Mysql
 sql >> डेटाबेस >  >> RDS >> Mysql

ETL प्रक्रिया में Python और MySQL का उपयोग करना:Python और SQLAlchemy का उपयोग करना

इस श्रृंखला के पिछले दो लेखों में, हमने चर्चा की कि ETL प्रक्रिया को करने के लिए Python और SQLAlchemy का उपयोग कैसे करें। आज हम भी ऐसा ही करेंगे, लेकिन इस बार Python और SQL कीमिया बिना SQL कमांड के का उपयोग कर रहे हैं पाठ्य प्रारूप में। यह हमें उस डेटाबेस इंजन की परवाह किए बिना SQLAlchemy का उपयोग करने में सक्षम करेगा जिससे हम जुड़े हुए हैं। तो चलिए शुरू करते हैं।

आज हम चर्चा करेंगे कि पायथन और SQLAlchemy का उपयोग करके ETL प्रक्रिया कैसे करें। हम अपने परिचालन डेटाबेस से दैनिक डेटा निकालने, उसे रूपांतरित करने और फिर अपने डेटा वेयरहाउस में लोड करने के लिए एक स्क्रिप्ट बनाएंगे।

यह श्रंखला का तीसरा लेख है। यदि आपने पहले दो लेख नहीं पढ़े हैं (ETL प्रक्रिया और SQLAlchemy में Python और MySQL का उपयोग करना), तो मैं आपको जारी रखने से पहले ऐसा करने के लिए दृढ़ता से प्रोत्साहित करता हूँ।

यह पूरी श्रृंखला हमारी डेटा वेयरहाउस श्रृंखला की निरंतरता है:

  • एक DWH बनाना, भाग एक:एक सदस्यता व्यवसाय डेटा मॉडल
  • एक DWH बनाना, भाग दो:एक सदस्यता व्यवसाय डेटा मॉडल
  • डेटा वेयरहाउस बनाना, भाग 3:एक सदस्यता व्यवसाय डेटा मॉडल

ठीक है, अब चलिए आज के विषय पर शुरू करते हैं। सबसे पहले, आइए डेटा मॉडल देखें।

डेटा मॉडल



ऑपरेशनल (लाइव) डेटाबेस डेटा मॉडल




DWH डेटा मॉडल


ये दो डेटा मॉडल हैं जिनका हम उपयोग करेंगे। डेटा वेयरहाउस (DWH) के बारे में अधिक जानकारी के लिए, इन लेखों को देखें:

  • द स्टार स्कीमा
  • द स्नोफ्लेक स्कीमा
  • स्टार स्कीमा बनाम स्नोफ्लेक स्कीमा

SQLAlchemy क्यों?

SQLAlchemy के पीछे का पूरा विचार यह है कि डेटाबेस आयात करने के बाद, हमें संबंधित डेटाबेस इंजन के लिए विशिष्ट SQL कोड की आवश्यकता नहीं होती है। इसके बजाय, हम वस्तुओं को SQLAlchemy में आयात कर सकते हैं और कथनों के लिए SQLAlchemy सिंटैक्स का उपयोग कर सकते हैं। यह हमें उसी भाषा का उपयोग करने की अनुमति देगा, चाहे हम किसी भी डेटाबेस इंजन से जुड़े हों। यहां मुख्य लाभ यह है कि एक डेवलपर को विभिन्न डेटाबेस इंजनों के बीच अंतर का ध्यान रखने की आवश्यकता नहीं होती है। यदि आप किसी भिन्न डेटाबेस इंजन में माइग्रेट करते हैं तो आपका SQLAlchemy प्रोग्राम ठीक उसी तरह (मामूली परिवर्तनों के साथ) काम करेगा।

मैंने अस्थायी भंडारण और विभिन्न डेटाबेस के बीच संवाद करने के लिए केवल SQLAlchemy कमांड और पायथन सूचियों का उपयोग करने का निर्णय लिया है। इस निर्णय के पीछे मुख्य कारण यह है कि 1) पायथन सूचियाँ प्रसिद्ध हैं, और 2) कोड उन लोगों के लिए पठनीय होगा जिनके पास पायथन कौशल नहीं है।

