Apache Kafka は大容量のメッセージ処理によく使われます。 今回の記事ではユーザの行動分析やサーバの死活監視、ログ集約などの使い方や、Java 以外に Rails でも利用できる点についてご紹介します。

Kafka とは

一言で言うと、Pull 型高スループットな分散メッセージングシステムです。詳しくは公式ページをご覧ください。

一般的なクライアント・サーバーモデルにおいて、クライアントがサーバーにデータを取りに行く動作モードを「Pull」、サーバー側がトリガーとなってクライアント側へデータを送り出す動作モードを「Push」といいます。これをメッセージングシステムに適用すると Client が Consumer、Server が Producer に該当します。

  • Pull … データ受領者が、能動的にデータを取りに行くモード
  • Push … データ受領者から見て、受動的にデータが送り付けられてくるモード

メッセージングシステムの基本要素

用語 意味
message 転送するデータです。
topic message のカテゴリのことです。topic は自分で名前を決められます。
broker message を貯めるところです。上記イメージの kafka cluster に該当します。
producer broker に message を送信する component です。
consumer broker から message を読む component です。
consumer group consumer のグループです。
offset consumer がどこまで consume したかの情報を保持しています。
Kafka ではこの情報を ZooKeeper という別の OSS で管理しています。

Kafka と ZooKeeper

ZooKeeper の詳細

ZooKeeper は、分散アプリケーションのためのオープンソースの分散コーディネーションサービスです。ZooKeeper は Java で動作し、Java と C のためのバインディングが用意されています。

クラスタ (マルチサーバー) セットアップ

ZooKeeper は奇数単位でアンサンブルと呼ぶクラスタを組みます。アンサンブルの過半数が動作している必要があるので、奇数台のマシンを使うのがベストです。たとえば 4 台のマシンを使う場合、ZooKeeper が対応できるのはマシン 1 台の障害までです。2 台のマシンに障害が発生すると、残りの 2 台では過半数にならないためです。5 台のマシンを使えば、ZooKeeper はマシン 2 台までの障害に対応できます。

その中の 1 台がリーダー選挙を通して全ての書き込みを調停・受理するリーダーとなり、他のサーバはフォロワーと呼ばれます。リーダーが選出されるタイミングは以下の 2 つです。

  • 障害等によりリーダーに選出されているサーバが停止し、リーダーであることを放棄した時
  • Zookeeper サーバの起動時

Kafkaの使用例

CentOS6.5、JDK1.8 にて動作確認しました。

インストール

// Kafkaをダウンロードして展開
$ wget http://ftp.riken.jp/net/apache/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz
$ tar xvfz kafka_2.11-0.9.0.1.tgz
$ cd kafka_2.11-0.9.0.1

メッセージングテスト

// ZooKeeperサーバスタート
$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2016-05-17 13:46:05,059] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2016-05-17 13:46:05,062] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager)
[2016-05-17 13:46:05,062] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager)
...

// Kafkaサーバスタート
$ bin/kafka-server-start.sh config/server.properties
[2016-05-17 13:47:38,015] INFO KafkaConfig values:
    advertised.host.name = null
    metric.reporters = []
    quota.producer.default = 9223372036854775807
    ...
[2016-05-17 13:47:38,120] INFO starting (kafka.server.KafkaServer)
[2016-05-17 13:47:38,127] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
...

// topic生成:"test"
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

// topic表示
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test

// producerよりメッセージ送信(コマンドラインにメッセージを入力する)
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>This is a message
>This is another message

// consumerよりメッセージ受信(コマンドラインにメッセージが表示される)
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message

Kafka のユースケース

Kafka はメッセージ処理がメインの機能でありますが、それ以外にも以下のような使い方ができます。 詳しくは公式ページの Use Cases をご覧ください。

  • Messaging Kafka 本来の機能であるメッセージブローカ機能です。
  • Website Activity Tracking サイトアクティビティ ( ページビュー、検索等のユーザが取り得る行動 ) を種別毎にトピック分けされて、中央に集められるということです。これらのフィードはリアルタイム処理やリアルタイム監視に利用することができます。
  • Metrics Kafka は運用監視データとしても使われることがあります。この場合は、運用データの中央フィードを生成するため、分散したアプリケーションの統計を集約するのに用いられます。
  • Log Aggregation ログ集約ソリューションとして Kafka を利用できます。Scribe や Flume といったログ集約システムと比較して、Kafka は同等のパフォーマンスと、複製によるより強い堅牢性保証、及びエンドツーエンドのより低いレイテンシを提供します。
  • Stream Processing 段階的なデータ処理をする場合、データは生データのトピックからコンシュームされ、集約され、肉付けされ、あるいはさらなるコンシュームの為に新たなKafkaトピックへ変換されます。
  • Event Sourcing イベントソーシングはアプリケーション設計手法のひとつで、状態の変更が時系列順のレコード列として記録されるというものです。Kafka は超巨大なログデータを扱えるため、この手法で構築されたアプリケーションの優れたバックエンドとして利用することができます。
  • Commit Log Kafka を分散システムのための外部コミットログとして使うこともできます。ノード間でデータを複製したり、障害ノードの復旧のための再同期機構として、このログを利用することができます。Kafka のログコンパクション機能もこの用途に適しています。この用途では、Kafka と Apache BookKeeper プロジェクトは似ています。

Rails Application での Kafka

ruby-kafka という gem を利用します。

Producer

Rails initializer にて producer 設定

# config/initializers/kafka_producer.rb
require "kafka"

# Configure the Kafka client with the broker hosts and the Rails logger.
$kafka = Kafka.new(
  seed_brokers: ["kafka1:9092", "kafka2:9092"],
  logger: Rails.logger,
)

# Set up an asynchronous producer that delivers its buffered messages every ten seconds:
$kafka_producer = $kafka.async_producer(
  delivery_interval: 10,
)

# Make sure to shut down the producer when exiting.
at_exit { $kafka_producer.shutdown }

Controller にて producer 呼び出し

# app/controllers/orders_controller.rb
class OrdersController
  def create
    @order = Order.create!(params[:order])

    event = {
      order_id: @order.id,
      amount: @order.amount,
      timestamp: Time.now,
    }

    $kafka_producer.produce(event.to_json, topic: "order_events")
  end
end

Consumer

fetch_messages APIを使ってConsuming

require "kafka"

kafka = Kafka.new(seed_brokers: ["kafka1:9092", "kafka2:9092"])

messages = kafka.fetch_messages(topic: "order_events", partition: 42)

messages.each do |message|
  puts message.offset, message.key, message.value
end

consumer API を使って Consuming

require "kafka"

kafka = Kafka.new(seed_brokers: ["kafka1:9092", "kafka2:9092"])

# Consumers with the same group id will form a Consumer Group together.
consumer = kafka.consumer(group_id: "my-consumer")

# It's possible to subscribe to multiple topics by calling `subscribe` repeatedly.
consumer.subscribe("order_events")

# This will loop indefinitely, yielding each message in turn.
consumer.each_message do |message|
  puts message.topic, message.partition
  puts message.offset, message.key, message.value
end

可視化

Trifectaというツールを使うとメッセージを可視化することができます。

まとめ

今回はKafkaの基本機能について紹介しました。分散環境で大量のメッセージを高速処理する必要がある場合などに活用できると思いますので、導入を検討してみてはいかがでしょうか。

参考

Kafka 0.9.0 Documentation

ZooKeeper 管理者ガイド

Apache Kafkaについて

ruby-kafka gem

Kafka ドキュメント(和訳)

Apache Kafka ―入門からTrifectaを用いた可視化まで