This python code can be used to extract two files from Kafka in Azure Datalake (ADLS):
- extract/kafka/topic/topic_{YYYYMMDD_HHMMSS}.json – no duplicates (PK: parentId|id)
- extract/kafka/topic_history/topic_{YYYYMMDD_HHMMSS}.json – all the rows (PK: parentId|id|date_created)
If case of error, the KafkaException is exported in a file with name error_topic_{YYYYMMDD_HHMMSS}.txt. ADF determines if there is an error, based on the file extension (.json or .txt).
from confluent_kafka import Consumer, KafkaException
from azure.storage.filedatalake import DataLakeServiceClient
import time
from datetime import datetime
import json
import threading
# Debug Info - Start
start_time = datetime.now()
print('start time: ' + start_time.strftime('%Y-%m-%d %H:%M:%S.%f'))
# Debug Info - End
# Parameters
par_group_id = 'my-synapse-workspace_my_topic'
par_max_rows = 10000
par_file_system = 'my_extract_folder/kafka'
par_directory = 'my_topic'
# Internal Variables
kafka_conf = {'bootstrap.servers': 'server1:9092,server2:9092',
'group.id': par_group_id,
'enable.auto.commit': 'false',
'auto.offset.reset': 'earliest',
'sasl.mechanism': 'SCRAM-SHA-512',
'sasl.username': 'my_username',
'sasl.password':'my_password',
'security.protocol': 'sasl_ssl'}
adls_account_url = 'https://my-adls-container.dfs.core.windows.net'
adls_credential = 'my_adls_credential'
now = time.strftime("%Y%m%d_%H%M%S", time.gmtime())
topics = ['my_topic']
json_array = []
json_array_history = []
def data_manipulation(msg, data_type):
data = json.loads(bytes.decode(msg.value()))
data['original_id'] = str(data['id'])
if(data_type == 'data'):
for i in json_array:
if(i['id'] == str(data['parent_id']) + '|' + str(data['id'])):
json_array.remove(i)
break
data['id'] = str(data['parent_id']) + '|' + str(data['id'])
json_array.append(data)
else:
data['id'] = str(data['parent_id']) + '|' + str(data['id']) + '|' + data['date_created']
json_array_history.append(data)
def save_in_adls(file_name, file_content, dir_extension):
service_client = DataLakeServiceClient(
account_url = adls_account_url,
credential = adls_credential
)
file_system_client = service_client.get_file_system_client(file_system = par_file_system)
directory_client = file_system_client.get_directory_client(par_directory + dir_extension)
file_client = directory_client.get_file_client(file_name)
file_client.upload_data(str(file_content), overwrite = True)
def extract_and_save_in_adls():
do_commit = True;
file_name = 'my_topic_' + now + ".json"
try:
c = Consumer(kafka_conf)
c.subscribe(topics)
counter = 0
consumer_start_time = time.time()
while (True):
if (counter < par_max_rows):
msg = c.poll(timeout=1.0)
if (msg is None):
if(int(time.time()) - int(consumer_start_time) > 10):
break
continue
if msg.error():
if (msg.error().code() == -191):
break
else:
raise KafkaException(msg.error())
else:
threading.Thread(target = data_manipulation(msg, 'data'))
threading.Thread(target = data_manipulation(msg, 'data_history'))
counter += 1
else:
break
if (counter > 0):
threading.Thread(target = save_in_adls(file_name, json.dumps(json_array), ''))
threading.Thread(target = save_in_adls(file_name, json.dumps(json_array_history), '_history'))
# Debug Info - Start
print('try:')
print(' counter: ', str(counter))
# Debug Info - End
except KafkaException:
do_commit = False
file_name = 'error_my_topic_' + now + ".txt"
save_in_adls(file_name, msg.error(), '')
finally:
if (do_commit == True):
c.commit()
c.close()
# Debug Info - Start
print('finally:')
print(' do_commit: ' + str(do_commit))
end_time = datetime.now()
print(' end_time: ' + str(end_time))
print(' duration: ' + str(end_time - start_time))
# Debug Info - End
result = {}
result['file_name'] = file_name
result['num_of_rows'] = str(len(json_array))
result['num_of_rows_history'] = str(len(json_array_history))
return json.dumps(result)
result = extract_and_save_in_adls()
print('Execution result: ' + result)
After you make sure that all works es expected, you can:
- remove the blocks of code, marked with “Debug Info”
- share your experience.
The cluster has to be set like this:

Keep it simple :-)

2 thoughts on “Python: Extract from Kafka with Azure Data Factory (Synapse) and Databricks”