大手ヘルスケアIT企業 Cerner社のKafka活用事例

投稿日 2015/06/29

このブログポストはCerner社のビッグデータプラットフォームチームのシニアソフトウェアアーキテクトであるMicah Whitacreによる寄稿です。CDHとApache Kafkaを使用したCerner社のユースケースについて紹介してくれました(訳注: 本記事が公開された2014年11月時、KafkaとCDHの統合はCloudera Labsでベータ版として公開されていましたが、2015年2月、正式にサポート対象となりました)。原文はこちらを参照してください。


大手ヘルスケアITプロバイダであるCerner社は、長年にわたClouderaのソフトウェアプラットフォームであるCDHで利用可能なコアテクノロジーを採用してきました。これには、Apache Hadoopとその関連プロジェクト、HDFS、Apache HBase、Apache Crunch、Apache Hive、およびApache Oozieといったソフトウェアが含まれています。これらの技術をベースに、私たちは多様なデータ取得、およびデータ処理システムを構築してきました。

しかしながら、私たちは様々な点でスケーラビリティの限界に達し、あるテクノロジーに至ってはその意図に沿わない使い方さえしてきました。より良い選択肢を探し続けた末にApache Kafkaを採用し、Kafkaを適切に使うことで、強固なコアインフラストラクチャを構築することに成功したのです。

我々が初期のデータ処理インフラストラクチャを構築する上で直面した問題のひとつが、バッチベースの処理からニアリアルタイムで処理できるストリーミング処理へ移行するというものでした。GoogleのPercolator論文におけるコンセプトをベースに、我々はHBaseの上に同様のインフラを構築しました。特定のソースから来る特定のタイプのデータを待つリスナーは、任意のテーブルに書き込まれたデータに対する興味(interest)を登録します。それぞれの書き込みが実行される際、該当する各リスナーへの通知は、対応する通知テーブルに書き込まれることになります。このとき、リスナーは継続的に通知テーブルの行の小さなデータセットをスキャンし続けて新たな処理対象のデータを待ち続け、完了したらその通知を削除します。

whitacre-f1

HBaseを基盤として我々が構築した低レイテンシの処理インフラは、当初はうまく動作してくれていましたが、すぐにスケーラビリティの限界に達してしまいました。削除通知を取り除くためのコンパクションを頻繁に行わないと、リスナーのスキャン性能が落ちてしまったのです。頻繁にコンパクションを行ってもパフォーマンスは劣化し、処理のスループットを著しく低下させます。頻繁に必要となる処理はHBaseテーブルからの読み込みであり、取得する情報は通知、ペイロード、他のHBaseテーブルからの補完的な情報などです。読み込みの数が多いと、多くの場合、我々の処理インフラによる書き込みと競合します。これは、下流のリスナーのために変換されたペイロードや、追加の通知といったものです。I/O競合やコンパクションは、クラスタ内での負荷を分散させ、特定のリージョンサーバの通知テーブルを分離するために慎重に管理する必要があります。

Kafkaの採用は、通知を読み書きする要件にぴったりでした。HBaseの行をスキャンする代わりに、リスナーはKafkaのトピックからメッセージを処理し、通知が処理されるとオフセットを更新するのです。

whitacre-f2

Kafkaがもつプロデューサとコンシューマという分離方法は、HBaseのリージョンサーバへの大量の読み書き状況下においても競合を排除することができました。Kafkaのコンシューマオフセットトラッキングは通知の削除を不要にし、通知のリプレイでさえもKafkaのオフセットをリセットするというシンプルな方法で実現することができたのです。HBaseから大量の一時データをオフロードすることで、コンパクションや高I/Oによる不必要なオーバヘッドを減らすことができました。

Kafkaベースの通知が成功したことで、我々はKafkaをデータ収集の簡素化と合理化のために使用することを考え始めました。Cerner社のシステムでは、複数の異なるソースやシステムからデータを取り込みます。これらのソースの多くは、データセンターの外部にあります。セキュアなHTTPエンドポイントとしての「コレクタ」が、HBaseに格納される前に名前空間及びデータを識別します。Kafkaの採用以前は、データ取得インフラはHBaseクラスタのような単一のデータストアのみを対象としていました。

whitacre-f3

このシステムは我々の最初のユースケースを満たしていましたが、処理のニーズが変化したことで、データ収集インフラの複雑化を招いてしまったのです。多くの場合、データはほぼリアルタイムで複数のクラスタで取り込む必要があり、すべてのデータがHBaseが提供するようなランダムリード/ライトを必要とするわけではありません。

データ取得インフラにもKafkaを採用することで、永続的なステージング領域を設けることができ、取得したデータを複数の対象に配布することができるようになったのです。Collectorのプロセスはソースから分離し、Kafkaのトピックにデータを永続化するためのシンプルな役目を負うことで残っています。Kafkaにデータをプッシュすることで、データの投入時にHBaseで発生していたコンパクションやリージョン分割による断続的なパフォーマンス劣化が回避でき、顕著なパフォーマンス改善が見られました。

whitacre-f4

Kafkaにデータが到着したあとは、Apache Stormを配置することで独立したクラスタへデータをプッシュして消費(consuming)しています。KafkaとStormの採用により、コレクタが複数の書き込みを行ったり、最も遅い下流のシステムのパフォーマンスへの影響を考慮する必要性を排除したため、コレクタ自身をシンプルなものにすることができたのです。データ配送時のStormのat least onceの保証はデータの永続化が冪等となるため、受け入れやすいものでした。

Kafkaが提供する分離は、必要に応じて処理のためのデータを集約することができます。一部の医療データは非常に大量の小さなペイロードを生成しますが、それらはMapReduceのようなバッチでの処理が必要なものです。LinkedInのCamusプロジェクトを採用することで、データ収集プラットフォームがKafkaのトピック内の小さなペイロードの塊を、バッチ処理用の大きなファイルにしてHDFSに永続化することができました。実際には、我々はKafkaに流し込むすべてのデータをCamusを使用して、Kite SDKのDatasetとしてHDFSにアーカイブしています。このアプローチにより、低レイテンシの処理を必要としないさらなる分析や処理を、HDFSに格納したデータに対して実施することができるのです。データをアーカイブすることで、Kafkaのトピック保持ポリシー(topic retention policy)を超えてデータ配信が遅れている場合のリカバリが可能になります。

データを収集におけるCerner社でのKafkaの利用方法は、新たなユースケースが発見された場合にも実験、進化することができます。Spark StreamingやApache Samza(インキュベーション)、およびApache Flumeなどの技術は、現在のインフラの別の選択肢、あるいは追加技術として検討できるでしょう。我々は、データを生成するプロセスに影響を与えることなく、独立した複数のソリューションのためのLambdaKappaアーキテクチャのプロトタイプを作成することができます。Kafkaのマルチテナント機能の開発により、永続化するデータの一部を下流のHBaseクラスタにプッシュする必要がなくなり、永続化の要件をシンプルなものにすることができるのです。

まとめると、Kafkaは、大規模な分散処理のためのCerner社のインフラストラクチャにおいて重要な役割を果たし、HadoopとHBaseといった既存の投資技術に仲間入りしたと言えるでしょう。

Micah Whitacre(@mkwhit)はCerner社のビッグデータプラットフォームチームのシニアソフトウェアアーキテクトであり、Apache Crunchのコミッターでもあります。

One clap, two clap, three clap, forty?

By clapping more or less, you can signal to us which stories really stand out.