Exploring Kafka Consumer’s Internals

Exploring Kafka Consumer’s Internals

This is the second part of a series where we explore Kafka’s client’s internals. This post focuses on the Kafka Consumer.

In a previous blog post, we covered Kafka Producer’s internals and outlined the scale of the traffic we handle: 310B msg/day, with 300 TB/day IN and 920 TB/day OUT. Today we are sharing our expertise on Kafka Client’s internals.

Kafka Consumer is a client that consumes messages from Kafka.

Components:

  • Consumer Metadata — manages metadata needed by the consumer: topics and partitions in the cluster, broker node serving as the leader of the partition, etc.
  • Subscriptions — tracks the consumer subscription state
  • Deserializers — record key and value deserializers. Deserializer converts the byte array to the object.
  • Partition Assignor — fully qualified class name representing partition assignment strategy that implements org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
  • Interceptors — interceptors that possibly mutate the records
  • Consumer Coordinator — manages group membership, offsets
  • Network Client — handles requests to brokers
  • Fetcher — fetches the batches of records from brokers.

Configuring Kafka Consumer

Kafka Consumer has four required properties:

  • bootstrap.servers — A list of host/port pairs to establish the initial connection to the Kafka cluster. Format: “host1:port1,host2:port2,…”
  • key.deserializer — Fully qualified class name representing key deserializer that implements org.apache.kafka.common.serialization.Deserializer interface.
  • value.deserializer — Fully qualified class name representing value deserializer that implements org.apache.kafka.common.serialization.Deserializer interface.
  • group.id — Unique consumer group id. Required if client uses subscribe() method for consuming messages or uses offset management capabilities.

Subscribing to topics

Kafka Consumer offers two ways for subscribing to the topics: via subscribe() and assign() methods. Each has a different set of supported features.

subscribe()

  • supports subscription to the list of topics or using regular expression
  • group membership with consumer failure detection(client-side and server-side)
  • dynamic partition assignment
  • automatic or manual offsets management
  • single consumer per partition(using standard partition assignors)

assign()

  • finer control with topic-partition subscription
  • automatic or manual offsets management
  • supports multiple consumers per partition

Apache Flink and Spark use assign() for subscribing to the topics and manage the distribution of topic-partition pairs across workers.

Consuming messages

Once subscribed to the topics, the user can consume messages via the invocation of the poll() method. Depending on the subscription method, several calls happen behind the scenes, listed below in sequence diagrams.

Consuming with subscribe()

In the case of subscribe(), Kafka Consumer won’t consume records till it becomes an active member of a consumer group. We saw some teams missing this point and spending time debugging stopped consumption, while the root cause was an unstable consumer group.

Consuming with assign()

In the case of assign(), Kafka Consumer won’t invoke group membership functionality, such as heart beating and join/re-join the consumer group.

Fetcher

Regardless of the subscription method, Kafka Consumer uses Fetcher to retrieve batch records from brokers.

Fetcher keeps in memory compressed batches, as sent by Producer, and decompresses records on poll(). Once the batch is consumed, it’s discarded from memory. fetch.min.bytes and fetch.max.wait.ms are key configs for tuning the fetcher for throughput or latency. Increasing bytes and time will result in increased throughput and decreasing in better latency.

Consumer group

When a user uses subscribe() for consumption, consumers with the same group.id will form a consumer group and cooperate to consume topic(s) messages. Kafka cluster will elect one of the brokers as a Group Coordinator. The group coordinator is responsible for managing group list membership, receiving heartbeats, triggering rebalances on group membership changes etc. The coordinator will elect one consumer as a Group Leader and ask to do partitions assignments across consumers. Each partition will have only one consumer assigned.

Consumer group rebalances

Changes in group membership will trigger consumer group rebalances. During rebalances, the group leader will recalculate partition assignments across current members. Rebalance is triggered when

  • consumer joins the group
  • consumer leaves the group
  • client-side failure detected via max.poll.interval.ms
  • server-side failure detected via session.timeout.ms

List of possible causes that could trigger consumer group rebalance:

  • service scaling IN our OUT
  • poll() and long message processing happening in the same thread
  • failure of heartbeat to the Group Coordinator
  • JVM Garbage Collection pauses
  • Kubernetes PODS CPU throttled
  • Kubernetes cluster upgrades, resulting in PODs evictions
  • Networking issues(latencies, packet drops, etc)

Consumer Partition Assignor

Kafka Consumer offers a couple of options to choose on how partitions are distributed during rebalances. The user can control this via partition.assignment.strategy configuration parameter, with the value of a fully qualified class name implementing org.apache.kafka.clients.consumer.ConsumerPartitionAssignor. Out of the box, Kafka offers the following strategies:

Range — stop the world strategy, works on a topic basis. It might generate imbalanced assignments. More details javadoc.

RoundRobin — stop the world strategy, uniformly distributes partitions for identical subscriptions. More details in javadoc.

Sticky — stop the world strategy, and initial distribution will be close to RoundRobin. Tries to minimize partitions moving during rebalances, might generate imbalanced assignments. More details in javadoc.

CooperativeSticky — incremental rebalance without stopping consumption. It’s the same logic as for Sticky, but with incremental support. This strategy could generate imbalanced assignments. More details in javadoc.

What’s next?

In the next blogposts, we will cover monitoring Producer and Consumer, performance tuning and a couple of additional technical aspects.

References

https://kafka.apache.org/documentation/#consumerconfigs

https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/clients/producer

Alternate Text Gọi ngay