यह कहना नहीं है कि SQLAlchemy एकदम सही है। इसकी कुछ सीमाएँ हैं, जिनकी चर्चा हम बाद में करेंगे। अभी के लिए, आइए नीचे दिए गए कोड पर एक नज़र डालें:

स्क्रिप्ट और परिणाम चलाना

यह हमारी स्क्रिप्ट को कॉल करने के लिए इस्तेमाल किया जाने वाला पायथन कमांड है। स्क्रिप्ट परिचालन डेटाबेस में डेटा की जांच करती है, डीडब्ल्यूएच के साथ मूल्यों की तुलना करती है, और नए मूल्यों को आयात करती है। इस उदाहरण में, हम दो आयाम तालिकाओं और एक तथ्य तालिका में मान अपडेट कर रहे हैं; स्क्रिप्ट उपयुक्त आउटपुट देता है। पूरी स्क्रिप्ट इसलिए लिखी गई है ताकि आप इसे दिन में कई बार चला सकें। यह उस दिन के "पुराने" डेटा को हटा देगा और इसे नए से बदल देगा।

आइए ऊपर से शुरू करते हुए पूरी स्क्रिप्ट का विश्लेषण करें।

SQLAlchemy आयात करना

पहली चीज जो हमें करने की ज़रूरत है वह है उन मॉड्यूल को आयात करना जो हम स्क्रिप्ट में उपयोग करेंगे। आमतौर पर, आप स्क्रिप्ट लिखते समय अपने मॉड्यूल आयात करेंगे। ज्यादातर मामलों में, आपको ठीक से पता नहीं होगा कि आपको शुरुआत में किन मॉड्यूल की आवश्यकता होगी।

from datetime import date

# import SQLAlchemy
from sqlalchemy import create_engine, select, MetaData, Table, and_, func, case

हमने Python का datetime आयात किया है मॉड्यूल, जो हमें तारीखों के साथ काम करने वाली कक्षाओं की आपूर्ति करता है।

इसके बाद, हमारे पास sqlalchemy है मापांक। हम पूरे मॉड्यूल को आयात नहीं करेंगे, बस हमारी जरूरत की चीजें - SQLAlchemy के लिए कुछ विशिष्ट (create_engine , MetaData , Table ), कुछ SQL कथन भाग (select , and_ , case ), और func , जो हमें गिनती () . जैसे कार्यों का उपयोग करने में सक्षम बनाता है और योग ()

डेटाबेस से कनेक्ट करना

हमें अपने सर्वर पर दो डेटाबेस से कनेक्ट करना होगा। यदि आवश्यक हो तो हम विभिन्न सर्वरों से अधिक डेटाबेस (MySQL, SQL सर्वर, या कोई अन्य) से जुड़ सकते हैं। इस मामले में, दोनों डेटाबेस MySQL डेटाबेस हैं और मेरी स्थानीय मशीन पर संग्रहीत हैं।

# connect to databases
engine_live = sqlalchemy.create_engine('mysql+pymysql://:@localhost:3306/subscription_live')
connection_live = engine_live.connect()
engine_dwh = sqlalchemy.create_engine('mysql+pymysql://:@localhost:3306/subscription_dwh')
connection_dwh = engine_dwh.connect()

metadata = MetaData(bind=None)

हमने दो इंजन और दो कनेक्शन बनाए हैं। मैं यहाँ विवरण में नहीं जाऊँगा क्योंकि हम पिछले लेख में इस बात को पहले ही समझा चुके हैं।

dim_time को अपडेट कर रहा है आयाम

लक्ष्य:कल की तारीख डालें यदि वह पहले से तालिका में सम्मिलित नहीं है।

हमारी स्क्रिप्ट में, हम दो आयाम तालिकाओं को नए मानों के साथ अपडेट करेंगे। उनमें से बाकी एक ही पैटर्न का पालन करते हैं, इसलिए हम इसे केवल एक बार देखेंगे; हमें लगभग समान कोड को कुछ और बार लिखने की आवश्यकता नहीं है।

