Lets talk about Kafka

Apache Kafka is a distributed event streaming platform which is used by thousands of organisations (80% of top 100 Fortune companies)for data integration, streaming analytics, data pipelines and critical applications.

Kafka was named after Franz Kafka, a novelist and writer whose writings featured protagonists stuck in the socio-bureaucratic cycle. Jay Kreps — the cofounder of Confluent was impressed by Franz’s writings and opted to name the distributed system platform as Kafka — a system optimized for writing data streams.

Why Kafka?

Why do distributed systems need kafka? Why would the top notch organizations like Airbnb, Netflix, LinkedIn, Uber, Walmart, etc use it?

Hypothetically, let us assume Uber has a ride service that is responsible for making car bookings. Once a booking is made, it also needs to relay the booking’s information to several subsystems like -

  1. Recommendation Engine — One that would recommend the passenger based on the route.
  2. Surge Forecaster — To compute the real time demand
  3. Other systems like analytics, logger, etc.

Now if, let us assume, the booking’s information is also needed to be sent to data lake, the ride service will also have to setup the data lake’s connection. Similarly, if uber comes up with bike services, the “Bike Microservice” will have to connect to all the upstream services.

Do you see the problem?

Streaming data in a Multiple Source — Multiple Target setup is a nightmare — exactly what Kafka efficiently solves. Kafka works on a push-pull model wherein the producers pushes data to kafka while the consumers pull data from kafka.

Kafka offers:

  1. A high throughput distributed messaging system
  2. Horizontal scaling upto hundreds of brokers and millions of messages per second.
  3. Latency of less than 10 ms which is near real-time.

All the above traits makes it a highly scalable, highly reliable and a fault tolerant setup.

Building Blocks of Kafka

Let us look at various building blocks of Kafka.

Topics

A topic refers to a stream of data. It is equivalent to a table in the database. A producer writes to a topic and the consumers consume from that topic for that set of data.

Data in a topic lives in various partitions — all of which are ordered data sets and every data point in a partition is assigned an auto incremental id called offsets. Also, the data pushed into Kafka is immutable in nature.

Brokers

Each Kafka Cluster has multiple broker(servers) that holds the data. Each broker has some data for a topic i.e. a particular partition but not all, hence making it distributed.

If the producer/ consumer is connected to any one of thr brokers, it is connected to the entire cluster. The broker you are connected to is called the bootstrap broker.

The data is also replicated across at least 1(configurable) brokers, thus making it highly reliable. An any point of time, only 1 broker can be a leader for a given partition and only that leader can receive/ serve data for that partition, other brokers will sync the data and act as in-sync replica(ISR). The leader and ISRs are determined by zookeeper via FastLeaderElection logic.

kafka-topics --bootstrap-server 127.0.0.1:9092 --topic first_topic_1 --create --partitions 3 --replication-factor 2

To create a kafka topic, we need to connect to the bootstrap server and define the number of partitions and the replication factor.

Producers

Producer refers to the building block that writes data to topics. They exactly know which partition and which topic to write to and have the capability to recover in case a broker is down.

When writing data, a producer can choose to receive acknowledgement of data writes. This is configured in 3 ways:

  1. Acks = 0, which means the producer will just send the data and will not wait for acknowledgement. This setting provides lower latency and higher throughput at the cost of much higher risk of message loss.
  2. Acks = 1, i.e. the producer waits for the acknowledgement. The leader will write the record to its log but will respond without awaiting a full acknowledgment from all followers. The message will be lost only if the leader fails immediately after acknowledging the record, but before the followers have replicated it. This setting is the middle ground for latency, throughput, and durability. It is slower but more durable than acks=0 and is the default value.
  3. Acks = all, i.e. the producer gets an ack when all in-sync replicas have received the record. The leader will wait for the full set of in-sync replicas to acknowledge the record. This means that it takes a longer time to send a message with ack value all, but it gives the strongest message durability.
kafka-console-producer --bootstrap-server 127.0.0.1:9092 --topic first_topic_1 acks=all
>msg1
>msg2
>msg3
>

The above command opens up a simple input data stream from where you can publish messages.

What if we try to publish messages to a topic that doesn’t exist?

kafka-console-producer —- bootstrap-server 127.0.0.1:9092 —- topic non_existing_topic

We get a warning that says the leader is not available for this topic.

