An ETL that i build recently instigated me to share the following excerpt of python code. The players in this ETL are:
- Apache Kafka (Source)
- Azure Data Factory (ETL app)
- Azure Databricks (Extract and Transform with Python)
- Azure Data Lake Storage (File storage)
- Cosmos DB (Destination)
In this example i speak only about the “T” part of the ETL – transform.
The messages in Kafka should look like this:

The result should be:

The modifications:
- Keep column “id” as “original_id”
- Modify column “id” as “group_id|id” – composite primary key.
- Keep the last modified element if duplicate – the first 2 rows are such a duplicate
The dataflow
- Consume messages from Kafka – read them in a loop one by one
- Apply the mentioned modifications
- Append the message in JSON array
- Save JSON file in ADLS
The Python code for the modifications:
import json people_string = ''' [ { "id": 1, "group_id": 1, "first_name": "John", "last_name": "Smith", "phone": "542-555-5134", "salary": 23.456, "emails": ["john.smith@my_fake_email.com", "john.smith@my_fake_work-place.com"], "has_license": false, "date_last_modified": "2011-09-11T14:08:27.136753" }, { "id": 1, "group_id": 1, "first_name": "John", "last_name": "Smith", "salary": 123.456, "emails": ["john.smith@not-existing-email.com", "john.smith@your-work-place.com", "john@fake-smith.com"], "has_license": true, "date_last_modified": "2012-10-10T10:33:56.236" }, { "id": 1, "group_id": 2, "first_name": "John", "last_name": "Smith", "phone": "632-555-1111", "salary": 52.142, "emails": ["john.smith@example.com"], "has_license": true, "date_last_modified": "2003-05-11T05:17:32.667" }, { "id": 2, "group_id": 1, "first_name": "Jack", "last_name": "Dicker", "phone": "513-555-5376", "number_of_cars": 3, "emails": null, "has_license": true, "date_last_modified": "2011-11-05T15:17:47.967" }, { "id": 3, "group_id": 1, "first_name": "Ana", "last_name": "Larson", "phone": "714-555-6376", "number_of_cars": 0, "emails": ["ana@larson.com", "ana.larson@gmail.com"], "has_license": true, "date_last_modified": "2011-02-13T04:55:11.437" } ] ''' #------------------------------ source = json.loads(people_string) destination = [] for src in source: do_append = True src['original_id'] = src['id'] src['id'] = str(src['group_id']) + '|' + str(src['id']) for dstn in destination: if (str(src['group_id']) + '|' + str(src['id']) == str(dstn['group_id']) + '|' + str(dstn['id'])): if(src['date_last_modified'] > dstn['date_last_modified']): destination.remove(dstn) else: do_append = False if (do_append == True): destination.append(src) print(json.dumps(destination, indent = 2))
Rows 4 to 59 simulate the massages in Kafka.
The first loop (row 67) simulates the loop that consumes Kafka.
The second loop (row 73) checks if the key “group_id|id” exists in the resulting array and delete/do not append to keep only the last modified object.
Keep it simple :-)
Pingback: Python: Jupiter Notebooks Development on localhost – Peter Lalovsky
Pingback: Azure: Pass secrets to Azure Function via Key Vault – Peter Lalovsky