विचार बहुत सरल है। हम कल के लिए नया डेटा डालने के लिए हमेशा स्क्रिप्ट चलाएंगे। इसलिए, हमें यह जांचना होगा कि क्या वह तारीख आयाम तालिका में डाली गई थी। अगर यह पहले से मौजूद है, तो हम कुछ नहीं करेंगे; अगर ऐसा नहीं है, तो हम इसे जोड़ देंगे। आइए dim_time . को अपडेट करने के लिए कोड पर एक नजर डालते हैं टेबल।

सबसे पहले, हम जांच करेंगे कि क्या तारीख मौजूद है। यदि यह मौजूद नहीं है, तो हम इसे जोड़ देंगे। हम कल की तारीख को एक चर में संग्रहीत करने के साथ शुरू करते हैं। पायथन में, आप इसे इस तरह से करते हैं:

yesterday = date.fromordinal(date.today().toordinal()-1)
yesterday_str = str(yesterday)

पहली पंक्ति एक वर्तमान तिथि लेती है, इसे एक संख्यात्मक मान में परिवर्तित करती है, उस मान से 1 घटाती है, और उस संख्यात्मक मान को एक तिथि में परिवर्तित करती है (कल =आज – 1 ) दूसरी पंक्ति तारीख को टेक्स्ट के रूप में संग्रहीत करती है।

इसके बाद, हम परीक्षण करेंगे कि क्या तारीख पहले से ही डेटाबेस में है:

table_dim_time = Table('dim_time', metadata, autoload = True, autoload_with = engine_dwh)
stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday_str)
result = connection_dwh.execute(stmt).fetchall()
date_exists = len(result)

तालिका लोड करने के बाद, हम एक क्वेरी चलाएंगे जो आयाम तालिका से सभी पंक्तियों को वापस कर देगी जहां समय/दिनांक मान कल के बराबर होता है। परिणाम में 0 (तालिका में ऐसी कोई तिथि नहीं) या 1 पंक्ति हो सकती है (तारीख पहले से ही तालिका में है)।

यदि तिथि पहले से तालिका में नहीं है, तो हम इसे जोड़ने के लिए इन्सर्ट () कमांड का उपयोग करेंगे:

if date_exists == 0:
  print("New value added.")
  stmt = table_dim_time.insert().values(time_date=yesterday, time_year=yesterday.year, time_month=yesterday.month, time_week=yesterday.isocalendar()[1], time_weekday=yesterday.weekday())
  connection_dwh.execute(stmt)
else:
  print("No new values.")

एक नई चीज जो मैं यहां बताना चाहूंगा, वह है इसका उपयोग। .year , .month , .isocalendar()[1] , और .weekday डेटपार्ट्स प्राप्त करने के लिए।

dim_city को अपडेट कर रहा है आयाम

लक्ष्य:यदि कोई हो तो नए शहर डालें (यानी लाइव डेटाबेस में शहरों की सूची की तुलना DWH में शहरों की सूची से करें और छूटे हुए शहरों को जोड़ें)।

dim_time को अपडेट कर रहा है आयाम बहुत सरल था। हमने बस परीक्षण किया कि क्या कोई तिथि तालिका में थी और यदि वह पहले से नहीं थी तो उसे सम्मिलित किया। DWH डेटाबेस में किसी मान का परीक्षण करने के लिए, हमने पायथन वेरिएबल (कल .) का उपयोग किया ) हम उस प्रक्रिया का फिर से उपयोग करेंगे, लेकिन इस बार सूचियों के साथ।

