This python code can be used to extract two files from Kafka in Azure Datalake (ADLS):
- extract/kafka/topic/topic_{YYYYMMDD_HHMMSS}.json – no duplicates (PK: parentId|id)
- extract/kafka/topic_history/topic_{YYYYMMDD_HHMMSS}.json – all the rows (PK: parentId|id|date_created)
If case of error, the KafkaException is exported in a file with name error_topic_{YYYYMMDD_HHMMSS}.txt. ADF determines if there is an error, based on the file extension (.json or .txt).
from confluent_kafka import Consumer, KafkaException from azure.storage.filedatalake import DataLakeServiceClient import time from datetime import datetime import json import threading # Debug Info - Start start_time = datetime.now() print('start time: ' + start_time.strftime('%Y-%m-%d %H:%M:%S.%f')) # Debug Info - End # Parameters par_group_id = 'my-synapse-workspace_my_topic' par_max_rows = 10000 par_file_system = 'my_extract_folder/kafka' par_directory = 'my_topic' # Internal Variables kafka_conf = {'bootstrap.servers': 'server1:9092,server2:9092', 'group.id': par_group_id, 'enable.auto.commit': 'false', 'auto.offset.reset': 'earliest', 'sasl.mechanism': 'SCRAM-SHA-512', 'sasl.username': 'my_username', 'sasl.password':'my_password', 'security.protocol': 'sasl_ssl'} adls_account_url = 'https://my-adls-container.dfs.core.windows.net' adls_credential = 'my_adls_credential' now = time.strftime("%Y%m%d_%H%M%S", time.gmtime()) topics = ['my_topic'] json_array = [] json_array_history = [] def data_manipulation(msg, data_type): data = json.loads(bytes.decode(msg.value())) data['original_id'] = str(data['id']) if(data_type == 'data'): for i in json_array: if(i['id'] == str(data['parent_id']) + '|' + str(data['id'])): json_array.remove(i) break data['id'] = str(data['parent_id']) + '|' + str(data['id']) json_array.append(data) else: data['id'] = str(data['parent_id']) + '|' + str(data['id']) + '|' + data['date_created'] json_array_history.append(data) def save_in_adls(file_name, file_content, dir_extension): service_client = DataLakeServiceClient( account_url = adls_account_url, credential = adls_credential ) file_system_client = service_client.get_file_system_client(file_system = par_file_system) directory_client = file_system_client.get_directory_client(par_directory + dir_extension) file_client = directory_client.get_file_client(file_name) file_client.upload_data(str(file_content), overwrite = True) def extract_and_save_in_adls(): do_commit = True; file_name = 'my_topic_' + now + ".json" try: c = Consumer(kafka_conf) c.subscribe(topics) counter = 0 consumer_start_time = time.time() while (True): if (counter < par_max_rows): msg = c.poll(timeout=1.0) if (msg is None): if(int(time.time()) - int(consumer_start_time) > 10): break continue if msg.error(): if (msg.error().code() == -191): break else: raise KafkaException(msg.error()) else: threading.Thread(target = data_manipulation(msg, 'data')) threading.Thread(target = data_manipulation(msg, 'data_history')) counter += 1 else: break if (counter > 0): threading.Thread(target = save_in_adls(file_name, json.dumps(json_array), '')) threading.Thread(target = save_in_adls(file_name, json.dumps(json_array_history), '_history')) # Debug Info - Start print('try:') print(' counter: ', str(counter)) # Debug Info - End except KafkaException: do_commit = False file_name = 'error_my_topic_' + now + ".txt" save_in_adls(file_name, msg.error(), '') finally: if (do_commit == True): c.commit() c.close() # Debug Info - Start print('finally:') print(' do_commit: ' + str(do_commit)) end_time = datetime.now() print(' end_time: ' + str(end_time)) print(' duration: ' + str(end_time - start_time)) # Debug Info - End result = {} result['file_name'] = file_name result['num_of_rows'] = str(len(json_array)) result['num_of_rows_history'] = str(len(json_array_history)) return json.dumps(result) result = extract_and_save_in_adls() print('Execution result: ' + result)
After you make sure that all works es expected, you can:
- remove the blocks of code, marked with “Debug Info”
- share your experience.
The cluster has to be set like this:

Keep it simple :-)
Pingback: Python: Jupiter Notebook Development on localhost – Peter Lalovsky
Pingback: Azure: Pass secrets to Azure Function via Key Vault – Peter Lalovsky