Azure Data Factory (ADF): Dynamic Extract Driven by SQL Server Table


In this example we create ADF pipeline that extracts from SQL Server and saves in CSV files in Data Lake. The point is just to demonstrate the logic so you can edit it as you need.

The extract is driven by SQL Server table with all the parameters for a dynamic extraction from SQL Server database into CSV files.

SQL Server

Create database:

USE [master];
GO
IF (DB_ID(N'PL') IS NOT NULL)
  BEGIN
    ALTER DATABASE PL SET SINGLE_USER WITH ROLLBACK IMMEDIATE;
    DROP DATABASE PL;
  END
GO
CREATE DATABASE PL;
GO
USE PL;
GO

CREATE SCHEMA cfg;
GO

CREATE TABLE cfg.table_to_extract
(
  id INT IDENTITY(1, 1) NOT NULL
  , process_name SYSNAME -- To use this table for more than one process (ETL)
  , server_name SYSNAME
  , [database_name] SYSNAME
  , [schema_name] SYSNAME
  , table_name SYSNAME
  , parameter VARCHAR(512) DEFAULT ('') -- Injected in the WHERE clause from table cfg.process_parameter
  , query VARCHAR(8000) DEFAULT ('') -- If not blank, use it instead of the default query
  , file_system_name VARCHAR(128) -- The ADLS container name
  , directory_name_extract VARCHAR(128) -- The ADLS directory where the extracted files will be saved (bronze)
  , [file_name] VARCHAR(128)
  , is_active BIT DEFAULT (1)
  , date_created DATETIME DEFAULT (GETDATE())
  , date_last_modified DATETIME
  , CONSTRAINT pk__cfg_table_to_extract PRIMARY KEY (id)
);
GO

CREATE TRIGGER tr__cfg_table_to_extract__date_last_modified__now
ON cfg.table_to_extract
AFTER UPDATE
AS
BEGIN
  UPDATE cfg.table_to_extract
  SET date_last_modified = GETDATE()
  WHERE id IN (SELECT id FROM inserted);
END
GO

INSERT cfg.table_to_extract (process_name
  , server_name, [database_name], [schema_name], table_name
  , parameter, query
  , file_system_name, directory_name_extract, [file_name])
SELECT           'PL_Test', 'my-sql-server.database.windows.net', 'my_database', 'dbo', 'table_1', '', '', 'mycontainer', 'Test_SQL', 'table_1.csv'
UNION ALL SELECT 'PL_Test', 'my-sql-server.database.windows.net', 'my_database', 'dbo', 'table_2', '', '', 'mycontainer', 'Test_SQL',  'table_2.csv'
UNION ALL SELECT 'PL_Test', 'my-sql-server.database.windows.net', 'my_database', 'dbo', 'table_3', 'id = @ID', '', 'mycontainer', 'Test_SQL', 'person.csv';

SELECT *
FROM cfg.table_to_extract;
GO

CREATE TABLE cfg.process_parameter
(
  process_name SYSNAME -- To use this table for more than one process (ETL)
  , parameter_name VARCHAR(128)
  , parameter_value VARCHAR(1024)
);

INSERT cfg.process_parameter (process_name, parameter_name, parameter_value)
SELECT 'PL_Test', 'ID', '3';
GO

SELECT *
FROM cfg.process_parameter;
GO

Table cfg.table_to_extract keeps the values that are used as parameters in ADF.

Columns:

Id Primary key
process_name Distinct the usage of this table for multiple processes (ETLs)
server_name Source SQL Server name
database_name Source database name
schema_name Source schema name
table_name Source table name
parameter The parameters (from table cfg.process_parameter) that will be injected in the dynamic WHERE clause. The default value is ” (empty string). Example: “ProductionDate BETWEEN @StartDate AND @EndDate”.
query If not empty string (default value), this query will be used instead of the default one.
file_system_name ADLS container name
directory_name_extract ADLS directory where the extracted files will be saved.
file_name The name of the extracted file in ADLS.
is_active Only the active rows are selectes.
date_created The data when the row was created.
date_last_modified

