When we insert data into spark dataframe, it continues to live in the source (delta table).
In this example i:
- Read a portion from delta table into spark dataframe
- Update this portion
- Delete the updated rows from the source (the delta table)
- Show the dataframe or save back into the delta table
Spark Dataframe
# Imports from pyspark.sql import types as st import pyspark.sql.functions as sf import delta.tables as dtb import datetime as dt
# Create dataframe for test_table
schema = st.StructType(
[
st.StructField("date", st.DateType())
, st.StructField("col_1", st.IntegerType())
, st.StructField("timestamp_dt", st.TimestampType())
]
)
df = spark.createDataFrame(
[
(dt.datetime(2022, 9, 1), 1, None)
, (dt.datetime(2022, 9, 2), 2, None)
, (dt.datetime(2022, 9, 2), 3, None)
, (dt.datetime(2022, 9, 3), 4, None)
, (dt.datetime(2022, 9, 3), 5, None)
, (dt.datetime(2022, 9, 3), 6, None)
]
, schema
)
display(df)
# Save df as delta table "test_table"
df.write\
.format("delta")\
.mode("overwrite")\
.save("/dbfs/FileStore/tables/test_table")

# Select WHERE date = '2022-09-03' from test_table
df = spark.read.format("delta").load("/dbfs/FileStore/tables/test_table").filter(sf.col("date") == "2022-09-03")
df = df.withColumn("timestamp_dt", sf.current_timestamp())
display(df)

# Delete date = '2023-09-03' from test_table
dtb_test_table = dtb.DeltaTable.forPath(spark, "/dbfs/FileStore/tables/test_table")
dtb_test_table.delete(sf.col("date") == "2022-09-03")
# Show test_table
display(spark.read.format("delta").load("/dbfs/FileStore/tables/test_table").orderBy("date"))

# Show the dataframe display(df)

Pandas Dataframe
When we select data from delta table into pandas dataframe, the data is transferred to the dataframe.
# Imports from pyspark.sql import types as st import pyspark.sql.functions as sf import delta.tables as dtb import datetime as dt
# Create dataframe for test_table
schema = st.StructType(
[
st.StructField("date", st.DateType())
, st.StructField("col_1", st.IntegerType())
, st.StructField("timestamp_dt", st.TimestampType())
]
)
df = spark.createDataFrame(
[
(dt.datetime(2022, 9, 1), 1, None)
, (dt.datetime(2022, 9, 2), 2, None)
, (dt.datetime(2022, 9, 2), 3, None)
, (dt.datetime(2022, 9, 3), 4, None)
, (dt.datetime(2022, 9, 3), 5, None)
, (dt.datetime(2022, 9, 3), 6, None)
]
, schema
)
display(df)
# Save df as delta table "test_table"
df.write\
.format("delta")\
.mode("overwrite")\
.save("/FileStore/tables/test_table")

# Select date = "2022-09-03" from test_table into spark dataframe
df = spark.read.format("delta").load("/FileStore/tables/test_table").filter(sf.col("date") == "2022-09-03")
df = df.withColumn("timestamp_dt", sf.current_timestamp())
display(df)
df = df.toPandas()
print(type(df))

# Delete date = "2023-09-03" from test_table
dtb_test_table = dtb.DeltaTable.forPath(spark, "/FileStore/tables/test_table")
dtb_test_table.delete(sf.col("date") == "2022-09-03")
# Show test_table
display(spark.read.format("delta").load("/FileStore/tables/test_table").orderBy("date"))

# Show the dataframe display(df)

# Insert the updated data back into the delta table
df = spark.createDataFrame(df)
df.write.format("delta").mode("append").save("/FileStore/tables/test_table")
# Show the delta table
df = spark.read.format("delta").load("/FileStore/tables/test_table")
display(df)

Keep it simple :-)
