इस श्रृंखला के पिछले दो लेखों में, हमने चर्चा की कि ETL प्रक्रिया को करने के लिए Python और SQLAlchemy का उपयोग कैसे करें। आज हम भी ऐसा ही करेंगे, लेकिन इस बार Python और SQL कीमिया बिना SQL कमांड के का उपयोग कर रहे हैं पाठ्य प्रारूप में। यह हमें उस डेटाबेस इंजन की परवाह किए बिना SQLAlchemy का उपयोग करने में सक्षम करेगा जिससे हम जुड़े हुए हैं। तो चलिए शुरू करते हैं।
आज हम चर्चा करेंगे कि पायथन और SQLAlchemy का उपयोग करके ETL प्रक्रिया कैसे करें। हम अपने परिचालन डेटाबेस से दैनिक डेटा निकालने, उसे रूपांतरित करने और फिर अपने डेटा वेयरहाउस में लोड करने के लिए एक स्क्रिप्ट बनाएंगे।
यह श्रंखला का तीसरा लेख है। यदि आपने पहले दो लेख नहीं पढ़े हैं (ETL प्रक्रिया और SQLAlchemy में Python और MySQL का उपयोग करना), तो मैं आपको जारी रखने से पहले ऐसा करने के लिए दृढ़ता से प्रोत्साहित करता हूँ।
यह पूरी श्रृंखला हमारी डेटा वेयरहाउस श्रृंखला की निरंतरता है:
- एक DWH बनाना, भाग एक:एक सदस्यता व्यवसाय डेटा मॉडल
- एक DWH बनाना, भाग दो:एक सदस्यता व्यवसाय डेटा मॉडल
- डेटा वेयरहाउस बनाना, भाग 3:एक सदस्यता व्यवसाय डेटा मॉडल
ठीक है, अब चलिए आज के विषय पर शुरू करते हैं। सबसे पहले, आइए डेटा मॉडल देखें।
डेटा मॉडल
ऑपरेशनल (लाइव) डेटाबेस डेटा मॉडल
DWH डेटा मॉडल
ये दो डेटा मॉडल हैं जिनका हम उपयोग करेंगे। डेटा वेयरहाउस (DWH) के बारे में अधिक जानकारी के लिए, इन लेखों को देखें:
- द स्टार स्कीमा
- द स्नोफ्लेक स्कीमा
- स्टार स्कीमा बनाम स्नोफ्लेक स्कीमा
SQLAlchemy क्यों?
SQLAlchemy के पीछे का पूरा विचार यह है कि डेटाबेस आयात करने के बाद, हमें संबंधित डेटाबेस इंजन के लिए विशिष्ट SQL कोड की आवश्यकता नहीं होती है। इसके बजाय, हम वस्तुओं को SQLAlchemy में आयात कर सकते हैं और कथनों के लिए SQLAlchemy सिंटैक्स का उपयोग कर सकते हैं। यह हमें उसी भाषा का उपयोग करने की अनुमति देगा, चाहे हम किसी भी डेटाबेस इंजन से जुड़े हों। यहां मुख्य लाभ यह है कि एक डेवलपर को विभिन्न डेटाबेस इंजनों के बीच अंतर का ध्यान रखने की आवश्यकता नहीं होती है। यदि आप किसी भिन्न डेटाबेस इंजन में माइग्रेट करते हैं तो आपका SQLAlchemy प्रोग्राम ठीक उसी तरह (मामूली परिवर्तनों के साथ) काम करेगा।
मैंने अस्थायी भंडारण और विभिन्न डेटाबेस के बीच संवाद करने के लिए केवल SQLAlchemy कमांड और पायथन सूचियों का उपयोग करने का निर्णय लिया है। इस निर्णय के पीछे मुख्य कारण यह है कि 1) पायथन सूचियाँ प्रसिद्ध हैं, और 2) कोड उन लोगों के लिए पठनीय होगा जिनके पास पायथन कौशल नहीं है।
यह कहना नहीं है कि SQLAlchemy एकदम सही है। इसकी कुछ सीमाएँ हैं, जिनकी चर्चा हम बाद में करेंगे। अभी के लिए, आइए नीचे दिए गए कोड पर एक नज़र डालें:
स्क्रिप्ट और परिणाम चलाना
यह हमारी स्क्रिप्ट को कॉल करने के लिए इस्तेमाल किया जाने वाला पायथन कमांड है। स्क्रिप्ट परिचालन डेटाबेस में डेटा की जांच करती है, डीडब्ल्यूएच के साथ मूल्यों की तुलना करती है, और नए मूल्यों को आयात करती है। इस उदाहरण में, हम दो आयाम तालिकाओं और एक तथ्य तालिका में मान अपडेट कर रहे हैं; स्क्रिप्ट उपयुक्त आउटपुट देता है। पूरी स्क्रिप्ट इसलिए लिखी गई है ताकि आप इसे दिन में कई बार चला सकें। यह उस दिन के "पुराने" डेटा को हटा देगा और इसे नए से बदल देगा।
आइए ऊपर से शुरू करते हुए पूरी स्क्रिप्ट का विश्लेषण करें।
SQLAlchemy आयात करना
पहली चीज जो हमें करने की ज़रूरत है वह है उन मॉड्यूल को आयात करना जो हम स्क्रिप्ट में उपयोग करेंगे। आमतौर पर, आप स्क्रिप्ट लिखते समय अपने मॉड्यूल आयात करेंगे। ज्यादातर मामलों में, आपको ठीक से पता नहीं होगा कि आपको शुरुआत में किन मॉड्यूल की आवश्यकता होगी।
from datetime import date # import SQLAlchemy from sqlalchemy import create_engine, select, MetaData, Table, and_, func, case
हमने Python का datetime
आयात किया है मॉड्यूल, जो हमें तारीखों के साथ काम करने वाली कक्षाओं की आपूर्ति करता है।
इसके बाद, हमारे पास sqlalchemy
है मापांक। हम पूरे मॉड्यूल को आयात नहीं करेंगे, बस हमारी जरूरत की चीजें - SQLAlchemy के लिए कुछ विशिष्ट (create_engine
, MetaData
, Table
), कुछ SQL कथन भाग (select
, and_
, case
), और func
, जो हमें गिनती () . जैसे कार्यों का उपयोग करने में सक्षम बनाता है और योग () ।
डेटाबेस से कनेक्ट करना
हमें अपने सर्वर पर दो डेटाबेस से कनेक्ट करना होगा। यदि आवश्यक हो तो हम विभिन्न सर्वरों से अधिक डेटाबेस (MySQL, SQL सर्वर, या कोई अन्य) से जुड़ सकते हैं। इस मामले में, दोनों डेटाबेस MySQL डेटाबेस हैं और मेरी स्थानीय मशीन पर संग्रहीत हैं।
# connect to databases engine_live = sqlalchemy.create_engine('mysql+pymysql://: @localhost:3306/subscription_live') connection_live = engine_live.connect() engine_dwh = sqlalchemy.create_engine('mysql+pymysql:// : @localhost:3306/subscription_dwh') connection_dwh = engine_dwh.connect() metadata = MetaData(bind=None)
हमने दो इंजन और दो कनेक्शन बनाए हैं। मैं यहाँ विवरण में नहीं जाऊँगा क्योंकि हम पिछले लेख में इस बात को पहले ही समझा चुके हैं।
dim_time को अपडेट कर रहा है आयाम
लक्ष्य:कल की तारीख डालें यदि वह पहले से तालिका में सम्मिलित नहीं है।
हमारी स्क्रिप्ट में, हम दो आयाम तालिकाओं को नए मानों के साथ अपडेट करेंगे। उनमें से बाकी एक ही पैटर्न का पालन करते हैं, इसलिए हम इसे केवल एक बार देखेंगे; हमें लगभग समान कोड को कुछ और बार लिखने की आवश्यकता नहीं है।
विचार बहुत सरल है। हम कल के लिए नया डेटा डालने के लिए हमेशा स्क्रिप्ट चलाएंगे। इसलिए, हमें यह जांचना होगा कि क्या वह तारीख आयाम तालिका में डाली गई थी। अगर यह पहले से मौजूद है, तो हम कुछ नहीं करेंगे; अगर ऐसा नहीं है, तो हम इसे जोड़ देंगे। आइए dim_time
. को अपडेट करने के लिए कोड पर एक नजर डालते हैं टेबल।
सबसे पहले, हम जांच करेंगे कि क्या तारीख मौजूद है। यदि यह मौजूद नहीं है, तो हम इसे जोड़ देंगे। हम कल की तारीख को एक चर में संग्रहीत करने के साथ शुरू करते हैं। पायथन में, आप इसे इस तरह से करते हैं:
yesterday = date.fromordinal(date.today().toordinal()-1) yesterday_str = str(yesterday)
पहली पंक्ति एक वर्तमान तिथि लेती है, इसे एक संख्यात्मक मान में परिवर्तित करती है, उस मान से 1 घटाती है, और उस संख्यात्मक मान को एक तिथि में परिवर्तित करती है (कल =आज – 1 ) दूसरी पंक्ति तारीख को टेक्स्ट के रूप में संग्रहीत करती है।
इसके बाद, हम परीक्षण करेंगे कि क्या तारीख पहले से ही डेटाबेस में है:
table_dim_time = Table('dim_time', metadata, autoload = True, autoload_with = engine_dwh) stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday_str) result = connection_dwh.execute(stmt).fetchall() date_exists = len(result)
तालिका लोड करने के बाद, हम एक क्वेरी चलाएंगे जो आयाम तालिका से सभी पंक्तियों को वापस कर देगी जहां समय/दिनांक मान कल के बराबर होता है। परिणाम में 0 (तालिका में ऐसी कोई तिथि नहीं) या 1 पंक्ति हो सकती है (तारीख पहले से ही तालिका में है)।
यदि तिथि पहले से तालिका में नहीं है, तो हम इसे जोड़ने के लिए इन्सर्ट () कमांड का उपयोग करेंगे:
if date_exists == 0: print("New value added.") stmt = table_dim_time.insert().values(time_date=yesterday, time_year=yesterday.year, time_month=yesterday.month, time_week=yesterday.isocalendar()[1], time_weekday=yesterday.weekday()) connection_dwh.execute(stmt) else: print("No new values.")
एक नई चीज जो मैं यहां बताना चाहूंगा, वह है इसका उपयोग। .year
, .month
, .isocalendar()[1]
, और .weekday
डेटपार्ट्स प्राप्त करने के लिए।
dim_city को अपडेट कर रहा है आयाम
लक्ष्य:यदि कोई हो तो नए शहर डालें (यानी लाइव डेटाबेस में शहरों की सूची की तुलना DWH में शहरों की सूची से करें और छूटे हुए शहरों को जोड़ें)।
dim_time
को अपडेट कर रहा है आयाम बहुत सरल था। हमने बस परीक्षण किया कि क्या कोई तिथि तालिका में थी और यदि वह पहले से नहीं थी तो उसे सम्मिलित किया। DWH डेटाबेस में किसी मान का परीक्षण करने के लिए, हमने पायथन वेरिएबल (कल .) का उपयोग किया ) हम उस प्रक्रिया का फिर से उपयोग करेंगे, लेकिन इस बार सूचियों के साथ।
चूंकि एकल SQLAlchemy क्वेरी में विभिन्न डेटाबेस से तालिकाओं को संयोजित करने का कोई आसान तरीका नहीं है, हम इस श्रृंखला के भाग 1 में उल्लिखित दृष्टिकोण का उपयोग नहीं कर सकते हैं। इसलिए, हमें इन दो डेटाबेस के बीच संवाद करने के लिए आवश्यक मूल्यों को संग्रहीत करने के लिए एक वस्तु की आवश्यकता होगी। मैंने सूचियों का उपयोग करने का निर्णय लिया है, क्योंकि वे सामान्य हैं और वे काम करती हैं।
सबसे पहले, हम country
और city
एक लाइव डेटाबेस से संबंधित वस्तुओं में टेबल।
# dim_city print("\nUpdating... dim_city") table_city = Table('city', metadata, autoload = True, autoload_with = engine_live) table_country = Table('country', metadata, autoload = True, autoload_with = engine_live) table_dim_city = Table('dim_city', metadata, autoload = True, autoload_with = engine_dwh)
अगला, हम dim_city
DWH से एक सूची में तालिका:
# load whole dwh table in the list stmt = select([table_dim_city]); table_dim_city_list = connection_dwh.execute(stmt).fetchall()
फिर हम लाइव डेटाबेस के मानों के लिए भी ऐसा ही करेंगे। हम टेबल में शामिल होंगे country
और city
इसलिए हमारे पास इस सूची में आवश्यक सभी डेटा हैं:
# load all live values in the list stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name])\ .select_from(table_city\ .join(table_country)) table_city_list = connection_live.execute(stmt).fetchall()
अब हम लाइव डेटाबेस से डेटा वाली सूची के माध्यम से लूप करेंगे। प्रत्येक रिकॉर्ड के लिए, हम मानों की तुलना करेंगे (city_name
, postal_code
, और country_name
) अगर हमें ऐसे मान नहीं मिलते हैं, तो हम dim_city
में एक नया रिकॉर्ड जोड़ देंगे टेबल।
# loop through live_db table # for each record test if it is missing in the dwh table new_values_added = 0 for city in table_city_list: id = -1; for dim_city in table_dim_city_list: if city[0] == dim_city[1] and city[1] == dim_city[2] and city[2] == dim_city[3]: id = dim_city[0] if id == -1: stmt = table_dim_city.insert().values(city_name=city[0], postal_code=city[1], country_name=city[2]) connection_dwh.execute(stmt) new_values_added = 1 if new_values_added == 0: print("No new values.") else: print("New value(s) added.")
यह निर्धारित करने के लिए कि क्या मान पहले से ही DWH में है, हमने विशेषताओं के संयोजन का परीक्षण किया जो अद्वितीय होना चाहिए। (लाइव डेटाबेस से प्राथमिक कुंजी हमें यहां ज्यादा मदद नहीं करती है।) हम अन्य शब्दकोशों को अपडेट करने के लिए समान कोड का उपयोग कर सकते हैं। यह सबसे अच्छा समाधान नहीं है, लेकिन यह अभी भी बहुत सुंदर है। और यह वही करेगा जो हमें चाहिए।
तथ्य_ग्राहक_सदस्यता को अपडेट करना टेबल
लक्ष्य:अगर हमारे पास कल की तारीख का पुराना डेटा है, तो पहले उसे हटा दें। कल के डेटा को DWH में जोड़ें - भले ही हमने पिछले चरण में कुछ हटाया हो या नहीं।
सभी डायमेंशन टेबल को अपडेट करने के बाद हमें फैक्ट टेबल को अपडेट करना चाहिए। हमारी स्क्रिप्ट में, हम केवल एक तथ्य तालिका को अपडेट करेंगे। तर्क पिछले अनुभाग की तरह ही है:अन्य तालिकाओं को अपडेट करने के लिए उसी पैटर्न का पालन किया जाएगा, इसलिए हम ज्यादातर कोड दोहराएंगे।
तथ्य तालिका में मान डालने से पहले, हमें आयाम तालिकाओं से संबंधित कुंजियों के मूल्यों को जानना होगा। ऐसा करने के लिए, हम फिर से आयामों को सूचियों में लोड करेंगे और उनकी तुलना लाइव डेटाबेस के मानों से करेंगे।
सबसे पहले हम ग्राहक को लोड करेंगे और fact_customer_subscribed
ऑब्जेक्ट्स में टेबल:
# fact_customer_subscribed print("\nUpdating... fact_customer_subscribed") table_customer = Table('customer', metadata, autoload = True, autoload_with = engine_live) table_fact_customer_subscribed = Table('fact_customer_subscribed', metadata, autoload = True, autoload_with = engine_dwh)
अब, हमें संबंधित समय आयाम के लिए कुंजियां ढूंढनी होंगी। चूंकि हम हमेशा कल के लिए डेटा डाल रहे हैं, हम उस तारीख को dim_time
में खोजेंगे तालिका और इसकी आईडी का उपयोग करें। क्वेरी 1 पंक्ति लौटाती है, और आईडी पहली स्थिति में है (सूचकांक 0 से शुरू होता है, इसलिए यह result[0][0]
है ):
# find key for the dim_time dimension stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday) result = connection_dwh.execute(stmt).fetchall() dim_time_id = result[0][0]
उस समय के लिए, हम तथ्य तालिका से सभी संबद्ध रिकॉर्ड हटा देंगे:
# delete any existing data in the fact table for that time dimension value stmt = table_fact_customer_subscribed.delete().where(table_fact_customer_subscribed.columns.dim_time_id == dim_time_id) connection_dwh.execute(stmt)
ठीक है, अब हमारे पास dim_time_id
में संग्रहीत समय आयाम की आईडी है चर। यह आसान था क्योंकि हमारे पास केवल एक बार आयाम मान हो सकता है। शहर के आयाम के लिए कहानी अलग होगी। सबसे पहले, हम सभी लोड करेंगे हमें जिन मूल्यों की आवश्यकता है - वे मूल्य जो विशिष्ट रूप से शहर का वर्णन करते हैं (आईडी नहीं), और समग्र मूल्य:
# prepare data for insert stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name, func.sum(case([(table_customer.columns.active == 1, 1)], else_=0)).label('total_active'), func.sum(case([(table_customer.columns.active == 0, 1)], else_=0)).label('total_inactive'), func.sum(case([(and_(table_customer.columns.active == 1, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_new'), func.sum(case([(and_(table_customer.columns.active == 0, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_canceled')])\ .select_from(table_customer\ .join(table_city)\ .join(table_country))\ .group_by(table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name)
उपरोक्त क्वेरी के बारे में मैं कुछ बातों पर ज़ोर देना चाहूंगा:
func.sum(...)
है SUM(...) "मानक SQL" से।- द
case(...)
सिंटैक्सand_
का उपयोग करता है शर्तों से पहले, उनके बीच नहीं। .label(...)
SQL AS उपनाम की तरह कार्य करता है।- हम
\
का उपयोग कर रहे हैं अगली पंक्ति में जाने और क्वेरी की पठनीयता बढ़ाने के लिए। (मुझ पर विश्वास करें, स्लैश के बिना यह काफी अपठनीय है - मैंने इसे आजमाया है :)) .group_by(...)
SQL के GROUP BY की भूमिका निभाता है।
इसके बाद, हम पिछली क्वेरी का उपयोग करके लौटाए गए प्रत्येक रिकॉर्ड के माध्यम से लूप करेंगे। प्रत्येक रिकॉर्ड के लिए, हम उन मानों की तुलना करेंगे जो किसी शहर को विशिष्ट रूप से परिभाषित करते हैं (city_name
, postal_code
, country_name
) DWH से बाहर बनाई गई सूची में संग्रहीत मूल्यों के साथ dim_city
टेबल। यदि सभी तीन मान मेल खाते हैं, तो हम सूची से आईडी संग्रहीत करेंगे और नया डेटा सम्मिलित करते समय इसका उपयोग करेंगे। इस तरह, प्रत्येक रिकॉर्ड के लिए, हमारे पास दोनों आयामों के लिए आईडी होंगे:
# loop through all new records # use time dimension # for each record find key for city dimension # insert row new_values = connection_live.execute(stmt).fetchall() for new_value in new_values: dim_city_id = -1; for dim_city in table_dim_city_list: if new_value[0] == dim_city[1] and new_value[1] == dim_city[2] and new_value[2] == dim_city[3]: dim_city_id = dim_city[0] if dim_city_id > 0: stmt_insert = table_fact_customer_subscribed.insert().values(dim_city_id=dim_city_id, dim_time_id=dim_time_id, total_active=new_value[3], total_inactive=new_value[4], daily_new=new_value[5], daily_canceled=new_value[6]) connection_dwh.execute(stmt_insert) dim_city_id = -1 print("Completed.")
और बस। हमने अपने DWH को अपडेट कर दिया है। यदि हम सभी आयामों और तथ्य तालिकाओं को अद्यतन करते हैं तो स्क्रिप्ट बहुत लंबी होगी। जब एक तथ्य तालिका अधिक आयाम तालिकाओं से संबंधित होती है तो जटिलता भी अधिक होती है। उस स्थिति में, हमें के लिए . की आवश्यकता होगी प्रत्येक आयाम तालिका के लिए लूप।
यह काम नहीं करता!
जब मैंने यह स्क्रिप्ट लिखी तो मुझे बहुत निराशा हुई और फिर पता चला कि ऐसा कुछ काम नहीं करेगा:
stmt = select([table_city.columns.city_name])\ .select_from(table_city\ .outerjoin(table_dim_city, table_city.columns.city_name == table_dim_city.columns.city_name))\ .where(table_dim_city.columns.id.is_(None))
इस उदाहरण में, मैं दो अलग-अलग डेटाबेस से तालिकाओं का उपयोग करने का प्रयास कर रहा हूं। यदि हम दो अलग-अलग कनेक्शन स्थापित करते हैं, तो पहला कनेक्शन दूसरे कनेक्शन से टेबल "देख" नहीं पाएगा। अगर हम सीधे सर्वर से कनेक्ट होते हैं, डेटाबेस से नहीं, तो हम टेबल लोड नहीं कर पाएंगे।
जब तक यह परिवर्तन नहीं होता (उम्मीद है कि जल्द ही), आपको दो डेटाबेस के बीच संवाद करने के लिए किसी प्रकार की संरचना (जैसे कि हमने आज क्या किया) का उपयोग करने की आवश्यकता होगी। यह कोड को जटिल बनाता है, क्योंकि आपको एक ही क्वेरी को दो सूचियों से बदलना होगा और के लिए नेस्टेड करना होगा लूप।
SQLAlchemy और Python के बारे में अपने विचार साझा करें
यह इस श्रंखला का अंतिम लेख था। लेकिन कौन जानता है? हो सकता है कि हम आने वाले लेखों में एक और तरीका आजमाएं, इसलिए बने रहें। इस बीच, कृपया डेटाबेस के साथ संयोजन में SQLAlchemy और Python के बारे में अपने विचार साझा करें। आपको क्या लगता है कि इस लेख में हमारे पास क्या कमी है? आप क्या जोड़ेंगे? हमें नीचे कमेंट्स में बताएं।
आप इस लेख में इस्तेमाल की गई पूरी स्क्रिप्ट यहां डाउनलोड कर सकते हैं।
और विशेष धन्यवाद डिर्क जे बोसमैन (@dirkjobosman) को जाता है, जिन्होंने इस लेख श्रृंखला की सिफारिश की थी।