各製品の資料を入手。
詳細はこちら →Kafka のデータをR で分析:JDBC での接続
CData JDBC Driver で標準的なR 関数とお好みの開発環境を使ってKafka を分析。
最終更新日:2023-09-21
この記事で実現できるKafka 連携のシナリオ
こんにちは!ウェブ担当の加藤です。マーケ関連のデータ分析や整備もやっています。
CData JDBC Driver for ApacheKafka とRJDBC package を使って、R でリモートKafka データ を利用できます。CData Driver を使うことで、業界が認めた基準で書かれたドライバーを活用して、オープンソースでポピュラーなR 言語のデータにアクセスできます。この記事では、ドライバーを使ってKafka にSQL クエリを実行する方法、および標準R 関数を呼び出してKafka をビジュアライズする方法について説明します。
R をインストール
マルチスレッドのMicrosoft R Open を実行すること、またはBLAS/LAPACK ライブラリにリンクされたオープン R を実行することによって、マルチスレッドおよびマネージドコードから利益を得られたドライバーのパフォーマンスにマッチできます。ここでは、Microsoft R Open 3.2.3 を使用します。CRAN レポジトリのJan. 1, 2016 snapshot からパッケージをインストールするために事前設定されています。このsnapshot は再現性を保証します。
RJDBC パッケージをロード
ドライバーを使うにはRJDBC パッケージをダウンロードします。RJDBC パッケージをインストールしたら、次のコードを入力してパッケージをロードします。
library(RJDBC)
JDBC データソースとしてKafka に接続
下記の情報を使いKafka にJDBC データソースとして接続します。
- Driver Class:cdata.jdbc.apachekafka.ApacheKafkaDriver に設定。
- Classpath:Driver JAR の場所を設定します。デフォルトではインストールディレクトリの[lib]サブフォルダです。
dbConnect やdbSendQuery のようなDBI 関数は、R にデータアクセスコードを書くための統一インターフェースを提供します。
driver <- JDBC(driverClass = "cdata.jdbc.apachekafka.ApacheKafkaDriver", classPath = "MyInstallationDir\lib\cdata.jdbc.apachekafka.jar", identifier.quote = "'")
これで、DBI 関数を使ってKafka に接続しSQL クエリを実行する準備が整いました。dbConnect 関数を使ってJDBC 接続を初期化します。一般的なJDBC 接続文字列は次のとおりです。
conn <- dbConnect(driver,"User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;")
Apache Kafka 接続プロパティの取得・設定方法
.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされ、自動的に本製品と一緒にインストールされます。 別のインストール方法を利用する場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0 をインストールしてください。
Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。
デフォルトでは、本製品はデータソースとPLAINTEXT で通信し、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化するには:
- UseSSL をtrue に設定し、本製品がSSL 暗号化を使用するように構成します。
- SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします。
Apache Kafka への認証
Apache Kafka データソースは、次の認証メソッドをサポートしています:
- Anonymous
- Plain
- SCRAM ログインモジュール
- SSL クライアント証明書
- Kerberos
Anonymous
Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 こうした接続はanonymous(匿名)と呼ばれます。
匿名認証を行うには、このプロパティを設定します。
- AuthScheme:None。
その他の認証方法については、ヘルプドキュメントを参照してください。
スキーマ Discovery
ドライバーはKafka API をリレーショナルデータベース、ビュー、ストアドプロシージャとしてモデルします。次のコードを使ってテーブルリストを検出します。
dbListTables(conn)
SQL クエリの実行
dbGetQuery 関数を使ってKafka API がサポートするすべてのSQL クエリを実行できます:
sampletable_1 <- dbGetQuery(conn,"SELECT Id, Column1 FROM SampleTable_1 WHERE Column2 = '100'")
次のコマンドを使って、結果を[data viewer]ウィンドウで見ることができます。
View(sampletable_1)
Kafka データ をプロット
CRAN レポジトリで利用可能なあらゆるデータ初期化パッケージを使ってKafka を分析する準備が整いました。ビルトインバーのplot 関数を使って簡単なバーを作成できます。
par(las=2,ps=10,mar=c(5,15,4,2)) barplot(sampletable_1$Column1, main="Kafka SampleTable_1", names.arg = sampletable_1$Id, horiz=TRUE)
