Oracle Data Integrator でKafka のデータをETL する方法

この記事では、Oracle Data Integrator を使ってKafka のデータをデータウェアハウスに転送する方法を説明します。

加藤龍彦
デジタルマーケティング

最終更新日:2022-01-31

この記事で実現できるKafka 連携のシナリオ

こんにちは!ウェブ担当の加藤です。マーケ関連のデータ分析や整備もやっています。

Oracle Data Integrator(ODI)はOracle エコシステムのハイパフォーマンスなデータ統合プラットフォームです。CData JDBC Driver for ApacheKafka を使えば、OCI をはじめとするETL ツールからKafka のデータにJDBC 経由で簡単に読み取りと書き込みを実現できます。リアルタイムKafka のデータをデータウェアハウス、BI・帳票ツール、CRM、基幹システムなどに統合すれば、データ活用もぐっと楽に。

CData のコネクタを使えば、Kafka API にリアルタイムで直接接続して、ODI 上で通常のデータベースと同じようにKafka のデータを操作できます。Kafka エンティティのデータモデルを構築、マッピングを作成し、データの読み込み方法を選択するだけの簡単なステップでKafka のデータのETL が実現できます。

ドライバーのインストール

ドライバーをインストールするには、インストールフォルダにあるドライバーのJAR ファイルと.lic ファイルをODI の適切なディレクトリにコピーします。

  • UNIX/Linux(Agent なし):~/.odi/oracledi/userlib
  • UNIX/Linux(Agent):$ODI_HOME/odi/agent/lib
  • Windows(Agent なし):%APPDATA%\Roaming\odi\oracledi\userlib
  • Windows(Agent):%APPDATA%\Roaming\odi\agent\lib

ODI を再起動してインストールを完了します。

モデルのリバースエンジニアリング

ODI の機能を使ってモデルをリバースエンジニアリングすることで、ドライバー側で取得したKafka のデータのリレーショナルビューに関するメタデータが取得できます。リバースエンジニアリング後、リアルタイムKafka のデータにクエリを実行してKafka テーブルのマッピングを作成できます。

  1. ODI でリポジトリに接続し、「New」->「Model and Topology Objects」をクリックします。 新しいモデルを作成
  2. 表示されるダイアログの「Model」画面で、以下の情報を入力します。
    • Name:ApacheKafka と入力します。
    • Technology:Technology:Generic SQL(ODI がVersion 12.2+ の場合はMicrosoft SQL Server)を選択します。
    • Logical Schema:ApacheKafka と入力します。
    • Context:Global を選択します。
    モデルを設定
  3. 表示されるダイアログの「Data Server」画面で、以下の情報を入力します。
    • Name:ApacheKafka と入力します。
    • Driver List:Oracle JDBC Driver を選択します。
    • Driver:cdata.jdbc.apachekafka.ApacheKafkaDriver と入力します。
    • URL:接続文字列を含むJDBC URL を入力します。

      Apache Kafka 接続プロパティの取得・設定方法

      .NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされ、自動的に本製品と一緒にインストールされます。 別のインストール方法を利用する場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0 をインストールしてください。

      Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。

      デフォルトでは、本製品はデータソースとPLAINTEXT で通信し、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化するには:

      1. UseSSLtrue に設定し、本製品がSSL 暗号化を使用するように構成します。
      2. SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします。

      Apache Kafka への認証

      Apache Kafka データソースは、次の認証メソッドをサポートしています:

      • Anonymous
      • Plain
      • SCRAM ログインモジュール
      • SSL クライアント証明書
      • Kerberos

      Anonymous

      Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 こうした接続はanonymous(匿名)と呼ばれます。

      匿名認証を行うには、このプロパティを設定します。

      • AuthSchemeNone

      その他の認証方法については、ヘルプドキュメントを参照してください。

      組み込みの接続文字列デザイナー

      JDBC URL の作成の補助として、Kafka JDBC Driver に組み込まれている接続文字列デザイナーが使用できます。JAR ファイルをダブルクリックするか、コマンドラインからjar ファイルを実行します。

      java -jar cdata.jdbc.apachekafka.jar

      接続プロパティを入力し、接続文字列をクリップボードにコピーします。

      組み込みの接続文字列デザイナーを使ってJDBC URL を生成(Salesforce の場合)

      一般的な接続文字列は次のとおりです。

      jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;
    Data Server を設定
  4. Physical Schema 画面で、以下の情報を入力します。
    • Name:ドロップダウンメニューから選択します。
    • Database (Catalog):CData と入力します。
    • Owner (Schema):Kafka にSchema を選択した場合は、選択したSchema を入力し、それ以外の場合はApacheKafka と入力します。
    • Database (Work Catalog):CData と入力します。
    • Owner (Work Schema):Kafka にSchema を選択した場合は、選択したSchema を入力し、それ以外の場合はApacheKafka と入力します。
    Physical Schema を設定
  5. 開いたモデルで「Reverse Engineer」をクリックしてKafka テーブルのメタデータを取得します。 モデルをリバースエンジニアリング