चूंकि एकल SQLAlchemy क्वेरी में विभिन्न डेटाबेस से तालिकाओं को संयोजित करने का कोई आसान तरीका नहीं है, हम इस श्रृंखला के भाग 1 में उल्लिखित दृष्टिकोण का उपयोग नहीं कर सकते हैं। इसलिए, हमें इन दो डेटाबेस के बीच संवाद करने के लिए आवश्यक मूल्यों को संग्रहीत करने के लिए एक वस्तु की आवश्यकता होगी। मैंने सूचियों का उपयोग करने का निर्णय लिया है, क्योंकि वे सामान्य हैं और वे काम करती हैं।

सबसे पहले, हम country और city एक लाइव डेटाबेस से संबंधित वस्तुओं में टेबल।

# dim_city
print("\nUpdating... dim_city")
table_city = Table('city', metadata, autoload = True, autoload_with = engine_live)
table_country = Table('country', metadata, autoload = True, autoload_with = engine_live)
table_dim_city = Table('dim_city', metadata, autoload = True, autoload_with = engine_dwh)

अगला, हम dim_city DWH से एक सूची में तालिका:

# load whole dwh table in the list
stmt = select([table_dim_city]);
table_dim_city_list = connection_dwh.execute(stmt).fetchall()

फिर हम लाइव डेटाबेस के मानों के लिए भी ऐसा ही करेंगे। हम टेबल में शामिल होंगे country और city इसलिए हमारे पास इस सूची में आवश्यक सभी डेटा हैं:

# load all live values in the list
stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name])\
	.select_from(table_city\
	.join(table_country))
table_city_list = connection_live.execute(stmt).fetchall()

अब हम लाइव डेटाबेस से डेटा वाली सूची के माध्यम से लूप करेंगे। प्रत्येक रिकॉर्ड के लिए, हम मानों की तुलना करेंगे (city_name , postal_code , और country_name ) अगर हमें ऐसे मान नहीं मिलते हैं, तो हम dim_city में एक नया रिकॉर्ड जोड़ देंगे टेबल।

# loop through live_db table
# for each record test if it is missing in the dwh table
new_values_added = 0
for city in table_city_list:
	id = -1;
	for dim_city in table_dim_city_list:
		if city[0] == dim_city[1] and city[1] == dim_city[2] and city[2] == dim_city[3]:
			id = dim_city[0]
	if id == -1:
		stmt = table_dim_city.insert().values(city_name=city[0], postal_code=city[1], country_name=city[2])
		connection_dwh.execute(stmt)
		new_values_added = 1
if new_values_added == 0:
	print("No new values.")
else:
	print("New value(s) added.")

यह निर्धारित करने के लिए कि क्या मान पहले से ही DWH में है, हमने विशेषताओं के संयोजन का परीक्षण किया जो अद्वितीय होना चाहिए। (लाइव डेटाबेस से प्राथमिक कुंजी हमें यहां ज्यादा मदद नहीं करती है।) हम अन्य शब्दकोशों को अपडेट करने के लिए समान कोड का उपयोग कर सकते हैं। यह सबसे अच्छा समाधान नहीं है, लेकिन यह अभी भी बहुत सुंदर है। और यह वही करेगा जो हमें चाहिए।

तथ्य_ग्राहक_सदस्यता को अपडेट करना टेबल

लक्ष्य:अगर हमारे पास कल की तारीख का पुराना डेटा है, तो पहले उसे हटा दें। कल के डेटा को DWH में जोड़ें - भले ही हमने पिछले चरण में कुछ हटाया हो या नहीं।

सभी डायमेंशन टेबल को अपडेट करने के बाद हमें फैक्ट टेबल को अपडेट करना चाहिए। हमारी स्क्रिप्ट में, हम केवल एक तथ्य तालिका को अपडेट करेंगे। तर्क पिछले अनुभाग की तरह ही है:अन्य तालिकाओं को अपडेट करने के लिए उसी पैटर्न का पालन किया जाएगा, इसलिए हम ज्यादातर कोड दोहराएंगे।

तथ्य तालिका में मान डालने से पहले, हमें आयाम तालिकाओं से संबंधित कुंजियों के मूल्यों को जानना होगा। ऐसा करने के लिए, हम फिर से आयामों को सूचियों में लोड करेंगे और उनकी तुलना लाइव डेटाबेस के मानों से करेंगे।