The data when the row was last modified.

Table cfg.process_parameter, populated during the process execution, keeps the values that will be injected in the WHERE clause.

Columns:

process_name Distinct the usage of this table for multiple processes (ETLs)
parameter_name The name of the parameter
parameter_value

The value of the parameter

Populate the test tables:

CREATE TABLE dbo.table_1
(
  id INT
  , col1 VARCHAR(128)
  , col2 VARCHAR(128)
);

CREATE TABLE dbo.table_2
(
  id INT
  , col1 VARCHAR(128)
  , col2 VARCHAR(128)
  , col3 INT
);

CREATE TABLE dbo.table_3
(
  id INT
  , first_name VARCHAR(128)
  , last_name VARCHAR(128)
);

INSERT dbo.table_1 (id, col1, col2)
SELECT           1, 'test value 11', 'test value 12'
UNION ALL SELECT 2, 'test value 21', 'test value 22';

INSERT dbo.table_2 (id, col1, col2, col3)
SELECT           1, 'test value 11', 'test value 12', '111'
UNION ALL SELECT 2, 'test value 21', 'test value 22', '222'
UNION ALL SELECT 3, 'test value 31', 'test value 32', '333';

INSERT dbo.table_3 (id, first_name, last_name)
SELECT           1, 'John	Smith'
UNION ALL SELECT 2, 'Ali	Bob'
UNION ALL SELECT 3, 'Samantha	Fox'
UNION ALL SELECT 4, 'Krali	Marko';

 

Azure Data Factory (ADF)

Actually in this example i use Synapse.

ADF Dynamic_Extract_Driven by SQL Server_Table

Get Table List (Lookup) reads the values from table cfg.table_to_extract and passes them to the loop ForEach Table (ForEach):

SELECT
  server_name
  , [database_name]
  , [schema_name]
  , table_name
  , parameter
  , query
  , file_system_name
  , directory_name_extract
  , [file_name]
FROM [cfg].[table_to_extract]
WHERE
  process_name = 'PL_Test'
  AND is_active = 1;

For each row in table cfg.table_to_extract the loop:

ADF Dynamic_Extract_Driven by SQL Server_Table

1. Generate dynamic SQL code – Get SQL Statement (Lookup).

Query:

