Keep it simple
Keep it simple

T-SQL: Dynamic Ingestion with change data capture (CDC)

Change Data Capture is a mechanism in SQL Server to capture the changes in the database for specified retain period (retention window). You can find more information here.

In this example i create something like POC (Proof of Concept) to implement CDC in my data ingestion design. In few words:

Tables:

  • table_to_extract – the tables to ingest
  • exiract_parameters – the parameters that drive the ingestion (most likely datetime_from, datetime_to)

Dataflow:

  • The tables in table_to_extract are looped
  • The parameters for each table are set and used to filter the incremental data

In this example the parameters that drive the incremental ingestion are defined by CDC – lsn_from and lsn_to (LSN is log sequence number)

Initialize

Create database

USE [master];
GO

IF (DB_ID(N'CDC') IS NOT NULL)
  BEGIN
    ALTER DATABASE CDC SET SINGLE_USER WITH ROLLBACK IMMEDIATE;
    DROP DATABASE CDC;
  END
GO

CREATE DATABASE CDC;
GO

Enable CDC on database level

IF (
  SELECT is_cdc_enabled
  FROM [master].sys.databases
  WHERE [name] = N'CDC'
) = 0
  BEGIN EXEC CDC.sys.sp_cdc_enable_db; END
GO

Create tables and enable CDC on table level

USE CDC;
GO

DROP TABLE IF EXISTS dbo.src_table_1;
CREATE TABLE dbo.src_table_1 (
  pk_col_1 INT         IDENTITY(1, 1)
  , col_2  NVARCHAR(128)
  , col_3  NVARCHAR(128)
  , CONSTRAINT PK__dbo_src_table_1 PRIMARY KEY (pk_col_1)
);
IF (SELECT is_tracked_by_cdc FROM sys.tables WHERE SCHEMA_NAME([schema_id]) = N'dbo' AND [name] = N'src_table_1') = 0
  BEGIN EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'src_table_1', @role_name = NULL; END

DROP TABLE IF EXISTS dbo.src_table_2;
CREATE TABLE dbo.src_table_2 (
  pk_col_1   INT
  , pk_col_2 INT
  , col_3    NVARCHAR(128)
  , col_4    NVARCHAR(128)
  , CONSTRAINT PK__dbo_src_table_2 PRIMARY KEY (pk_col_1, pk_col_2)
);
IF (SELECT is_tracked_by_cdc FROM sys.tables WHERE SCHEMA_NAME([schema_id]) = N'dbo' AND [name] = N'src_table_2') = 0
  BEGIN EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'src_table_2', @role_name = NULL; END

DROP TABLE IF EXISTS dbo.dstn_table_1;
CREATE TABLE dbo.dstn_table_1 (
  pk_col_1 INT
  , col_2  NVARCHAR(128)
  , col_3  NVARCHAR(128)
);

DROP TABLE IF EXISTS dbo.dstn_table_2;
CREATE TABLE dbo.dstn_table_2 (
  pk_col_1   INT
  , pk_col_2 INT
  , col_3    NVARCHAR(128)
  , col_4    NVARCHAR(128)
);

DROP TABLE IF EXISTS dbo.tables_to_extract;
CREATE TABLE dbo.tables_to_extract (
  id INT IDENTITY(1, 1)
  , src_server_name    SYSNAME NOT NULL
  , src_database_name  SYSNAME NOT NULL
  , src_schema_name    SYSNAME NOT NULL
  , src_table_name     SYSNAME NOT NULL
  , dstn_server_name   SYSNAME
  , dstn_database_name SYSNAME
  , dstn_schema_name   SYSNAME
  , dstn_table_name    SYSNAME
  , CONSTRAINT PK_dbo_tables_to_extract
    PRIMARY KEY (src_server_name, src_database_name, src_schema_name, src_table_name)
);