Kafka のデータの編集と保存

リバースエンジニアリング後、ODI でKafka のデータを操作できるようになります。 Kafka のデータを編集し保存するには、Designer ナビゲーターでモデルアコーディオンを展開し、テーブルを右クリックして「Data」をクリックします。「Refresh」をクリックしてデータの変更を取得します。変更が完了したら「Save Changes」をクリックします。 データを表示

ETL プロジェクトの作成

次の手順に従って、Kafka からETL を作成します。SampleTable_1 エンティティをODI Getting Started VM に含まれているサンプルデータウェアハウスにロードします。

  1. SQL Developer を開き、Oracle データベースに接続します。Connections ぺインでデータベースのノードを右クリックし、「New SQL Worksheet」をクリックします。

    もしくは、SQLPlus を使用することもできます。コマンドプロンプトから、以下のように入力します。

    sqlplus / as sysdba
  2. 以下のクエリを入力し、ODI_DEMO スキーマにあるサンプルデータウェアハウスに新しいターゲットテーブルを作成します。以下のクエリは、Kafka のSampleTable_1 テーブルに一致するいくつかのカラムを定義します。 CREATE TABLE ODI_DEMO.TRG_SAMPLETABLE_1 (COLUMN1 NUMBER(20,0),Id VARCHAR2(255));
  3. ODI でDesigner ナビゲーターのModels アコーディオンを展開し、ODI_DEMO フォルダの「Sales Administration」ノードをダブルクリックします。Model Editor でモデルが開きます。
  4. 「Reverse Engineer」をクリックします。TRG_SAMPLETABLE_1 テーブルがモデルに追加されます。
  5. プロジェクトの「Mappings」ノードを右クリックし、「New Mapping」をクリックします。マッピングの名前を入力し、「Create Empty Dataset」オプションを無効にします。Mapping Editor が表示されます。
  6. TRG_SAMPLETABLE_1 テーブルをSales Administration モデルからマッピングにドラッグします。
  7. SampleTable_1 テーブルをKafka モデルからマッピングにドラッグします。
  8. ソースコネクタポイントをクリックしてターゲットコネクタポイントにドラッグします。Attribute Matching ダイアログが表示されます。ここでは、デフォルトオプションを使用します。その場合、目的の動作はターゲットカラムのプロパティに表示されます。
  9. Mapping Editor のPhysical タブを開き、TARGET_GROUP の「SAMPLETABLE_1_AP」をクリックします。
  10. SAMPLETABLE_1_AP プロパティで、Loading Knowledge Module タブの「LKM SQL to SQL (Built-In)」を選択します。 Kafka へのSQL ベースのアクセスには、標準データベース間のナレッジモジュールを使用できます

これで、マッピングを実行してKafka のデータをOracle にロードできます。

関連コンテンツ

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。