Analytics/Cluster/MediaWiki Avro Logging

From Wikitech

Logs can be generated from the production MediaWiki instances in Avro and shipped through Kafka to Hadoop and finally end up in HDFS. This documentation is a work in progress. Get ahold of ottomatta, nuria, dcausse, or ebernhardson to clear up any questions about this documentation.

MediaWiki

Schemas are stored in the mediawiki/event-schemas repository. Schemas are labeled with a version number, starting at 10, along with the a unix timestamp indicating when the schema was created. For example the schema stored in avro/mediawiki/CirrusSearchRequestSet/101446746400.avsc within the event-schemas repository is the first version of the CirrusSearchRequestSet. It was created at 1446746400.

Once a schema has been added it can be referenced from the operations/mediawiki-config repository. You will need to generate a submodule bump for the wmf-config/event-schemas directory within operations/mediawiki-config so it points to a version that includes the new schema. In addition you will need to edit wmf-config/InitializeSettings.php . First off you need to associate a monolog channel to the Avro schema. This is done through the wmgMonologAvroSchemas setting. The schema field needs to be a string containing a valid Avro schema. This schema must be sourced from the event-schemas repository. The revision field refers to the exact schema number. This schema number is encoded into the header of a log message so that readers can use the correct schema to decode the log line.

'wmgMonologAvroSchemas' => array(
    'default' => array(
        'CirrusSearchRequestSet' => array(
            'schema' => file_get_contents( __DIR__ . '/event-schemas/avro/mediawiki/CirrusSearchRequestSet/111448028943.avsc' ),
            'revision' => 111448028943,
        ),
    ),
),

In addition to defining the schema a Monolog channel needs to be defined for this schema. The channel name must be the same as the schema name above. When utilizing the Avro + Kafka pipeline the buffer flag should always be set to true to prevent latency communicating with Kafka from negatively impacting page load performance. For any high volume logging the logstash handler for this channel must be disabled. The udp2log channel can be enabled or disabled depending on your needs.

'wmgMonologChannels' => array(
    'default' => array(
        ...
        'CirrusSearchRequestSet' => array(
            'kafka' => 'debug',
            'udp2log' => false,
            'logstash' => false,
            'buffer' => true
        ),
        ...
    )
),

Kafka

With MediaWiki configured as defined above binary Avro log messages will start appearing in the mediawiki_CirrusSearchRequestSet topic of Kafka. Topics will be auto-created in Kafka as necessary, but they will only have a single partition. For logging channels with any kind of volume, which should be the only reason a MediaWiki->Kafka->Hadoop pipeline is necessary, you will need to create a ticket in Phabricator for analytics engineering to create your topic with an appropriate number of partitions.

Test reading an Avro record from a Kafka topic

Individual messages can be read out of Kafka on stat1002.eqiad.wmnet, to verify the data is being encoded as expected. This command will read the most recent message from one partition:

$ kafkacat -b kafka1012 -t mediawiki_CirrusSearchRequestSet -c 1 > csrq.avro

The extracted message is not quite valid Avro yet, it contains a 9 byte header pre-pended to it. The header can be read with this small bit of scala:

object AvroTest {
  def main(args: Array[String]) {
    val file = new java.io.FileInputStream(args(0));
    val data = new java.io.DataInputStream(file);
    println("magic = %d".format(data.readByte()));
    println("revid = %d".format(data.readLong()));
  }
}

With that code in a file named avrotest.scala the following shell command will read in and report the header. Magic must always be 0. The revid must match the writer schema revision id that was used to write the message in mediawiki:

$ scala avrotest.scala csrq.avro

Finally the actual Avro message can be read in to ensure it, as well, is as expected with the following shell command. This will throw a java.io.EOFException after printing out the decoded event. The EOFException can safely be ignored.

$ dd if=csrq.avro bs=1 skip=9 | \
      java -jar avro-tools-1.7.7.jar fragtojson --schema-file event-schemas/CirrusSearchRequestSet/111448028943.avsc -

Camus

Camus is the piece of software that reads messages out of Kafka and writes them to Hadoop. Camus will read the previous hour worth of data out of Kafka topics named mediawiki_<name> at 15 minutes past the hour. This data is written out to Hadoop in /wmf/data/raw/mediawiki/<name>/<year>/<month>/<day>/<hour>. For this to work though, Camus needs to know about the schemas to use. Camus reads schemas out of the analytics/refinery/source repository, via a git submodule to the event-schema repository. After adding a schema to the event-schema repository a submodule bump needs to be merged to analytics/refinery/source and a new version of the refinery-source jar needs to be deployed to the analytics cluster.

While Camus can read events from any schema version it knows about, it always writes events out to HDFS as the schema version configured in the mediawiki.erb template of the camus module in puppet. For each Kafka topic Camus reads there needs to be a line like the following:

org.wikimedia.analytics.schemas.CirrusSearchRequestSet.latestRev=111448028943

@TODO something about timestamp handling

Hive

Hive is the final destination for querying logs generated from mediawiki and stored within Hadoop. The appropriate external table needs to be created on Hive in the wmf_raw database. See Analytics/Cluster/Hive/Avro for more information.

Oozie

Oozie is the Hadoop workflow scheduler. A job needs to be created within the analytics/refinery repository to create partitions in the Hive table after Camus has written them out. Oozie has extensive documentation on wiki for it's usage. See https://gerrit.wikimedia.org/r/#/c/251238/ for an example of how to setup a job that will create the necessary partitions.

Initial Deployment Checklist

These steps are intended to be done in-order.

  1. Commit schema to mediawiki/event-schemas repository
  2. Commit submodule bump to analytics/refinery/source repository
  3. Commit Oozie job to create partitions to analytics/refinery repository
  4. Commit property changes for Camus to operations/puppet repository
  5. Wait for analytics to deploy new versions of refinery and refinery-source to analytics cluster
  6. Have a topic with the proper number of partitions created in Kafka
  7. Commit submodule bump along with proper configuration to operations/mediawiki-config repository
  8. Deploy initial mediawiki-config patch to production with a sampling rate of a few events per minute for testing
  9. Verify events in Kafka are as expected. Check mediawiki logs for errors.
  10. After enough time has passed (Camus runs once per hour) verify events are showing up in HDFS
  11. Create table in Hive pointing at the events in HDFS
  12. Submit coordinator to Oozie to auto-create partitions
  13. Adjust (or remove) sampling of events in operations/mediawiki-config repository
  14. Query your data!

Schema Upgrade Checklist

Schema upgrades need to be performed in a controlled manner to ensure the full pipeline continues processing events throughout the change. The only schema upgrade that is tested so far is adding a new field with a default value.

  1. Commit new schema version to mediawiki/event-schemas repository
  2. Commit submodule bump to analytics/refinery/source repository
  3. Wait for analytics to deploy new versions of refinery and refinery-source to analytics cluster
  4. Alter the relevant Hive table to use the new schema version
  5. Commit property changes for Camus to operations/puppet repository to write out data files using the new schema
  6. Adjust and deploy mediawiki code to provide the new field. Note that the PHP Avro encoder will ignore new fields it doesn't know about, but will error out if a field it knows about (even with a default value) is missing.
  7. Commit submodule bump and schema update to operations/mediawiki-config
  8. After the next Camus run verify it is still writing out all of the events. If things are awry Camus may only partially write out the directory for that hour.
  9. Verify the Oozie job to create partitions completes. If Camus only wrote a partial directory this will fail.