Azure: Dynamic Extract from Multiple Servers with Error Handling (Log) and Send Error Email 1


In this example i will facilely explain my logic to extract dynamically in Azure Data Factory or Azure Synapse from multiple servers and save the result in Azure Data Lake.

This post extends the logic, explained earlier in another post – Azure Data Factory (ADF): Dynamic Extract Driven by SQL Server Table.

The database tables that drive this process are located on the destination SQL Server (Azure).

If an error occurs during the ETL, an email is sent to operator/s with Runbooks.

Overview

This example is built in Azure Synapse or ADF and shows how to:

  1. Extract dynamically multiple tables from multiple servers
  2. Inject custom parameters for each table
  3. Collect log for ETL execution reporting
  4. Send email with the ETL errors to an operator

It explains the process facilely, but all that you need (the T-SQL code, the JSON for the ADF pipelines and the Runbook to send email) to replicate it is included.

Servers

In this scenario there is one “master” server and multiple “slave” servers. This could be the corporation server (master) and the subcontractors or franchisers servers (slave).

When a new slave is added, the master replicates all the vital information (contacts, departments, financial formulas, etc) to the slave server and next the slave appends its own info.

Basic requirements:

  • All the databases have to be “mirrors” of the master one, i.e. the same table names, column names, data types, etc.
  • All the databases has to include the same user/password that is used in the ETL.

Tables that Drive the ETL

On the destination server i create the following tables:

  • cfg.table_to_extract – the tables that will be extracted.
  • cfg.process_parameter – the parameters that we need to:
    • run the ETL.
    • inject parameters in the T-SQL.
  • log.dynamic_extract– the errors and successes that occurred during the ETL.

Create Database Objects

Create database:

USE master;
GO

IF (DB_ID('pl_database_1') IS NOT NULL)
  BEGIN
    ALTER DATABASE pl_database_1 SET SINGLE_USER WITH ROLLBACK IMMEDIATE;
    DROP DATABASE pl_database_1;
  END
GO

CREATE DATABASE pl_database_1;
GO

USE pl_database_1;
GO

Create Schema:

CREATE SCHEMA cfg
GO

CREATE SCHEMA [log]
GO

cfg.table_to_extract:

DROP TABLE IF EXISTS cfg.table_to_extract;
GO

CREATE TABLE cfg.table_to_extract
(
  id INT IDENTITY(1, 1)
  , process_name SYSNAME
  , server_name SYSNAME
  , [database_name] SYSNAME
  , [schema_name] SYSNAME
  , table_name SYSNAME
  , parameter VARCHAR(512)
  , query VARCHAR(8000)
  , file_system_name VARCHAR(128)
  , directory_name_bronze VARCHAR(128)
  , [file_name] VARCHAR(128)
  , is_active BIT
  , CONSTRAINT PK__cfg_table_to_extract PRIMARY KEY (id)
);
GO

SET IDENTITY_INSERT cfg.table_to_extract ON;
INSERT cfg.table_to_extract (id, process_name, server_name, [database_name], [schema_name], table_name, parameter, query, file_system_name, directory_name_bronze, [file_name], is_active)
SELECT 1, N'PL_Test', N'pl-sql-server-1.database.windows.net', N'pl_database_1', N'dbo', N'test_table_one', N'', N'', N'plcontainer1', N'Test_SQL', N'1_test_table_one.csv', 1
UNION ALL SELECT 2, N'PL_Test', N'pl-sql-server-1.database.windows.net', N'pl_database_1', N'dbo', N'test_table_two', N'', N'SELECT A.product_id,  A.order_id,  A.qty,  A.line_total_usd
FROM dbo.product AS A
WHERE
  EXISTS
  (
    SELECT B.id
    FROM table_14 AS B
    WHERE
      A.product_id = B.product_id
      AND B.reprt_date = CAST(GETDATE() AS DATE))
      AND B.order_date >= @order_date
  );', N'plcontainer1', N'Test_SQL', N'1_test_table_two.csv', 0
