Analytics/Cluster/Kafka

From Wikitech
(Redirected from Kafka)

Introduction

Apache Kafka is a scalable and durable distributed logging buffer.

Administration

See the Kafka Administration page for administration tips and documentation.

Infrastructure

Currently, we run a single Kafka cluster on 6 brokers in the Analytics cluster in eqiad. Replication factor for webrequest* topics is set to 3. Each topic has 12 partitions.

varnishkafka is installed on frontend varnishes. It sends webrequest logs to the Kafka brokers in eqiad.

kafaktee is a replacement for udp2log that consumes from Kafka instead of from the udp2log firehose.

Capacity

We plan to support at least the full webrequest firehose, which currently maxes around 205,000 messages/second.

Each Kafka Broker has about 24 TB disk space for Kafka messages spread across 12 disks. Messages from varnishkafka are Snappy compressed and stored that way on disk on the brokers.

See: Kafka Capacity for capacity planning from 2014.

Monitoring

In Labs

Kafka is puppetized in order to be able to spawn up arbitrary clusters in labs. Here's how.

You'll need a running Zookeeper and Kafka broker. These instructions show how to set up a single node Zookeeper and Kafka on the same host.

Create a new Jessie labs instance. In this example we have named our new instance 'kafka1' and it is in the 'analytics' project. Thus the hostname is kafka1.analytics.eqiad.wmflabs. Wait for the instance to spawn and finish its first puppet run. Make sure you can log in.

Edit hiera data for your project and set the following:

zookeeper_hosts:
    kafka1.analytics.eqiad.wmflabs: "1"
"role::analytics::kafka::config::kafka_cluster_name": my-kafka-cluster
"role::analytics::kafka::config::cluster_config":
    my-kafka-cluster:
        kafka1.analytics.eqiad.wmflabs:
            id: "1"

Go to the configure instance page for your new instance, and check the following boxes to include needed classes:

  • role::zookeeper::server
  • role::analytics::kafka::server

If you don't have these boxes to check, go to https://wikitech.wikimedia.org/wiki/Special:NovaPuppetGroup and add them.

Run puppet on your new instance. Fingers crossed and you should have a new Kafka broker running.

To verify, log into your instance and run

 kafka topic --create --topic test --partitions 2 --replication-factor 1
 kafka topic --describe

If this suceeds, you will have created a topic in your new single node Kafka cluster.

Kafka clients usually take a list of brokers and/or a zookeeper connect string in order to work with Kafka. In this example, those would be:

  • broker list: kafka1.analytics.eqiad.wmflabs:9092
  • zookeeper connect: kafka1.analytics.eqiad.wmflabs:2181/kafka/my-kafka-cluster

Note that the zookeeper connect URL contains a path that has the value of role::analytics::kafka::config::kafka_cluster_name in it. You should substitute this for whatever you named your cluster in your hiera config.


How do I ...

Produce/Consume to kafka

Easiest is to use kafkacat, can be executed in consumer or producer mode

Consume

stat1002$ kafkacat -C -b kafka1012.eqiad.wmnet:9092 -t test

Produce

stat1002$ more test_message.txt
Hola Mundo
stat1002$ cat test_message.txt | kafkacat -P -b kafka1012.eqiad.wmnet:9092 -t test

Consume avro schema from kafka

 from kafka import KafkaConsumer
 import avro.schema
 import avro.io
 import io

 # To consume messages
 consumer = KafkaConsumer('mediawiki_CirrusSearchRequestSet',
                         group_id='my_group',
                         metadata_broker_list=['kafka1012:9092'])

 schema_path="/home/madhuvishy/avro-kafka/CirrusSearchRequestSet.avsc"
 schema = avro.schema.parse(open(schema_path).read())

 for msg in consumer:
    bytes_reader = io.BytesIO(msg.value)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    data = reader.read(decoder)
    print data

See also