Azure Databricks: Extract from SQL Server and save CSV or JSON file in Azure Data Lake


Create table and insert data:

CREATE TABLE dbo.Person
(
	PersonID INT IDENTITY(1, 1) PRIMARY KEY
	, FirstName VARCHAR(32)
	, LastName VARCHAR(32)
	, PayRate DECIMAL(19, 4)
	, StartDate DATETIME
	, EndDate DATETIME
	, DateCreated DATETIME
	, DateLastModified DATETIME
);

INSERT dbo.Person (FirstName, LastName, PayRate, StartDate, EndDate, DateCreated)
SELECT				    'John', 'Smith', 1.2300, '1966-03-07', '2087-01-13', GETDATE()
UNION ALL SELECT  'George', 'Benson', 432.5210, '1987-11-23', '2003-04-07', GETDATE()
UNION ALL SELECT	'Anna', 'Larson', 2.4120, '1954-03-17', '1966-07-15', GETDATE()
UNION ALL SELECT	'Bilbo', 'Beggins', 77.4500, '1968-11-14', '1970-03-08', GETDATE()
UNION ALL SELECT	'Stephanie', 'Garcia', -3.1400, '2073-03-17', '2086-12-21', GETDATE()
UNION ALL SELECT	'Gregory', 'Jacson', 3.1400, '2005-08-05', '2009-07-03', GETDATE();

SELECT *
FROM Person;

Databricks commands:

# SQL Server Configuration
hostname = 'my-sql-server.database.windows.net'
port = 1433
database = 'my_database'
username = 'my_username'
password = 'my_password'
url = 'jdbc:sqlserver://{0}:{1}'.format(hostname, port)

connectionProperties = {
  'user': username,
  'password': password,
  'driver': 'com.microsoft.sqlserver.jdbc.SQLServerDriver',
  'database': database
}

The values for hostname, port, database and username can be found in the connection string of the database.

%sql
-- Pass parameters from ADF pipeline
CREATE WIDGET TEXT table_name DEFAULT 'Person1';

Create a widget to pass parameters from Azure Data Factory.

# Query the database
query = '(SELECT * FROM dbo.' + dbutils.widgets.get("table_name") + ') AS Person' # Dymnamic SQL

df = spark.read.jdbc(url = url
  , table = query
  , properties = connectionProperties
)

As many as needed parameters can be declared and used in a dynamic SQL query. The result is stored in data frame (df). The content can be shown with the command display(df).

# ADSL Configuration
spark.conf.set(
  'fs.azure.account.key.mystorageaccount.dfs.core.windows.net'
  , 'my_key'
)

spark.conf.set() define the access key for the connection to Data Lake. The access key can be found in Azure Portal.

# Save to single CSV file (instead of partitioned ones) --> coalesce(1)
destination_path = 'abfss://mycontainer@mystorageaccount.dfs.core.windows.net/my_folder_name'

df.coalesce(1).write\
  .format('csv')\
  .option('header', True)\
  .mode('overwrite')\
  .option('sep', '¤')\
  .option("quoteAll", True)\
  .save(destination_path + '/tempPerson')

# Give the saved file a friendly name
files = dbutils.fs.ls(destination_path + '/tempPerson')
csv_file = [x.path for x in files if x.path.endswith('.csv')][0]
dbutils.fs.mv(csv_file, destination_path + '/Person.csv')
dbutils.fs.rm(destination_path + '/tempPerson', recurse = True)

The coalesce() method  changes the default behavior of Spark (save in many partitions) and saves into single file.

As the file name is not friendly, the file is renamed.

Finally, the temp folder is removed.

To export in JSON file:

# Save to single JSON file (instead of partitioned ones) --> coalesce(1)
destination_path = 'abfss://mycontainer@mystorageaccount.dfs.core.windows.net/my_folder_name'

df.coalesce(1).write\
  .format('json')\
  .option('header', True)\
  .mode('overwrite')\
  .option('sep', '¤')\
  .option("quoteAll", True)\
  .save(destination_path + '/tempPerson')

# Give the saved file a friendly name
files = dbutils.fs.ls(destination_path + '/tempPerson')
csv_file = [x.path for x in files if x.path.endswith('.json')][0]
dbutils.fs.mv(csv_file, destination_path + '/Person.json')
dbutils.fs.rm(destination_path + '/tempPerson', recurse = True)

Keep it simple :-)

Leave a comment

Your email address will not be published. Required fields are marked *