各製品の資料を入手。
詳細はこちら →JDBI からKafka データ のデータアクセスオブジェクトを作成
JDBI でKafka のデータ 用のSQL オブジェクトAPIを作成する方法を概説します。
最終更新日:2022-05-25
この記事で実現できるKafka 連携のシナリオ
こんにちは!ウェブ担当の加藤です。マーケ関連のデータ分析や整備もやっています。
JDBI は、Fluent スタイルとSQL オブジェクトスタイルという2つの異なるスタイルAPI を公開する、Java 用のSQL コンビニエンスライブラリです。CData JDBC Driver for ApacheKafka は、Java アプリケーションとリアルタイムKafka のデータ のデータ連携を実現します。これらの技術を組み合わせることによって、Kafka のデータ へのシンプルなコードアクセスが可能になります。ここでは、基本的なDAO(Data Access Object )とそれに付随するKafka のデータ の読み書きのためのコードの作成について説明します。
Kafka SampleTable_1 Entity のDAO を作成
以下のインターフェースは、実装されるSQL ステートメントごとに単一のメソッドを作成するためのSQL オブジェクトの正しい動作を宣言します。
public interface MySampleTable_1DAO {
//insert new data into Kafka
@SqlUpdate("INSERT INTO SampleTable_1 (Column2, Column1) values (:column2, :column1)")
void insert(@Bind("column2") String column2, @Bind("column1") String column1);
//request specific data from Kafka (String type is used for simplicity)
@SqlQuery("SELECT Column1 FROM SampleTable_1 WHERE Column2 = :column2")
String findColumn1ByColumn2(@Bind("column2") String column2);
/*
* close with no args is used to close the connection
*/
void close();
}
Kafka への接続を開く
必要な接続プロパティを収集し、Kafka に接続するための適切なJDBC URL を作成します。
Apache Kafka 接続プロパティの取得・設定方法
.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされ、自動的に本製品と一緒にインストールされます。 別のインストール方法を利用する場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0 をインストールしてください。
Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。
デフォルトでは、本製品はデータソースとPLAINTEXT で通信し、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化するには:
- UseSSL をtrue に設定し、本製品がSSL 暗号化を使用するように構成します。
- SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします。
Apache Kafka への認証
Apache Kafka データソースは、次の認証メソッドをサポートしています:
- Anonymous
- Plain
- SCRAM ログインモジュール
- SSL クライアント証明書
- Kerberos
Anonymous
Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 こうした接続はanonymous(匿名)と呼ばれます。
匿名認証を行うには、このプロパティを設定します。
- AuthScheme:None。
その他の認証方法については、ヘルプドキュメントを参照してください。
ビルトイン接続文字列デザイナー
JDBC URL の構成については、Kafka JDBC Driver に組み込まれている接続文字列デザイナーを使用してください。JAR ファイルのダブルクリック、またはコマンドラインからJAR ファイルを実行します。
java -jar cdata.jdbc.apachekafka.jar
接続プロパティを入力し、接続文字列をクリップボードにコピーします。

Kafka の接続文字列は、通常次のようになります。
jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;
構成済みのJDBC URL を使用して、DAO インターフェースのインスタンスを取得します。以下に示す特定のメソッドはインスタンスにバインドされたハンドルを開くため、ハンドルとバインドされたJDBC 接続を開放するには、インスタンスを明示的に閉じる必要があります。
DBI dbi = new DBI("jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;");
MySampleTable_1DAO dao = dbi.open(MySampleTable_1DAO.class);
//do stuff with the DAO
dao.close();
Kafka データ について
Kafka への接続を開いた状態で以前定義したメソッドを呼び出すだけで、Kafka のSampleTable_1 エンティティからデータを取得できます。
//disply the result of our 'find' method
String column1 = dao.findColumn1ByColumn2("100");
System.out.println(column1);
Kafka データ の書き方
以前定義した方法を使用すれば、Kafka にデータを書き込むことも簡単になります。
//add a new entry to the SampleTable_1 entity
dao.insert(newColumn2, newColumn1);
JDBI ライブラリはJDBC 接続を処理できるため、CData JDBC Driver for ApacheKafka と統合することで、SQL Object API for ApacheKafka を簡単に作成できます。今すぐ無料トライアルをダウンロードし、Java アプリケーションでリアルタイムKafka を操作しましょう。