各製品の資料を入手。
詳細はこちら →Oracle Data Integrator でKafka のデータをETL する方法
この記事では、Oracle Data Integrator を使ってKafka のデータをデータウェアハウスに転送する方法を説明します。
最終更新日:2022-01-31
この記事で実現できるKafka 連携のシナリオ
こんにちは!ウェブ担当の加藤です。マーケ関連のデータ分析や整備もやっています。
Oracle Data Integrator(ODI)はOracle エコシステムのハイパフォーマンスなデータ統合プラットフォームです。CData JDBC Driver for ApacheKafka を使えば、OCI をはじめとするETL ツールからKafka のデータにJDBC 経由で簡単に読み取りと書き込みを実現できます。リアルタイムKafka のデータをデータウェアハウス、BI・帳票ツール、CRM、基幹システムなどに統合すれば、データ活用もぐっと楽に。
CData のコネクタを使えば、Kafka API にリアルタイムで直接接続して、ODI 上で通常のデータベースと同じようにKafka のデータを操作できます。Kafka エンティティのデータモデルを構築、マッピングを作成し、データの読み込み方法を選択するだけの簡単なステップでKafka のデータのETL が実現できます。
ドライバーのインストール
ドライバーをインストールするには、インストールフォルダにあるドライバーのJAR ファイルと.lic ファイルをODI の適切なディレクトリにコピーします。
- UNIX/Linux(Agent なし):~/.odi/oracledi/userlib
- UNIX/Linux(Agent):$ODI_HOME/odi/agent/lib
- Windows(Agent なし):%APPDATA%\Roaming\odi\oracledi\userlib
- Windows(Agent):%APPDATA%\Roaming\odi\agent\lib
ODI を再起動してインストールを完了します。
モデルのリバースエンジニアリング
ODI の機能を使ってモデルをリバースエンジニアリングすることで、ドライバー側で取得したKafka のデータのリレーショナルビューに関するメタデータが取得できます。リバースエンジニアリング後、リアルタイムKafka のデータにクエリを実行してKafka テーブルのマッピングを作成できます。
-
ODI でリポジトリに接続し、「New」->「Model and Topology Objects」をクリックします。
- 表示されるダイアログの「Model」画面で、以下の情報を入力します。
- Name:ApacheKafka と入力します。
- Technology:Technology:Generic SQL(ODI がVersion 12.2+ の場合はMicrosoft SQL Server)を選択します。
- Logical Schema:ApacheKafka と入力します。
- Context:Global を選択します。
- 表示されるダイアログの「Data Server」画面で、以下の情報を入力します。
- Name:ApacheKafka と入力します。
- Driver List:Oracle JDBC Driver を選択します。
- Driver:cdata.jdbc.apachekafka.ApacheKafkaDriver と入力します。
- URL:接続文字列を含むJDBC URL を入力します。
Apache Kafka 接続プロパティの取得・設定方法
.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされ、自動的に本製品と一緒にインストールされます。 別のインストール方法を利用する場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0 をインストールしてください。
Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。
デフォルトでは、本製品はデータソースとPLAINTEXT で通信し、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化するには:
- UseSSL をtrue に設定し、本製品がSSL 暗号化を使用するように構成します。
- SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします。
Apache Kafka への認証
Apache Kafka データソースは、次の認証メソッドをサポートしています:- Anonymous
- Plain
- SCRAM ログインモジュール
- SSL クライアント証明書
- Kerberos
Anonymous
Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 こうした接続はanonymous(匿名)と呼ばれます。
匿名認証を行うには、このプロパティを設定します。
- AuthScheme:None。
その他の認証方法については、ヘルプドキュメントを参照してください。
組み込みの接続文字列デザイナー
JDBC URL の作成の補助として、Kafka JDBC Driver に組み込まれている接続文字列デザイナーが使用できます。JAR ファイルをダブルクリックするか、コマンドラインからjar ファイルを実行します。
java -jar cdata.jdbc.apachekafka.jar
接続プロパティを入力し、接続文字列をクリップボードにコピーします。
一般的な接続文字列は次のとおりです。
jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;
- Physical Schema 画面で、以下の情報を入力します。
- Name:ドロップダウンメニューから選択します。
- Database (Catalog):CData と入力します。
- Owner (Schema):Kafka にSchema を選択した場合は、選択したSchema を入力し、それ以外の場合はApacheKafka と入力します。
- Database (Work Catalog):CData と入力します。
- Owner (Work Schema):Kafka にSchema を選択した場合は、選択したSchema を入力し、それ以外の場合はApacheKafka と入力します。
- 開いたモデルで「Reverse Engineer」をクリックしてKafka テーブルのメタデータを取得します。
Kafka のデータの編集と保存
リバースエンジニアリング後、ODI でKafka のデータを操作できるようになります。
Kafka のデータを編集し保存するには、Designer ナビゲーターでモデルアコーディオンを展開し、テーブルを右クリックして「Data」をクリックします。「Refresh」をクリックしてデータの変更を取得します。変更が完了したら「Save Changes」をクリックします。
ETL プロジェクトの作成
次の手順に従って、Kafka からETL を作成します。SampleTable_1 エンティティをODI Getting Started VM に含まれているサンプルデータウェアハウスにロードします。
SQL Developer を開き、Oracle データベースに接続します。Connections ぺインでデータベースのノードを右クリックし、「New SQL Worksheet」をクリックします。
もしくは、SQLPlus を使用することもできます。コマンドプロンプトから、以下のように入力します。
sqlplus / as sysdba
- 以下のクエリを入力し、ODI_DEMO スキーマにあるサンプルデータウェアハウスに新しいターゲットテーブルを作成します。以下のクエリは、Kafka のSampleTable_1 テーブルに一致するいくつかのカラムを定義します。
CREATE TABLE ODI_DEMO.TRG_SAMPLETABLE_1 (COLUMN1 NUMBER(20,0),Id VARCHAR2(255));
- ODI でDesigner ナビゲーターのModels アコーディオンを展開し、ODI_DEMO フォルダの「Sales Administration」ノードをダブルクリックします。Model Editor でモデルが開きます。
- 「Reverse Engineer」をクリックします。TRG_SAMPLETABLE_1 テーブルがモデルに追加されます。
- プロジェクトの「Mappings」ノードを右クリックし、「New Mapping」をクリックします。マッピングの名前を入力し、「Create Empty Dataset」オプションを無効にします。Mapping Editor が表示されます。
- TRG_SAMPLETABLE_1 テーブルをSales Administration モデルからマッピングにドラッグします。
- SampleTable_1 テーブルをKafka モデルからマッピングにドラッグします。
- ソースコネクタポイントをクリックしてターゲットコネクタポイントにドラッグします。Attribute Matching ダイアログが表示されます。ここでは、デフォルトオプションを使用します。その場合、目的の動作はターゲットカラムのプロパティに表示されます。
- Mapping Editor のPhysical タブを開き、TARGET_GROUP の「SAMPLETABLE_1_AP」をクリックします。
- SAMPLETABLE_1_AP プロパティで、Loading Knowledge Module タブの「LKM SQL to SQL (Built-In)」を選択します。
これで、マッピングを実行してKafka のデータをOracle にロードできます。