Sqlserver
 sql >> डेटाबेस >  >> RDS >> Sqlserver

SQL सर्वर में चेंज डेटा कैप्चर का उपयोग करके इंक्रीमेंटल लोड को लागू करना

यह लेख उन लोगों के लिए दिलचस्प होगा जिन्हें अक्सर डेटा एकीकरण से निपटना पड़ता है।

परिचय

मान लें कि एक डेटाबेस है जहां उपयोगकर्ता हमेशा डेटा को संशोधित करते हैं (अपडेट या निकालें)। शायद, इस डेटाबेस का उपयोग एक बड़े एप्लिकेशन द्वारा किया जाता है जो तालिका संरचना को संशोधित करने की अनुमति नहीं देता है। कार्य समय-समय पर एक अलग सर्वर पर इस डेटाबेस से दूसरे डेटाबेस में डेटा लोड करना है। समस्या से निपटने का सबसे सरल तरीका लक्ष्य डेटाबेस को प्रारंभिक सफाई के साथ स्रोत डेटाबेस से लक्ष्य डेटाबेस में नए डेटा को लोड करना है। आप इस पद्धति का उपयोग तब तक कर सकते हैं जब तक डेटा लोड करने का समय स्वीकार्य है और पूर्व निर्धारित समय सीमा से अधिक नहीं है। क्या होगा यदि डेटा लोड करने में कई दिन लगते हैं? इसके अलावा, अस्थिर संचार चैनल उस स्थिति की ओर ले जाते हैं जब डेटा लोड रुक जाता है और पुनरारंभ होता है। यदि आप इन बाधाओं का सामना करते हैं, तो मेरा सुझाव है कि 'डेटा रीलोडिंग' एल्गोरिदम में से एक पर विचार करें। इसका मतलब है कि नवीनतम लोड लोड होने के बाद से केवल डेटा संशोधन हुआ है।

सीडीसी

SQL सर्वर 2008 में, Microsoft ने एक डेटा ट्रैकिंग तंत्र की शुरुआत की, जिसे चेंज डेटा कैप्चर (CDC) कहा जाता है। मोटे तौर पर, इस तंत्र का उद्देश्य यह है कि किसी भी डेटाबेस तालिका के लिए सीडीसी को सक्षम करने से मूल तालिका के समान नाम के साथ उसी डेटाबेस में एक सिस्टम तालिका तैयार हो जाएगी (स्कीमा इस प्रकार होगी:'cdc' उपसर्ग के रूप में प्लस पुराना स्कीमा नाम प्लस "_" और अंत "_CT"। उदाहरण के लिए, मूल तालिका dbo है। उदाहरण, तो सिस्टम तालिका को cdc.dbo_Example_CT कहा जाएगा)। यह संशोधित किए गए सभी डेटा को संग्रहीत करेगा।

दरअसल, सीडीसी में गहरी खुदाई करने के लिए, उदाहरण पर विचार करें। लेकिन पहले, सुनिश्चित करें कि सीडीसी का उपयोग करने वाला SQL एजेंट SQL सर्वर परीक्षण उदाहरण पर काम करता है।

इसके अलावा, हम एक स्क्रिप्ट पर विचार करने जा रहे हैं जो एक डेटाबेस और परीक्षण तालिका बनाता है, इस तालिका को डेटा से भरता है और इस तालिका के लिए सीडीसी को सक्षम करता है।

कार्य को समझने और सरल बनाने के लिए, हम विभिन्न सर्वरों को स्रोत और लक्ष्य डेटाबेस वितरित किए बिना एक SQL सर्वर इंस्टेंस का उपयोग करेंगे।

use master
go
-- create a source database
if not exists (select * from sys.databases where name = 'db_src_cdc')
	create database db_src_cdc
go
use db_src_cdc
go
-- enable CDC if it is disabled
if not exists (select * from sys.databases where name = db_name() and is_cdc_enabled=1)
	exec sys.sp_cdc_enable_db
go
-- create a role for tables with CDC
if not exists(select * from sys.sysusers where name = 'CDC_Reader' and issqlrole=1)
	create role CDC_Reader
go
-- create a table
if object_id('dbo.Example','U') is null
	create table dbo.Example
	(
		ID int identity constraint PK_Example primary key,
		Title varchar(200) not null
	)
