Python のDash ライブラリを使って、Kafka のデータ に連携するウェブアプリケーションを開発する方法

CData Python Connector を使って、Kafka にデータ連携するPython ウェブアプリケーションを開発できます。pandas とDash を使って作成してみます。

加藤龍彦
デジタルマーケティング

最終更新日:2023-09-23

この記事で実現できるKafka 連携のシナリオ

こんにちは!ウェブ担当の加藤です。マーケ関連のデータ分析や整備もやっています。

Python エコシステムには、多くのモジュールがあり、システム構築を素早く効率的に行うことができます。CData Python Connector for ApacheKafka を使うことで、pandas モジュールとDash フレームワークでKafka にデータ連携するアプリケーションを効率的に開発することができます。本記事では、pandas、Dash とCData Connector を使って、Kafka に連携して、Kafka のデータ をビジュアライズするシンプルなウェブアプリを作る方法をご紹介します。

CData Python Connectors の特徴

CData Python Connectors は、以下のような特徴を持った製品です。

  1. Kafka をはじめとする、CRM、MA、会計ツールなど多様なカテゴリの270種類以上のSaaS / オンプレデータソースに対応
  2. Dash をはじめとする多様なデータ分析・BI ツールにKafka のデータを連携
  3. ノーコードでの手軽な接続設定

必要なモジュールのインストール

まずは、pip で必要なモジュールおよびフレームワークをインストールします:

pip install pandas
pip install dash
pip install dash-daq

Python でKafka のデータを可視化

必要なモジュールとフレームワークがインストールされたら、ウェブアプリを開発していきます。コードのスニペットは以下の通りです。フルコードは記事の末尾に掲載しているので、参考にしてください。

まず、CData Connector を含むモジュールをインポートします:

import os
import dash
import dash_core_components as dcc
import dash_html_components as html
import pandas as pd
import cdata.apachekafka as mod
import plotly.graph_objs as go

接続文字列を使ってデータへの接続を確立します。connect 関数を使ってCData Kafka Connector からKafka のデータ との接続を確立します。

cnxn = mod.connect("User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;")

Apache Kafka 接続プロパティの取得・設定方法

.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされ、自動的に本製品と一緒にインストールされます。 別のインストール方法を利用する場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0 をインストールしてください。

Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。

デフォルトでは、本製品はデータソースとPLAINTEXT で通信し、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化するには:

  1. UseSSLtrue に設定し、本製品がSSL 暗号化を使用するように構成します。
  2. SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします。

Apache Kafka への認証

Apache Kafka データソースは、次の認証メソッドをサポートしています:

  • Anonymous
  • Plain
  • SCRAM ログインモジュール
  • SSL クライアント証明書
  • Kerberos

Anonymous

Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 こうした接続はanonymous(匿名)と呼ばれます。

匿名認証を行うには、このプロパティを設定します。

  • AuthSchemeNone

その他の認証方法については、ヘルプドキュメントを参照してください。

Kafka にクエリを実行

read_sql 関数を使って、padas からSQL 文を発行し、DataFrame に結果を格納します。

df = pd.read_sql("""SELECT Id, Column1 FROM SampleTable_1 WHERE Column2 = '100'""", cnxn)

ウェブアプリケーションの設定

DataFrame に格納されたクエリ結果を使って、ウェブアプリにname、stylesheet、title を設定していきます。

app_name = 'dash-apachekafkaedataplot'

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.title = 'CData + Dash'

Layout 設定

次に、Kafka のデータ をベースにした棒グラフを作詞し、アプリのレイアウトを設定します。

trace = go.Bar(x=df.Id, y=df.Column1, name='Id')

app.layout = html.Div(children=[html.H1("CData Extention + Dash", style={'textAlign': 'center'}),
	dcc.Graph(
		id='example-graph',
		figure={
			'data': [trace],
			'layout':
			go.Layout(alt='Kafka SampleTable_1 Data', barmode='stack')
		})
], className="container")

アプリをセットアップして実行

接続、アプリ、レイアウトを定義したら、アプリを実行してみましょう。以下のコードで実行できます。

if __name__ == '__main__':
    app.run_server(debug=True)

最後に、Python でウェブアプリを起動してブラウザでKafka のデータ を見てみましょう。

python apachekafka-dash.py
Dash のウェブアプリでKafka のデータ を表示

ちゃんとデータが表示できてますね!

おわりに

Kafka Python Connector の30日の無償トライアル をぜひダウンロードして、Kafka のデータ への接続をPython アプリやスクリプトから簡単に作成してみてください。



import os
import dash
import dash_core_components as dcc
import dash_html_components as html
import pandas as pd
import cdata.apachekafka as mod
import plotly.graph_objs as go

cnxn = mod.connect("User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;")

df = pd.read_sql("SELECT Id, Column1 FROM SampleTable_1 WHERE Column2 = '100'", cnxn)
app_name = 'dash-apachekafkadataplot'

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.title = 'CData + Dash'
trace = go.Bar(x=df.Id, y=df.Column1, name='Id')

app.layout = html.Div(children=[html.H1("CData Extention + Dash", style={'textAlign': 'center'}),
	dcc.Graph(
		id='example-graph',
		figure={
			'data': [trace],
			'layout':
			go.Layout(alt='Kafka SampleTable_1 Data', barmode='stack')
		})
], className="container")

if __name__ == '__main__':
    app.run_server(debug=True)

関連コンテンツ

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。