UNION ALL SELECT 3, N'PL_Test', N'pl-sql-server-1.database.windows.net', N'pl_database_1', N'dbo', N'test_table_three', N'id = @ID', N'', N'plcontainer1', N'Test_SQL', N'1_person.csv', 0
UNION ALL SELECT 4, N'PL_Test', N'pl-sql-server-1.database.windows.net', N'pl_database_1', N'dbo', N'test_table_four', N'order_date_start >= @order_date_start AND order_date_end < @order_date_end', N'', N'plcontainer1', N'Test_SQL', N'1_new_name.csv', 0;
SET IDENTITY_INSERT cfg.table_to_extract OFF;
GO

ALTER TABLE cfg.table_to_extract ADD DEFAULT ('') FOR parameter;
GO

ALTER TABLE cfg.table_to_extract ADD DEFAULT ('') FOR query;
GO

ALTER TABLE cfg.table_to_extract ADD DEFAULT ((1)) FOR is_active;
GO

cfg.process_parameter:

DROP TABLE IF EXISTS cfg.process_parameter
GO

CREATE TABLE cfg.process_parameter
(
  id INT IDENTITY(1, 1) NOT NULL
  , process_name SYSNAME
  , server_name NVARCHAR(128)
  , [database_name] NVARCHAR(128)
  , [schema_name] NVARCHAR(128)
  , table_name NVARCHAR(128)
  , [name] VARCHAR(128)
  , [value] VARCHAR(1024)
  , [description] VARCHAR(512)
  , CONSTRAINT PK_cfg_process_parameter PRIMARY KEY (id)
)
GO

SET IDENTITY_INSERT cfg.process_parameter ON;
INSERT cfg.process_parameter (id, process_name, server_name, [database_name], [schema_name], table_name, [name], [value], [description])
SELECT 1, N'PL_Test', NULL, NULL, NULL, NULL, N'email_on_error', N'peter@lalovsky.com', N'Emails, separated by semicolon (;) that will receive the error messages.'
UNION ALL SELECT 2, N'PL_Test', NULL, NULL, NULL, NULL, N'days_to_keep_log', N'365', N'The days to keep the data in the log tables';
SET IDENTITY_INSERT cfg.process_parameter OFF;
GO

log.dynamic_extract:

DROP TABLE IF EXISTS [log].dynamic_extract
GO

CREATE TABLE [log].dynamic_extract
(
  unique_identifier CHAR(15)
  , data_factory NVARCHAR(512)
  , pipeline NVARCHAR(512)
  , activity_name NVARCHAR(512)
  , group_id NVARCHAR(512)
  , run_id NVARCHAR(512)
  , trigger_id NVARCHAR(512)
  , trigger_name NVARCHAR(512)
  , trigger_time datetime2(7)
  , trigger_type NVARCHAR(512)
  , triggered_by_pipeline_name NVARCHAR(512)
  , triggered_by_pipeline_run_id NVARCHAR(512)
  , [error_message] NVARCHAR(4000)
  , parameter NVARCHAR(4000)
  , logged_datetime DATETIME
)
GO

I added a fake table with the slaves. It should be located on the master:

DROP TABLE IF EXISTS dbo.Slaves
GO

CREATE TABLE dbo.Slaves
(
  SlaveID INT
  , ServerName NVARCHAR(128)
  , SlaveType CHAR(1)
  , IsActive BIT
);
GO

INSERT dbo.Slaves (SlaveID, ServerName, SlaveType, IsActive)
SELECT 1, N'pl-sql-server-1.database.windows.net', N'S', 1
UNION ALL SELECT 2, N'pl-sql-server-2.database.windows.net', N'S', 1;
GO

The tables that will be extracted:

DROP TABLE IF EXISTS dbo.test_table_one
GO

CREATE TABLE dbo.test_table_one
(
  id INT
  , col1 VARCHAR(128)
  , col2 VARCHAR(128)
);
GO

