Python pandas を使ってKafka のデータを可視化・分析する方法

CData Python Connector を使えば、Python でKafka をpandas などのライブラリで呼び出してデータ分析や可視化を実行できます。

加藤龍彦
デジタルマーケティング

最終更新日:2023-09-23

この記事で実現できるKafka 連携のシナリオ

こんにちは!ウェブ担当の加藤です。マーケ関連のデータ分析や整備もやっています。

Python エコシステムには多くのライブラリがあり、開発やデータ分析を行う際には必須と言っていいライブラリも多く存在します。CData Python Connector for ApacheKafka は、pandas、Matplotlib、SQLAlchemy から使用することで Kafka にデータ連携するPython アプリケーションを構築したり、Kafka のデータの可視化を実現します。本記事では、pandas、SQLAlchemy、およびMatplotlib のビルトイン機能でKafka にリアルタイムアクセスし、クエリを実行して結果を可視化する方法を説明します。

CData Python Connectors の特徴

CData Python Connectors は、以下のような特徴を持った製品です。

  1. Kafka をはじめとする、CRM、MA、会計ツールなど多様なカテゴリの270種類以上のSaaS / オンプレデータソースに対応
  2. pandas をはじめとする多様なデータ分析・BI ツールにKafka のデータを連携
  3. ノーコードでの手軽な接続設定

CData Python Connectors では、1.データソースとしてKafka の接続を設定、2.Python からPython Connectors との接続を設定、という2つのステップだけでデータソースに接続できます。以下に具体的な設定手順を説明します。

以下の手順に従い、必要なライブラリをインストールし、Python オブジェクト経由でKafka にアクセスします。

必要なライブラリのインストール

pip で、pandas & Matplotlib ライブラリおよび、SQLAlchemy をインストールします。

pip install pandas
pip install matplotlib
pip install sqlalchemy

次にライブラリをインポートします。

import pandas
import matplotlib.pyplot as plt
from sqlalchemy import create_engine

Python でKafka のデータを可視化

次は接続文字列を作成してKafka に接続します。create_engine 関数を使って、Kafka に連携するEngne を作成します。以下はサンプルの接続文字列になりますので、環境に応じてクレデンシャル部分を変更してください。

engine = create_engine("apachekafka:///?User=admin&Password=pass&BootStrapServers=https://localhost:9091&Topic=MyTopic")

Apache Kafka 接続プロパティの取得・設定方法

.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされ、自動的に本製品と一緒にインストールされます。 別のインストール方法を利用する場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0 をインストールしてください。

Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。

デフォルトでは、本製品はデータソースとPLAINTEXT で通信し、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化するには:

  1. UseSSLtrue に設定し、本製品がSSL 暗号化を使用するように構成します。
  2. SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします。

Apache Kafka への認証

Apache Kafka データソースは、次の認証メソッドをサポートしています:

  • Anonymous
  • Plain
  • SCRAM ログインモジュール
  • SSL クライアント証明書
  • Kerberos

Anonymous

Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 こうした接続はanonymous(匿名)と呼ばれます。

匿名認証を行うには、このプロパティを設定します。

  • AuthSchemeNone

その他の認証方法については、ヘルプドキュメントを参照してください。

Kafka にアクセスするSQL を実行

pandas のread_sql 関数を使って好きなSQL を発行して、DataFrame にデータを格納します。

df = pandas.read_sql("""SELECT Id, Column1 FROM SampleTable_1 WHERE Column2 = '100'""", engine)

Kafka のデータを可視化

DataFrame に格納されたクエリ結果に対して、plot 関数をつかって、Kafka のデータをグラフ化してみます。

df.plot(kind="bar", x="Id", y="Column1")
plt.show()
Kafka データ in a Python plot (Salesforce is shown).

Kafka からPython へのデータ連携には、ぜひCData Python Connector をご利用ください

このようにCData Python Connector と併用することで、270を超えるSaaS、NoSQL データをPython からコーディングなしで扱うことができます。30日の無償評価版が利用できますので、ぜひ自社で使っているクラウドサービスやNoSQL と合わせて活用してみてください。

日本のユーザー向けにCData Python Connector は、UI の日本語化、ドキュメントの日本語化、日本語でのテクニカルサポートを提供しています。



ソースコード

import pandas
import matplotlib.pyplot as plt
from sqlalchemy import create_engin

engine = create_engine("apachekafka:///?User=admin&Password=pass&BootStrapServers=https://localhost:9091&Topic=MyTopic")
df = pandas.read_sql("""SELECT Id, Column1 FROM SampleTable_1 WHERE Column2 = '100'""", engine)

df.plot(kind="bar", x="Id", y="Column1")
plt.show()

関連コンテンツ

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。