सबसे पहले हम ग्राहक को लोड करेंगे और fact_customer_subscribed ऑब्जेक्ट्स में टेबल:

# fact_customer_subscribed
print("\nUpdating... fact_customer_subscribed")

table_customer = Table('customer', metadata, autoload = True, autoload_with = engine_live)
table_fact_customer_subscribed = Table('fact_customer_subscribed', metadata, autoload = True, autoload_with = engine_dwh)

अब, हमें संबंधित समय आयाम के लिए कुंजियां ढूंढनी होंगी। चूंकि हम हमेशा कल के लिए डेटा डाल रहे हैं, हम उस तारीख को dim_time में खोजेंगे तालिका और इसकी आईडी का उपयोग करें। क्वेरी 1 पंक्ति लौटाती है, और आईडी पहली स्थिति में है (सूचकांक 0 से शुरू होता है, इसलिए यह result[0][0] है ):

# find key for the dim_time dimension
stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday)
result = connection_dwh.execute(stmt).fetchall()
dim_time_id = result[0][0]

उस समय के लिए, हम तथ्य तालिका से सभी संबद्ध रिकॉर्ड हटा देंगे:

# delete any existing data in the fact table for that time dimension value
stmt = table_fact_customer_subscribed.delete().where(table_fact_customer_subscribed.columns.dim_time_id == dim_time_id)
connection_dwh.execute(stmt)

ठीक है, अब हमारे पास dim_time_id में संग्रहीत समय आयाम की आईडी है चर। यह आसान था क्योंकि हमारे पास केवल एक बार आयाम मान हो सकता है। शहर के आयाम के लिए कहानी अलग होगी। सबसे पहले, हम सभी लोड करेंगे हमें जिन मूल्यों की आवश्यकता है - वे मूल्य जो विशिष्ट रूप से शहर का वर्णन करते हैं (आईडी नहीं), और समग्र मूल्य:

