Analytics/Cluster/Kafka/Administration
Safe Broker Restarts
Since partitions have at least 2 (usually 3) replicas, you should be able to restart a broker without losing any messages. Brokers, consumer, and producers will automatically rebalance themselves when a broker dies, but it is nice to allow them to do so gracefully. service kafka stop will perform a graceful shutdown. Before you run this, you should make sure that any topics for which the target broker is the leader also has In Sync Replicas:
root@kafka1014:~# kafka topic --describe ... Topic:webrequest_text PartitionCount:12 ReplicationFactor:3 Configs: Topic: webrequest_text Partition: 0 Leader: 18 Replicas: 18,22,12 Isr: 18,22,12 Topic: webrequest_text Partition: 1 Leader: 20 Replicas: 20,12,13 Isr: 20,13,12 Topic: webrequest_text Partition: 2 Leader: 22 Replicas: 22,13,14 Isr: 22,13,14 Topic: webrequest_text Partition: 3 Leader: 12 Replicas: 12,14,18 Isr: 18,12,14 Topic: webrequest_text Partition: 4 Leader: 13 Replicas: 13,18,20 Isr: 18,13,20 Topic: webrequest_text Partition: 5 Leader: 14 Replicas: 14,20,22 Isr: 22,20,14 Topic: webrequest_text Partition: 6 Leader: 18 Replicas: 18,12,13 Isr: 18,13,12 Topic: webrequest_text Partition: 7 Leader: 20 Replicas: 20,13,14 Isr: 20,13,14 Topic: webrequest_text Partition: 8 Leader: 22 Replicas: 22,14,18 Isr: 22,18,14 Topic: webrequest_text Partition: 9 Leader: 12 Replicas: 12,18,20 Isr: 20,18,12 Topic: webrequest_text Partition: 10 Leader: 13 Replicas: 13,20,22 Isr: 22,20,13 Topic: webrequest_text Partition: 11 Leader: 14 Replicas: 14,22,12 Isr: 22,12,14 ... # partitions with Leader: 14 have several brokers in the ISR for that partition. It is safe to stop kafka1014 root@kafka1014:~# service kafka stop
Notice how (eventually) after the broker stops, broker 14 is no longer the leader for any topics when you run kafka topic --describe.
Once you are ready to start the broker back up, you can do so with a simple service kafka start.
It will likely take a few minutes for the broker to recover after restarting. It needs to check all of its recent logs to make sure that it doesn't have any incomplete messages. It will also start replicating partitions from where it left off when it was restarted. Keep checking kafka topic --describe until all topic-partitions have all brokers in the isr. Once the topic-partitions are up to date on all brokers, you can start a replica election to balance the leaders across brokers.
Replica Elections
Kafka will not automatically rebalance replica leaders. It will only do so if it is explicitly asked to.
List topics to see the current leader assignments:
root@kafka1014:~$ kafka topic --describe Topic:webrequest_bits PartitionCount:12 ReplicationFactor:3 Configs: Topic: webrequest_bits Partition: 0 Leader: 22 Replicas: 22,12,18 Isr: 12,18,22 Topic: webrequest_bits Partition: 1 Leader: 12 Replicas: 12,18,21 Isr: 12,18,21 Topic: webrequest_bits Partition: 2 Leader: 18 Replicas: 18,21,22 Isr: 18,22,21 Topic: webrequest_bits Partition: 3 Leader: 21 Replicas: 21,22,12 Isr: 12,22,21 Topic: webrequest_bits Partition: 4 Leader: 22 Replicas: 22,18,21 Isr: 18,22,21 Topic: webrequest_bits Partition: 5 Leader: 12 Replicas: 12,21,22 Isr: 12,22,21 Topic: webrequest_bits Partition: 6 Leader: 18 Replicas: 18,22,12 Isr: 12,18,22 Topic: webrequest_bits Partition: 7 Leader: 21 Replicas: 21,12,18 Isr: 12,18,21 Topic: webrequest_bits Partition: 8 Leader: 22 Replicas: 22,21,12 Isr: 12,22,21 Topic: webrequest_bits Partition: 9 Leader: 12 Replicas: 12,22,18 Isr: 12,18,22 Topic: webrequest_bits Partition: 10 Leader: 18 Replicas: 18,12,21 Isr: 12,18,21 Topic: webrequest_bits Partition: 11 Leader: 21 Replicas: 21,18,22 Isr: 18,22,21 Topic:webrequest_mobile PartitionCount:12 ReplicationFactor:3 Configs: Topic: webrequest_mobile Partition: 0 Leader: 12 Replicas: 12,21,22 Isr: 12,22,21 Topic: webrequest_mobile Partition: 1 Leader: 18 Replicas: 18,22,12 Isr: 12,18,22 Topic: webrequest_mobile Partition: 2 Leader: 21 Replicas: 21,12,18 Isr: 12,18,21 Topic: webrequest_mobile Partition: 3 Leader: 22 Replicas: 22,18,21 Isr: 18,22,21 Topic: webrequest_mobile Partition: 4 Leader: 12 Replicas: 12,22,18 Isr: 12,18,22 Topic: webrequest_mobile Partition: 5 Leader: 18 Replicas: 18,12,21 Isr: 12,18,21 Topic: webrequest_mobile Partition: 6 Leader: 21 Replicas: 21,18,22 Isr: 18,22,21 Topic: webrequest_mobile Partition: 7 Leader: 22 Replicas: 22,21,12 Isr: 12,22,21 Topic: webrequest_mobile Partition: 8 Leader: 12 Replicas: 12,18,21 Isr: 12,18,21 Topic: webrequest_mobile Partition: 9 Leader: 18 Replicas: 18,21,22 Isr: 18,22,21 Topic: webrequest_mobile Partition: 10 Leader: 21 Replicas: 21,22,12 Isr: 12,22,21 Topic: webrequest_mobile Partition: 11 Leader: 22 Replicas: 22,12,18 Isr: 12,18,22 Topic:webrequest_text PartitionCount:12 ReplicationFactor:3 Configs: Topic: webrequest_text Partition: 0 Leader: 22 Replicas: 22,21,12 Isr: 12,22,21 Topic: webrequest_text Partition: 1 Leader: 12 Replicas: 12,22,18 Isr: 12,18,22 Topic: webrequest_text Partition: 2 Leader: 18 Replicas: 18,12,21 Isr: 12,18,21 Topic: webrequest_text Partition: 3 Leader: 21 Replicas: 21,18,22 Isr: 18,22,21 Topic: webrequest_text Partition: 4 Leader: 22 Replicas: 22,12,18 Isr: 12,18,22 Topic: webrequest_text Partition: 5 Leader: 12 Replicas: 12,18,21 Isr: 12,18,21 Topic: webrequest_text Partition: 6 Leader: 18 Replicas: 18,21,22 Isr: 18,22,21 Topic: webrequest_text Partition: 7 Leader: 21 Replicas: 21,22,12 Isr: 12,22,21 Topic: webrequest_text Partition: 8 Leader: 22 Replicas: 22,18,21 Isr: 18,22,21 Topic: webrequest_text Partition: 9 Leader: 12 Replicas: 12,21,22 Isr: 12,22,21 Topic: webrequest_text Partition: 10 Leader: 18 Replicas: 18,22,12 Isr: 12,18,22 Topic: webrequest_text Partition: 11 Leader: 21 Replicas: 21,12,18 Isr: 12,18,21 Topic:webrequest_upload PartitionCount:12 ReplicationFactor:3 Configs: Topic: webrequest_upload Partition: 0 Leader: 18 Replicas: 18,12,21 Isr: 12,18,21 Topic: webrequest_upload Partition: 1 Leader: 21 Replicas: 21,18,22 Isr: 22,18,21 Topic: webrequest_upload Partition: 2 Leader: 22 Replicas: 22,21,12 Isr: 12,22,21 Topic: webrequest_upload Partition: 3 Leader: 12 Replicas: 12,22,18 Isr: 12,18,22 Topic: webrequest_upload Partition: 4 Leader: 18 Replicas: 18,21,22 Isr: 18,22,21 Topic: webrequest_upload Partition: 5 Leader: 21 Replicas: 21,22,12 Isr: 22,12,21 Topic: webrequest_upload Partition: 6 Leader: 22 Replicas: 22,12,18 Isr: 12,18,22 Topic: webrequest_upload Partition: 7 Leader: 12 Replicas: 12,18,21 Isr: 12,18,21 Topic: webrequest_upload Partition: 8 Leader: 18 Replicas: 18,22,12 Isr: 12,18,22 Topic: webrequest_upload Partition: 9 Leader: 21 Replicas: 21,12,18 Isr: 12,18,21 Topic: webrequest_upload Partition: 10 Leader: 22 Replicas: 22,18,21 Isr: 18,22,21 Topic: webrequest_upload Partition: 11 Leader: 12 Replicas: 12,21,22 Isr: 12,22,21
In this case, you can see that leaders are balanced across all brokers. If they weren't (E.g.: broker “21” not appearing as leader) you can ask Kafka to do a leader election by running
kafka preferred-replica-election
Wait few seconds, no more than a minute. If all goes well, kafka topic --describe again and you should see the leaders properly balanced.
Recovering a laggy broker replica
If a Kafka Broker goes offline for a long while, it will likely come back online and be far behind in logs. It will need to catch up on logs from remaining brokers before it can be put back into the ISR. During normal operation, replicas should be able to stay in sync with each other. But when one broker is far behind, you may need to tweak settings to encourage Kafka to spend more resources keeping replicas up to date.
num.replica.fetchers
This setting controls the number of threads dedicated to fetching logs from other replicas. Bump this number up temporarily and restart Kafka to try to get it to consume faster.
replica.fetch.max.bytes
This is the number of bytes that each fetch request will attempt to consume from each topic-partition. The actual number of bytes being requested at a time will be this multiplied by the number of topic-partitions. Be careful not to set this too high.
Checking consumer offsets
As of November 2015, LinkedIn's Burrow is installed and running on krypton.eqiad.wmnet. It is configured to email analytics admins if consumers start lagging. You can also query it directly via its HTTP interface. E.g., to see if a particular consumer group is lagging, and where its latest offset commits are:
curl http://krypton.eqiad.wmnet:8000/v2/kafka/eqiad/consumer/mysql-m4-master/topic/eventlogging-valid-mixed {"error":false,"message":"consumer group topic offsets returned","offsets":[79014572,79014599,79003923,79014602,79014593,79014599,79014574,79014599,79003640,79014585,79014592,79014597]}
New Broker Install
Partitioning
As of 2014-06, all of our brokers have 12 2TB disks. The first two disk have a 30GB RAID 1 /, and a 1GB RAID 1 swap. Partman will create these partitions when the node is installed. Partman is not fancy enough to do the rest, so you will have to do this manually. Copy/pasting the following should do everything necessary to set up the Broker data partitions.
data_directory='/var/spool/kafka'
# sda3 and sdb3 are already created, format them as ext4
for disk in /dev/sd{a,b}; do sudo fdisk $disk <<EOF
t
3
83
w
EOF
done
# sd{c..l}1 are full physical ext4 partitions
for disk in /dev/sd{c,d,e,f,g,h,i,j,k,l}; do sudo fdisk $disk <<EOF
n
p
1
w
EOF
done
# run partprobe to make the kernel pick up the partition changes.
apt-get install parted
partprobe
# mkfs.ext4 all data partitions
for disk_letter in a b c d e f g h i j k l; do
# use partition 3 on sda and sdb
if [ "${disk_letter}" = 'a' -o "${disk_letter}" = 'b' ]; then
partition_number=3
else
partition_number=1
fi
partition="/dev/sd${disk_letter}${partition_number}"
disk_data_directory="${data_directory}/${disk_letter}"
# Run mkfs.ext4 in background so we don't have to wait
# for this to complete synchronously.
mkfs.ext4 $partition &
done
### IMPORTANT!
# Wait for all the above ext4 filesystems to be formatted
# before running the following loop.
#
sudo mkdir -p $data_directory
for disk_letter in a b c d e f g h i j k l; do
# use partition 3 on sda and sdb
if [ "${disk_letter}" = 'a' -o "${disk_letter}" = 'b' ]; then
partition_number=3
else
partition_number=1
fi
partition="/dev/sd${disk_letter}${partition_number}"
mount_point="${data_directory}/${disk_letter}"
# don't reserve any blocks for OS on these partitions
tune2fs -m 0 $partition
# make the mount point
mkdir -pv $mount_point
grep -q $mount_point /etc/fstab || echo -e "# Kafka log partition ${disk_letter}\n${partition}\t${mount_point}\text4\tdefaults,noatime,data=writeback,nobh,delalloc\t0\t2" | sudo tee -a /etc/fstab
mount -v $mount_point
done
Note: ext4 settings were taken from recommendations found here: https://kafka.apache.org/08/ops.html
Upgrade Checklist
The Analytics team experienced dataloss and a lot of headaches when performing a routine Kafka upgrade from 0.8.2.0 -> 0.8.2.1 in August 2015. The following is a deployment checklist we came up with as part of the postmortem after that outage. When upgrading, please follow this checklist before you proceed in production.
- Check Release Notes for new versions. e.g. https://archive.apache.org/dist/kafka/0.8.2.1/RELEASE_NOTES.html
- Check Apache JIRA for bugs that may affect new version(s). e.g. https://issues.apache.org/jira/browse/KAFKA-2496?jql=project%20%3D%20KAFKA%20AND%20affectedVersion%20%3D%200.8.2.1
- Stress test to see if varnishkafka latency goes up. It may be difficult to do this, but it is worth a try.
- Set up a varnish+varnishkafka instance and Kafka cluster in labs (if there is not one in deployment-prep already).
- 1. Use ab (Apache Benchmark) to force varnishkafka to send requests as fast as you can.
- 2. Record rtt times in varnishkafka stats.json
- 3. Record data file sizes for Kafka partitions on Kafka brokers in /var/spool/kafka/
- 4. Upgrade Kafka cluster
- 5. Repeat steps 1 - 3 and compare results to previous version. There should be no (negative) change.
- When doing production upgrade, document all steps in a deployment plan. Review and then execute the plan with a peer. Take notes on all steps along the way including execution of times for each step. This allows for easier documentation and correlations later if there are any problems. Be sure to keep an eye on the Kafka dashboard while deploying.