यह लेख उन लोगों के लिए दिलचस्प होगा जिन्हें अक्सर डेटा एकीकरण से निपटना पड़ता है।
परिचय
मान लें कि एक डेटाबेस है जहां उपयोगकर्ता हमेशा डेटा को संशोधित करते हैं (अपडेट या निकालें)। शायद, इस डेटाबेस का उपयोग एक बड़े एप्लिकेशन द्वारा किया जाता है जो तालिका संरचना को संशोधित करने की अनुमति नहीं देता है। कार्य समय-समय पर एक अलग सर्वर पर इस डेटाबेस से दूसरे डेटाबेस में डेटा लोड करना है। समस्या से निपटने का सबसे सरल तरीका लक्ष्य डेटाबेस को प्रारंभिक सफाई के साथ स्रोत डेटाबेस से लक्ष्य डेटाबेस में नए डेटा को लोड करना है। आप इस पद्धति का उपयोग तब तक कर सकते हैं जब तक डेटा लोड करने का समय स्वीकार्य है और पूर्व निर्धारित समय सीमा से अधिक नहीं है। क्या होगा यदि डेटा लोड करने में कई दिन लगते हैं? इसके अलावा, अस्थिर संचार चैनल उस स्थिति की ओर ले जाते हैं जब डेटा लोड रुक जाता है और पुनरारंभ होता है। यदि आप इन बाधाओं का सामना करते हैं, तो मेरा सुझाव है कि 'डेटा रीलोडिंग' एल्गोरिदम में से एक पर विचार करें। इसका मतलब है कि नवीनतम लोड लोड होने के बाद से केवल डेटा संशोधन हुआ है।
सीडीसी
SQL सर्वर 2008 में, Microsoft ने एक डेटा ट्रैकिंग तंत्र की शुरुआत की, जिसे चेंज डेटा कैप्चर (CDC) कहा जाता है। मोटे तौर पर, इस तंत्र का उद्देश्य यह है कि किसी भी डेटाबेस तालिका के लिए सीडीसी को सक्षम करने से मूल तालिका के समान नाम के साथ उसी डेटाबेस में एक सिस्टम तालिका तैयार हो जाएगी (स्कीमा इस प्रकार होगी:'cdc' उपसर्ग के रूप में प्लस पुराना स्कीमा नाम प्लस "_" और अंत "_CT"। उदाहरण के लिए, मूल तालिका dbo है। उदाहरण, तो सिस्टम तालिका को cdc.dbo_Example_CT कहा जाएगा)। यह संशोधित किए गए सभी डेटा को संग्रहीत करेगा।
दरअसल, सीडीसी में गहरी खुदाई करने के लिए, उदाहरण पर विचार करें। लेकिन पहले, सुनिश्चित करें कि सीडीसी का उपयोग करने वाला SQL एजेंट SQL सर्वर परीक्षण उदाहरण पर काम करता है।
इसके अलावा, हम एक स्क्रिप्ट पर विचार करने जा रहे हैं जो एक डेटाबेस और परीक्षण तालिका बनाता है, इस तालिका को डेटा से भरता है और इस तालिका के लिए सीडीसी को सक्षम करता है।
कार्य को समझने और सरल बनाने के लिए, हम विभिन्न सर्वरों को स्रोत और लक्ष्य डेटाबेस वितरित किए बिना एक SQL सर्वर इंस्टेंस का उपयोग करेंगे।
use master go -- create a source database if not exists (select * from sys.databases where name = 'db_src_cdc') create database db_src_cdc go use db_src_cdc go -- enable CDC if it is disabled if not exists (select * from sys.databases where name = db_name() and is_cdc_enabled=1) exec sys.sp_cdc_enable_db go -- create a role for tables with CDC if not exists(select * from sys.sysusers where name = 'CDC_Reader' and issqlrole=1) create role CDC_Reader go -- create a table if object_id('dbo.Example','U') is null create table dbo.Example ( ID int identity constraint PK_Example primary key, Title varchar(200) not null ) go -- populate the table insert dbo.Example (Title) values ('One'),('Two'),('Three'),('Four'),('Five'); go -- enable CDC for the table if not exists (select * from sys.tables where is_tracked_by_cdc = 1 and name = 'Example') exec sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'Example', @role_name = 'CDC_Reader' go -- populate the table with some data. We will change or delete something update dbo.Example set Title = reverse(Title) where ID in (2,3,4); delete from dbo.Example where ID in (1,2); set identity_insert dbo.Example on; insert dbo.Example (ID, Title) values (1,'One'),(6,'Six'); set identity_insert dbo.Example off; go
अब, आइए देखें कि इस स्क्रिप्ट को dbo.Example और cdc.dbo_Example_CT तालिकाओं में निष्पादित करने के बाद हमारे पास क्या है (यह ध्यान दिया जाना चाहिए कि सीडीसी अतुल्यकालिक है। डेटा उन तालिकाओं में भर जाता है जहां परिवर्तन ट्रैकिंग एक निश्चित अवधि के बाद संग्रहीत होती है। )।
select * from dbo.Example;
ID Title ---- ---------------------- 1 One 3 eerhT 4 ruoF 5 Five 6 Six
select row_number() over ( partition by ID order by __$start_lsn desc, __$seqval desc ) as __$rn, * from cdc.dbo_Example_CT;
__$rn __$start_lsn __$end_lsn __$seqval __$operation __$update_mask ID Title ------ ---------------------- ----------- ---------------------- ------------ ---------------- --- ----------- 1 0x0000003A000000580005 NULL 0x0000003A000000580003 2 0x03 1 One 2 0x0000003A000000560006 NULL 0x0000003A000000560002 1 0x03 1 One 1 0x0000003A000000560006 NULL 0x0000003A000000560005 1 0x03 2 owT 2 0x0000003A000000540005 NULL 0x0000003A000000540002 3 0x02 2 Two 3 0x0000003A000000540005 NULL 0x0000003A000000540002 4 0x02 2 owT 1 0x0000003A000000540005 NULL 0x0000003A000000540003 3 0x02 3 Three 2 0x0000003A000000540005 NULL 0x0000003A000000540003 4 0x02 3 eerhT 1 0x0000003A000000540005 NULL 0x0000003A000000540004 3 0x02 4 Four 2 0x0000003A000000540005 NULL 0x0000003A000000540004 4 0x02 4 ruoF 1 0x0000003A000000580005 NULL 0x0000003A000000580004 2 0x03
उस तालिका संरचना पर विस्तार से विचार करें जिसमें परिवर्तन ट्रैकिंग संग्रहीत है। __ $start_lsn और __ $seqval फ़ील्ड क्रमशः LSN (डेटाबेस में लॉग अनुक्रम संख्या) और लेनदेन के भीतर लेनदेन संख्या हैं। इन क्षेत्रों में एक महत्वपूर्ण संपत्ति है, अर्थात्, हम यह सुनिश्चित कर सकते हैं कि उच्च एलएसएन के साथ रिकॉर्ड बाद में किया जाएगा। इस संपत्ति के कारण, हम आसानी से क्वेरी में प्रत्येक रिकॉर्ड की नवीनतम स्थिति प्राप्त कर सकते हैं, हमारे चयन को इस शर्त के अनुसार फ़िल्टर कर सकते हैं - जहां __ $ rn =1.
__$ऑपरेशन फ़ील्ड में लेन-देन कोड होता है:
- 1 - रिकॉर्ड हटा दिया गया है
- 2 - रिकॉर्ड डाला गया है
- 3, 4 - रिकॉर्ड अपडेट किया जाता है। अपडेट से पहले पुराना डेटा 3 है, नया डेटा 4 है।
उपसर्ग «__$» के साथ सेवा क्षेत्रों के अलावा, मूल तालिका के क्षेत्र पूरी तरह से डुप्लिकेट हैं। यह जानकारी हमारे लिए वृद्धिशील भार पर आगे बढ़ने के लिए पर्याप्त है।
डेटा लोड करने के लिए डेटाबेस सेट करना
हमारे परीक्षण लक्ष्य डेटाबेस में एक तालिका बनाएं, जिसमें डेटा लोड किया जाएगा, साथ ही लोड लॉग के बारे में डेटा संग्रहीत करने के लिए एक अतिरिक्त तालिका भी बनाएं।
use master go -- create a target database if not exists (select * from sys.databases where name = 'db_dst_cdc') create database db_dst_cdc go use db_dst_cdc go -- create a table if object_id('dbo.Example','U') is null create table dbo.Example ( ID int constraint PK_Example primary key, Title varchar(200) not null ) go -- create a table to store the load log if object_id('dbo.log_cdc','U') is null create table dbo.log_cdc ( table_name nvarchar(512) not null, dt datetime not null default getdate(), lsn binary(10) not null default(0x0), constraint pk_log_cdc primary key (table_name,dt desc) ) go
मैं आपका ध्यान LOG_CDC तालिका के क्षेत्रों की ओर आकर्षित करना चाहता हूं:
- TABLE_NAME इस बारे में जानकारी संग्रहीत करता है कि किस तालिका को लोड किया गया था (भविष्य में विभिन्न डेटाबेस से या विभिन्न सर्वरों से भी कई तालिकाओं को लोड करना संभव है; तालिका प्रारूप 'SERVER_NAME.DB_NAME.SCHEMA_NAME.TABLE_NAME' है।
- DT लोडिंग दिनांक और समय का एक क्षेत्र है, जो वृद्धिशील लोड के लिए वैकल्पिक है। हालांकि, यह लोडिंग ऑडिटिंग के लिए उपयोगी होगा।
- LSN - एक टेबल लोड होने के बाद, यदि आवश्यक हो, तो हमें उस स्थान के बारे में जानकारी संग्रहीत करने की आवश्यकता है जहां अगला लोड शुरू करना है। तदनुसार, प्रत्येक लोड के बाद, हम इस कॉलम में नवीनतम (अधिकतम) __ $ start_lsn जोड़ते हैं।
डेटा लोड करने के लिए एल्गोरिदम
जैसा कि ऊपर वर्णित है, क्वेरी का उपयोग करके, हम विंडो फ़ंक्शंस की सहायता से तालिका की नवीनतम स्थिति प्राप्त कर सकते हैं। यदि हम नवीनतम लोड के एलएसएन को जानते हैं, तो अगली बार जब हम लोड करते हैं तो हम स्रोत से सभी डेटा को फ़िल्टर कर सकते हैं, जिनमें से परिवर्तन संग्रहीत एलएसएन से अधिक होते हैं, यदि कम से कम एक पूर्ण पिछला लोड था:
with incr_Example as ( select row_number() over ( partition by ID order by __$start_lsn desc, __$seqval desc ) as __$rn, * from db_src_cdc.cdc.dbo_Example_CT where __$operation <> 3 and __$start_lsn > @lsn ) select * from incr_Example
फिर, हम पूरे लोड के लिए सभी रिकॉर्ड प्राप्त कर सकते हैं, अगर लोड एलएसएन संग्रहीत नहीं है:
with incr_Example as ( select row_number() over ( partition by ID order by __$start_lsn desc, __$seqval desc ) as __$rn, * from db_src_cdc.cdc.dbo_Example_CT where __$operation <> 3 and __$start_lsn > @lsn ) , full_Example as ( select * from db_src_cdc.dbo.Example where @lsn is null ) select ID, Title, __$operation from incr_Example where __$rn = 1 union all select ID, Title, 2 as __$operation from full_Example
इस प्रकार, @LSN मान के आधार पर, यह क्वेरी या तो स्थिति के साथ सभी नवीनतम परिवर्तन (अंतरिम वाले को छोड़कर) प्रदर्शित करेगी या नहीं, या मूल तालिका से सभी डेटा, स्थिति 2 (नया रिकॉर्ड) जोड़कर - यह फ़ील्ड केवल दो चयनों को एकीकृत करने के लिए उपयोग किया जाता है। इस क्वेरी के साथ, हम MERGE कमांड (SQL 2008 संस्करण से शुरू करके) का उपयोग करके या तो पूर्ण लोड या पुनः लोड को आसानी से लागू कर सकते हैं।
बाधाओं से बचने के लिए जो वैकल्पिक प्रक्रियाएं बना सकते हैं और विभिन्न तालिकाओं से मिलान किए गए डेटा को लोड करने के लिए (भविष्य में, हम कई तालिकाओं को लोड करेंगे और संभवतः, उनके बीच संबंधपरक संबंध हो सकते हैं), मैं स्रोत डेटाबेस पर एक डीबी स्नैपशॉट का उपयोग करने का सुझाव देता हूं ( एक और SQL 2008 सुविधा)।
लोड का पूरा टेक्स्ट इस प्रकार है:
[विस्तार शीर्षक ="कोड"]
/* Algorithm of data loading */ -- create a database snapshot if exists (select * from sys.databases where name = 'db_src_cdc_ss' ) drop database db_src_cdc_ss; declare @query nvarchar(max); select @query = N'create database db_src_cdc_ss on ( name = N'''+name+ ''', filename = N'''+[filename]+'.ss'' ) as snapshot of db_src_cdc' from db_src_cdc.sys.sysfiles where groupid = 1; exec ( @query ); -- read LSN from the previous load declare @lsn binary(10) = (select max(lsn) from db_dst_cdc.dbo.log_cdc where table_name = 'localhost.db_src_cdc.dbo.Example'); -- clear a table before the complete load if @lsn is null truncate table db_dst_cdc.dbo.Example; -- load process with incr_Example as ( select row_number() over ( partition by ID order by __$start_lsn desc, __$seqval desc ) as __$rn, * from db_src_cdc_ss.cdc.dbo_Example_CT where __$operation <> 3 and __$start_lsn > @lsn ) , full_Example as ( select * from db_src_cdc_ss.dbo.Example where @lsn is null ) , cte_Example as ( select ID, Title, __$operation from incr_Example where __$rn = 1 union all select ID, Title, 2 as __$operation from full_Example ) merge db_dst_cdc.dbo.Example as trg using cte_Example as src on trg.ID=src.ID when matched and __$operation = 1 then delete when matched and __$operation <> 1 then update set trg.Title = src.Title when not matched by target and __$operation <> 1 then insert (ID, Title) values (src.ID, src.Title); -- mark the end of the load process and the latest LSN insert db_dst_cdc.dbo.log_cdc (table_name, lsn) values ('localhost.db_src_cdc.dbo.Example', isnull((select max(__$start_lsn) from db_src_cdc_ss.cdc.dbo_Example_CT),0)) -- delete the database snapshot if exists (select * from sys.databases where name = 'db_src_cdc_ss' ) drop database db_src_cdc_ss
[/विस्तार]