Kafka のデータをR で分析:JDBC での接続

CData JDBC Driver で標準的なR 関数とお好みの開発環境を使ってKafka を分析。

加藤龍彦
デジタルマーケティング

最終更新日:2023-09-21

この記事で実現できるKafka 連携のシナリオ

こんにちは!ウェブ担当の加藤です。マーケ関連のデータ分析や整備もやっています。

CData JDBC Driver for ApacheKafka とRJDBC package を使って、R でリモートKafka データ を利用できます。CData Driver を使うことで、業界が認めた基準で書かれたドライバーを活用して、オープンソースでポピュラーなR 言語のデータにアクセスできます。この記事では、ドライバーを使ってKafka にSQL クエリを実行する方法、および標準R 関数を呼び出してKafka をビジュアライズする方法について説明します。

R をインストール

マルチスレッドのMicrosoft R Open を実行すること、またはBLAS/LAPACK ライブラリにリンクされたオープン R を実行することによって、マルチスレッドおよびマネージドコードから利益を得られたドライバーのパフォーマンスにマッチできます。ここでは、Microsoft R Open 3.2.3 を使用します。CRAN レポジトリのJan. 1, 2016 snapshot からパッケージをインストールするために事前設定されています。このsnapshot は再現性を保証します。

RJDBC パッケージをロード

ドライバーを使うにはRJDBC パッケージをダウンロードします。RJDBC パッケージをインストールしたら、次のコードを入力してパッケージをロードします。

library(RJDBC)

JDBC データソースとしてKafka に接続

下記の情報を使いKafka にJDBC データソースとして接続します。

  • Driver Class:cdata.jdbc.apachekafka.ApacheKafkaDriver に設定。
  • Classpath:Driver JAR の場所を設定します。デフォルトではインストールディレクトリの[lib]サブフォルダです。

dbConnect やdbSendQuery のようなDBI 関数は、R にデータアクセスコードを書くための統一インターフェースを提供します。

driver <- JDBC(driverClass = "cdata.jdbc.apachekafka.ApacheKafkaDriver", classPath = "MyInstallationDir\lib\cdata.jdbc.apachekafka.jar", identifier.quote = "'")

これで、DBI 関数を使ってKafka に接続しSQL クエリを実行する準備が整いました。dbConnect 関数を使ってJDBC 接続を初期化します。一般的なJDBC 接続文字列は次のとおりです。

conn <- dbConnect(driver,"User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;")

Apache Kafka 接続プロパティの取得・設定方法

.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされ、自動的に本製品と一緒にインストールされます。 別のインストール方法を利用する場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0 をインストールしてください。

Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。

デフォルトでは、本製品はデータソースとPLAINTEXT で通信し、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化するには:

  1. UseSSLtrue に設定し、本製品がSSL 暗号化を使用するように構成します。
  2. SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします。

Apache Kafka への認証

Apache Kafka データソースは、次の認証メソッドをサポートしています:

  • Anonymous
  • Plain
  • SCRAM ログインモジュール
  • SSL クライアント証明書
  • Kerberos

Anonymous

Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 こうした接続はanonymous(匿名)と呼ばれます。

匿名認証を行うには、このプロパティを設定します。

  • AuthSchemeNone

その他の認証方法については、ヘルプドキュメントを参照してください。

スキーマ Discovery

ドライバーはKafka API をリレーショナルデータベース、ビュー、ストアドプロシージャとしてモデルします。次のコードを使ってテーブルリストを検出します。

dbListTables(conn)

SQL クエリの実行

dbGetQuery 関数を使ってKafka API がサポートするすべてのSQL クエリを実行できます:

sampletable_1 <- dbGetQuery(conn,"SELECT Id, Column1 FROM SampleTable_1 WHERE Column2 = '100'")

次のコマンドを使って、結果を[data viewer]ウィンドウで見ることができます。

View(sampletable_1)

Kafka データ をプロット

CRAN レポジトリで利用可能なあらゆるデータ初期化パッケージを使ってKafka を分析する準備が整いました。ビルトインバーのplot 関数を使って簡単なバーを作成できます。

par(las=2,ps=10,mar=c(5,15,4,2)) barplot(sampletable_1$Column1, main="Kafka SampleTable_1", names.arg = sampletable_1$Id, horiz=TRUE) A basic bar plot. (Salesforce is shown.)

関連コンテンツ

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。