SnapLogic でKafka を外部サービスに連携

CData JDBC Driver を使用して、SnapLogic と外部サービスを連携。

古川えりか
コンテンツスペシャリスト

最終更新日:2021-11-05

この記事で実現できるKafka 連携のシナリオ

こんにちは!ドライバー周りのヘルプドキュメントを担当している古川です。

SnapLogic はintegration Platform-as-a-Service(iPaaS)であり、ユーザーはノーコードでデータ連携フローを作成できます。CData JDBC ドライバと組み合わせることで、ユーザーはSnapLogic ワークフローからKafka を含む250を超えるSaaS、ビッグデータ、NoSQL データソースのリアルタイムデータに接続できます。

組み込みの最適化されたデータ処理によって、CData JDBC Driver はリアルタイムKafka のデータを高速に扱えます。プラットフォームがKafka に複雑なSQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされているSQL 操作をKafka に直接プッシュし、サポートされていない操作(主にSQL 関数とJOIN 操作)は組み込みSQL エンジンを利用してクライアント側で処理します。組み込みの動的メタデータクエリを使用すると、ネイティブデータソース型を使用してKafka のデータを操作することができます。

SnapLogic からKafka に接続する

SnapLogic からKafka のデータに接続するには、CData Kafka JDBC Driver をダウンロードしてインストールします。インストール画面に従ってください。インストールが完了すると、インストール先のディレクトリ(デフォルトでは、C:/Program Files/CData/CData JDBC Driver for ApacheKafka/lib)にJAR ファイルが作成されます。

Kafka JDBC Driver をアップロードする

インストール後、JDBC JAR ファイルをSnapLogic 内のディレクトリ(例えば、projects/Jerod Johnson)にManager タブからアップロードします。

アップロードされたJDBC Driver(Salesforce とQuickBooks Online の場合)

接続を設定する

JDBC Driver がアップロードされると、Kafka への接続を作成できます。

  1. Designer タブに移動します。
  2. Snaps から「JDBC」を展開して、「Generic JDBC - Select」snap をdesigner にドラッグします。 designer にGeneric JDBC snap を追加
  3. Add Account をクリック(または既存のものを選択)して、「Continue」をクリックします。
  4. 次のフォームでは、JDBC 接続プロパティを設定します。
    • JDBC JARs 以下にアップロードしたJAR ファイルを追加します。
    • JDBC Driver Classcdata.jdbc.apachekafka.ApacheKafkaDriver に設定します。
    • JDBC URL をKafka JDBC Driver 用のJDBC 接続文字列に設定します。例えば、

      jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;RTK=XXXXXX;
      です。

      NOTE: RTK は評価版もしくは製品版のキーです。詳しくは、CData のサポートチームにご連絡ください接続を設定(Salesforce の場合)

      組み込みの接続文字列デザイナー

      JDBC URL の作成の補助として、Kafka JDBC Driver に組み込まれている接続文字列デザイナーが使用できます。JAR ファイルをダブルクリックするか、コマンドラインからjar ファイルを実行します。

      java -jar cdata.jdbc.apachekafka.jar

      接続プロパティを入力し、接続文字列をクリップボードにコピーします。

      Apache Kafka 接続プロパティの取得・設定方法

      .NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされ、自動的に本製品と一緒にインストールされます。 別のインストール方法を利用する場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0 をインストールしてください。

      Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。

      デフォルトでは、本製品はデータソースとPLAINTEXT で通信し、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化するには:

      1. UseSSLtrue に設定し、本製品がSSL 暗号化を使用するように構成します。
      2. SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします。

      Apache Kafka への認証

      Apache Kafka データソースは、次の認証メソッドをサポートしています:

      • Anonymous
      • Plain
      • SCRAM ログインモジュール
      • SSL クライアント証明書
      • Kerberos

      Anonymous

      Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 こうした接続はanonymous(匿名)と呼ばれます。

      匿名認証を行うには、このプロパティを設定します。

      • AuthSchemeNone

      その他の認証方法については、ヘルプドキュメントを参照してください。 組み込みの接続文字列デザイナーを使ってJDBC URL を生成(Salesforce の場合)

  5. 接続プロパティの入力後、「Validate」、そして「Apply」をクリックします。

Kafka のデータを読み込む

接続を検証、適用後に開くフォームで、クエリを設定します。

  • Schema name を"ApacheKafka" に設定します。
  • Table name を、スキーマ名を使用したKafka 用のテーブルに設定します。例えば、"ApacheKafka"."SampleTable_1" です(ドロップダウンを使用して利用可能なテーブルの全リストを確認できます)。
  • テーブルから、使用したい項目ごとにOutput fields を追加します。
Select snap を設定(Salesforce の場合)

Generic JDBC - Select snap を保存します。

接続とクエリを設定したら、snap の終端部分(以下のハイライト部分)をクリックしてデータをプレビューします。

snap の終端部分をクリックしてデータをプレビュー。

結果が期待どおりのものであることを確認したら、他のsnap を追加してKafka のデータを別のエンドポイントに渡すこともできます。

データをプレビュー(Salesforce の場合)。

Kafka のデータを外部サービスにパイプ

本記事では、データをGoogle Spreadsheet にロードします。他のあらゆるサポートされているsnap が使用でき、Generic JDBC snap を他のCData JDBC ドライバと利用してデータを外部サービスに移すこともできます。

  1. まず、「Worksheet Writer」snap を"Generic JDBC - Select" snap の終端部分にドロップします。
  2. Google Sheets に接続するアカウントを追加します。 Google に接続
  3. Worksheet Writer snap を、Kafka のデータをGoogle Spreadsheet に書き込むよう設定します。 Google Spreadsheet に書き込み

これで、接続済みのパイプラインを実行してKafka からデータを抽出し、Google Spreadsheet にプッシュできます。

Google Spreadsheets に書き込まれたデータ(Salesforce の場合)

外部データをKafka にパイプ

上述のように、JDBC Driver for ApacheKafka をSnapLogic から使用してデータをKafka に書き込むことができます。まずは、Generic JDBC - Insert またはGeneric JDBC - Update snap をダッシュボードに追加します。

  1. 既存の「Account」(接続)を選択するか、新しく作成します。
  2. クエリの設定。
    • Schema name を"ApacheKafka" に設定します。
    • Table name をスキーマ名を使用したKafka のテーブルに設定します。例えば、"ApacheKafka"."SampleTable_1"(ドロップダウンを使用して利用可能な全テーブルの一覧を確認できます)。
    INSERT snap を設定(Salesforce の場合)
  3. Generic JDBC - Insert/Update snap を保存します。

これで、Kafka にデータを書き込んだり、新しいレコードを挿入したり、既存のレコードを更新するsnap が設定できました。

詳細情報と無償評価版

CData JDBC Driver for ApacheKafka を使えば、Kafka のデータを外部サービスに連携するためのパイプラインをSnapLogic で作成できます。Kafka への接続に関する詳細については、CData のKafka 連携ページを参照してください。30日の無償評価版をダウンロードして今すぐ使い始めましょう。

関連コンテンツ

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。