An ETL that i build recently instigated me to share the following excerpt of python code. The players in this ETL are:
- Apache Kafka (Source)
- Azure Data Factory (ETL app)
- Azure Databricks (Extract and Transform with Python)
- Azure Data Lake Storage (File storage)
- Cosmos DB (Destination)
In this example i speak only about the “T” part of the ETL – transform.
The messages in Kafka should look like this:

The result should be:

The modifications:
- Keep column “id” as “original_id”
- Modify column “id” as “group_id|id” – composite primary key.
- Keep the last modified element if duplicate – the first 2 rows are such a duplicate
The dataflow
- Consume messages from Kafka – read them in a loop one by one
- Apply the mentioned modifications
- Append the message in JSON array
- Save JSON file in ADLS
The Python code for the modifications:
import json
people_string = '''
[
{
"id": 1,
"group_id": 1,
"first_name": "John",
"last_name": "Smith",
"phone": "542-555-5134",
"salary": 23.456,
"emails": ["john.smith@my_fake_email.com", "john.smith@my_fake_work-place.com"],
"has_license": false,
"date_last_modified": "2011-09-11T14:08:27.136753"
},
{
"id": 1,
"group_id": 1,
"first_name": "John",
"last_name": "Smith",
"salary": 123.456,
"emails": ["[email protected]", "[email protected]", "[email protected]"],
"has_license": true,
"date_last_modified": "2012-10-10T10:33:56.236"
},
{
"id": 1,
"group_id": 2,
"first_name": "John",
"last_name": "Smith",
"phone": "632-555-1111",
"salary": 52.142,
"emails": ["[email protected]"],
"has_license": true,
"date_last_modified": "2003-05-11T05:17:32.667"
},
{
"id": 2,
"group_id": 1,
"first_name": "Jack",
"last_name": "Dicker",
"phone": "513-555-5376",
"number_of_cars": 3,
"emails": null,
"has_license": true,
"date_last_modified": "2011-11-05T15:17:47.967"
},
{
"id": 3,
"group_id": 1,
"first_name": "Ana",
"last_name": "Larson",
"phone": "714-555-6376",
"number_of_cars": 0,
"emails": ["[email protected]", "[email protected]"],
"has_license": true,
"date_last_modified": "2011-02-13T04:55:11.437"
}
]
'''
#------------------------------
source = json.loads(people_string)
destination = []
for src in source:
do_append = True
src['original_id'] = src['id']
src['id'] = str(src['group_id']) + '|' + str(src['id'])
for dstn in destination:
if (str(src['group_id']) + '|' + str(src['id']) == str(dstn['group_id']) + '|' + str(dstn['id'])):
if(src['date_last_modified'] > dstn['date_last_modified']):
destination.remove(dstn)
else:
do_append = False
if (do_append == True):
destination.append(src)
print(json.dumps(destination, indent = 2))
Rows 4 to 59 simulate the massages in Kafka.
The first loop (row 67) simulates the loop that consumes Kafka.
The second loop (row 73) checks if the key “group_id|id” exists in the resulting array and delete/do not append to keep only the last modified object.
Keep it simple :-)

2 thoughts on “Python: Build JSON Array and keep the last object based on key”