PHP でMySQL からKafka のデータにアクセス

PHP から標準MySQL ライブラリを介してKafka に接続。

古川えりか
コンテンツスペシャリスト

最終更新日:2022-07-18

こんにちは!ドライバー周りのヘルプドキュメントを担当している古川です。

CData SQL Gateway とODBC Driver for ApacheKafka を使用することで、ETL を実行したりデータをキャッシュしたりすることなく、MySQL クライアントからKafka にアクセスできます。以下のステップに従って、PHP の標準MySQL インターフェースであるmysqliPDO_MySQL を介してKafka のデータにリアルタイムで接続します。

CData ODBC ドライバとは?

CData ODBC ドライバは、以下のような特徴を持ったリアルタイムデータ連携ソリューションです。

  1. Kafka をはじめとする、CRM、MA、会計ツールなど多様なカテゴリの270種類以上のSaaS / オンプレミスデータソースに対応
  2. 多様なアプリケーション、ツールにKafka のデータを連携
  3. ノーコードでの手軽な接続設定
  4. 標準 SQL での柔軟なデータ読み込み・書き込み

CData ODBC ドライバでは、1.データソースとしてKafka の接続を設定、2.MySQL 側でODBC Driver との接続を設定、という2つのステップだけでデータソースに接続できます。以下に具体的な設定手順を説明します。

CData ODBC ドライバのインストールとKafka への接続設定

まずは、本記事右側のサイドバーからApacheKafka ODBC Driver の無償トライアルをダウンロード・インストールしてください。30日間無償で、製品版の全機能が使用できます。

必要な接続プロパティの値が未入力の場合には、データソース名(DSN)で入力します。組み込みのMicrosoft ODBC Data Source Administrator を使用し、DSN を構成できます。ドライバーのインストールの最後にアドミニストレーターが開きます。Microsoft ODBC データソースアドミニストレーターを使ってDSN を作成および設定する方法は、ヘルプドキュメントの「はじめに」を参照してください。

Apache Kafka 接続プロパティの取得・設定方法

.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされ、自動的に本製品と一緒にインストールされます。 別のインストール方法を利用する場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0 をインストールしてください。

Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。

デフォルトでは、本製品はデータソースとPLAINTEXT で通信し、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化するには:

  1. UseSSLtrue に設定し、本製品がSSL 暗号化を使用するように構成します。
  2. SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします。

Apache Kafka への認証

Apache Kafka データソースは、次の認証メソッドをサポートしています:

  • Anonymous
  • Plain
  • SCRAM ログインモジュール
  • SSL クライアント証明書
  • Kerberos

Anonymous

Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 こうした接続はanonymous(匿名)と呼ばれます。

匿名認証を行うには、このプロパティを設定します。

  • AuthSchemeNone

その他の認証方法については、ヘルプドキュメントを参照してください。

SQL Gateway を構成する

Kafka のデータへの接続を、仮想MySQL データベースとして設定する方法については、「SQL Gateway Overview」を参照してください。クライアントからのMySQL リクエストをリッスンするMySQL リモートサービスを構成します。このサービスはSQL Gateway UI で設定できます。

Creating a MySQL Remoting Service in SQL Gateway (Salesforce is shown)

PHP から接続する

以下の例は、オブジェクト指向インターフェースを使用してクエリを接続および実行する方法を示します。 以下のパラメータを使用して接続オブジェクトを初期化し、仮想MySQL データベースに接続します。

  • Host:サービスが実行されているリモートホストの場所を指定します。この例では、サービスがローカルマシンで実行されているため、"localhost" をリモートホスト設定に使用します。
  • Username:SQL Gateway の「ユーザー」タブで認証したユーザーのユーザー名を指定します。
  • Password:認証されたユーザーアカウントのパスワードを指定します。
  • Database Name:データベース名としてシステムDSN を指定します。
  • Port:サービスが実行されているポートを指定します。この例では3306 を指定します。

mysqli

<?php
$mysqli = new mysqli("localhost", "user", "password", "CData ApacheKafka Sys","3306");
?>

PDO

<?php
$pdo = new PDO('mysql:host=localhost;dbname=CData ApacheKafka Sys;port=3306', 'user', 'password');
?>

PHP からクエリする

接続が確立されると、テーブルにアクセスできます。以下のステップでは、例を説明します。

  1. テーブル(SampleTable_1)をクエリします。結果は、連想配列として$result オブジェクトに保存されます。
  2. 各行とカラムを順番に処理して、値を出力してPHP ページに表示することができます。
  3. 接続を閉じます。

mysqli

$result = $mysqli->query("SELECT Id, Column1 FROM SampleTable_1 WHERE Column2 = '100'");
while($row = $result->fetch_assoc()) {
  foreach ($row as $k=>$v) {
    echo "$k : $v";
    echo "<br />";
  }
}
$mysqli->close();

PDO

$result = $pdo->query("SELECT Id, Column1 FROM SampleTable_1 WHERE Column2 = '100'");
while($row = $result->fetch(PDO::FETCH_ASSOC)) {
  foreach ($row as $k=>$v) {
    echo "$k : $v";
    echo "<br />";
  }
}
$result = null;
$pdo = null;

おわりに

このようにCData ODBC ドライバと併用することで、270を超えるSaaS、NoSQL データをコーディングなしで扱うことができます。30日の無償評価版が利用できますので、ぜひ自社で使っているクラウドサービスやNoSQL と合わせて活用してみてください。

CData ODBC ドライバは日本のユーザー向けに、UI の日本語化、ドキュメントの日本語化、日本語でのテクニカルサポートを提供しています。

関連コンテンツ

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。