Kafka is publish-subscribe high-throughput distributed messaging system. A single Kafka broker can handle hundreds of megabytes of reads and writes per second from thousands of clients. Kafka is designed to allow a single cluster to serve as the central data backbone for a large organization. It can be elastically and transparently expanded without downtime. Data streams are partitioned and spread over a cluster of machines to allow data streams larger than the capability of any single machine and to allow clusters of coordinated consumers. Messages are persisted on disk and replicated within the cluster to prevent data loss. Each broker can handle terabytes of messages without performance impact. Distributed by Design – Kafka has a modern cluster-centric design that offers strong durability and fault-tolerance guarantees.
Configure Kafka
Install Kafka
Kafka is distributed by CDH and requires the following services:
- Kafka Broker – Add the Kafka Broker service to a node with plenty of memory and CPU.
- Note: On installation you will configure the Kafka broker Default Group: change the Kafka Data directory logs.dirs from /var/local/kafka/data to /space#/kafka/data (with n number of volumes). See the Kafka configuration section below for more settings.
- Kafka MirrorMaker – Does not need to be installed onto the same node as the Broker. Is not needed for every installation – a common usage of the MirrorMaker is to move data between Data Centers.
How to install Kafka
- Install Kafka Parcel
- In Cloudera Manager, click on Parcels.
- Find the Kafka Parcel, click Distribute.
- Click Activate, but do not restart the cluster.
- Install Kafka Service
- In Cloudera Manager, click on the cluster name.
- In the upper right hand corner, click on Actions, and Add a New Service.
- Select Kafka in the list.
- Follow the instructions and configure Kafka using the below list.
Kafka Configuration
Configure Kafka as follows:
Upgrade Kafka to 0.9.x
Using Kafka CDH Parcels, it is easy to upgrade Kafka. Use CM to distribute the Kafka CDH Parcel.
- By default, the new version of the Kafka broker binds to only the hostname interface instead of all. This breaks any environments that need to contact the Kafka Brokers via localhost for the hostname. We’ll need to add to the Kafka Broker Advanced Configuration Snippet (Safety Valve) for kafka.properties:
- listeners=PLAINTEXT://0.0.0.0:9092
- Producers must define bootstrap.servers
- New style consumers must define bootstrap.servers otherwise old style consumers can continue to use
zookeeper.connect
Web Service Buffering
An important design consideration for a web service using Kafka is to build in a buffering to handle unexpected Kafka service failure or expected maintenance. Bruce is one example:
https://techbeacon.com/when-kafka-met-bruce-how-one-service-can-boost-app-messaging-reliability
https://github.com/ifwe/bruce
Administer Kafka
Kafka Command-line Tools
Important Kafka command-line tools are located in /usr/bin, others are located in /opt/cloudera/parcels/KAFKA-0.8.2.0-1.kafka1.3.1.p0.9/lib/kafka/bin/:
Topics
kafka-topics
Create, alter, list, and describe topics. For example:
To create: kafka-topics –create –topic testtopic –partition 1 –replication-factor 1 –zookeeper zk.servername01/kafka
To list: kafka-topics –list –zookeeper zk.servername01/kafka
sink1
t1
t2
Describe
Describe a topic:
kafka-topics –zookeeper zk.servername01:2181/kafka –describe –topic the_name_of_topic
Or:
kafka-topics –zookeeper servername01 –describe –topic the_name_of_topic
Consume
Kafka Consumer using ZooKeeper to handle offsets:
kafka-console-consumer
Read data from a Kafka topic and write it to standard output. For example:
kafka-console-consumer –zookeeper zk.servername01/kafka –topic topic_name
kafka-console-consumer –zookeeper servername01:2181 –consumer.config consumer.config –topic topic_name
Use the new Kafka Consumer using Kafka to handle offsets:
/usr/bin/kafka-console-consumer –new-consumer –bootstrap-server kafka.broker.servername01:9092 –topic topic_name –max-messages 10 –from-beginning
Limit the number of records you consume with max-messages:
–max-messages 10
Consume from the beginning with from-beginning:
–from-beginning
Produce
kafka-console-producer
Read data from standard output and write it to a Kafka topic. For example:
kafka-console-producer –broker-list kafka.broker.servername01:9092 –topic topic_name
Get Offsets
GetOffsetShell
It can be crucial to know the most recent (latest) offsets for your topic. Use the GetOffsetShell class:
/usr/bin/kafka-run-class kafka.tools.GetOffsetShell –broker-list kafka.broker.servername01:9092,kafka.broker.servername02:9092 –topic topic_name –time -1
Note:
–time <Long: timestamp/-1(latest)/-2 timestamp of the offsets before that (earliest)>
ConsumerOffsetChecker
Look at offsets:
/usr/bin/kafka-run-class kafka.tools.ConsumerOffsetChecker –group cloudera_mirrormaker –zookeeper zk.servername01
kafka-consumer-offset-checker
Check the number of messages read and written, as well as the lag for each consumer in a specific consumer group. For example:
kafka-consumer-offset-checker –group flume –topic topic_name –zookeeper zk.servername01
Note: forceStartOffsetTime value can be -2, -1, or a time stamp in milliseconds:
-1 to read the latest offset of the topic
-2 to read from the beginning.
timestamp to read from a specific time
Get a list of Kafka consumer groups
If you’re using a console consumer, it will try to use a random(ish) consumer group ID unless you specifically tell it otherwise. You can do this, however, by creating a consumer.properties file and putting a group.id=(some name) property into it, and then running the consumer with the –consumer.config=consumer.properties command line argument. For now, feel free to make up your own consumer group name, and then you can use that group name to later do the query that you want to do.
There’s probably a better way to go about it, but you can use the Zookeeper client to get a list of existing consumer groups, under /consumers.
Change the Log Retention Time
For any existing topics, we can always change the log retention time. It takes a few minutes to take effect, so it’s best to do this well before we encounter an outage situation where this suddenly becomes extremely important.
Get current retention time for a topic:
kafka-topics –zookeeper zk.servername01:2181 –describe –topic topic_name
Change the log retention to 7 days on one topic:
kafka-topics –zookeeper zk.servername01:2181 –alter –topic topic_name –config retention.ms=604800000
Here’s a way to change the log retention to 7 days for every topic in a cluster, from that cluster’s app server:
ZOOKEEPER=”zk.servername01:2181″# milliseconds for one week: 604800000
MILLIS_PER_WEEK=604800000kafka-topics –zookeeper $ZOOKEEPER –list | while read -r f; do
kafka-topics –zookeeper $ZOOKEEPER –alter –topic “$f” –config retention.ms=$MILLIS_PER_WEEK
done
Add Partitions to an Existing Topic
There are times when you need additional partitions.
First describe the topic:
kafka-topics –zookeeper zk.servername01:2181/kafka –describe –topic topic_name
Finally, add additional partitions:
kafka-topics –zookeeper zk.servername01:2181/kafka –alter –topic topic_name –partitions 5
Change the Replication Factor on an Existing Topic
Changing the replication factor on an existing topic can be a little tricky.
Reference: http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor
Identify the Broker IDs from a Kafka cluster, for example 160 and 161
Create a json file like this:
{ "version":1, "partitions":[ { "topic":"topic_name", "partition":0,"replicas":[160,161] }, { "topic":"topic_name", "partition":1,"replicas":[161,160] }, { "topic":"topic_name", "partition":2,"replicas":[160,161] } ] }
Then run the script, which on a server with Kafka installed (or a Gateway) is under /opt/cloudera/parcels/KAFKA/lib/kafka/bin:
./kafka-reassign-partitions.sh –zookeeper zk.servername01:2181 –reassignment-json-file increase-replication-factor.json –execute
Then, verify it
./kafka-reassign-partitions.sh –zookeeper zk.servername01:2181 –reassignment-json-file increase-replication-factor.json –verify
And finally, get a description of the topic to make sure it’s right:
kafka-topics –zookeeper zk.servername01:2181/kafka –describe –topic topic_name
Topic: topic_name PartitionCount:3 ReplicationFactor:2 Configs:
Topic: topic_name Partition: 0 Leader: 161 Replicas: 160,161 Isr: 161,160
Topic: topic_name Partition: 1 Leader: 161 Replicas: 161,160 Isr: 161,160
Topic: topic_name Partition: 2 Leader: 160 Replicas: 160,161 Isr: 160,161
Kafka Logs
The Kafka parcel is configured to log all Kafka log messages to a single file, /var/log/kafka/server.log by default. You can view, filter, and search this log using Cloudera Manager.
Reference: http://www.cloudera.com/content/cloudera/en/documentation/cloudera-kafka/latest/PDF/cloudera-kafka.pdf, http://kafka.apache.org
Delete a Topic
To delete a Kafka topic after the Broker has lost connection to the topic:
Typical delete command:
kafka-topics –zookeeper zk_node:port/chroot –delete –topic topic_name
The topic will be “marked for deletion” and will be removed on the next event. If the delete command doesn’t work right away, try restarting the Kafka service.
More information: Although the command may seem like it deletes topics and returns on success, in fact it creates /admin/delete_topics/<topic>
node in zookeeper and only triggers deletion. As soon as broker sees this update, topic no longer accept any new produce/consume request and eventually topic will be deleted. Actual deletion process is kinda complicated and it involves multiple transitions of state machine, and as a result of cleaning up topic’s data that node in zk will be deleted and topic will be deemed dead.
Manually delete the topic:
If the delete commands fails (marked for deletion forever):
1. Using Cloudera Manager, stop the Kafka Broker.
2. Delete the topic from disk. To get the physical location of the topic, you can find the root under Cloudera Manager, Kafka, Configurations, Broker Service, Data Directory.
For example:
sudo mv /space1/kafka/topic_name-0 /space1/kafka/topic_name-0.bak
3. Delete the topic znode from ZooKeeper:
Here are the ZooKeeper commands we used to delete the topic from ZK:
Log onto zkcli:
hbase zkcli
Search for topics:
ls /kafka/brokers/topics
rmr /kafka/brokers/topics/topic_name
ls /kafka/brokers/topics
4. Start Kafka.
5. You can then recreate the Kafka topic.
Kafka Offset Monitor
This is an app to monitor your Kafka consumers and their position (offset) in the queue. We currently do not use this monitor.
- Copy the KafkaOffsetMonitor-assembly-0.2.1.jar to /tmp/ on the node.
- Start the offset monitor:
- Reset permissions on the monitor database:
sudo mkdir -p /opt/kafka/offsetmonitor/
sudo cp /tmp/KafkaOffsetMonitor-assembly-0.2.1.jar /opt/kafka/offsetmonitor/
sudo chmod -R 774 /opt/kafka/offsetmonitor/
cd /opt/kafka/offsetmonitor/
java -cp KafkaOffsetMonitor-assembly-0.2.1.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb –zk zk.servername01/kafka –port 8080 –refresh 10.seconds –retain 2.days
Reference: https://github.com/quantifind/KafkaOffsetMonitor/blob/master/README.md
Kafka MirrorMaker
Kafka MirrorMaker is used to replicate (called mirroring to avoid using replication) one Kafka cluster to another Kafka cluster. Kafka MirrorMaker ships with CDH.
Mirroring is often used in cross-DC scenarios, and there are a few configuration options that you may want to tune to help deal with inter-DC communication latencies and performance bottlenecks on your specific hardware. In general, you should set a high value for the socket buffer size on the mirror-maker’s consumer configuration (socket.buffersize) and the source cluster’s broker configuration (socket.send.buffer). Also, the mirror-maker consumer’s fetch size (fetch.size) should be higher than the consumer’s socket buffer size. Note that the socket buffer size configurations are a hint to the underlying platform’s networking code. If you enable trace logging, you can check the actual receive buffer size and determine whether the setting in the OS networking layer also needs to be adjusted.
Set up a mirror
Setting up a mirror is easy – simply start up the mirror-maker processes after bringing up the target cluster. At minimum, the mirror maker takes one or more consumer configurations, a producer configuration and either a whitelist or a blacklist. You need to point the consumer to the source cluster’s ZooKeeper, and the producer to the mirror cluster’s ZooKeeper (or use the broker.list parameter).
In Cloudera Manager update the corresponding configurations in the UI and start the MirrorMaker instance.
Reference: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330
Check whether a mirror is keeping up
Cloudera publishes health checks: http://www.cloudera.com/content/www/en-us/documentation/enterprise/latest/topics/cm_ht_kafka_mirrormaker.html
ConsumerOffsetChecker
The consumer offset checker tool is useful to gauge how well your mirror is keeping up with the source cluster. Note that the –zkconnect argument should point to the source cluster’s ZooKeeper (zk.servername01 in this example). Also, if the topic is not specified, then the tool prints information for all topics under the given consumer group. For example:
Group Topic Pid Offset logSize Lag Owner
KafkaMirror test-topic 0 5 5 0 none
KafkaMirror test-topic 1 3 4 1 none
KafkaMirror test-topic 2 6 9 3 none
For example:
Check if we are consuming:
kafka-console-consumer –zookeeper zk.servername01:2181 –consumer.config consumer.config –topic topic_name
Check KafkaMirror’s offset:
/usr/bin/kafka-run-class kafka.tools.ConsumerOffsetChecker –group cloudera_mirrormaker –zookeeper zk.servername01:2181 –topic topic_name
This allows you to connect to zookeeper and show various information regarding offsets for that consumer and topic. Example results of running this (with consumer group ‘signatures’ and topic ‘ingest’) are:
Group Topic Pid Offset logSize Lag Owner
signatures ingest 0 5158355 6655120 1496765 none
signatures ingest 1 5111118 6604278 1493160 none
signatures ingest 2 5080952 6571573 1490621 none
signatures ingest 3 5055358 6543351 1487993 none
The Pid represents the partition id, so in the case above we can see the info for each of the 4 partitions 0 through 3.
The offset tells you the offset currently held in zookeeper for this consumer.
The Lag tells you what the difference is between this offset and the latest message on the topic.
You can use these to check that your consumer is consuming messages faster than they are appearing on the queue, and to dispel any worries you might have that it’s never catching up.
Beware, that the consumer will commit its offset to zookeeper after a certain interval (default 10 seconds), so if you run this command a few times in a row you’ll likely see the offset remain constant whilst lag increases, until a commit from the consumer will suddenly bring the offset up and hence lag down significantly in one go. When we used this, we got scared that we were just falling further and further behind, until we realized about the delay of committal of offsets to zookeeper.
The time period for consumer offsets is configurable in the consumer config, using the “autocommit.interval.ms” property. The default is 10000 ms (10 seconds). There is a design decision to be made around what you set this to… set it too long and if the consumer goes down, you may have to replay a huge amount of messages. Set it too short and you’ll overload zookeeper with constant commits.
ConsumerGroupCommand
The Consumer Group Command will replace the ConsumerOffsetChecker. This command is the same as the ConsumerOffsetChecker, but it allows you to use Kafka’s new consumer without using a ZooKeeper.
Using the example from above, you would use the following:
/usr/bin/kafka-run-class kafka.admin.ConsumerGroupCommand –describe –group cloudera_mirrormaker –zookeeper zk.servername01:2181
Example results:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
cloudera_mirrormaker topic_name1 0 18713 428478 409765 none
cloudera_mirrormaker topic_name1 1 547 24997 24450 none
cloudera_mirrormaker topic_name1 2 107 409511 409404 none
cloudera_mirrormaker topic_name2 0 404108447 452060078 47951631 none
cloudera_mirrormaker topic_name2 1 426193345 465930519 39737174 none
cloudera_mirrormaker topic_name2 2 422415396 470367125 47951729 none
Kafka Manager
I use the open source Kafka Manager tool (https://github.com/yahoo/kafka-manager) to help identify throughput on my Kafka Brokers. The tool also allows for some basic management of the brokers (partition creation, reassignment, etc).
It is a scala application built using the SBT and Play frameworks.
Kafka REST Proxy
Developed by Confluent, the Kafka REST Proxy provides a RESTful interface to a Kafka cluster. It makes it easy to produce and consume messages, view the state of the cluster, and perform administrative actions without using the native Kafka protocol or clients. Examples of use cases include reporting data to Kafka from any front-end app built in any language, ingesting messages into a stream processing framework that doesn’t yet support Kafka, and scripting administrative actions.
The Kafka REST Proxy relies on the Schema Registry, which provides a serving layer for your metadata. The Schema Registry provides a RESTful interface for storing and retrieving Avro schemas. It stores a versioned history of all schemas, provides multiple compatibility settings and allows evolution of schemas according to the configured compatibility setting. It provides serializers that plug into Kafka clients that handle schema storage and retrieval for Kafka messages that are sent in the Avro format.
Kafka REST Proxy does not start on restart
Error: Kafka-rest does not start on restart as there is a mismatch in PIDs
Solution: Kill the kafka-rest process
Start the kafka-rest
sudo service kafka-rest start
Logging
All logs related to this service are available at /opt/confluent/logs/
- kafka-rest-startup.log – captures stdout and stderr during startup
- kafka-rest.log – log4j target for all other logging, rotates between 10 files up to 10MB each
- schema-registry-startup.log – captures stdout and stderr during startup
- schema-registry.log – log4j target for all other logging, rotates between 10 files up to 10MB each
Deciphering HTTP CODES
Reference: http://www.restpatterns.org/HTTP_Status_Codes
Troubleshooting
Unexpected error from SyncGroup: Messages are rejected since there are fewer in-sync replicas than required
Resolution: This means that the source Kafka Broker has less partitions than expected. Cloudera Manager will attempt to restart MirrorMaker, with the hope that by reconnecting MM will pull a new replica list from one of the Brokers. Only after the service is DOWN, should you use Cloudera Manager to manually restart MirrorMaker to pick up the change to the number of partitions. Because Cloudera Manager will do this automatically, user intervention is usually not required.
Next, contact the provider to make sure they are aware that their system is flapping. In the past, our client had a problem where older consumers were attempting to connect – causing instability in their Brokers.
Alternate explanation: This is occurring in a SyncGroup call so it’s probably due to the fact that the new consumer needs to write to the __consumer_offsets topic and can’t because __consumer_offsets isn’t meeting the min ISR requirements for the cluster.
Error:
In the Role Logs you see the following error:
[mirrormaker-thread-0] Mirror maker thread failure due to
org.apache.kafka.common.KafkaException: Unexpected error from SyncGroup: Messages are rejected since there are fewer in-sync replicas than required.
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupRequestHandler.handle(AbstractCoordinator.java:436)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupRequestHandler.handle(AbstractCoordinator.java:403)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
…
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
…
Further errors:
Sep 8, 5:13:07.214 PM ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Group cloudera_group1 failed to commit partition topic_name-0 at offset 827610278: Messages are rejected since there are fewer in-sync replicas than required.
Cloudera Manager will attempt to retry (by restarting the process). However, that will eventually fail.
And the fatal error that finally kills MM:
Sep 9, 12:44:52.247 AM FATAL kafka.tools.MirrorMaker$MirrorMakerThread [mirrormaker-thread-0] Mirror maker thread exited abnormally, stopping the whole mirror maker.
Cloudera Manager will report the following error:
KAFKA_KAFKA_MIRROR_MAKER_SCM_HEALTH Role health test bad
Critical
The health test result for KAFKA_KAFKA_MIRROR_MAKER_SCM_HEALTH has become bad: This role’s process is starting. This role is supposed to be started.
Further explanation:
I’d suggest using the kafka-topics command to determine the current state of replicas for the _schemas topic (or whatever topic you are using). Here’s how I used it on the a local set of test services:
$ ./bin/kafka-topics.sh –zookeeper localhost:2181 –describe –topic _schemas
Topic:_schemas PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact
Topic: _schemas Partition: 0 Leader: 0 Replicas: 0 Isr: 0
The root cause exception (NotEnoughReplicasException) indicates the in sync replicas for the topic partition has fallen below min.insync.replicas, so the output from this command should show you the difference between the full list of replicas and the in sync replicas (labeled Isr in the output). From there you can determine why one or more of the brokers are falling behind.
References
Kafka-Storm (and Spark) integration: http://www.michael-noll.com/blog/2014/05/27/kafka-storm-integration-example-tutorial/
Kafka-Storm-Spark GitHub: https://github.com/miguno/kafka-storm-starter
Good scaling question: https://grokbase.com/t/kafka/users/158n9a4sf3/painfully-slow-kafka-recovery