In this example we create ADF pipeline that extracts from SQL Server and saves in CSV files in Data Lake. The point is just to demonstrate the logic so you can edit it as you need.
The extract is driven by SQL Server table with all the parameters for a dynamic extraction from SQL Server database into CSV files.
SQL Server
Create database:
USE [master]; GO IF (DB_ID(N'PL') IS NOT NULL) BEGIN ALTER DATABASE PL SET SINGLE_USER WITH ROLLBACK IMMEDIATE; DROP DATABASE PL; END GO CREATE DATABASE PL; GO USE PL; GO CREATE SCHEMA cfg; GO CREATE TABLE cfg.table_to_extract ( id INT IDENTITY(1, 1) NOT NULL , process_name SYSNAME -- To use this table for more than one process (ETL) , server_name SYSNAME , [database_name] SYSNAME , [schema_name] SYSNAME , table_name SYSNAME , parameter VARCHAR(512) DEFAULT ('') -- Injected in the WHERE clause from table cfg.process_parameter , query VARCHAR(8000) DEFAULT ('') -- If not blank, use it instead of the default query , file_system_name VARCHAR(128) -- The ADLS container name , directory_name_extract VARCHAR(128) -- The ADLS directory where the extracted files will be saved (bronze) , [file_name] VARCHAR(128) , is_active BIT DEFAULT (1) , date_created DATETIME DEFAULT (GETDATE()) , date_last_modified DATETIME , CONSTRAINT pk__cfg_table_to_extract PRIMARY KEY (id) ); GO CREATE TRIGGER tr__cfg_table_to_extract__date_last_modified__now ON cfg.table_to_extract AFTER UPDATE AS BEGIN UPDATE cfg.table_to_extract SET date_last_modified = GETDATE() WHERE id IN (SELECT id FROM inserted); END GO INSERT cfg.table_to_extract (process_name , server_name, [database_name], [schema_name], table_name , parameter, query , file_system_name, directory_name_extract, [file_name]) SELECT 'PL_Test', 'my-sql-server.database.windows.net', 'my_database', 'dbo', 'table_1', '', '', 'mycontainer', 'Test_SQL', 'table_1.csv' UNION ALL SELECT 'PL_Test', 'my-sql-server.database.windows.net', 'my_database', 'dbo', 'table_2', '', '', 'mycontainer', 'Test_SQL', 'table_2.csv' UNION ALL SELECT 'PL_Test', 'my-sql-server.database.windows.net', 'my_database', 'dbo', 'table_3', 'id = @ID', '', 'mycontainer', 'Test_SQL', 'person.csv'; SELECT * FROM cfg.table_to_extract; GO CREATE TABLE cfg.process_parameter ( process_name SYSNAME -- To use this table for more than one process (ETL) , parameter_name VARCHAR(128) , parameter_value VARCHAR(1024) ); INSERT cfg.process_parameter (process_name, parameter_name, parameter_value) SELECT 'PL_Test', 'ID', '3'; GO SELECT * FROM cfg.process_parameter; GO
Table cfg.table_to_extract keeps the values that are used as parameters in ADF.
Columns:
Id | Primary key |
process_name | Distinct the usage of this table for multiple processes (ETLs) |
server_name | Source SQL Server name |
database_name | Source database name |
schema_name | Source schema name |
table_name | Source table name |
parameter | The parameters (from table cfg.process_parameter) that will be injected in the dynamic WHERE clause. The default value is ” (empty string). Example: “ProductionDate BETWEEN @StartDate AND @EndDate”. |
query | If not empty string (default value), this query will be used instead of the default one. |
file_system_name | ADLS container name |
directory_name_extract | ADLS directory where the extracted files will be saved. |
file_name | The name of the extracted file in ADLS. |
is_active | Only the active rows are selectes. |
date_created | The data when the row was created. |
date_last_modified |
The data when the row was last modified. |
Table cfg.process_parameter, populated during the process execution, keeps the values that will be injected in the WHERE clause.
Columns:
process_name | Distinct the usage of this table for multiple processes (ETLs) |
parameter_name | The name of the parameter |
parameter_value |
The value of the parameter |
Populate the test tables:
CREATE TABLE dbo.table_1 ( id INT , col1 VARCHAR(128) , col2 VARCHAR(128) ); CREATE TABLE dbo.table_2 ( id INT , col1 VARCHAR(128) , col2 VARCHAR(128) , col3 INT ); CREATE TABLE dbo.table_3 ( id INT , first_name VARCHAR(128) , last_name VARCHAR(128) ); INSERT dbo.table_1 (id, col1, col2) SELECT 1, 'test value 11', 'test value 12' UNION ALL SELECT 2, 'test value 21', 'test value 22'; INSERT dbo.table_2 (id, col1, col2, col3) SELECT 1, 'test value 11', 'test value 12', '111' UNION ALL SELECT 2, 'test value 21', 'test value 22', '222' UNION ALL SELECT 3, 'test value 31', 'test value 32', '333'; INSERT dbo.table_3 (id, first_name, last_name) SELECT 1, 'John Smith' UNION ALL SELECT 2, 'Ali Bob' UNION ALL SELECT 3, 'Samantha Fox' UNION ALL SELECT 4, 'Krali Marko';
Azure Data Factory (ADF)
Actually in this example i use Synapse.
Get Table List (Lookup) reads the values from table cfg.table_to_extract and passes them to the loop ForEach Table (ForEach):
SELECT server_name , [database_name] , [schema_name] , table_name , parameter , query , file_system_name , directory_name_extract , [file_name] FROM [cfg].[table_to_extract] WHERE process_name = 'PL_Test' AND is_active = 1;
For each row in table cfg.table_to_extract the loop:
1. Generate dynamic SQL code – Get SQL Statement (Lookup).
Query:
@concat('DECLARE @schema_name VARCHAR(128) = ''', item().schema_name, ''' , @table_name VARCHAR(128) = ''', item().table_name, ''' , @parameter VARCHAR(512) = ''', item().parameter, ''' , @query VARCHAR(8000) = ''', item().query, '''; DECLARE @dynamic_sql VARCHAR(MAX) = ''SET NOCOUNT ON; SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED; '' , @new_line_char CHAR(2) = CONCAT(CHAR(13), CHAR(10)) , @tab_char CHAR(1) = CHAR(9); IF (@query = '''') IF ( SELECT COUNT(*) FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = @schema_name AND TABLE_NAME = @table_name ) = 0 BEGIN SELECT @dynamic_sql += CONCAT(''SELECT '''''' , QUOTENAME(@schema_name) , ''.'', QUOTENAME(@table_name) , '' does not exist'''' AS [error message] '') END ELSE BEGIN SELECT @dynamic_sql += CONCAT( CASE WHEN ORDINAL_POSITION = 1 THEN CONCAT(''SELECT '', @new_line_char, @tab_char) ELSE '''' END , CASE WHEN DATA_TYPE = ''uniqueidentifier'' THEN CONCAT(''CAST(A.'', QUOTENAME(COLUMN_NAME), '' AS VARCHAR(50))'') WHEN DATA_TYPE = ''float'' THEN CONCAT(''FORMAT(A.'', QUOTENAME(COLUMN_NAME), '', ''''''''G'''''''')'') WHEN DATA_TYPE = ''timestamp'' THEN CONCAT(''CONVERT(NVARCHAR(18), CONVERT(BINARY(8), A.'', QUOTENAME(COLUMN_NAME), ''), 1)'') WHEN DATA_TYPE IN (''text'', ''ntext'') THEN CONCAT(''LTRIM(RTRIM(CAST(A.'', QUOTENAME(COLUMN_NAME), '' AS VARCHAR(4000))))'') WHEN DATA_TYPE IN (''date'', ''time'', ''datetime'', ''datetime2'') THEN CONCAT(''A.'', QUOTENAME(COLUMN_NAME), '''') ELSE CONCAT(''LTRIM(RTRIM(A.'', QUOTENAME(COLUMN_NAME), ''))'') END, '' AS '', '''''''''''', REPLACE(COLUMN_NAME, '' '', ''_''), '''''''''''' , @new_line_char, @tab_char , '', '' ) FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = @schema_name AND TABLE_NAME = @table_name; SELECT @dynamic_sql = LEFT(@dynamic_sql, LEN(@dynamic_sql) - 2) SELECT @dynamic_sql += CONCAT('' FROM '' , QUOTENAME(@schema_name), ''.'', QUOTENAME(@table_name) , '' AS A'' ); SELECT @dynamic_sql += CASE WHEN @parameter != '''' THEN CONCAT(@new_line_char, ''WHERE '', @parameter) ELSE '''' END; SELECT @dynamic_sql += '';''; END ELSE BEGIN SELECT @dynamic_sql += @query; END SELECT @dynamic_sql AS stmt; ', '')
This code:
a. Query the system view INFORMTION_SCHEMA.TABLES (columns schema_name, table_name in cfg.table_to_extract) to get the column names and their data types (rows 48 to 51).
b. Cleanse the data (rows 33 to 44).
c. Return SQL code
2. Inject the parameters in the WHERE clause – Get Parameters (Lookup).
Query:
@concat('DECLARE @stmt NVARCHAR(MAX) = ''', activity('Get SQL Statement').output.firstRow.stmt ,'''; SELECT @stmt AS stmt INTO #stmt; DECLARE @dynamic_sql NVARCHAR(MAX) = N''''; SELECT @dynamic_sql += CONCAT(N''UPDATE #stmt SET stmt = REPLACE(stmt, ''''@'', parameter_name, N'''''', CONCAT(CHAR(39), '''''', parameter_value, N'''''', CHAR(39))); '') FROM cfg.process_parameter WHERE process_name = ''PL_Test'' EXEC (@dynamic_sql); SELECT stmt FROM #stmt; ')
This code:
a. Replace the parameter names with the values from table cfg.process_parameter. The name of the parameters in column parameter in table cfg.tables_to_extract must be the same as in column parameter_name in table cfg.process_parameter (without @).
b. Return SQL code
3. Extract the table into CSV file – Copy Table (Copy data).
Source query:
@replace(replace(activity('Get Parameters').output.firstRow.stmt, '>', '>'), '<', '<')
We need to replace the generated by ADF ‘>’ with ‘>’ and ‘<’ with ‘<‘.
Sink in Azure Data Lake by using columns file_system_name, directory_name_extract and file_name.
This example is very basic and very poorly explained. If you find it useful and need a clarification, type a comment.
By the way, all the connection strings are parametrized and i will add this in another post.
2022-02-23: I extended the logic, explained in this post in Azure: Dynamic Extract from Multiple Servers with Error Handling (Log) and Send Error Email to the folowing:
- Extract dynamically multiple tables from multiple servers
- Inject custom parameters for each table
- Collect log for ETL execution reporting
- Send email with the ETL errors to an operator
Keep it simple :-)
Pingback: Azure: Dynamic Extract from Multiple Servers with Error Handling (Log) and Send Error Email – Peter Lalovsky