An ETL that i build recently instigated me to share the following excerpt of python code. The players in this ETL are:

In this example i speak only about the “T” part of the ETL – transform.

The messages in Kafka should look like this:

To simplify the visualization, i present the JSON as a table.

The result should be:

The modifications:

  • Keep column “id” as “original_id”
  • Modify column “id” as “group_id|id” – composite primary key.
  • Keep the last modified element if duplicate – the first 2 rows are such a duplicate

The dataflow

  • Consume messages from Kafka – read them in a loop one by one
  • Apply the mentioned modifications
  • Append the message in JSON array
  • Save JSON file in ADLS

The Python code for the modifications:

import json

people_string = '''
    "id": 1,
    "group_id": 1,
    "first_name": "John",
    "last_name": "Smith",
    "phone": "542-555-5134",
    "salary": 23.456,
    "emails": ["", ""],
    "has_license": false,
    "date_last_modified": "2011-09-11T14:08:27.136753"
    "id": 1,
    "group_id": 1,
    "first_name": "John",
    "last_name": "Smith",
    "salary": 123.456,
    "emails": ["", "", ""],
    "has_license": true,
    "date_last_modified": "2012-10-10T10:33:56.236"
    "id": 1,
    "group_id": 2,
    "first_name": "John",
    "last_name": "Smith",
    "phone": "632-555-1111",
    "salary": 52.142,
    "emails": [""],
    "has_license": true,
    "date_last_modified": "2003-05-11T05:17:32.667"
    "id": 2,
    "group_id": 1,
    "first_name": "Jack",
    "last_name": "Dicker",
    "phone": "513-555-5376",
    "number_of_cars": 3,
    "emails": null,
    "has_license": true,
    "date_last_modified": "2011-11-05T15:17:47.967"
    "id": 3,
    "group_id": 1,
    "first_name": "Ana",
    "last_name": "Larson",
    "phone": "714-555-6376",
    "number_of_cars": 0,
    "emails": ["", ""],
    "has_license": true,
    "date_last_modified": "2011-02-13T04:55:11.437"


source = json.loads(people_string)
destination = []

for src in source:
    do_append = True

    src['original_id'] = src['id']
    src['id'] = str(src['group_id']) + '|' + str(src['id'])
    for dstn in destination:        
        if (str(src['group_id']) + '|' + str(src['id']) == str(dstn['group_id']) + '|' + str(dstn['id'])):
            if(src['date_last_modified'] > dstn['date_last_modified']):
                do_append = False

    if (do_append == True):

print(json.dumps(destination, indent = 2))

Rows 4 to 59 simulate the massages in Kafka.

The first loop (row 67) simulates the loop that consumes Kafka.

The second loop (row 73) checks if the key “group_id|id” exists in the resulting array and delete/do not append to keep only the last modified object.

Keep it simple :-)