# prepare data for insert
stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name, func.sum(case([(table_customer.columns.active == 1, 1)], else_=0)).label('total_active'), func.sum(case([(table_customer.columns.active == 0, 1)], else_=0)).label('total_inactive'), func.sum(case([(and_(table_customer.columns.active == 1, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_new'), func.sum(case([(and_(table_customer.columns.active == 0, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_canceled')])\
	.select_from(table_customer\
	.join(table_city)\
	.join(table_country))\
	.group_by(table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name)

उपरोक्त क्वेरी के बारे में मैं कुछ बातों पर ज़ोर देना चाहूंगा:

  • func.sum(...) है SUM(...) "मानक SQL" से।
  • case(...) सिंटैक्स and_ का उपयोग करता है शर्तों से पहले, उनके बीच नहीं।
  • .label(...) SQL AS उपनाम की तरह कार्य करता है।
  • हम \ का उपयोग कर रहे हैं अगली पंक्ति में जाने और क्वेरी की पठनीयता बढ़ाने के लिए। (मुझ पर विश्वास करें, स्लैश के बिना यह काफी अपठनीय है - मैंने इसे आजमाया है :))
  • .group_by(...) SQL के GROUP BY की भूमिका निभाता है।

इसके बाद, हम पिछली क्वेरी का उपयोग करके लौटाए गए प्रत्येक रिकॉर्ड के माध्यम से लूप करेंगे। प्रत्येक रिकॉर्ड के लिए, हम उन मानों की तुलना करेंगे जो किसी शहर को विशिष्ट रूप से परिभाषित करते हैं (city_name , postal_code , country_name ) DWH से बाहर बनाई गई सूची में संग्रहीत मूल्यों के साथ dim_city टेबल। यदि सभी तीन मान मेल खाते हैं, तो हम सूची से आईडी संग्रहीत करेंगे और नया डेटा सम्मिलित करते समय इसका उपयोग करेंगे। इस तरह, प्रत्येक रिकॉर्ड के लिए, हमारे पास दोनों आयामों के लिए आईडी होंगे:

# loop through all new records
# use time dimension
# for each record find key for city dimension
# insert row
new_values = connection_live.execute(stmt).fetchall()
for new_value in new_values:
	dim_city_id = -1;
	for dim_city in table_dim_city_list:
		if new_value[0] == dim_city[1] and new_value[1] == dim_city[2] and new_value[2] == dim_city[3]:
			dim_city_id = dim_city[0]
	if dim_city_id > 0:	
		stmt_insert = table_fact_customer_subscribed.insert().values(dim_city_id=dim_city_id, dim_time_id=dim_time_id, total_active=new_value[3], total_inactive=new_value[4], daily_new=new_value[5], daily_canceled=new_value[6])
		connection_dwh.execute(stmt_insert)
		dim_city_id = -1
print("Completed.")

और बस। हमने अपने DWH को अपडेट कर दिया है। यदि हम सभी आयामों और तथ्य तालिकाओं को अद्यतन करते हैं तो स्क्रिप्ट बहुत लंबी होगी। जब एक तथ्य तालिका अधिक आयाम तालिकाओं से संबंधित होती है तो जटिलता भी अधिक होती है। उस स्थिति में, हमें के लिए . की आवश्यकता होगी प्रत्येक आयाम तालिका के लिए लूप।

यह काम नहीं करता!

जब मैंने यह स्क्रिप्ट लिखी तो मुझे बहुत निराशा हुई और फिर पता चला कि ऐसा कुछ काम नहीं करेगा:

stmt = select([table_city.columns.city_name])\
	.select_from(table_city\
	.outerjoin(table_dim_city, table_city.columns.city_name == table_dim_city.columns.city_name))\
	.where(table_dim_city.columns.id.is_(None))

इस उदाहरण में, मैं दो अलग-अलग डेटाबेस से तालिकाओं का उपयोग करने का प्रयास कर रहा हूं। यदि हम दो अलग-अलग कनेक्शन स्थापित करते हैं, तो पहला कनेक्शन दूसरे कनेक्शन से टेबल "देख" नहीं पाएगा। अगर हम सीधे सर्वर से कनेक्ट होते हैं, डेटाबेस से नहीं, तो हम टेबल लोड नहीं कर पाएंगे।

जब तक यह परिवर्तन नहीं होता (उम्मीद है कि जल्द ही), आपको दो डेटाबेस के बीच संवाद करने के लिए किसी प्रकार की संरचना (जैसे कि हमने आज क्या किया) का उपयोग करने की आवश्यकता होगी। यह कोड को जटिल बनाता है, क्योंकि आपको एक ही क्वेरी को दो सूचियों से बदलना होगा और के लिए नेस्टेड करना होगा लूप।

SQLAlchemy और Python के बारे में अपने विचार साझा करें

यह इस श्रंखला का अंतिम लेख था। लेकिन कौन जानता है? हो सकता है कि हम आने वाले लेखों में एक और तरीका आजमाएं, इसलिए बने रहें। इस बीच, कृपया डेटाबेस के साथ संयोजन में SQLAlchemy और Python के बारे में अपने विचार साझा करें। आपको क्या लगता है कि इस लेख में हमारे पास क्या कमी है? आप क्या जोड़ेंगे? हमें नीचे कमेंट्स में बताएं।

आप इस लेख में इस्तेमाल की गई पूरी स्क्रिप्ट यहां डाउनलोड कर सकते हैं।

और विशेष धन्यवाद डिर्क जे बोसमैन (@dirkjobosman) को जाता है, जिन्होंने इस लेख श्रृंखला की सिफारिश की थी।


  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. MySQL का रूट पासवर्ड रीसेट करें

  2. MySQL में टेबल को कैसे छोटा करें

  3. MySQL में अग्रणी और अनुगामी व्हॉट्सएप को कैसे हटाएं

  4. =नल और IS NULL में क्या अंतर है?

  5. वर्चुअलमिन:पासवर्ड बदलने के बाद आपके पास इस MySQL डेटाबेस तक पहुंच नहीं है