INSERT dbo.test_table_one (id, col1, col2)
SELECT 1, N'test value 11', N'test value 12'
UNION ALL SELECT 2, N'test value 21', N'test value 22';
GO

DROP TABLE IF EXISTS  dbo.test_table_two
GO

CREATE TABLE dbo.test_table_two
(
  id INT
  , col1 VARCHAR(128)
  , col2 VARCHAR(128)
  , col3 INT NULL
);
GO

INSERT dbo.test_table_two (id, col1, col2, col3)
SELECT 1, N'test value 11', N'test value 12', 111
UNION ALL SELECT 2, N'test value 21', N'test value 22', 222
UNION ALL SELECT 3, N'test value 31', N'test value 32', 333;
GO

DROP TABLE IF EXISTS dbo.test_table_three
GO

CREATE TABLE dbo.test_table_three
(
  id INT
  , first_name VARCHAR(128)
  , last_name VARCHAR(128)
);
GO

INSERT dbo.test_table_three (id, first_name, last_name)
SELECT 1, N'John', N'Smith'
UNION ALL SELECT 2, N'Ali', N'Bob'
UNION ALL SELECT 3, N'Samantha', N'Fox'
UNION ALL SELECT 4, N'Krali', N'Marko';
GO

DROP TABLE IF EXISTS dbo.test_table_four
GO

CREATE TABLE dbo.test_table_four
(
  id INT
  , col1 VARCHAR(128)
  , order_date_start DATETIME
  , order_date_end DATETIME
);
GO

INSERT dbo.test_table_four (id, col1, order_date_start, order_date_end)
SELECT 1, N'test value 11', DATEADD(MM, -3, GETDATE()), GETDATE()
UNION ALL SELECT 2, N'test value 21', DATEADD(YY, -3, GETDATE()), GETDATE()
UNION ALL SELECT 3, N'test value 21', DATEADD(YY, -1, GETDATE()), GETDATE();
GO

Azure

i created 2 pipelines (click to see the JSON) in Azure Synapse (ADF):

Quick explanation:

Pipeline extract_db

1 – Set current yyyyMMdd_HHmmss to variable unique_identifier

2 – Get the “global” parameters from table cfg.process_parameter

3 – Get the list of the active slaves

4 – Add new slaves in table cfg.table_to_extract and activate the active ones

5 – Update table cfg.process_paramater

6 – Get the list of the tables to extract from table cfg.table_to_extract

7 – Loop the list of the tables to extract

8 – Check the log table for errors

9 – If there is an error, send email

Note: in point 7 i replace CHAR(13) and CHAR(10) with spaces:

This is only needed when we extract to .csv. If we extract in .parquet, this is not needed.

10 – Remove old records from the log table

Activity foreach_table

1 – Generate the dynamic statement for extraction from INFORMATION_SCHEMA.COLUMNS

2 – Insert error in the log table

3 – Set value now() to variable “now”

4 – Set variable in table cfg.process_parameter

5 – Inject theparameters from table cfg.process_parameter

6 – Insert error in the log table

7 – Extract the table

8 – Insert error in the log table

9 – Set variables in table cfg.process_parameter

10 – Insert success in the log table

Activity send_error_email

1 – Get the errors in HTML format for the current execution, determined by variable unique_identifier

2 – Send email to an operator. See the code for the Runbook here.

The Result

Extracted tables:

Extraction errors email:

Keep it simple :-)


About Peter Lalovsky

I am Microsoft SQL Server certified professional, creating with T-SQL, SSRS, SSIS, ASP.NET/C#, Azure, Python, PowerShell and more on a daily basis since year 2006. In 2016 i wrote a book for beginner and intermediate T-SQL programmers which you can download here. This blog is something like my personal programming documentation. When i am not in front of a computer, i am around my paper car – Trabant 601.

Leave a comment

Your email address will not be published.

One thought on “Azure: Dynamic Extract from Multiple Servers with Error Handling (Log) and Send Error Email