>msg1
[2021-09-15 08:48:45,471] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {first_topic_3=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

But the topic is created with default configs and the message is published as well.

kafka-topics --bootstrap-server 127.0.0.1:9092 --topic first_topic_1 --describe
Topic: non_existing_topic TopicId: uINeTkH_S7qsJxFBynUwWw PartitionCount: 1 ReplicationFactor: 1Configs: segment.bytes=1073741824
Topic: first_topic_3 Partition: 0 Leader: 0 Replicas: 0 Isr: 0

Here 1 partition with replication factor =1 is created. If you want to change this default setting, it can be changed by changing num.partitions=1 in the `zookeeper.properties`.

Consumers

Consumers are the building blocks that read data from topics. They know which broker to read from and know how to recover in case a broker is down. The data within each partition is ordered by design and hence is consumed in order as well for each partition but not across partitions.

kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic_1 --property print.partition=true
Partition:0 msg2
Partition:0 msg3

When a consumer is setup, it starts reading messages from that point only.

Consumer Groups

Consumers read data in groups called Consumer Groups. Each consumer within a consumer group reads from an exclusive partition. Consumer Groups use the GroupCoordinator and the ConsumerCoordinator to assign consumers to a partition.

If the number of consumers > number of partitions, some consumers are going to be inactive in nature. While if the number of consumers < number of partitions, some consumers will be reading from more than 1 partitions.

Additionally, when you add a new consumer in a Consumer Group, the consumer-parition relation is automatically rebalanced. e.g. For Topic T, we had 3 partitions P1, P2, P3 and 2 consumers C1, C2 in the group CG1. Lets say C1 was consuming from P1 and P2 while C2 was consuming from P3. If we now add C3 to the mix, the relation is rebalanced and can look like:

  • C1 reads from P1
  • C2 reads from P3
  • C3 reads from P2

On the flip side, if C2 is removed, C1 will start reading from all P1, P2 and P3.

To start a consumer within a consumer group,

kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic_1 --group cgid1

And we can check the active consumers by

kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --group cgid3 --describeGROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
cgid3 first_topic_1 0 1 1 0 consumer-cgid3-1-2e2465bc-31f0-4c2a-84a8-

Offsets

Kafka stores bookmarks at which a consumer group has been reading in the form of offsets. The offsets are committed live in a topic called __consumer_offsets.

Whenever a consumer group processes data, it needs to commit the latest offset that has been read and processed. This mechanism results in high reliability when a consumer dies as other/ new consumers can start reading onwards from the latest offset committed.

Delivery Semantics

The power to commit offsets lies with a consumer and it can choose to update it. There are 3 delivery semantics at play:

  • At most once — Offsets are committed as soon as the message is received by the consumer. If the message wasn’t processed, offset will still be committed. This is the default setting.
  • At least once — Offsets are committed only if the message is processed. The consumer’s system should be idempotent since a message can be read more than once.
  • Exactly once — Offsets are committed only if the message is processed and only once. The consumers need to be idempotent in nature. This is generally used only for Kafka to Kafka workflows using Kafka Streams API.

Broker Discovery

All the brokers in a cluster are bootstrap servers I.e. they know about all the brokers, topics, partitions in the cluster and clients can connect to any 1 of them. When a client connects to any one of them, it also requests for metadata. The bootstrap server in turn returns list of all the brokers in the cluster, thus enabling client to setup connection with other brokers as well.

The metadata response looks like

Metadata Response (Version: 0) => [brokers] [topics] 
brokers => node_id host port
node_id => INT32
host => STRING
port => INT32
topics => error_code name [partitions]
error_code => INT16
name => STRING
partitions => error_code partition_index leader_id [replica_nodes] [isr_nodes]
error_code => INT16
partition_index => INT32
leader_id => INT32
replica_nodes => INT32
isr_nodes => INT32

Zookeeper

In distributed systems, the major challenge is data inconsistency and not having agreement. Zookeeper provides a centralized service that maintains configuration information, DNS, and provides distributed synchronization.

Zookeeper has a leader that handles writes and other followers that handles reads. A zookeeper ensemble by design operates with odd number of servers only.

Kafka can’t work without zookeeper as

  • Zookeeper manages brokers and keeps a list of them
  • It helps in leader election for partitions
  • It notifies Kafka of changes like new topic, broker down, broker up, topic deletion, etc.