各製品の資料を入手。
詳細はこちら →Kafka のデータをPowerShell でMySQL にレプリケーションする方法
PowerShell のシンプルなスクリプトで、Kafka のデータ をMySQL データベースにレプリケーション(複製)する方法を紹介します。
最終更新日:2023-09-26
この記事で実現できるKafka 連携のシナリオ
こんにちは!ドライバー周りのヘルプドキュメントを担当している古川です。
CData Cmdlets for ApacheKafka を使えば、PowerShell からKafka のデータ データにリアルタイムで連携できます。データ同期などのタスクの連携にぴったりの製品です。 本記事では、PowerShell からCData Cmdlets for ApacheKafka およびCData Cmdlets for MySQL を使って、同期スクリプトを作成して実行します。
まずは、PowerShell でKafka への接続を行います。レプリケーションは4つのステップがあります。
Apache Kafka 接続プロパティの取得・設定方法
.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされ、自動的に本製品と一緒にインストールされます。 別のインストール方法を利用する場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0 をインストールしてください。
Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。
デフォルトでは、本製品はデータソースとPLAINTEXT で通信し、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化するには:
- UseSSL をtrue に設定し、本製品がSSL 暗号化を使用するように構成します。
- SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします。
Apache Kafka への認証
Apache Kafka データソースは、次の認証メソッドをサポートしています:
- Anonymous
- Plain
- SCRAM ログインモジュール
- SSL クライアント証明書
- Kerberos
Anonymous
Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 こうした接続はanonymous(匿名)と呼ばれます。
匿名認証を行うには、このプロパティを設定します。
- AuthScheme:None。
その他の認証方法については、ヘルプドキュメントを参照してください。
Kafka のデータの取得
-
モジュールのインストール:
Install-Module ApacheKafkaCmdlets
-
Kafka への接続:
$apachekafka = Connect-ApacheKafka -User $User -Password $Password -BootStrapServers $BootStrapServers -Topic $Topic
-
取得ターゲットのリソースの取得:
$data = Select-ApacheKafka -Connection $apachekafka -Table "SampleTable_1"
Invoke-ApacheKafka cmdlet を使って、SQL-92 クエリを使用することもできます:
$data = Invoke-ApacheKafka -Connection $apachekafka -Query 'SELECT * FROM SampleTable_1 WHERE Column2 = @Column2' -Params @{'@Column2'='100'}
-
戻り値からカラム名のリストを保存します。
$columns = ($data | Get-Member -MemberType NoteProperty | Select-Object -Property Name).Name
Kafka のデータをMySQL データベースにレプリケーション
カラム名を指定できるようにして、データをMySQL データベースにレプリケーションします。
-
モジュールのインストール:
Install-Module MySQLCmdlets
-
MySQL DB に、MySQL Server 名、ユーザー、パスワード、レプリケーション先のデータベース名を指定して、接続します:
$mysql = Connect-MySQL -User $User -Password $Password -Database $Database -Server $Server -Port $Port
-
Kafka、保存された値、そしてAdd-MySQL Cmdlet を使って、MySQL にデータを1レコードずつ挿入します。この例では、MySQL 側のテーブルは、Kafka のリソース(SampleTable_1)と同じテーブル名を持っている必要があります。
$data | % { $row = $_ $values = @() $columns | % { $col = $_ $values += $row.$($col) } Add-MySQL -Connection $mysql -Table "SampleTable_1" -Columns $columns -Values $values }
次回以降のレプリケーションをシンプルに実現
-
一度PowerShell でKafka とMySQL に接続したら、次からは1行のコマンドでレプリケーションを実施できます:
Select-ApacheKafka -Connection $apachekafka -Table "SampleTable_1" | % { $row = $_ $values = @() $columns | % { $col = $_ $values += $row.$($col) } Add-MySQL -Connection $mysql -Table "SampleTable_1" -Columns $columns -Values $values }
-
別のPowerShell モジュールで、Kafka を別のデータベースに複製する場合、Select-ApacheKafka cmdlet のデータから、カラム、接続およびテーブルを除外しておきましょう。これらのデータはデータ移動のときだけ必要となるためです。
$columns = ($data | Get-Member -MemberType NoteProperty | Select-Object -Property Name).Name | ? {$_ -NotIn @('Columns','Connection','Table')}
おわりに
これで、Kafka のデータをMySQL に複製できました。分析、BI などでKafka のデータをMySQL から使うことができるようになります。