Python でKafka のデータを変換・出力するETL 処理を作る方法

CData Python Connector とpetl モジュールを使って、Kafka のデータを変換後にCSV ファイルに吐き出すETL 処理を実装します。

加藤龍彦
デジタルマーケティング

最終更新日:2023-09-23

この記事で実現できるKafka 連携のシナリオ

こんにちは!ウェブ担当の加藤です。マーケ関連のデータ分析や整備もやっています。

Pythonエコシステムには多くのモジュールがあり、システム構築を素早く効率的に行うことができます。本記事では、CData Python Connector for ApacheKafka とpetl フレームワークを使って、Kafka のデータにPython から接続してデータを変換、CSV に出力するETL 変換を実装してみます。

CData Python Connector は効率的なデータ処理によりKafka のデータ にPython から接続し、高いパフォーマンスを発揮します。Kafka にデータをクエリする際、ドライバーはフィルタリング、集計などがサポートされている場合SQL 処理を直接Kafka 側に行わせ、サポートされていないSQL 処理については、組み込みのSQL エンジンによりクライアント側で処理を行います(JOIN やSQL 関数など)。

必要なモジュールのインストール

pip で必要なモジュールおよびフレームワークをインストールします:

pip install petl
pip install pandas

Python でKafka のデータをETL 処理するアプリを構築

モジュールとフレームワークをインストールしたら、ETL アプリケーションを組んでいきます。コードのスニペットは以下の通りです。フルコードは記事の末尾に付いています。

CData Connector を含むモジュールをインポートします。

import petl as etl
import pandas as pd
import cdata.apachekafka as mod

接続文字列で接続を確立します。connect 関数を使って、CData Kafka Connector からKafka への接続を行います

cnxn = mod.connect("User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;")

Apache Kafka 接続プロパティの取得・設定方法

.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされ、自動的に本製品と一緒にインストールされます。 別のインストール方法を利用する場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0 をインストールしてください。

Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。

デフォルトでは、本製品はデータソースとPLAINTEXT で通信し、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化するには:

  1. UseSSLtrue に設定し、本製品がSSL 暗号化を使用するように構成します。
  2. SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします。

Apache Kafka への認証

Apache Kafka データソースは、次の認証メソッドをサポートしています:

  • Anonymous
  • Plain
  • SCRAM ログインモジュール
  • SSL クライアント証明書
  • Kerberos

Anonymous

Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 こうした接続はanonymous(匿名)と呼ばれます。

匿名認証を行うには、このプロパティを設定します。

  • AuthSchemeNone

その他の認証方法については、ヘルプドキュメントを参照してください。

Kafka をクエリするSQL 文の作成

Kafka にはSQL でデータアクセスが可能です。SampleTable_1 エンティティからのデータを読み出します。

sql = "SELECT Id, Column1 FROM SampleTable_1 WHERE Column2 = '100'"

Kafka データ のETL 処理

DataFrame に格納されたクエリ結果を使って、petl でETL(抽出・変換・ロード)パイプラインを組みます。この例では、Kafka のデータ を取得して、Column1 カラムでデータをソートして、CSV ファイルにデータをロードします。

table1 = etl.fromdb(cnxn,sql)

table2 = etl.sort(table1,'Column1')

etl.tocsv(table2,'sampletable_1_data.csv')

CData Python Connector for ApacheKafka を使えば、データベースを扱う場合と同感覚で、Kafka のデータ を扱うことができ、petl のようなETL パッケージから直接データにアクセスが可能になります。

おわりに

Kafka Python Connector の30日の無償トライアル をぜひダウンロードして、Kafka のデータ への接続をPython アプリやスクリプトから簡単に作成しましょう。



フルソースコード

import petl as etl
import pandas as pd
import cdata.apachekafka as mod

cnxn = mod.connect("User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;")

sql = "SELECT Id, Column1 FROM SampleTable_1 WHERE Column2 = '100'"

table1 = etl.fromdb(cnxn,sql)

table2 = etl.sort(table1,'Column1')

etl.tocsv(table2,'sampletable_1_data.csv')

関連コンテンツ

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。