Change Data Capture is a mechanism in SQL Server to capture the changes in the database for specified retain period (retention window). You can find more information here.
In this example i create something like POC (Proof of Concept) to implement CDC in my data ingestion design. In few words:
Tables:
- table_to_extract – the tables to ingest
- exiract_parameters – the parameters that drive the ingestion (most likely datetime_from, datetime_to)
Dataflow:
- The tables in table_to_extract are looped
- The parameters for each table are set and used to filter the incremental data
In this example the parameters that drive the incremental ingestion are defined by CDC – lsn_from and lsn_to (LSN is log sequence number)
Initialize
Create database
USE [master];
GO
IF (DB_ID(N'CDC') IS NOT NULL)
BEGIN
ALTER DATABASE CDC SET SINGLE_USER WITH ROLLBACK IMMEDIATE;
DROP DATABASE CDC;
END
GO
CREATE DATABASE CDC;
GO
Enable CDC on database level
IF ( SELECT is_cdc_enabled FROM [master].sys.databases WHERE [name] = N'CDC' ) = 0 BEGIN EXEC CDC.sys.sp_cdc_enable_db; END GO
Create tables and enable CDC on table level
USE CDC;
GO
DROP TABLE IF EXISTS dbo.src_table_1;
CREATE TABLE dbo.src_table_1 (
pk_col_1 INT IDENTITY(1, 1)
, col_2 NVARCHAR(128)
, col_3 NVARCHAR(128)
, CONSTRAINT PK__dbo_src_table_1 PRIMARY KEY (pk_col_1)
);
IF (SELECT is_tracked_by_cdc FROM sys.tables WHERE SCHEMA_NAME([schema_id]) = N'dbo' AND [name] = N'src_table_1') = 0
BEGIN EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'src_table_1', @role_name = NULL; END
DROP TABLE IF EXISTS dbo.src_table_2;
CREATE TABLE dbo.src_table_2 (
pk_col_1 INT
, pk_col_2 INT
, col_3 NVARCHAR(128)
, col_4 NVARCHAR(128)
, CONSTRAINT PK__dbo_src_table_2 PRIMARY KEY (pk_col_1, pk_col_2)
);
IF (SELECT is_tracked_by_cdc FROM sys.tables WHERE SCHEMA_NAME([schema_id]) = N'dbo' AND [name] = N'src_table_2') = 0
BEGIN EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'src_table_2', @role_name = NULL; END
DROP TABLE IF EXISTS dbo.dstn_table_1;
CREATE TABLE dbo.dstn_table_1 (
pk_col_1 INT
, col_2 NVARCHAR(128)
, col_3 NVARCHAR(128)
);
DROP TABLE IF EXISTS dbo.dstn_table_2;
CREATE TABLE dbo.dstn_table_2 (
pk_col_1 INT
, pk_col_2 INT
, col_3 NVARCHAR(128)
, col_4 NVARCHAR(128)
);
DROP TABLE IF EXISTS dbo.tables_to_extract;
CREATE TABLE dbo.tables_to_extract (
id INT IDENTITY(1, 1)
, src_server_name SYSNAME NOT NULL
, src_database_name SYSNAME NOT NULL
, src_schema_name SYSNAME NOT NULL
, src_table_name SYSNAME NOT NULL
, dstn_server_name SYSNAME
, dstn_database_name SYSNAME
, dstn_schema_name SYSNAME
, dstn_table_name SYSNAME
, CONSTRAINT PK_dbo_tables_to_extract
PRIMARY KEY (src_server_name, src_database_name, src_schema_name, src_table_name)
);
DROP TABLE IF EXISTS dbo.extract_parameters;
CREATE TABLE dbo.extract_parameters (
src_server_name SYSNAME NOT NULL
, src_database_name SYSNAME NOT NULL
, src_schema_name SYSNAME NOT NULL
, src_table_name SYSNAME NOT NULL
, lsn_from BINARY(10) NOT NULL
, lsn_to BINARY(10)
, CONSTRAINT PK_dbo_extract_parameters
PRIMARY KEY (src_server_name, src_database_name, src_schema_name, src_table_name)
, CONSTRAINT FK_extract_parameters_tables_to_extract
FOREIGN KEY (src_server_name, src_database_name, src_schema_name, src_table_name)
REFERENCES dbo.tables_to_extract (src_server_name, src_database_name, src_schema_name, src_table_name)
);
GO
Define the ingested tables
DECLARE @domain NVARCHAR(128); EXEC [master]..xp_regread @rootkey = N'HKEY_LOCAL_MACHINE' , @key = N'SYSTEM\ControlSet001\Services\Tcpip\Parameters\' , @value_name = N'Domain' , @value = @domain OUTPUT; INSERT dbo.tables_to_extract ( src_server_name, src_database_name, src_schema_name, src_table_name , dstn_server_name, dstn_database_name, dstn_schema_name, dstn_table_name ) SELECT CONCAT(CAST(SERVERPROPERTY(N'ServerName') AS NVARCHAR(128)), @domain), N'CDC', N'dbo', N'src_table_1' , CONCAT(CAST(SERVERPROPERTY(N'ServerName') AS NVARCHAR(128)), @domain), N'CDC', N'dbo', N'dstn_table_1'; INSERT dbo.tables_to_extract ( src_server_name, src_database_name, src_schema_name, src_table_name , dstn_server_name, dstn_database_name, dstn_schema_name, dstn_table_name ) SELECT CONCAT(CAST(SERVERPROPERTY(N'ServerName') AS NVARCHAR(128)), @domain), N'CDC', N'dbo', N'src_table_2' , CONCAT(CAST(SERVERPROPERTY(N'ServerName') AS NVARCHAR(128)), @domain), N'CDC', N'dbo', N'dstn_table_2'; GO
Create the ingestion stored procedure
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
-- =============================================
-- Author: Peter Lalovsky
-- Create date: 2026-05-06
-- Description: Run dynamic ETL for specified tables with CDC
-- =============================================
CREATE OR ALTER PROCEDURE dbo.usp_run_cdc_etl AS
BEGIN
-- Variables
DECLARE
@src_server_name SYSNAME
, @src_database_name SYSNAME
, @src_schema_name SYSNAME
, @src_table_name SYSNAME
, @dstn_server_name SYSNAME
, @dstn_database_name SYSNAME
, @dstn_schema_name SYSNAME
, @dstn_table_name SYSNAME
, @loop_id INT = 1
, @max_id INT
, @capture_instance NVARCHAR(257)
, @lsn_from BINARY(10)
, @lsn_to BINARY(10)
, @star NVARCHAR(MAX)
, @on NVARCHAR(MAX)
, @matched NVARCHAR(MAX)
, @update NVARCHAR(MAX)
, @insert NVARCHAR(MAX)
, @values NVARCHAR(MAX)
, @dynamic_sql NVARCHAR(MAX);
SELECT @max_id = MAX(id)
FROM dbo.tables_to_extract;
-- List the columns in the source table and flag the PKs
DECLARE @PKs TABLE (
column_name SYSNAME
, is_pk BIT
);
WHILE (@loop_id <= @max_id)
BEGIN -- Loop dbo.tables_to_extract
-- Get variables
SELECT
@src_server_name = src_server_name
, @src_database_name = src_database_name
, @src_schema_name = src_schema_name
, @src_table_name = src_table_name
, @capture_instance = CONCAT(src_schema_name, N'_', src_table_name)
, @dstn_server_name = dstn_server_name
, @dstn_database_name = dstn_database_name
, @dstn_schema_name = dstn_schema_name
, @dstn_table_name = dstn_table_name
FROM dbo.tables_to_extract
WHERE id = @loop_id;
SET @loop_id += 1;
-- UPSERT the source table in dbo.extract_parameters
IF NOT EXISTS (
SELECT 1
FROM dbo.extract_parameters
WHERE
src_server_name = @src_server_name
AND src_database_name = @src_database_name
AND src_schema_name = @src_schema_name
AND src_table_name = @src_table_name
)
BEGIN
INSERT dbo.extract_parameters (
src_server_name, src_database_name, src_schema_name
, src_table_name, lsn_from, lsn_to
)
SELECT
@src_server_name
, @src_database_name
, @src_schema_name
, @src_table_name
, sys.fn_cdc_get_min_lsn(@capture_instance)
, sys.fn_cdc_get_max_lsn();
END
ELSE
BEGIN
UPDATE dbo.extract_parameters
SET lsn_to = sys.fn_cdc_get_max_lsn()
WHERE
src_server_name = @src_server_name
AND src_database_name = @src_database_name
AND src_schema_name = @src_schema_name
AND src_table_name = @src_table_name;
END
-- Get the LSNs from dbo.extract_parameters
SELECT
@lsn_from = lsn_from
, @lsn_to = lsn_to
FROM dbo.extract_parameters
WHERE
src_server_name = @src_server_name
AND src_database_name = @src_database_name
AND src_schema_name = @src_schema_name
AND src_table_name = @src_table_name;
-- List the columns in source table
DELETE @PKs;
INSERT @PKs (column_name, is_pk)
SELECT
C.column_name
, CASE
WHEN PK.COLUMN_NAME IS NOT NULL THEN CAST(1 AS BIT)
ELSE CAST(0 AS BIT)
END AS is_pk
FROM
INFORMATION_SCHEMA.COLUMNS AS C
LEFT JOIN
(
SELECT
KU.TABLE_CATALOG
, KU.TABLE_SCHEMA
, KU.TABLE_NAME
, KU.COLUMN_NAME
FROM
INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS TC
JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS KU ON TC.TABLE_CATALOG = KU.TABLE_CATALOG
AND TC.TABLE_SCHEMA = KU.TABLE_SCHEMA
AND TC.TABLE_NAME = KU.TABLE_NAME
AND TC.CONSTRAINT_NAME = KU.CONSTRAINT_NAME
WHERE
TC.CONSTRAINT_TYPE = 'PRIMARY KEY'
AND KU.TABLE_CATALOG = @src_database_name
AND KU.TABLE_SCHEMA = @src_schema_name
AND KU.TABLE_NAME = @src_table_name
) AS PK ON C.TABLE_CATALOG = PK.TABLE_CATALOG
AND C.TABLE_SCHEMA = PK.TABLE_SCHEMA
AND C.TABLE_NAME = PK.TABLE_NAME
AND C.COLUMN_NAME = PK.COLUMN_NAME
WHERE
C.TABLE_CATALOG = @src_database_name
AND C.TABLE_SCHEMA = @src_schema_name
AND C.TABLE_NAME = @src_table_name;
-- Prepare the clauses for the MERGE dynamic code
SELECT @on = STRING_AGG(CONCAT(N'T.', QUOTENAME(column_name), N' = S.', QUOTENAME(column_name)), N' AND ')
FROM @PKs
WHERE is_pk = 1;
SELECT
@star = STRING_AGG(QUOTENAME(column_name), N', ')
, @insert = STRING_AGG(QUOTENAME(column_name), N', ')
, @values = STRING_AGG(CONCAT(N'S.', QUOTENAME(column_name)), N', ')
FROM @PKs;
SELECT
@matched = STRING_AGG(CONCAT(N'T.', QUOTENAME(column_name), N' != S.', QUOTENAME(column_name)), N' OR ')
, @update = STRING_AGG(CONCAT(N'T.', QUOTENAME(column_name), N' = S.', QUOTENAME(column_name)), N', ')
FROM @PKs
WHERE is_pk = 0;
-- MERGE (INSERT, UPDATE, DELETE)
SET @dynamic_sql = CONCAT(N'/* DELETE */
DELETE T
FROM
', QUOTENAME(@dstn_schema_name), N'.', QUOTENAME(@dstn_table_name) ,' AS T
JOIN cdc.fn_cdc_get_net_changes_', @capture_instance, N'(@lsn_from, @lsn_to, ''all'') AS S
ON ', @on, N'
WHERE S.__$operation = 1;
/* MERGE (INSERT, UPDATE) */
DECLARE @merge_results TABLE ([action] NVARCHAR(10));
WITH src_cte AS (
SELECT ', @star, N'
FROM cdc.fn_cdc_get_net_changes_', @capture_instance, N'(@lsn_from, @lsn_to, ''all'')
WHERE __$operation IN (2, 4)
)
MERGE ', QUOTENAME(@dstn_schema_name), N'.', QUOTENAME(@dstn_table_name) ,' AS T
USING src_cte AS S
ON ', @on, N'
WHEN MATCHED AND (', @matched, N') THEN
UPDATE SET
', @update, N'
WHEN NOT MATCHED BY TARGET THEN
INSERT (', @insert, N')
VALUES (', @values, N');');
print N'@lsn_from:'
print @lsn_from
print N'@lsn_to:'
print @lsn_to
print N'@dynamic_sql:'
print @dynamic_sql
EXEC sp_executesql
@dynamic_sql
, N'@src_server_name SYSNAME
, @src_database_name SYSNAME
, @src_schema_name SYSNAME
, @src_table_name SYSNAME
, @lsn_from BINARY(10)
, @lsn_to BINARY(10)'
, @src_server_name = @src_server_name
, @src_database_name = @src_database_name
, @src_schema_name = @src_schema_name
, @src_table_name = @src_table_name
, @lsn_from = @lsn_from
, @lsn_to = @lsn_to;
-- UPDATE LSNs in dbo.extract_parameters
UPDATE dbo.extract_parameters
SET
lsn_from = lsn_to
, lsn_to = NULL
WHERE
src_server_name = @src_server_name
AND src_database_name = @src_database_name
AND src_schema_name = @src_schema_name
AND src_table_name = @src_table_name;
END -- Loop dbo.tables_to_extract
END
GO
The dataflow of the stored procedure:
- Loop table_to_extract
- UPSERT the source table in extract_parameters
- If the table doesn’t exist – INSERT a row with lsn_from, lsn_to
- If the table doesn’t exist – UPDATE lsn_to
- List the columns in the source table and flag the PKs into table @PKs
- Prepare the clauses for the MERGE dynamic code from table @PKs
- Create @dynamic_sql and
- DELETE
- MERGE
- UPDATE LSNs in extract_parameters
- lsn_from = lsn_to
- lsn_to = NULL
Testing
Show table_to_extract
select * from dbo.tables_to_extract

INSERT
INSERT dbo.src_table_1 (col_2, col_3) SELECT N'Value 11', N'Value 12' UNION ALL SELECT N'Value 21', N'Value 22' UNION ALL SELECT N'Value 31', N'Value 32'; INSERT dbo.src_table_2 (pk_col_1, pk_col_2, col_3, col_4) SELECT 1, 2, N'Value 11', N'Value 12' UNION ALL SELECT 3, 4, N'Value 21', N'Value 22' UNION ALL SELECT 5, 6, N'Value 31', N'Value 32';
Ingest
EXEC dbo.usp_run_cdc_etl;
The generated @dynamic_sql
@lsn_from:
0x00000025000009EC0040
@lsn_to:
0x0000002600000B40001E
@dynamic_sql:
/* DELETE */
DELETE T
FROM
[dbo].[dstn_table_1] AS T
JOIN cdc.fn_cdc_get_net_changes_dbo_src_table_1(@lsn_from, @lsn_to, 'all') AS S
ON T.[pk_col_1] = S.[pk_col_1]
WHERE S.__$operation = 1;
/* MERGE (INSERT, UPDATE) */
DECLARE @merge_results TABLE ([action] NVARCHAR(10));
WITH src_cte AS (
SELECT [pk_col_1], [col_2], [col_3]
FROM cdc.fn_cdc_get_net_changes_dbo_src_table_1(@lsn_from, @lsn_to, 'all')
WHERE __$operation IN (2, 4)
)
MERGE [dbo].[dstn_table_1] AS T
USING src_cte AS S
ON T.[pk_col_1] = S.[pk_col_1]
WHEN MATCHED AND (T.[col_2] != S.[col_2] OR T.[col_3] != S.[col_3]) THEN
UPDATE SET
T.[col_2] = S.[col_2], T.[col_3] = S.[col_3]
WHEN NOT MATCHED BY TARGET THEN
INSERT ([pk_col_1], [col_2], [col_3])
VALUES (S.[pk_col_1], S.[col_2], S.[col_3]);
@lsn_from:
0x000000260000031B0050
@lsn_to:
0x0000002600000B40001E
@dynamic_sql:
/* DELETE */
DELETE T
FROM
[dbo].[dstn_table_2] AS T
JOIN cdc.fn_cdc_get_net_changes_dbo_src_table_2(@lsn_from, @lsn_to, 'all') AS S
ON T.[pk_col_1] = S.[pk_col_1] AND T.[pk_col_2] = S.[pk_col_2]
WHERE S.__$operation = 1;
/* MERGE (INSERT, UPDATE) */
DECLARE @merge_results TABLE ([action] NVARCHAR(10));
WITH src_cte AS (
SELECT [pk_col_1], [pk_col_2], [col_3], [col_4]
FROM cdc.fn_cdc_get_net_changes_dbo_src_table_2(@lsn_from, @lsn_to, 'all')
WHERE __$operation IN (2, 4)
)
MERGE [dbo].[dstn_table_2] AS T
USING src_cte AS S
ON T.[pk_col_1] = S.[pk_col_1] AND T.[pk_col_2] = S.[pk_col_2]
WHEN MATCHED AND (T.[col_3] != S.[col_3] OR T.[col_4] != S.[col_4]) THEN
UPDATE SET
T.[col_3] = S.[col_3], T.[col_4] = S.[col_4]
WHEN NOT MATCHED BY TARGET THEN
INSERT ([pk_col_1], [pk_col_2], [col_3], [col_4])
VALUES (S.[pk_col_1], S.[pk_col_2], S.[col_3], S.[col_4]);
Remarks
- We JOIN on all the PK columns
- We UPDATE only if there is a change
Show table_1
select * from dbo.src_table_1 select * from dbo.dstn_table_1

Show table_2

UPDATE
UPDATE dbo.src_table_1 SET col_2 = 'Value 21 New 1' , col_3 = 'Value 22 New 1' WHERE pk_col_1 = 2; WAITFOR DELAY '00:00:05'; UPDATE dbo.src_table_1 SET col_2 = 'Value 21 New 2' , col_3 = 'Value 22 New 2' WHERE pk_col_1 = 2; UPDATE dbo.src_table_2 SET col_3 = N'Value 11 New 1' WHERE pk_col_1 = 1;
I run multiple UPDATEs in order to ensure that cdc.fn_cdc_get_net_changes will return the last change
Ingest
EXEC dbo.usp_run_cdc_etl;


DELETE
DELETE dbo.src_table_1 WHERE pk_col_1 = 3; DELETE dbo.src_table_2 WHERE pk_col_1 = 1;
Ingest
EXEC dbo.usp_run_cdc_etl;


Show extract_parameter
select * from dbo.extract_parameters

Keep it simple :-)