go
-- populate the table
insert dbo.Example (Title) values
('One'),('Two'),('Three'),('Four'),('Five');
go
-- enable CDC for the table
if not exists (select * from sys.tables where is_tracked_by_cdc = 1 and name = 'Example')
	exec sys.sp_cdc_enable_table
		@source_schema = 'dbo',
		@source_name = 'Example',
		@role_name = 'CDC_Reader'
go
-- populate the table with some data. We will change or delete something
update dbo.Example
set Title = reverse(Title)
where ID in (2,3,4);

delete from dbo.Example where ID in (1,2);

set identity_insert dbo.Example on;
insert dbo.Example (ID, Title) values
(1,'One'),(6,'Six');
set identity_insert dbo.Example off;
go

अब, आइए देखें कि इस स्क्रिप्ट को dbo.Example और cdc.dbo_Example_CT तालिकाओं में निष्पादित करने के बाद हमारे पास क्या है (यह ध्यान दिया जाना चाहिए कि सीडीसी अतुल्यकालिक है। डेटा उन तालिकाओं में भर जाता है जहां परिवर्तन ट्रैकिंग एक निश्चित अवधि के बाद संग्रहीत होती है। )।

select * from dbo.Example;
ID Title
---- ----------------------
   1 One
   3 eerhT 
   4 ruoF 
   5 Five 
   6 Six
select
	row_number() over
		(
			partition by ID
			order by
				__$start_lsn desc,
				__$seqval desc
		) as __$rn,
	*
from cdc.dbo_Example_CT;
__$rn __$start_lsn           __$end_lsn  __$seqval              __$operation __$update_mask    ID Title
------ ---------------------- ----------- ---------------------- ------------ ---------------- --- -----------
     1 0x0000003A000000580005 NULL        0x0000003A000000580003            2 0x03               1 One
     2 0x0000003A000000560006 NULL        0x0000003A000000560002            1 0x03               1 One
     1 0x0000003A000000560006 NULL        0x0000003A000000560005            1 0x03               2 owT
     2 0x0000003A000000540005 NULL        0x0000003A000000540002            3 0x02               2 Two
     3 0x0000003A000000540005 NULL        0x0000003A000000540002            4 0x02               2 owT
     1 0x0000003A000000540005 NULL        0x0000003A000000540003            3 0x02               3 Three
     2 0x0000003A000000540005 NULL        0x0000003A000000540003            4 0x02               3 eerhT
     1 0x0000003A000000540005 NULL        0x0000003A000000540004            3 0x02               4 Four
     2 0x0000003A000000540005 NULL        0x0000003A000000540004            4 0x02               4 ruoF
     1 0x0000003A000000580005 NULL        0x0000003A000000580004            2 0x03

उस तालिका संरचना पर विस्तार से विचार करें जिसमें परिवर्तन ट्रैकिंग संग्रहीत है। __ $start_lsn और __ $seqval फ़ील्ड क्रमशः LSN (डेटाबेस में लॉग अनुक्रम संख्या) और लेनदेन के भीतर लेनदेन संख्या हैं। इन क्षेत्रों में एक महत्वपूर्ण संपत्ति है, अर्थात्, हम यह सुनिश्चित कर सकते हैं कि उच्च एलएसएन के साथ रिकॉर्ड बाद में किया जाएगा। इस संपत्ति के कारण, हम आसानी से क्वेरी में प्रत्येक रिकॉर्ड की नवीनतम स्थिति प्राप्त कर सकते हैं, हमारे चयन को इस शर्त के अनुसार फ़िल्टर कर सकते हैं - जहां __ $ rn =1.

__$ऑपरेशन फ़ील्ड में लेन-देन कोड होता है:

  • 1 - रिकॉर्ड हटा दिया गया है
  • 2 - रिकॉर्ड डाला गया है
  • 3, 4 - रिकॉर्ड अपडेट किया जाता है। अपडेट से पहले पुराना डेटा 3 है, नया डेटा 4 है।

उपसर्ग «__$» के साथ सेवा क्षेत्रों के अलावा, मूल तालिका के क्षेत्र पूरी तरह से डुप्लिकेट हैं। यह जानकारी हमारे लिए वृद्धिशील भार पर आगे बढ़ने के लिए पर्याप्त है।

