各製品の資料を入手。
詳細はこちら →SQLAlchemy ORM を使って、Python でKafka のデータに連携する方法
CData Python Connector を使って、Python アプリケーションおよびスクリプトからSQLAlchemy 経由でKafka にOR マッピング可能に。
最終更新日:2023-09-23
この記事で実現できるKafka 連携のシナリオ
こんにちは!ウェブ担当の加藤です。マーケ関連のデータ分析や整備もやっています。
Pythonエコシステムには、多くのモジュールがあり、システム構築を素早く効率的に行うことができます。CData Python Connector for ApacheKafka は、pandas、Matplotlib モジュール、SQLAlchemy ツールキットから使用することで Kafka にデータ連携するPython アプリケーションを構築し、Kafka のデータを可視化できます。 本記事では、SQLAlchemy でKafka に連携して、データを取得、、更新、挿入、削除 する方法を説明します。
CData Python Connectors の特徴
CData Python Connectors は、以下のような特徴を持った製品です。
- Kafka をはじめとする、CRM、MA、会計ツールなど多様なカテゴリの270種類以上のSaaS / オンプレデータソースに対応
- Python をはじめとする多様なデータ分析・BI ツールにKafka のデータを連携
- ノーコードでの手軽な接続設定
CData Python Connectors では、1.データソースとしてKafka の接続を設定、2.Python からPython Connectors との接続を設定、という2つのステップだけでデータソースに接続できます。以下に具体的な設定手順を説明します。
必要なモジュールのインストール
pip でSQLAlchemy ツールキットをインストールします:
pip install sqlalchemy
モジュールのインポートを忘れずに行います:
import sqlalchemy
Python でKafka のデータをモデル化
次は、接続文字列で接続を確立します。create_engine 関数を使って、Kafka のデータに連携するEngne を作成します。
engine = create_engine("apachekafka///?User=admin&Password=pass&BootStrapServers=https://localhost:9091&Topic=MyTopic")
Apache Kafka 接続プロパティの取得・設定方法
.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされ、自動的に本製品と一緒にインストールされます。 別のインストール方法を利用する場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0 をインストールしてください。
Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。
デフォルトでは、本製品はデータソースとPLAINTEXT で通信し、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化するには:
- UseSSL をtrue に設定し、本製品がSSL 暗号化を使用するように構成します。
- SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします。
Apache Kafka への認証
Apache Kafka データソースは、次の認証メソッドをサポートしています:
- Anonymous
- Plain
- SCRAM ログインモジュール
- SSL クライアント証明書
- Kerberos
Anonymous
Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 こうした接続はanonymous(匿名)と呼ばれます。
匿名認証を行うには、このプロパティを設定します。
- AuthScheme:None。
その他の認証方法については、ヘルプドキュメントを参照してください。
Kafka のデータのマッピングクラスの宣言
接続を確立したら、OR マッパーでモデル化するテーブルのマッピングクラスを宣言します。本記事では、SampleTable_1 テーブルを使います。sqlalchemy.ext.declarative.declarative_base 関数を使って、新しいクラスにフィールド(カラム)を定義します。
base = declarative_base() class SampleTable_1(base): __tablename__ = "SampleTable_1" Id = Column(String,primary_key=True) Column1 = Column(String) ...
Kafka のデータをクエリ
マッピングクラスができたので、セッションオブジェクトを使ってデータソースをクエリすることができます。セッションにEngine をバインドして、セッションのquery メソッドにマッピングクラスを提供します。
query メソッドを使う
engine = create_engine("apachekafka///?User=admin&Password=pass&BootStrapServers=https://localhost:9091&Topic=MyTopic") factory = sessionmaker(bind=engine) session = factory() for instance in session.query(SampleTable_1).filter_by(Column2="100"): print("Id: ", instance.Id) print("Column1: ", instance.Column1) print("---------")
ほかの方法としては、execute メソッドを適切なテーブルオブジェクトに使うことが可能です。以下のコードはアクティブなsession に対して有効です。
execute メソッドを使う
SampleTable_1_table = SampleTable_1.metadata.tables["SampleTable_1"] for instance in session.execute(SampleTable_1_table.select().where(SampleTable_1_table.c.Column2 == "100")): print("Id: ", instance.Id) print("Column1: ", instance.Column1) print("---------")
より複雑なクエリとして、JOIN、集計、Limit などが利用可能です。詳細はヘルプドキュメントをご覧ください。
Kafka のデータの挿入(INSERT)
Kafka のデータへの挿入には、マップされたクラスのインスタンスを定義し、アクティブな session に追加します。commit 関数を呼び出して、Kafka にすべての追加インスタンスを送ります。
new_rec = SampleTable_1(Id="placeholder", Column2="100") session.add(new_rec) session.commit()
Kafka のデータを更新(UPDATE)
Kafka のデータの更新には、更新するレコードをフィルタクエリとともにフェッチします。そして、フィールドの値を変更し、セッションでcommit 関数を呼んで、Kafka にレコードを追加します。
updated_rec = session.query(SampleTable_1).filter_by(SOME_ID_COLUMN="SOME_ID_VALUE").first() updated_rec.Column2 = "100" session.commit()
Kafka のデータを削除(DELETE)
Kafka のデータの削除には、フィルタクエリと一緒に対象となるレコードをフェッチします。そして、アクティブsession でレコードを削除し、セッションでcommit 関数を呼び出して、該当するレコードの削除を実行します。
deleted_rec = session.query(SampleTable_1).filter_by(SOME_ID_COLUMN="SOME_ID_VALUE").first() session.delete(deleted_rec) session.commit()
Kafka からPython へのデータ連携には、ぜひCData Python Connector をご利用ください
このようにCData Python Connector と併用することで、270を超えるSaaS、NoSQL データをPython からコーディングなしで扱うことができます。30日の無償評価版が利用できますので、ぜひ自社で使っているクラウドサービスやNoSQL と合わせて活用してみてください。
日本のユーザー向けにCData Python Connector は、UI の日本語化、ドキュメントの日本語化、日本語でのテクニカルサポートを提供しています。