DROP TABLE IF EXISTS dbo.extract_parameters;
CREATE TABLE dbo.extract_parameters (
  src_server_name     SYSNAME    NOT NULL
  , src_database_name SYSNAME    NOT NULL
  , src_schema_name   SYSNAME    NOT NULL
  , src_table_name    SYSNAME    NOT NULL
  , lsn_from          BINARY(10) NOT NULL
  , lsn_to            BINARY(10)
  , CONSTRAINT PK_dbo_extract_parameters
    PRIMARY KEY (src_server_name, src_database_name, src_schema_name, src_table_name)
  , CONSTRAINT FK_extract_parameters_tables_to_extract
    FOREIGN KEY (src_server_name, src_database_name, src_schema_name, src_table_name)
    REFERENCES dbo.tables_to_extract (src_server_name, src_database_name, src_schema_name, src_table_name)
);
GO

Define the ingested tables

DECLARE @domain NVARCHAR(128);
EXEC [master]..xp_regread
  @rootkey      = N'HKEY_LOCAL_MACHINE'
  , @key        = N'SYSTEM\ControlSet001\Services\Tcpip\Parameters\'
  , @value_name = N'Domain'
  , @value      = @domain OUTPUT;

INSERT dbo.tables_to_extract (
  src_server_name, src_database_name, src_schema_name, src_table_name
  , dstn_server_name, dstn_database_name, dstn_schema_name, dstn_table_name
)
SELECT
  CONCAT(CAST(SERVERPROPERTY(N'ServerName') AS NVARCHAR(128)), @domain), N'CDC', N'dbo', N'src_table_1'
  , CONCAT(CAST(SERVERPROPERTY(N'ServerName') AS NVARCHAR(128)), @domain), N'CDC', N'dbo', N'dstn_table_1';

INSERT dbo.tables_to_extract (
  src_server_name, src_database_name, src_schema_name, src_table_name
  , dstn_server_name, dstn_database_name, dstn_schema_name, dstn_table_name
)
SELECT
  CONCAT(CAST(SERVERPROPERTY(N'ServerName') AS NVARCHAR(128)), @domain), N'CDC', N'dbo', N'src_table_2'
  , CONCAT(CAST(SERVERPROPERTY(N'ServerName') AS NVARCHAR(128)), @domain), N'CDC', N'dbo', N'dstn_table_2';
GO

Create the ingestion stored procedure

SET ANSI_NULLS ON
GO

SET QUOTED_IDENTIFIER ON
GO
-- =============================================
-- Author:      Peter Lalovsky
-- Create date: 2026-05-06
-- Description:	Run dynamic ETL for specified tables with CDC
-- =============================================

CREATE OR ALTER PROCEDURE dbo.usp_run_cdc_etl AS
BEGIN

-- Variables
DECLARE
  @src_server_name      SYSNAME
  , @src_database_name  SYSNAME
  , @src_schema_name    SYSNAME
  , @src_table_name     SYSNAME
  , @dstn_server_name   SYSNAME
  , @dstn_database_name SYSNAME
  , @dstn_schema_name   SYSNAME
  , @dstn_table_name    SYSNAME
  , @loop_id            INT = 1
  , @max_id             INT
  , @capture_instance   NVARCHAR(257)
  , @lsn_from           BINARY(10)
  , @lsn_to             BINARY(10)
  , @star               NVARCHAR(MAX)
  , @on                 NVARCHAR(MAX)
  , @matched            NVARCHAR(MAX)
  , @update             NVARCHAR(MAX)
  , @insert             NVARCHAR(MAX)
  , @values             NVARCHAR(MAX)
  , @dynamic_sql        NVARCHAR(MAX);

SELECT @max_id = MAX(id)
FROM dbo.tables_to_extract;

-- List the columns in the source table and flag the PKs
DECLARE @PKs TABLE (
  column_name SYSNAME
  , is_pk     BIT
);

