各製品の資料を入手。
詳細はこちら →Python のDash ライブラリを使って、Kafka のデータ に連携するウェブアプリケーションを開発する方法
CData Python Connector を使って、Kafka にデータ連携するPython ウェブアプリケーションを開発できます。pandas とDash を使って作成してみます。
最終更新日:2023-09-23
この記事で実現できるKafka 連携のシナリオ
こんにちは!ウェブ担当の加藤です。マーケ関連のデータ分析や整備もやっています。
Python エコシステムには、多くのモジュールがあり、システム構築を素早く効率的に行うことができます。CData Python Connector for ApacheKafka を使うことで、pandas モジュールとDash フレームワークでKafka にデータ連携するアプリケーションを効率的に開発することができます。本記事では、pandas、Dash とCData Connector を使って、Kafka に連携して、Kafka のデータ をビジュアライズするシンプルなウェブアプリを作る方法をご紹介します。
CData Python Connectors の特徴
CData Python Connectors は、以下のような特徴を持った製品です。
- Kafka をはじめとする、CRM、MA、会計ツールなど多様なカテゴリの270種類以上のSaaS / オンプレデータソースに対応
- Dash をはじめとする多様なデータ分析・BI ツールにKafka のデータを連携
- ノーコードでの手軽な接続設定
必要なモジュールのインストール
まずは、pip で必要なモジュールおよびフレームワークをインストールします:
pip install pandas pip install dash pip install dash-daq
Python でKafka のデータを可視化
必要なモジュールとフレームワークがインストールされたら、ウェブアプリを開発していきます。コードのスニペットは以下の通りです。フルコードは記事の末尾に掲載しているので、参考にしてください。
まず、CData Connector を含むモジュールをインポートします:
import os import dash import dash_core_components as dcc import dash_html_components as html import pandas as pd import cdata.apachekafka as mod import plotly.graph_objs as go
接続文字列を使ってデータへの接続を確立します。connect 関数を使ってCData Kafka Connector からKafka のデータ との接続を確立します。
cnxn = mod.connect("User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;")
Apache Kafka 接続プロパティの取得・設定方法
.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされ、自動的に本製品と一緒にインストールされます。 別のインストール方法を利用する場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0 をインストールしてください。
Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。
デフォルトでは、本製品はデータソースとPLAINTEXT で通信し、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化するには:
- UseSSL をtrue に設定し、本製品がSSL 暗号化を使用するように構成します。
- SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします。
Apache Kafka への認証
Apache Kafka データソースは、次の認証メソッドをサポートしています:
- Anonymous
- Plain
- SCRAM ログインモジュール
- SSL クライアント証明書
- Kerberos
Anonymous
Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 こうした接続はanonymous(匿名)と呼ばれます。
匿名認証を行うには、このプロパティを設定します。
- AuthScheme:None。
その他の認証方法については、ヘルプドキュメントを参照してください。
Kafka にクエリを実行
read_sql 関数を使って、padas からSQL 文を発行し、DataFrame に結果を格納します。
df = pd.read_sql("""SELECT Id, Column1 FROM SampleTable_1 WHERE Column2 = '100'""", cnxn)
ウェブアプリケーションの設定
DataFrame に格納されたクエリ結果を使って、ウェブアプリにname、stylesheet、title を設定していきます。
app_name = 'dash-apachekafkaedataplot' external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css'] app = dash.Dash(__name__, external_stylesheets=external_stylesheets) app.title = 'CData + Dash'
Layout 設定
次に、Kafka のデータ をベースにした棒グラフを作詞し、アプリのレイアウトを設定します。
trace = go.Bar(x=df.Id, y=df.Column1, name='Id') app.layout = html.Div(children=[html.H1("CData Extention + Dash", style={'textAlign': 'center'}), dcc.Graph( id='example-graph', figure={ 'data': [trace], 'layout': go.Layout(alt='Kafka SampleTable_1 Data', barmode='stack') }) ], className="container")
アプリをセットアップして実行
接続、アプリ、レイアウトを定義したら、アプリを実行してみましょう。以下のコードで実行できます。
if __name__ == '__main__': app.run_server(debug=True)
最後に、Python でウェブアプリを起動してブラウザでKafka のデータ を見てみましょう。
python apachekafka-dash.py

ちゃんとデータが表示できてますね!
おわりに
Kafka Python Connector の30日の無償トライアル をぜひダウンロードして、Kafka のデータ への接続をPython アプリやスクリプトから簡単に作成してみてください。