Topics in Kafka can be subdivided into partitions. For example, while creating a topic named Demo, you might configure it to have three partitions. The server would create three log files, one for each of the demo partitions. When a producer published a message to the topic, it would assign a partition ID for that message. The server would then append the message to the log file for that partition only.
If you then started two consumers, the server might assign partitions 1 and 2 to the first consumer, and partition 3 to the second consumer. Each consumer would read only from its assigned partitions. You can see the Demo topic configured for three partitions in Figure 1.
To expand the scenario, imagine a Kafka cluster with two brokers, housed in two machines. When you partitioned the demo topic, you would configure it to have two partitions and two replicas. For this type of configuration, the Kafka server would assign the two partitions to the two brokers in your cluster. Each broker would be the leader for one of the partitions.
When a producer published a message, it would go to the partition leader. The leader would take the message and append it to the log file on the local machine. The second broker would passively replicate that commit log to its own machine. If the partition leader went down, the second broker would become the new leader and start serving client requests. In the same way, when a consumer sent a request to a partition, that request would go first to the partition leader, which would return the requested messages.
Consider the benefits of partitioning a Kafka-based messaging system:
The producer is responsible for deciding what partition a message will go to. The producer has two options for controlling this assignment:
For the simple producer/consumer example in Part 1, we used a DefaultPartitioner. Now we'll try creating a custom partitioner instead. For this example, let's assume that we have a retail site that consumers can use to order products anywhere in the world. Based on usage, we know that most consumers are in either the United States or India. We want to partition our application to send orders from the US or India to their own respective consumers, while orders from anywhere else will go to a third consumer.
To start, we'll create a CountryPartitioner that implements the org.apache.kafka.clients.producer.Partitioner interface. We must implement the following methods:
Note that when Kafka calls configure(), the Kafka producer will pass all the properties that we've configured for the producer to the Partitioner class. It is essential that we read only those properties that start with partitions., parse them to get the partitionId, and store the ID in countryToPartitionMap.
Below is our custom implementation of the Partitioner interface.
The Producer class in Listing 2 (below) is very similar to our simple producer from Part 1, with two changes marked in bold:
The Kafka server guarantees that a partition is assigned to only one consumer, thereby guaranteeing the order of message consumption. You can manually assign a partition or have it assigned automatically.
If your business logic demands more control, then you'll need to manually assign partitions. In this case you would use KafkaConsumer.assign(<listOfPartitions>) to pass a list of partitions that each consumer was interested in to the Kakfa server.
Having partitions assigned automatically is the default and most common choice. In this case, the Kafka server will assign a partition to each consumer, and will reassign partitions to scale for new consumers.
Say you're creating a new topic with three partitions. When you start the first consumer for the new topic, Kafka will assign all three partitions to the same consumer. If you then start a second consumer, Kafka will reassign all the partitions, assigning one partition to the first consumer and the remaining two partitions to the second consumer. If you add a third consumer, Kafka will reassign the partitions again, so that each consumer is assigned a single partition. Finally, if you start fourth and fifth consumers, then three of the consumers will have an assigned partition, but the others won't receive any messages. If one of the initial three partitions goes down, Kafka will use the same partitioning logic to reassign that consumer's partition to one of the additional consumers.
We'll use automatic assignment for the example application. Most of our consumer code will be the same as it was for the simple consumer seen in Part 1. The only difference is that we'll pass an instance of ConsumerRebalanceListener as a second argument to our KafkaConsumer.subscribe() method. Kafka will call methods of this class every time it either assigns or revokes a partition to this consumer. We'll override ConsumerRebalanceListener's onPartitionsRevoked() and onPartitionsAssigned() methods and print the list of partitions that were assigned or revoked from this subscriber.
We're ready to run and test the current iteration of our producer/consumer application. As you've done previously, you can use the code in Listings 1 through 3, or download the complete source code on GitHub.
Figure 2 shows producer/consumer output in the partitioned topic.
Being able to partition a single topic into multiple parts is one essential to Kafka's scalability. Partitioning lets you scale your messaging infrastructure horizontally while also maintaining order within each partition. Next we'll look at how Kafka uses message offsets to track and manage complex messaging scenarios.
I mentioned in Part 1 that whenever a producer publishes a message, the Kafka server assigns an offset to that message. A consumer is able to control which messages it wants to consume by setting or resetting the message offset. When developing a consumer you have two options for managing the offset: automatic and manual.
When you start a consumer in the Kafka client, it will read the value of your ConsumerConfig.AUTO_OFFSET_RESET_CONFIG(auto.offset.reset) configuration. If that config is set to earliest then the consumer will start with the smallest offset available for the topic. In its first request to Kafka, the consumer will say: give me all the messages in this partition with an offset greater than the smallest one available. It will also specify a batch size. The Kafka server will return it all the matching messages in batches of the specified size.
The consumer keeps track of the offset of the last message it has processed, so it will always request messages with an offset higher than the last offset. This setup works when a consumer is functioning normally, but what happens if the consumer crashes, or you want to stop it for maintenance In this case you would want the consumer to remember the offset of last message processed, so that it can start with the first unprocessed message.
In order to ensure message persistence, Kafka uses two types of offset: The current offset is used to track messages consumed when the consumer is working normally. The committed offset also tracks the last message offset, but it sends that information to the Kafka server for persistent storage.
If the consumer goes down or is taken down for some reason, it can query the Kafka server for the last committed offset and resume message consumption as if no time has been lost. For its part, the Kafka broker stores this information in a topic called __consumer_offsets. This data is replicated to multiple brokers so that the broker won't ever lose the offsets.
You have a choice about how often to commit offset data. If you commit frequently, you'll take a performance penalty. On the other hand, if the consumer does go down you will have fewer messages to reprocess and consume. Your other option is to commit less frequently (for better performance), but reprocess more messages in case of failure. In either case the consumer has two options for committing the offset:
Let's consider three use cases where you wouldn't want to use Kafka's default offset management infrastructure. Instead, you'll manually decide what message to to start from.
The consumer code that we've developed so far auto-commits records every 5 seconds. Now let's update the consumer to take a third argument that manually sets your offset consumption.
If you use the value of the last argument equal to 0, the consumer will assume that you want to start from the beginning, so it will call a kafkaConsumer.seekToBeginning() method for each of its partitions. If you pass a value of -1 it will assume that you want to ignore the existing messages and only consume messages published after the consumer has been restarted. In this case it will call kafkaConsumer.seekToEnd() on each of the partitions. Finally, if you specify any value other than 0 or -1 it will assume that you have specified the offset that you want the consumer to start from; for example, if you pass the third value as 5, then on restart the consumer will consume messages with an offset greater than 5. For this it would call kafkaConsumer.seek(<topicname>, <startingoffset>).
Once your code is ready you can test it by executing following command:
The Kafka client should print all the messages from an offset of 0, or you could change the value of the last argument to jump around in the message queue.
Traditional messaging use cases can be divided into two main types: point to point and publish-subscribe. In a point-to-point scenario, one consumer consumes one message. When a message relays a bank transaction, only one consumer should respond by updating the bank account. In a publish-subscribe scenario, multiple consumers will consume a single message but respond differently to it. When a web server goes down, you want the alert to go to consumers programmed to respond in different ways.
Queue refers to a point-to-point scenario, where a message is consumed by only one consumer. Topic refers to a publish-subscribe scenario, where a message is consumed by every consumer. Kafka doesn't define a separate API for the queue and topic use cases; instead, when you start your consumer you need to specify the ConsumerConfig.GROUP_ID_CONFIG property.
If you use the same GROUP_ID_CONFIG for more than one consumer, Kafka will assume that both of them are part of a single group, and it will deliver messages to only one of the consumers. If you start the two consumers in separate group.ids, Kafka will assume that they are not related, so each consumer will get its own copy of the message.
Recall that the partitioned consumer in Listing 3 takes groupId as its second parameter. Now we'll use the groupId parameter to implement both queue and topic use cases for the consumer.
Early use cases for big data message systems called for batch processing, such as running a nightly ETL process or moving data from the RDBMS to a NoSQL datastore at regular intervals. In the past few years the demand for realtime processing has increased, especially for fraud detection and emergency response systems. Kafka was built for just these types of realtime scenarios.
Kafka is a great open source product but it does have some limitations; for instance you can't query data from inside a topic before it reaches its destination, or replicate data across multiple geographically distributed clusters. You could combine MapR Streams (a commercial product) with the Kafka API for these and other more complex publish-subscribe scenarios.