Keep it simple
Keep it simple

Databricks: Spark Dataframe is not Material, but Pandas Dataframe is

When we insert data into spark dataframe, it continues to live in the source (delta table).

In this example i:

  • Read a portion from delta table into spark dataframe
  • Update this portion
  • Delete the updated rows from the source (the delta table)
  • Show the dataframe or save back into the delta table

Spark Dataframe

# Imports

from pyspark.sql import types as st
import pyspark.sql.functions as sf
import delta.tables as dtb
import datetime as dt
# Create dataframe for test_table

schema = st.StructType(
    [
        st.StructField("date", st.DateType())
        , st.StructField("col_1", st.IntegerType())
        , st.StructField("timestamp_dt", st.TimestampType())
    ]
)

df = spark.createDataFrame(
    [
        (dt.datetime(2022, 9, 1), 1, None)
        , (dt.datetime(2022, 9, 2), 2, None)
        , (dt.datetime(2022, 9, 2), 3, None)
        , (dt.datetime(2022, 9, 3), 4, None)
        , (dt.datetime(2022, 9, 3), 5, None)
        , (dt.datetime(2022, 9, 3), 6, None)
    ]
    , schema    
)

display(df)

# Save df as delta table "test_table"

df.write\
    .format("delta")\
    .mode("overwrite")\
    .save("/dbfs/FileStore/tables/test_table")
test_table
# Select WHERE date = '2022-09-03' from test_table

df = spark.read.format("delta").load("/dbfs/FileStore/tables/test_table").filter(sf.col("date") == "2022-09-03")
df = df.withColumn("timestamp_dt", sf.current_timestamp())
display(df)
Select one day in dataframe
# Delete date = '2023-09-03' from test_table

dtb_test_table = dtb.DeltaTable.forPath(spark, "/dbfs/FileStore/tables/test_table")
dtb_test_table.delete(sf.col("date") == "2022-09-03")
# Show test_table

display(spark.read.format("delta").load("/dbfs/FileStore/tables/test_table").orderBy("date"))
test_table after delete
# Show the dataframe

display(df)
The dataframe

Pandas Dataframe

When we select data from delta table into pandas dataframe, the data is transferred to the dataframe.

# Imports

from pyspark.sql import types as st
import pyspark.sql.functions as sf
import delta.tables as dtb
import datetime as dt
# Create dataframe for test_table

schema = st.StructType(
    [
        st.StructField("date", st.DateType())
        , st.StructField("col_1", st.IntegerType())
        , st.StructField("timestamp_dt", st.TimestampType())
    ]
)

df = spark.createDataFrame(
    [
        (dt.datetime(2022, 9, 1), 1, None)
        , (dt.datetime(2022, 9, 2), 2, None)
        , (dt.datetime(2022, 9, 2), 3, None)
        , (dt.datetime(2022, 9, 3), 4, None)
        , (dt.datetime(2022, 9, 3), 5, None)
        , (dt.datetime(2022, 9, 3), 6, None)
    ]
    , schema    
)

display(df)

# Save df as delta table "test_table"

df.write\
    .format("delta")\
    .mode("overwrite")\
    .save("/FileStore/tables/test_table")
test_table
# Select date = "2022-09-03" from test_table into spark dataframe

df = spark.read.format("delta").load("/FileStore/tables/test_table").filter(sf.col("date") == "2022-09-03")
df = df.withColumn("timestamp_dt", sf.current_timestamp())
display(df)
df = df.toPandas()
print(type(df))
Select one day in pandas dataframe
# Delete date = "2023-09-03" from test_table

dtb_test_table = dtb.DeltaTable.forPath(spark, "/FileStore/tables/test_table")
dtb_test_table.delete(sf.col("date") == "2022-09-03")
# Show test_table

display(spark.read.format("delta").load("/FileStore/tables/test_table").orderBy("date"))
test_table after delete
# Show the dataframe

display(df)
The dataframe is not empty
# Insert the updated data back into the delta table

df = spark.createDataFrame(df)
df.write.format("delta").mode("append").save("/FileStore/tables/test_table")
# Show the delta table
 
df = spark.read.format("delta").load("/FileStore/tables/test_table")
display(df)
The updated data is appended back to the delta table

Keep it simple :-)

Leave a comment

Your email address will not be published. Required fields are marked *

Time limit exceeded. Please complete the captcha once again.