Over the past few years, Kafka has emerged to solve a variety of use cases. In the simplest case, it could be a simple buffer for storing application logs. Combined with a technology like Spark Streaming, it can be used to track data changes and take action on that data before saving it to a final destination. Kafka's predictive mode makes it a powerful tool for detecting fraud, such as checking the validity of a credit card transaction when it happens, and not waiting for batch processing hours later.
This two-part tutorial introduces Kafka, starting with how to install and run it in your development environment. You'll get an overview of Kafka's architecture, followed by an introduction to developing an out-of-the-box Kafka messaging system. Finally, you'll build a custom producer/consumer application that sends and consumes messages via a Kafka server. In the second half of the tutorial you'll learn how to partition and group messages, and how to control which messages a Kafka consumer will consume.
Apache Kafka is messaging system built to scale for big data. Similar to Apache ActiveMQ or RabbitMq, Kafka enables applications built on different platforms to communicate via asynchronous message passing. But Kafka differs from these more traditional messaging systems in key ways:
Before we explore Kafka's architecture, you should know its basic terminology:
Kafka's architecture is very simple, which can result in better performance and throughput in some systems. Every topic in Kafka is like a simple log file. When a producer publishes a message, the Kafka server appends it to the end of the log file for its given topic. The server also assigns an offset, which is a number used to permanently identify each message. As the number of messages grows, the value of each offset increases; for example if the producer publishes three messages the first one might get an offset of 1, the second an offset of 2, and the third an offset of 3.
When the Kafka consumer first starts, it will send a pull request to the server, asking to retrieve any messages for a particular topic with an offset value higher than 0. The server will check the log file for that topic and return the three new messages. The consumer will process the messages, then send a request for messages with an offset higher than 3, and so on.
In Kafka, the client is responsible for remembering the offset count and retrieving messages.The Kafka server doesn't track or manage message consumption. By default, a Kafka server will keep a message for seven days. A background thread in the server checks and deletes messages that are seven days or older. A consumer can access messages as long as they are on the server. It can read a message multiple times, and even read messages in reverse order of receipt. But if the consumer fails to retrieve the message before the seven days are up, it will miss that message.
We'll build a custom application in this tutorial, but let's start by installing and testing a Kafka instance with an out-of-the-box producer and consumer.
You've seen how Kafka works out of the box. Next, let's develop a custom producer/consumer application. The producer will retrieve user input from the console and send each new line as a message to a Kafka server. The consumer will retrieve messages for a given topic and print them to the console. The producer and consumer components in this case are your own implementations of kafka-console-producer.sh and kafka-console-consumer.sh.
Let's start by creating a Producer.java class. This client class contains logic to read user input from the console and send that input as a message to the Kafka server.
We configure the producer by creating an object from the java.util.Properties class and setting its properties. The ProducerConfig class defines all the different properties available, but Kafka's default values are sufficient for most uses. For the default config we only need to set three mandatory properties:
BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers) sets a list of host:port pairs used for establishing the initial connections to the Kakfa cluster in the host1:port1,host2:port2,... format. Even if we have more than one broker in our Kafka cluster, we only need to specify the value of the first broker's host:port. The Kafka client will use this value to make a discover call on the broker, which will return a list of all the brokers in the cluster. It's a good idea to specify more than one broker in the BOOTSTRAP_SERVERS_CONFIG, so that if that first broker is down the client will be able to try other brokers.
The Kafka server expects messages in byte[] key, byte[] value format. Rather than converting every key and value, Kafka's client-side library permits us to use friendlier types like String and int for sending messages. The library will convert these to the appropriate type. For example, the sample app doesn't have a message-specific key, so we'll use null for the key. For the value we'll use a String, which is the data entered by the user on the console.
To configure the message key, we set a value of KEY_SERIALIZER_CLASS_CONFIG on the org.apache.kafka.common.serialization.ByteArraySerializer. This works because null doesn't need to be converted into byte[]. For the message value, we set VALUE_SERIALIZER_CLASS_CONFIG on the org.apache.kafka.common.serialization.StringSerializer, because that class knows how to convert a String into a byte[].
After filling the Properties class with the necessary configuration properties, we can use it to create an object of KafkaProducer. Whenever we want to send a message to the Kafka server after that, we'll create an object of ProducerRecord and call the KafkaProducer's send() method with that record to send the message. The ProducerRecord takes two parameters: the name of the topic to which message should be published, and the actual message. Don't forget to call the Producer.close() method when you're done using the producer:
Next we'll create a simple consumer that subscribes to a topic. Whenever a new message is published to the topic, it will read that message and print it to the console. The consumer code is quite similar to the producer code. We start by creating an object of java.util.Properties, setting its consumer-specific properties, and then using it to create a new object of KafkaConsumer. The ConsumerConfig class defines all the properties that we can set. There are just four mandatory properties:
Just as we did for the producer class, we'll use BOOTSTRAP_SERVERS_CONFIG to configure the host/port pairs for the consumer class. This config lets us establish the initial connections to the Kakfa cluster in the host1:port1,host2:port2,... format.
As I previously noted, the Kafka server expects messages in byte[] key and byte[] value formats, and has its own implementation for serializing different types into byte[]. Just as we did with the producer, on the consumer side we'll have to use a custom deserializer to convert byte[] back into the appropriate type.
In the case of the example application, we know the producer is using ByteArraySerializer for the key and StringSerializer for the value. On the client side we therefore need to use org.apache.kafka.common.serialization.ByteArrayDeserializer for the key and org.apache.kafka.common.serialization.StringDeserializer for the value. Setting those classes as values for KEY_DESERIALIZER_CLASS_CONFIG and VALUE_DESERIALIZER_CLASS_CONFIG will enable the consumer to deserialize byte[] encoded types sent by the producer.
Finally, we need to set the value of the GROUP_ID_CONFIG. This should be a group name in string format. I'll explain more about this config in a minute. For now, just look at the Kafka consumer with the four mandatory properties set:
Writing the consumer code in Listing 2 in two parts ensures that we close the Consumer object before exiting. I'll describe each class in turn. First, ConsumerThread is an inner class that takes a topic name and group name as its arguments. In the run() method it creates a KafkaConsumer object, with appropriate properties. It subscribes to the topic that was passed as an argument in the constructor, by calling the kafkaConsumer.subscribe() method, then polls the Kafka server every 100 milliseconds to check if there are any new messages in the topic. It will iterate through the list of any new messages and print them to the console.
In the Consumer class we create a new object of ConsumerThread and start it in a different thread. The ConsumerThead starts an infinite loop and keeps polling the topic for new messages. Meanwhile in the Consumer class, the main thread waits for a user to enter exit on the console. Once a user enters exit, it calls the KafkaConsumer.wakeup() method, causing the KafkaConsumer to stop polling for new messages and throw a WakeupException. We can then close the KafkaConsumer gracefully, by calling kafkaConsumer's close() method.
To test this application you can run the code in Listings 1 and 2 from your IDE, or you can follow these steps:
In the first half of this tutorial you've learned the basics of big data messaging with Kafka, including a conceptual overview of Kafka, setup instructions, and how to configure a producer/consumer messaging system with Kafka.
As you've seen, Kafka's architecture is both simple and efficient, designed for performance and throughput. In Part 2 I'll introduce some more advanced techniques for distributed messaging with Kafka, starting with using partitions to subdivide topics. I'll also demonstrate how to manage message offsets in order to support different use cases.