Python: Extract from Kafka with Azure Data Factory (Synapse) and Databricks 2


This python code can be used to extract two files from Kafka in Azure Datalake (ADLS):

  • extract/kafka/topic/topic_{YYYYMMDD_HHMMSS}.json – no duplicates (PK: parentId|id)
  • extract/kafka/topic_history/topic_{YYYYMMDD_HHMMSS}.json – all the rows (PK: parentId|id|date_created)

If case of error, the KafkaException is exported in a file with name error_topic_{YYYYMMDD_HHMMSS}.txt. ADF determines if there is an error, based on the file extension (.json or .txt).

from confluent_kafka import Consumer, KafkaException
from azure.storage.filedatalake import DataLakeServiceClient
import time
from datetime import datetime
import json
import threading

# Debug Info - Start
start_time = datetime.now()
print('start time: ' + start_time.strftime('%Y-%m-%d %H:%M:%S.%f'))
# Debug Info - End



# Parameters
par_group_id = 'my-synapse-workspace_my_topic'
par_max_rows = 10000
par_file_system = 'my_extract_folder/kafka'
par_directory = 'my_topic'

# Internal Variables
kafka_conf = {'bootstrap.servers': 'server1:9092,server2:9092',
    'group.id': par_group_id,
    'enable.auto.commit': 'false',
    'auto.offset.reset': 'earliest',
    'sasl.mechanism': 'SCRAM-SHA-512',
    'sasl.username': 'my_username',
    'sasl.password':'my_password',
    'security.protocol': 'sasl_ssl'}
adls_account_url = 'https://my-adls-container.dfs.core.windows.net'
adls_credential = 'my_adls_credential'
now = time.strftime("%Y%m%d_%H%M%S", time.gmtime())
topics = ['my_topic']
json_array = []
json_array_history = []



def data_manipulation(msg, data_type):
    data = json.loads(bytes.decode(msg.value()))
    data['original_id'] = str(data['id'])    
    if(data_type == 'data'):
        for i in json_array:
            if(i['id'] == str(data['parent_id']) + '|' + str(data['id'])):
                json_array.remove(i)
                break
        data['id'] = str(data['parent_id']) + '|' + str(data['id'])
        json_array.append(data)
    else:
        data['id'] = str(data['parent_id']) + '|' + str(data['id']) + '|' + data['date_created']
        json_array_history.append(data)



def save_in_adls(file_name, file_content, dir_extension):
    service_client = DataLakeServiceClient(
        account_url = adls_account_url,
        credential = adls_credential
    )    
    file_system_client = service_client.get_file_system_client(file_system = par_file_system)    
    directory_client = file_system_client.get_directory_client(par_directory + dir_extension)
    file_client = directory_client.get_file_client(file_name)
    file_client.upload_data(str(file_content), overwrite = True)



def extract_and_save_in_adls():
    do_commit = True;
    file_name = 'my_topic_' + now + ".json"

    try:
        c = Consumer(kafka_conf)
        c.subscribe(topics)

        counter = 0
        consumer_start_time = time.time()
            
        while (True):
            if (counter < par_max_rows):
                msg = c.poll(timeout=1.0)

                if (msg is None):                
                    if(int(time.time()) - int(consumer_start_time) > 10):
                        break
                    
                    continue
                
                if msg.error():                
                    if (msg.error().code() == -191):
                        break
                    else:
                        raise KafkaException(msg.error())
                else:              
                    threading.Thread(target = data_manipulation(msg, 'data'))
                    threading.Thread(target = data_manipulation(msg, 'data_history'))
                    
                    counter += 1
            else:
                break
                
        if (counter > 0):            
            threading.Thread(target = save_in_adls(file_name, json.dumps(json_array), ''))
            threading.Thread(target = save_in_adls(file_name, json.dumps(json_array_history), '_history'))
            
        # Debug Info - Start
        print('try:')
        print('    counter: ', str(counter))
        # Debug Info - End

    except KafkaException:
        do_commit = False
        file_name = 'error_my_topic_' + now + ".txt"
        save_in_adls(file_name, msg.error(), '')

    finally:
        if (do_commit == True):
            c.commit()
        c.close()
        
        # Debug Info - Start
        print('finally:')
        print('    do_commit: ' + str(do_commit))
        end_time = datetime.now()
        print('    end_time: ' + str(end_time))
        print('    duration: ' + str(end_time - start_time))
        # Debug Info - End
        
        result = {}
        result['file_name'] = file_name
        result['num_of_rows'] = str(len(json_array))
        result['num_of_rows_history'] = str(len(json_array_history))
        
        return json.dumps(result)

result = extract_and_save_in_adls()
print('Execution result: ' + result)

After you make sure that all works es expected, you can:

  • remove the blocks of code, marked with “Debug Info”
  • share your experience.

The cluster has to be set like this:

Keep it simple :-)


Leave a comment

Your email address will not be published. Required fields are marked *

2 thoughts on “Python: Extract from Kafka with Azure Data Factory (Synapse) and Databricks