WHILE (@loop_id <= @max_id)
BEGIN -- Loop dbo.tables_to_extract
  -- Get variables
  SELECT
    @src_server_name      = src_server_name
    , @src_database_name  = src_database_name
    , @src_schema_name    = src_schema_name
    , @src_table_name     = src_table_name
    , @capture_instance   = CONCAT(src_schema_name, N'_', src_table_name)
    , @dstn_server_name   = dstn_server_name
    , @dstn_database_name = dstn_database_name
    , @dstn_schema_name   = dstn_schema_name
    , @dstn_table_name    = dstn_table_name
  FROM dbo.tables_to_extract
  WHERE id = @loop_id;

  SET @loop_id += 1;

  -- UPSERT the source table in dbo.extract_parameters
  IF NOT EXISTS (
    SELECT 1
    FROM dbo.extract_parameters
    WHERE
      src_server_name       = @src_server_name
      AND src_database_name = @src_database_name
      AND src_schema_name   = @src_schema_name
      AND src_table_name    = @src_table_name
  )
    BEGIN
      INSERT dbo.extract_parameters (
        src_server_name, src_database_name, src_schema_name
        , src_table_name, lsn_from, lsn_to
      )
      SELECT
        @src_server_name
        , @src_database_name
        , @src_schema_name
        , @src_table_name
        , sys.fn_cdc_get_min_lsn(@capture_instance)
        , sys.fn_cdc_get_max_lsn();
    END
  ELSE
    BEGIN
      UPDATE dbo.extract_parameters
      SET lsn_to = sys.fn_cdc_get_max_lsn()
      WHERE
        src_server_name       = @src_server_name
        AND src_database_name = @src_database_name
        AND src_schema_name   = @src_schema_name
        AND src_table_name    = @src_table_name;
    END

  -- Get the LSNs from dbo.extract_parameters
  SELECT
    @lsn_from = lsn_from
    , @lsn_to = lsn_to
  FROM dbo.extract_parameters
  WHERE
    src_server_name       = @src_server_name
    AND src_database_name = @src_database_name
    AND src_schema_name   = @src_schema_name
    AND src_table_name    = @src_table_name;

  -- List the columns in source table
  DELETE @PKs;

  INSERT @PKs (column_name, is_pk)
  SELECT 
    C.column_name
    , CASE
      WHEN PK.COLUMN_NAME IS NOT NULL THEN CAST(1 AS BIT)
      ELSE CAST(0 AS BIT)
    END AS is_pk
  FROM
    INFORMATION_SCHEMA.COLUMNS AS C
    LEFT JOIN
    (
      SELECT
        KU.TABLE_CATALOG
        , KU.TABLE_SCHEMA
        , KU.TABLE_NAME
        , KU.COLUMN_NAME
      FROM
        INFORMATION_SCHEMA.TABLE_CONSTRAINTS     AS TC
        JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS KU ON  TC.TABLE_CATALOG   = KU.TABLE_CATALOG
                                                       AND TC.TABLE_SCHEMA    = KU.TABLE_SCHEMA
                                                       AND TC.TABLE_NAME      = KU.TABLE_NAME
                                                       AND TC.CONSTRAINT_NAME = KU.CONSTRAINT_NAME
      WHERE
        TC.CONSTRAINT_TYPE   = 'PRIMARY KEY'
        AND KU.TABLE_CATALOG = @src_database_name
        AND KU.TABLE_SCHEMA  = @src_schema_name
        AND KU.TABLE_NAME    = @src_table_name
  ) AS PK ON  C.TABLE_CATALOG = PK.TABLE_CATALOG
          AND C.TABLE_SCHEMA  = PK.TABLE_SCHEMA
          AND C.TABLE_NAME    = PK.TABLE_NAME
          AND C.COLUMN_NAME   = PK.COLUMN_NAME
  WHERE
    C.TABLE_CATALOG    = @src_database_name
    AND C.TABLE_SCHEMA = @src_schema_name
    AND C.TABLE_NAME   = @src_table_name;

  -- Prepare the clauses for the MERGE dynamic code
  SELECT @on = STRING_AGG(CONCAT(N'T.', QUOTENAME(column_name), N' = S.', QUOTENAME(column_name)), N' AND ')
  FROM @PKs
  WHERE is_pk = 1;

  SELECT
    @star = STRING_AGG(QUOTENAME(column_name), N', ')
    , @insert = STRING_AGG(QUOTENAME(column_name), N', ')
    , @values = STRING_AGG(CONCAT(N'S.', QUOTENAME(column_name)), N', ')
  FROM @PKs;

  SELECT
    @matched  = STRING_AGG(CONCAT(N'T.', QUOTENAME(column_name), N' != S.', QUOTENAME(column_name)), N' OR ')
    , @update = STRING_AGG(CONCAT(N'T.', QUOTENAME(column_name), N' = S.', QUOTENAME(column_name)), N', ')
  FROM @PKs
  WHERE is_pk = 0;

  -- MERGE (INSERT, UPDATE, DELETE)
  SET @dynamic_sql = CONCAT(N'/* DELETE */
DELETE T
FROM
  ', QUOTENAME(@dstn_schema_name), N'.', QUOTENAME(@dstn_table_name) ,' AS T
  JOIN cdc.fn_cdc_get_net_changes_', @capture_instance, N'(@lsn_from, @lsn_to, ''all'') AS S
    ON ', @on, N'
WHERE S.__$operation = 1;

/* MERGE (INSERT, UPDATE) */
DECLARE @merge_results TABLE ([action] NVARCHAR(10));
  
WITH src_cte AS (
  SELECT ', @star, N'
  FROM cdc.fn_cdc_get_net_changes_', @capture_instance, N'(@lsn_from, @lsn_to, ''all'')
  WHERE __$operation IN (2, 4)
)
MERGE ', QUOTENAME(@dstn_schema_name), N'.', QUOTENAME(@dstn_table_name) ,' AS T
USING src_cte AS S
  ON ', @on, N'
WHEN MATCHED AND (', @matched, N') THEN
  UPDATE SET
    ', @update, N'
WHEN NOT MATCHED BY TARGET THEN
  INSERT (', @insert, N')
  VALUES (', @values, N');');

            print N'@lsn_from:'
            print @lsn_from
            print N'@lsn_to:'
            print @lsn_to

            print N'@dynamic_sql:'
            print @dynamic_sql

  EXEC sp_executesql
    @dynamic_sql
    , N'@src_server_name   SYSNAME
      , @src_database_name SYSNAME
      , @src_schema_name   SYSNAME
      , @src_table_name    SYSNAME
      , @lsn_from          BINARY(10)
      , @lsn_to            BINARY(10)'
    , @src_server_name   = @src_server_name
    , @src_database_name = @src_database_name
    , @src_schema_name   = @src_schema_name
    , @src_table_name    = @src_table_name
    , @lsn_from          = @lsn_from
    , @lsn_to            = @lsn_to;

  -- UPDATE LSNs in dbo.extract_parameters
  UPDATE dbo.extract_parameters
  SET
    lsn_from = lsn_to
    , lsn_to = NULL
  WHERE
    src_server_name       = @src_server_name
    AND src_database_name = @src_database_name
    AND src_schema_name   = @src_schema_name
    AND src_table_name    = @src_table_name;  

END -- Loop dbo.tables_to_extract

END
GO

The dataflow of the stored procedure:

  • Loop table_to_extract
  • UPSERT the source table in extract_parameters
    • If the table doesn’t exist – INSERT a row with lsn_from, lsn_to
    • If the table doesn’t exist – UPDATE lsn_to
  • List the columns in the source table and flag the PKs into table @PKs
  • Prepare the clauses for the MERGE dynamic code from table @PKs
  • Create @dynamic_sql and
    • DELETE
    • MERGE
  • UPDATE LSNs in extract_parameters
    • lsn_from = lsn_to
    • lsn_to = NULL

Testing

Show table_to_extract

select * from dbo.tables_to_extract

INSERT

INSERT dbo.src_table_1 (col_2, col_3)
SELECT           N'Value 11', N'Value 12'
UNION ALL SELECT N'Value 21', N'Value 22'
UNION ALL SELECT N'Value 31', N'Value 32';

INSERT dbo.src_table_2 (pk_col_1, pk_col_2, col_3, col_4)
SELECT           1, 2, N'Value 11', N'Value 12'
UNION ALL SELECT 3, 4, N'Value 21', N'Value 22'
UNION ALL SELECT 5, 6, N'Value 31', N'Value 32';

Ingest

EXEC dbo.usp_run_cdc_etl;

The generated @dynamic_sql

@lsn_from:
0x00000025000009EC0040
@lsn_to:
0x0000002600000B40001E
@dynamic_sql:
/* DELETE */
DELETE T
FROM
  [dbo].[dstn_table_1] AS T
  JOIN cdc.fn_cdc_get_net_changes_dbo_src_table_1(@lsn_from, @lsn_to, 'all') AS S
    ON T.[pk_col_1] = S.[pk_col_1]
WHERE S.__$operation = 1;

/* MERGE (INSERT, UPDATE) */
DECLARE @merge_results TABLE ([action] NVARCHAR(10));
  
WITH src_cte AS (
  SELECT [pk_col_1], [col_2], [col_3]
  FROM cdc.fn_cdc_get_net_changes_dbo_src_table_1(@lsn_from, @lsn_to, 'all')
  WHERE __$operation IN (2, 4)
)
MERGE [dbo].[dstn_table_1] AS T
USING src_cte AS S
  ON T.[pk_col_1] = S.[pk_col_1]
WHEN MATCHED AND (T.[col_2] != S.[col_2] OR T.[col_3] != S.[col_3]) THEN
  UPDATE SET
    T.[col_2] = S.[col_2], T.[col_3] = S.[col_3]
WHEN NOT MATCHED BY TARGET THEN
  INSERT ([pk_col_1], [col_2], [col_3])
  VALUES (S.[pk_col_1], S.[col_2], S.[col_3]);
@lsn_from:
0x000000260000031B0050
@lsn_to:
0x0000002600000B40001E
@dynamic_sql:
/* DELETE */
DELETE T
FROM
  [dbo].[dstn_table_2] AS T
  JOIN cdc.fn_cdc_get_net_changes_dbo_src_table_2(@lsn_from, @lsn_to, 'all') AS S
    ON T.[pk_col_1] = S.[pk_col_1] AND T.[pk_col_2] = S.[pk_col_2]
WHERE S.__$operation = 1;

/* MERGE (INSERT, UPDATE) */
DECLARE @merge_results TABLE ([action] NVARCHAR(10));
  
WITH src_cte AS (
  SELECT [pk_col_1], [pk_col_2], [col_3], [col_4]
  FROM cdc.fn_cdc_get_net_changes_dbo_src_table_2(@lsn_from, @lsn_to, 'all')
  WHERE __$operation IN (2, 4)
)
MERGE [dbo].[dstn_table_2] AS T
USING src_cte AS S
  ON T.[pk_col_1] = S.[pk_col_1] AND T.[pk_col_2] = S.[pk_col_2]
WHEN MATCHED AND (T.[col_3] != S.[col_3] OR T.[col_4] != S.[col_4]) THEN
  UPDATE SET
    T.[col_3] = S.[col_3], T.[col_4] = S.[col_4]
WHEN NOT MATCHED BY TARGET THEN
  INSERT ([pk_col_1], [pk_col_2], [col_3], [col_4])
  VALUES (S.[pk_col_1], S.[pk_col_2], S.[col_3], S.[col_4]);

Remarks

  • We JOIN on all the PK columns
  • We UPDATE only if there is a change

Show table_1

select * from dbo.src_table_1
select * from dbo.dstn_table_1

Show table_2

UPDATE

UPDATE dbo.src_table_1
SET
  col_2 = 'Value 21 New 1'
  , col_3 = 'Value 22 New 1'
WHERE pk_col_1 = 2;

WAITFOR DELAY '00:00:05';

UPDATE dbo.src_table_1
SET
  col_2 = 'Value 21 New 2'
  , col_3 = 'Value 22 New 2'
WHERE pk_col_1 = 2;

UPDATE dbo.src_table_2
SET col_3 = N'Value 11 New 1'
WHERE pk_col_1 = 1;

I run multiple UPDATEs in order to ensure that cdc.fn_cdc_get_net_changes will return the last change

Ingest

EXEC dbo.usp_run_cdc_etl;

DELETE

DELETE dbo.src_table_1
WHERE pk_col_1 = 3;

DELETE dbo.src_table_2
WHERE pk_col_1 = 1;

Ingest

EXEC dbo.usp_run_cdc_etl;

Show extract_parameter

select * from dbo.extract_parameters

Keep it simple :-)

Leave a comment

Your email address will not be published. Required fields are marked *

Time limit exceeded. Please complete the captcha once again.