kafka

Kafka Consumers

Writing a Kafka consumer seems simple, subscribe to a topic, and consume messages as they appear. Unfortunately life’s not that simple, there’s a lot of things to take into account.

Configuration

  • Topics

    • Kafka puts messages into topics, you can think of these as subreddits, or IRC channels. Messages placed on to a topic can be read by one or more consumers.
  • Partitions

    • Topics can be cut up into partitions. The best way to think of these is to think of shards, that is, each partition is a going to carry different messages placed on the topic.
  • Offsets

    • A consumer usually wants to know two things to save it repeating work already done by other consumers. The first thing is, what messages have already been read, and the second is, what messages have already been processed. These may be two different things, for example if I get a text message on my phone asking me to call someone, I read that message, then I need to do the action that message asks me to complete.

    • The two offsets are known as the consumed offset (which messages have been read), and the committed offset (which messages have had their actions completed).

    • Kafka defaults to marking messages as committed after 5 seconds, but this may not be accurate (eg. long running tasks, or, worse, consumers erroring and/or crashing during processing).

  • Asynchronous/Synchronous consumers

    • Kafka allows consumers to either act synchronously when consuming messages, or asynchronously. Synchronous consumers block the topic/partition that they are reading from, preventing other consumers from accessing the next message(s) on that topic/partition until the blocking consumer says that it has finished processing the message.

    • Asynchronous consumers will not block the topic/partition, meaning that many consumers can act on many messages in that topic/partition concurrently.

  • Consumer Groups

    • Kafka Consumer groups are groups of consumers that manage partition assignment, consumers being added, consumers leaving the group.

Multiple Consumers per topic

A lot of people are making a fundamental mistake, their goal is to have multiple consumers on a topic/partition, so that those consumers can all work concurrently. many hands making light work.

To do this they set Kafka to allow asynchronous consumers, which is absolutely fine. Kafka dutifully records which messages have been read by the asynchronous consumers, which is, again, perfectly adequate.

Unfortunately the next part is the problem, determining which messages that have been consumed, have been successfully processed.

The committed offset that Kafka maintains is no longer of any use - there are no guarantees that the consumers will complete their tasks in the same order that they were given the tasks. That is, the consumer that was dealing with message 1 is not necessarily guaranteed to complete its task before the consumer that is dealing with message 11, meaning that the consumer dealing with message 11 telling Kafka that the committed offset can be set to 11 completely ignores if messages 1-10 have been completed too. If the consumer working message 1 finishes after the consumer handling message 11, what should the committed offset be?

People following this thought pattern then fall into the trap of having a third party storage system storing the committed values, but this quickly leads to a complicated and broken solution - storing a ‘state’ for each message, showing if it’s been completed, has retryable errors, or needs to be handled manually. The information that is required is - what is the earliest message that needs to be (re)fetched, what messages need to be retried, what messages are impossible to deal with.

A slightly better, but still broken, approach is to have consumers take a message, and if it errors with a retryable error, place it into a new topic/queue so that that message can be tried again. If a message has been tried too many times and still erroring, or has errors that cannot be recovered at all, then place it in a Dead Letter Queue, which can be another topic/partition.

Whilst this seems complete, it’s still broken -in both solutions if a consumer dies after reading a message, but before the message can be marked as complete (including the error states), then what happens? Does another service need to run to determine if messages are being dropped and and not completed? How long should a consumer be working on a message before it’s considered to be broken? (A consumer will produce a heartbeat, which eliminates some of this issue).

To be clear, this race condition is impossible to navigate around, and it’s pure folly to even try.

Using asynchronous consumers, then, is only feasible when it’s acceptable that some (or even all) of the messages are never processed.

Improved solution

I think that a much better solution is to make use of synchronous consumers (meaning that the blocking behavior ensures that the committed offset does not produce any gaps), and have those consumers managed in a consumer group, and, to ensure that many hands make light work, cut the topic up into multiple partitions.

By having the consumers in a group, the number of consumers can range up or down as determined by the number of messages being handled at any given point in time, that is, when a large number of messages arrive to be processed, consumers can be “spun up” to process those messages. Further, when the number of messages reduces, any excess of consumers can be switched off.

The number of partitions created determines the maximum number of messages that can be processed concurrently. So getting the number of partitions right is important. Kafka supports 200k partitions per cluster (2008)

As a rule of thumb, we recommend each broker to have up to 4,000 partitions and each cluster to have up to 200,000 partitions

How many partitions should be created? Little’s Law guides this, I have created some examples of Little’s Law usage for buffer/cache size calculations, which I recommend for determining the number of partitions that would be required for a given workload.

Edit: A single cluster will not provide infinite resources, so developers need to calculate the maximum number of messages that can be processed per broker/cluster, and, if necessary, create more clusters.

For example, if the system is expecting 80,000 messages per second, and the broker will allow a maximum of 4,000 partitions, and the consumers cannot average the 20 messages per second required to keep up, then another broker needs to be created, and the messages need to be split between the brokers, load balancing, or sharding, the traffic.

Summary

Kafka is a highly configurable messaging system, that comes with a requirement that you understand the effect of the configuration choices you make. Choosing wisely allows consumers to be created that provide guarantees that usecases might demand.

Published:
comments powered by Disqus