Kafka のデータ のPostgreSQL インターフェースを作成

Kafka JDBC Driver のリモート機能を使用し、データアクセス用のPostgreSQL エントリポイントを作成します。

加藤龍彦
デジタルマーケティング

最終更新日:2022-12-02

この記事で実現できるKafka 連携のシナリオ

こんにちは!ウェブ担当の加藤です。マーケ関連のデータ分析や整備もやっています。

PostgreSQL には多くの対応クライアントがあります。標準のドライバーからBI、アナリティクスツールまで、PostgreSQL はデータ接続の人気のインターフェースです。JDBC ドライバーを使用することで、簡単に任意の標準クライアントから接続できるPostgreSQL エントリポイントを作成できます。

Kafka にPostgreSQL データベースとしてアクセスするには、CData JDBC Driver for ApacheKafka とJDBC foreign data wrapper (FDW) を使用します。この記事ではFDW をコンパイルしてインストールし、PostgreSQL サーバーからKafka にクエリを実行します。

JDBC データソースとしてKafka のデータに接続する

JDBC データソースとしてKafka に接続するには、以下が必要です。

  • Driver のJAR パス:JAR ファイルは、インストールディレクトリのlib サブフォルダにあります。
  • Driver クラス

    cdata.jdbc.apachekafka.ApacheKafkaDriver
  • JDBC URL: URL は、"jdbc:apachekafka:" で始まり、セミコロンで区切られた名前と値の組み合わせで任意の接続プロパティを含めることができます。

    Apache Kafka 接続プロパティの取得・設定方法

    .NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされ、自動的に本製品と一緒にインストールされます。 別のインストール方法を利用する場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0 をインストールしてください。

    Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。

    デフォルトでは、本製品はデータソースとPLAINTEXT で通信し、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化するには:

    1. UseSSLtrue に設定し、本製品がSSL 暗号化を使用するように構成します。
    2. SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします。

    Apache Kafka への認証

    Apache Kafka データソースは、次の認証メソッドをサポートしています:

    • Anonymous
    • Plain
    • SCRAM ログインモジュール
    • SSL クライアント証明書
    • Kerberos

    Anonymous

    Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 こうした接続はanonymous(匿名)と呼ばれます。

    匿名認証を行うには、このプロパティを設定します。

    • AuthSchemeNone

    その他の認証方法については、ヘルプドキュメントを参照してください。

    ビルトイン接続文字列デザイナ

    JDBC URL の構成については、Kafka JDBC Driver に組み込まれている接続文字列デザイナを使用できます。JAR ファイルのダブルクリック、またはコマンドラインからJAR ファイルを実行します。

    java -jar cdata.jdbc.apachekafka.jar

    接続プロパティを入力し、接続文字列をクリップボードにコピーします。

    Using the built-in connection string designer to generate a JDBC URL (Salesforce is shown.)

    以下は一般的なJDBC URL です。

    jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;

JDBC FDW を構築する

FDW は、PostgreSQL を再コンパイルせずに、PostgreSQL の拡張機能としてインストールできます。例としてjdbc2_fdw 拡張子を使用します。

  1. ご使用のバージョンのJRE 共有オブジェクトから、/usr/lib/libjvm.so にシンボリックリンクを追加します。コマンド例: ln -s /usr/lib/jvm/java-6-openjdk/jre/lib/amd64/server/libjvm.so /usr/lib/libjvm.so
  2. ビルドするには、以下のコマンドを実行してください。 make install USE_PGXS=1

Kafka のデータをPostgreSQL データベースとしてクエリする

拡張機能をインストールした後、以下のステップに従ってKafka へのクエリの実行を開始します。

  1. データベースにログイン
  2. データベースの拡張機能をロード CREATE EXTENSION jdbc2_fdw;
  3. Kafka のオブジェクトを作成 CREATE SERVER ApacheKafka FOREIGN DATA WRAPPER jdbc2_fdw OPTIONS ( drivername 'cdata.jdbc.apachekafka.ApacheKafkaDriver', url 'jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;', querytimeout '15', jarfile '/home/MyUser/CData/CData\ JDBC\ Driver\ for\ Salesforce MyDriverEdition/lib/cdata.jdbc.apachekafka.jar');
  4. PostgreSQL デーモンに認識されているユーザーのユーザー名とパスワードのユーザーマッピングを作成 CREATE USER MAPPING for postgres SERVER ApacheKafka OPTIONS ( username 'admin', password 'test');
  5. ローカルデータベースに外部テーブルを作成 postgres=# CREATE FOREIGN TABLE sampletable_1 ( sampletable_1_id text, sampletable_1_Id text, sampletable_1_Column1 numeric) SERVER ApacheKafka OPTIONS ( table_name 'sampletable_1');
Kafka に対して 読み取り/書き込みコマンドを実行可能にする postgres=# SELECT * FROM sampletable_1;

おわりに

このようにCData JDBC Driver for ApacheKafka を使って簡単にKafka のデータを取得して検索対象にすることができました。ぜひ、30日の無償評価版 をお試しください。

関連コンテンツ

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。