Out of Order Consumer Offset Commit in Kafka | Advantages & Disadvantages

  Рет қаралды 1,824

Knowledge Amplifier

Knowledge Amplifier

Күн бұрын

This video explains the Out of Order Consumer Offset Commit in Kafka with it's advantages & disadvantages.
Kafka Commands:
-------------------------------------
F:/kafka_2.12-3.3.1/bin/windows/zookeeper-server-start.bat F:/kafka_2.12-3.3.1/config/zookeeper.properties
F:/kafka_2.12-3.3.1/bin/windows/kafka-server-start.bat F:/kafka_2.12-3.3.1/config/server.properties
F:/kafka_2.12-3.3.1/bin/windows/kafka-topics.bat --create --topic sensor_data_consumer2 --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
F:/kafka_2.12-3.3.1/bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" --from-beginning
F:/kafka_2.12-3.3.1/bin/windows/kafka-topics.bat --create --topic sensor_data_consumer --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
Producer Code:
---------------------------
from time import sleep
from json import dumps
from kafka import KafkaProducer
def custom_partitioner(key, all_partitions, available):
"""
Customer Kafka partitioner to get the partition corresponding to key
:param key: partitioning key
:param all_partitions: list of all partitions sorted by partition ID
:param available: list of available partitions in no particular order
:return: one of the values from all_partitions or available
"""
print("The key is : {}".format(key))
print("All partitions : {}".format(all_partitions))
print("After decoding of the key : {}".format(key.decode('UTF-8')))
return int(key.decode('UTF-8'))%len(all_partitions)
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: dumps(x).encode('utf-8'),partitioner=custom_partitioner)
topic_name='sensor_data_consumer'
for e in range(0,100):
data={"number":e}
producer.send(topic_name, key=str(e).encode(), value=data)
sleep(10)
Simple Consumer Code:
-------------------------------------------
from kafka import KafkaConsumer
from kafka import TopicPartition , OffsetAndMetadata
import kafka
import json
class MyConsumerRebalanceListener(kafka.ConsumerRebalanceListener):
def on_partitions_revoked(self, revoked):
print("Partitions %s revoked" % revoked)
def on_partitions_assigned(self, assigned):
print("Partitions %s assigned" % assigned)
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='demo112215sgtrjwrykvjh', auto_offset_reset='earliest',
enable_auto_commit=False)
listener = MyConsumerRebalanceListener()
consumer.subscribe('sensor_data_consumer2',listener=listener)
for message in consumer:
print(message)
print("The value is : {}".format(message.value))
tp=TopicPartition(message.topic,message.partition)
om = OffsetAndMetadata(message.offset+1, message.timestamp)
consumer.commit({tp:om})
print('*' * 100)
Consumer with offset commit when message=4 and exit the code:
---------------------------------------------------------------------------------------------------------------
from kafka import KafkaConsumer
from kafka import TopicPartition , OffsetAndMetadata
import kafka
import json
from sys import exit
class MyConsumerRebalanceListener(kafka.ConsumerRebalanceListener):
def on_partitions_revoked(self, revoked):
print("Partitions %s revoked" % revoked)
def on_partitions_assigned(self, assigned):
print("Partitions %s assigned" % assigned)
consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='demo112215sgtrjwrykvjh', auto_offset_reset='earliest',
enable_auto_commit=False)
listener = MyConsumerRebalanceListener()
consumer.subscribe('sensor_data_consumer',listener=listener)
for message in consumer:
print(message)
print("The value is : {}".format(message.value))
if(message.value['number'] == 4):
print("inside if")
tp=TopicPartition(message.topic,message.partition)
om = OffsetAndMetadata(message.offset+1, message.timestamp)
consumer.commit({tp:om})
print('*' * 100)
exit()
print('*' * 100)
Check this playlist for more Data Engineering related videos:
• Demystifying Data Engi...
Apache Kafka form scratch
• Apache Kafka for Pytho...
Snowflake Complete Course from scratch with End-to-End Project with in-depth explanation--
doc.clickup.co...
🙏🙏🙏🙏🙏🙏🙏🙏
YOU JUST NEED TO DO
3 THINGS to support my channel
LIKE
SHARE
&
SUBSCRIBE
TO MY KZbin CHANNEL

Пікірлер: 3
Reference Architecture for  Batch ETL Workloads & Best Practices
13:35
Knowledge Amplifier
Рет қаралды 2,1 М.
Running With Bigger And Bigger Lunchlys
00:18
MrBeast
Рет қаралды 136 МЛН
啊?就这么水灵灵的穿上了?
00:18
一航1
Рет қаралды 51 МЛН
My Daughter's Dumplings Are Filled With Coins #funny #cute #comedy
00:18
Funny daughter's daily life
Рет қаралды 30 МЛН
Life hack 😂 Watermelon magic box! #shorts by Leisi Crazy
00:17
Leisi Crazy
Рет қаралды 79 МЛН
Apache Kafka Consumer Lag Analysis in-depth intuition
14:57
Knowledge Amplifier
Рет қаралды 9 М.
End-to-End Big Data Project: Architecture, Implementation, and Deployment
1:36:04
Introduction to Schema Registry in Kafka | Part 1
30:42
Knowledge Amplifier
Рет қаралды 9 М.
Kafka Producer Key & Message Acknowledgements
21:14
Knowledge Amplifier
Рет қаралды 5 М.
Topics, Partitions and Offsets:  Apache Kafka Tutorial #2
6:41
Anton Putra
Рет қаралды 24 М.
Optimizing Kafka Producers and Consumers: A Hands-On Guide
27:33
Rock the JVM
Рет қаралды 8 М.
Running With Bigger And Bigger Lunchlys
00:18
MrBeast
Рет қаралды 136 МЛН