@concat('DECLARE
  @SchemaName VARCHAR(128) = ''', item().schema_name, '''
  , @TableName VARCHAR(128) = ''', item().table_name, '''
  , @Parameter VARCHAR(512) = ''', item().parameter, '''
  , @Query VARCHAR(8000) = ''', item().query, ''';
  
DECLARE
  @DynamicSQL VARCHAR(MAX) = ''SET NOCOUNT ON;
SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;

''
  , @NewLineChar CHAR(2) = CONCAT(CHAR(13), CHAR(10))
  , @TabChar CHAR(1) = CHAR(9);

IF (@Query = '''')
  IF
  (
    SELECT COUNT(*)
    FROM INFORMATION_SCHEMA.COLUMNS
      WHERE
        TABLE_SCHEMA = @SchemaName
        AND TABLE_NAME = @TableName
  ) = 0
    BEGIN
      SELECT @DynamicSQL += CONCAT(''SELECT ''''''
        , QUOTENAME(@SchemaName)
        , ''.'', QUOTENAME(@TableName)
        , '' does not exist'''' AS [error message] '')
    END
  ELSE
  	BEGIN
      SELECT @DynamicSQL += CONCAT(
        CASE
          WHEN ORDINAL_POSITION = 1 THEN CONCAT(''SELECT '', @NewLineChar, @TabChar)
          ELSE ''''
        END
        , CASE
          WHEN DATA_TYPE = ''uniqueidentifier'' THEN CONCAT(''CAST(A.'', QUOTENAME(COLUMN_NAME), '' AS VARCHAR(50)) '')
          WHEN DATA_TYPE = ''money'' THEN CONCAT(''CAST(A.'', QUOTENAME(COLUMN_NAME), '' AS DECIMAL(19, 4))'')
          WHEN DATA_TYPE IN (''char'', ''nchar'', ''varchar'', ''nvarchar'')
            THEN CONCAT(''LTRIM(RTRIM(REPLACE(REPLACE(A.'', QUOTENAME(COLUMN_NAME), '', CHAR(13), N'''''''' ''''''''), CHAR(10), N'''''''' '''''''')))'')
          WHEN DATA_TYPE = ''float'' THEN CONCAT(''FORMAT('', QUOTENAME(COLUMN_NAME), '', ''''G'''')'')
          ELSE CONCAT(''A.'', QUOTENAME(COLUMN_NAME))
        END, '' AS '', QUOTENAME(COLUMN_NAME)
        , @NewLineChar, @TabChar
        , '', ''
        )
      FROM INFORMATION_SCHEMA.COLUMNS
      WHERE
        TABLE_SCHEMA = @SchemaName
        AND TABLE_NAME = @TableName;

      SELECT @DynamicSQL = LEFT(@DynamicSQL, LEN(@DynamicSQL) - 2)

      SELECT @DynamicSQL += CONCAT(''FROM ''
        , QUOTENAME(@SchemaName), ''.'', QUOTENAME(@TableName)
        , '' AS A''
      );

      SELECT @DynamicSQL += CASE
        WHEN @Parameter != '''' THEN CONCAT(@NewLineChar, ''WHERE '', @Parameter)
        ELSE ''''
      END;

      SELECT @DynamicSQL += '';'';
    END
  ELSE
    BEGIN
      SELECT @DynamicSQL += @Query;
    END

SELECT @DynamicSQL AS stmt;
', '')

This code:

  a. Query the system view INFORMTION_SCHEMA.TABLES (columns schema_name, table_name in cfg.table_to_extract) to get the column names and their data types (rows 48 to 51).

  b. Cleanse the data (rows 33 to 44).

  c. Return SQL code

2. Inject the parameters in the WHERE clause – Get Parameters (Lookup).

Query:

@concat('DECLARE @stmt NVARCHAR(MAX) = ''', activity('Get SQL Statement').output.firstRow.stmt ,''';

SELECT @stmt AS stmt INTO #stmt;

DECLARE @dynamic_sql NVARCHAR(MAX) = N'''';

SELECT @dynamic_sql += CONCAT(N''UPDATE #stmt SET stmt = REPLACE(stmt, ''''@'', parameter_name, N'''''', CONCAT(CHAR(39), '''''', parameter_value, N'''''', CHAR(39)));
'')
FROM cfg.process_parameter
WHERE process_name = ''PL_Test''

EXEC (@dynamic_sql);

SELECT stmt FROM #stmt;
')

This code:

  a. Replace the parameter names with the values from table cfg.process_parameter. The name of the parameters in column parameter in table cfg.tables_to_extract must be the same as in column parameter_name in table cfg.process_parameter (without @).

ADF Dynamic_Extract_Driven by SQL Server_Table

  b. Return SQL code

3. Extract the table into CSV file – Copy Table (Copy data).

Source query:

@replace(replace(activity('Get Parameters').output.firstRow.stmt, '&gt;', '>'), '&lt;', '<')

We need to replace the generated by ADF ‘&gt;’ with ‘>’ and ‘&lt;’ with ‘<‘.

Sink in Azure Data Lake by using columns file_system_name, directory_name_extract and file_name.

This example is very basic and very poorly explained. If you find it useful and need a clarification, type a comment.

By the way, all the connection strings are parametrized and i will add this in another post.

Keep it simple :-)

Leave a comment

Your email address will not be published. Required fields are marked *