डेटा लोड करने के लिए डेटाबेस सेट करना

हमारे परीक्षण लक्ष्य डेटाबेस में एक तालिका बनाएं, जिसमें डेटा लोड किया जाएगा, साथ ही लोड लॉग के बारे में डेटा संग्रहीत करने के लिए एक अतिरिक्त तालिका भी बनाएं।

use master
go
-- create a target database 
if not exists (select * from sys.databases where name = 'db_dst_cdc')
	create database db_dst_cdc
go
use db_dst_cdc
go
-- create a table
if object_id('dbo.Example','U') is null
	create table dbo.Example
	(
		ID int constraint PK_Example primary key,
		Title varchar(200) not null
	)
go
-- create a table to store the load log
if object_id('dbo.log_cdc','U') is null
	create table dbo.log_cdc
	(
		table_name nvarchar(512) not null,
		dt datetime not null default getdate(),
		lsn binary(10) not null default(0x0),
		constraint pk_log_cdc primary key (table_name,dt desc)
	)
go

मैं आपका ध्यान LOG_CDC तालिका के क्षेत्रों की ओर आकर्षित करना चाहता हूं:

  • TABLE_NAME इस बारे में जानकारी संग्रहीत करता है कि किस तालिका को लोड किया गया था (भविष्य में विभिन्न डेटाबेस से या विभिन्न सर्वरों से भी कई तालिकाओं को लोड करना संभव है; तालिका प्रारूप 'SERVER_NAME.DB_NAME.SCHEMA_NAME.TABLE_NAME'
  • है।
  • DT लोडिंग दिनांक और समय का एक क्षेत्र है, जो वृद्धिशील लोड के लिए वैकल्पिक है। हालांकि, यह लोडिंग ऑडिटिंग के लिए उपयोगी होगा।
  • LSN - एक टेबल लोड होने के बाद, यदि आवश्यक हो, तो हमें उस स्थान के बारे में जानकारी संग्रहीत करने की आवश्यकता है जहां अगला लोड शुरू करना है। तदनुसार, प्रत्येक लोड के बाद, हम इस कॉलम में नवीनतम (अधिकतम) __ $ start_lsn जोड़ते हैं।

डेटा लोड करने के लिए एल्गोरिदम

जैसा कि ऊपर वर्णित है, क्वेरी का उपयोग करके, हम विंडो फ़ंक्शंस की सहायता से तालिका की नवीनतम स्थिति प्राप्त कर सकते हैं। यदि हम नवीनतम लोड के एलएसएन को जानते हैं, तो अगली बार जब हम लोड करते हैं तो हम स्रोत से सभी डेटा को फ़िल्टर कर सकते हैं, जिनमें से परिवर्तन संग्रहीत एलएसएन से अधिक होते हैं, यदि कम से कम एक पूर्ण पिछला लोड था:

with incr_Example as
(
	select
		row_number() over
			(
				partition by ID
				order by
					__$start_lsn desc,
					__$seqval desc
			) as __$rn,
		*
	from db_src_cdc.cdc.dbo_Example_CT
	where
		__$operation <> 3 and
		__$start_lsn > @lsn
)
select * from incr_Example

फिर, हम पूरे लोड के लिए सभी रिकॉर्ड प्राप्त कर सकते हैं, अगर लोड एलएसएन संग्रहीत नहीं है:

with incr_Example as
(
	select
		row_number() over
			(
				partition by ID
				order by
					__$start_lsn desc,
					__$seqval desc
			) as __$rn,
		*
	from db_src_cdc.cdc.dbo_Example_CT
	where
		__$operation <> 3 and
		__$start_lsn > @lsn
)
, full_Example as
(
	select *
	from db_src_cdc.dbo.Example
	where @lsn is null
)
select
	ID, Title, __$operation
from incr_Example
where __$rn = 1
union all
select
	ID, Title, 2 as __$operation
from full_Example

इस प्रकार, @LSN मान के आधार पर, यह क्वेरी या तो स्थिति के साथ सभी नवीनतम परिवर्तन (अंतरिम वाले को छोड़कर) प्रदर्शित करेगी या नहीं, या मूल तालिका से सभी डेटा, स्थिति 2 (नया रिकॉर्ड) जोड़कर - यह फ़ील्ड केवल दो चयनों को एकीकृत करने के लिए उपयोग किया जाता है। इस क्वेरी के साथ, हम MERGE कमांड (SQL 2008 संस्करण से शुरू करके) का उपयोग करके या तो पूर्ण लोड या पुनः लोड को आसानी से लागू कर सकते हैं।

बाधाओं से बचने के लिए जो वैकल्पिक प्रक्रियाएं बना सकते हैं और विभिन्न तालिकाओं से मिलान किए गए डेटा को लोड करने के लिए (भविष्य में, हम कई तालिकाओं को लोड करेंगे और संभवतः, उनके बीच संबंधपरक संबंध हो सकते हैं), मैं स्रोत डेटाबेस पर एक डीबी स्नैपशॉट का उपयोग करने का सुझाव देता हूं ( एक और SQL 2008 सुविधा)।

लोड का पूरा टेक्स्ट इस प्रकार है:

[विस्तार शीर्षक ="कोड"]

/*
	Algorithm of data loading
*/
-- create a database snapshot
if exists (select * from sys.databases where name = 'db_src_cdc_ss' )
	drop database db_src_cdc_ss;

declare
	@query nvarchar(max);

select
	@query = N'create database db_src_cdc_ss on ( name = N'''+name+
		''', filename = N'''+[filename]+'.ss'' ) as snapshot of db_src_cdc'
from db_src_cdc.sys.sysfiles where groupid = 1;

exec ( @query );

-- read LSN from the previous load
declare @lsn binary(10) =
	(select max(lsn) from db_dst_cdc.dbo.log_cdc
	where table_name = 'localhost.db_src_cdc.dbo.Example');

-- clear a table before the complete load
if @lsn is null truncate table db_dst_cdc.dbo.Example;

-- load process
with incr_Example as
(
	select
		row_number() over
			(
				partition by ID
				order by
					__$start_lsn desc,
					__$seqval desc
			) as __$rn,
		*
	from db_src_cdc_ss.cdc.dbo_Example_CT
	where
		__$operation <> 3 and
		__$start_lsn > @lsn
)
, full_Example as
(
	select *
	from db_src_cdc_ss.dbo.Example
	where @lsn is null
)
, cte_Example as
(
	select
		ID, Title, __$operation
	from incr_Example
	where __$rn = 1
	union all
	select
		ID, Title, 2 as __$operation
	from full_Example
)
merge db_dst_cdc.dbo.Example as trg using cte_Example as src on trg.ID=src.ID
when matched and __$operation = 1 then delete
when matched and __$operation <> 1 then update set trg.Title = src.Title
when not matched by target and __$operation <> 1 then insert (ID, Title) values (src.ID, src.Title);

-- mark the end of the load process and the latest LSN
insert db_dst_cdc.dbo.log_cdc (table_name, lsn)
values ('localhost.db_src_cdc.dbo.Example', isnull((select max(__$start_lsn) from db_src_cdc_ss.cdc.dbo_Example_CT),0))

-- delete the database snapshot
if exists (select * from sys.databases where name = 'db_src_cdc_ss' )
	drop database db_src_cdc_ss

[/विस्तार]


  1. Database
  2.   
  3. Mysql
  4.   
  5. Oracle
  6.   
  7. Sqlserver
  8.   
  9. PostgreSQL
  10.   
  11. Access
  12.   
  13. SQLite
  14.   
  15. MariaDB
  1. आउटपुट क्लॉज पर फ़िल्टर करें sql

  2. SQL सर्वर में लॉगिन के रूप में सक्रिय निर्देशिका उपयोगकर्ता समूह कैसे जोड़ें

  3. SQL सर्वर में किसी भी / कुछ लॉजिकल ऑपरेटर का उपयोग कैसे करें - SQL सर्वर / TSQL ट्यूटोरियल भाग 127

  4. क्रिस्टल रिपोर्ट बनाम माइक्रोसॉफ्ट एसक्यूएल सर्वर रिपोर्टिंग सेवाएं

  5. T-SQL का उपयोग करके SQL सर्वर में एक विदेशी कुंजी का नाम बदलें