Azure Databricks: Read/Write files from/to Azure Data Lake


Upload file result.csv into Azure Data Lake (in mystorageaccount/mycontainer).

result.csv

Databricks commands:

# ADLS Key
spark.conf.set(
  'fs.azure.account.key.mystorageaccount.dfs.core.windows.net'
  , 'my_key'
)

spark.conf.set() define the access key for the connection to Data Lake. The access key can be found in Azure Portal.

# List folder in ADLS
dbutils.fs.ls('abfss://mycontainer@mystorageaccount.dfs.core.windows.net/Test_CSV')

To list a folder in ADLS, use dbutils.fs.ls().

# Read file from ADLS
source_file = 'abfss://mycontainer1@mystorageaccount.dfs.core.windows.net/result.csv'

df = spark.read\
  .format('csv')\
  .option('inferSchema', True)\
  .option('header', True)\
  .option('delimiter', ',')\
  .load(source_file)

display(df)

To read a file in ADLS, use spark.read(). The result is inserted in a DataFrame (df). Next it can be manipulated in Databricks.

To browse the DataFrame – display(df).

# Show the schema
df.printSchema()

To show the schema of the DataFrame –  df.printSchema().

# Create temp view from the DataFrame
df.createOrReplaceTempView('result_temp_view')

Create a temporary view in Databricks that will allow the manipulation of the data.

%sql
-- Manipulate the DataFrame with SQL
CREATE OR REPLACE TEMPORARY VIEW result_temp_view_1 AS
SELECT
  *
  , CASE
    WHEN score > 200 THEN 'Good'
    ELSE 'Bad'
  END AS derived_column
FROM result_temp_view
ORDER BY score

Use SQL to create a temporary view and to add derived column.

# Create DataFrame from temporary view
df1 = spark.sql('SELECT * FROM result_temp_view_1')

Create DataFrame from temporary view.

# Save the DataFrame in file on ADLS
destination_parh = 'abfss://mycontainer@mystorageaccount.dfs.core.windows.net'

# Parquet (insert the result from the temporary view)
df1.write.format('parquet')\
  .mode('overwrite')\
  .save(destination_parh + '/Databricks_Parquet')

# JSON
df.write.format('json')\
  .mode('overwrite')\
  .save(destination_parh + '/Databricks_JSON')

# CSV
df.write.format('csv')\
  .mode('overwrite')\
  .option('header', True)\
  .option('sep', ',')\
  .save(destination_parh + '/Databricks_CSV')

Save DataFrame in Parquet, JSON or CSV file in ADLS. The output is saved in Delta Lake – an open-source storage layer that brings ACID (atomicity, consistency, isolation, and durability) transactions to Apache Spark and big data workloads.

The files in Delta Lake are partitioned and they do not have friendly names:

# Read Parquet Delta Lake
df_parquet = spark.read\
  .parquet(destination_parh + '/Databricks_Parquet')

display(df_parquet)

Read Parquet Delta Lake:

# Read JSON Delta Lake
df_json = spark.read\
  .json(destination_parh + '/Databricks_JSON')

display(df_json)

Read JSON Delta Lake.

# Read CSV Delta Lake
df_csv = spark.read\
  .option('header', True)\
  .csv(destination_parh + '/Databricks_CSV')

display(df_csv)

Read CSV Delta Lake.

Keep it simple :-)

Leave a comment

Your email address will not be published. Required fields are marked *