各製品の資料を入手。
詳細はこちら →Azure App サービスでKafka IFTTT フローをトリガー
この記事では、Logic Apps の標準ウィザードを使用してIFTTT (if-this-then-that) ワークフローを自動化する方法を説明します。
最終更新日:2022-12-02
この記事で実現できるKafka 連携のシナリオ
こんにちは!ドライバー周りのヘルプドキュメントを担当している古川です。
OData やSwagger などの標準ベースのインターフェースを介してCData API Server をADO.NET Provider for ApacheKafka(もしくは250+ の他のADO.NET Providers)と組み合わせることで、Kafka を使用してLogic Apps と Power Automate のネイティブエクスペリエンスを提供します。OData は、データへのリアルタイム接続を可能にします。Swagger を使用すると、Logic Apps とPower Automateのウィザードのスキャフォールディング(コード生成) および、Power Apps のスキャフォールディングが可能になります。この記事では、Logic App のIFTTT (if-this-then-that) ワークフローにKafka を追加する方法を説明します。
API サーバーをセットアップ
以下のステップに従って、安全でSwaggerに対応するKafka API の作成を開始します。
デプロイ
API サーバーは独自のサーバーで実行されます。Windows では、スタンドアロンサーバーまたはIIS を使用して展開できます。Java サーブレットコンテナで、API Server WAR ファイルをドロップします。詳細とハウツーについては、ヘルプドキュメントを参照してください。
API サーバーは、Microsoft Azure、Amazon EC2、Heroku にも簡単にデプロイできます。
Kafka に接続
API サーバーとADO.NET Provider for ApacheKafka を展開した後、[Settings]->[Connections]をクリックし、API サーバー管理コンソールで新しい接続を追加して認証値とその他の接続プロパティを指定します。 次に、[Settings]->[Resources]をクリックして、API サーバーへのアクセスを許可する エンティティを選択できます。
Apache Kafka 接続プロパティの取得・設定方法
.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされ、自動的に本製品と一緒にインストールされます。 別のインストール方法を利用する場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0 をインストールしてください。
Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。
デフォルトでは、本製品はデータソースとPLAINTEXT で通信し、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化するには:
- UseSSL をtrue に設定し、本製品がSSL 暗号化を使用するように構成します。
- SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします。
Apache Kafka への認証
Apache Kafka データソースは、次の認証メソッドをサポートしています:
- Anonymous
- Plain
- SCRAM ログインモジュール
- SSL クライアント証明書
- Kerberos
Anonymous
Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 こうした接続はanonymous(匿名)と呼ばれます。
匿名認証を行うには、このプロパティを設定します。
- AuthScheme:None。
その他の認証方法については、ヘルプドキュメントを参照してください。
また、CORS を有効にし、[Settings]->[Server]ページで次のセクションを定義する必要があります。[*]なしですべてのドメインを許可するオプションを選択することもできます。
- Access-Control-Allow-Origin:値を[*]に設定します。Logic Apps Designer でAPI サーバーを使用するには、Access-Control-Allow-Origin ヘッダー値[*]が必要です。
- Access-Control-Allow-Methods:値を[GET,PUT,POST,OPTIONS]に設定します。
- Access-Control-Allow-Headers:[x-ms-client-request-id, authorization, content-type]に設定します。
API サーバーユーザーを承認
作成するOData サービスを決定したら、[Settings]->[Users]をクリックしてユーザーを承認します。API サーバーは、認証トークンベースの認証を使用して主要な認証スキームをサポートします。SSL を使用して、接続を認証及び暗号化することができます。アクセスはIP アドレスによって制限することもできます。デフォルトでは、ローカルマシンのみに制限されています。
簡単にするために、API ユーザーの認証トークンをURL で渡すことができます。データディレクトリにあるsettings.cfg ファイルの[Application]セクションに設定を追加する必要があります。Windows でこれは、アプリケーションルートのapp_data サブフォルダにあたります。Java エディションでは、データディレクトリの場所はオペレーティングシステムによって異なります。
- Windows:C:\ProgramData\CData
- Unix or Mac OS X: ~/cdata
[Application]
AllowAuthtokenInURL = true
Logic App でKafka にアクセス
Logic App でAPI サーバーを使用し、Kafka の周りにプロセスフローを作成できます。HTTP + Swagger アクションは、Kafka に対して実行する操作を定義するためのウィザードを提供します。以下のステップでは、Logic App でKafka を取得する方法を説明しています。
テーブルにレコードの作成日を含むカラムがある場合は、以下のステップに従って新しいレコードのカラム値をチェックする関数を作成できます。それ以外の場合は、[Create a Logic App]セクションにスキップし、フィルタに一致するエンティティにメールを送信します。
新しいKafka エンティティを確認
特定の新しいKafka エンティティを見つけるために、インターバルの開始日時の値を取得する関数を作成できます。
- [Azure Portal]で、[New]->[Function App]->[Create]と進みます。
- 名前を入力し、サブスクリプション、リソースグループ、App Service プラン、そしてストレージアカウントを選択します。
- Function App を選択し、Webhook + API シナリオを選択します。
- 言語を選択します。この記事では、JavaScript を使用します。
- 以下のコードを追加し、JSON オブジェクトで前の時間を返します。
module.exports = function (context, data) { var d = new Date(); d.setHours(d.getHours()-1); // Response of the function to be used later. context.res = { body: { start: d } }; context.done(); };
トリガーにKafka を追加
以下のステップに従って、フィルタに一致する結果をKafka で検索するトリガーを作成します。上記の関数を作成した場合は、返されたインターバルの開始後に作成されたオブジェクトを検索できます。
- Azure Portal で[New]をクリックし、[Web + Mobile]セクションで[Logic App]を選択してリソースグループとApp Service プランを選択します。
- これで、Logic App Designer で使用可能なウィザードが使用できます。このウィザードには、Logic App の設定ブレードからアクセスできます。Blank Logic App templateを選択します。
- Kafka オブジェクトをポーリングするRecurrence アクションを追加します。この記事では、一時間ごとにポーリングします。タイムゾーンを選択します。デフォルトはUTC です。
- 関数アクションを追加します。[Add Action]ダイアログのメニューを展開し、リジョンにAzure 関数を表示するオプションを選択します。先に作成したFunction App を選択し、インターバル開始を返す関数を選択します。
- からの中括弧のペア({})を入力し、からのペイロードオブジェクトを関数に渡します。
- HTTP + Swagger アクションを追加し、API サーバーのSwagger URL を入力します。
http://MySite:MyPort/api.rsc/@MyAuthtoken/$oas
- [Return SampleTable_1]操作を選択します。
各プロパティの説明を使用して、取得する列やフィルターなどの追加パラメータを指定します。以下はフィルタの一例です。
Column2 eq '100'
API サーバーは、Swagger ドキュメントにて説明とその他のドキュメントを返します。OData API とサポートされているOData の使用方法についての詳細は、API サーバーのヘルプドキュメントを参照してください。
getInterval 関数から返された日時値を使用するには、SampleTable_1 テーブルの日時列で[ge]演算子を使用し、ダイアログでBody パラメータを選択します。日時値を囲むには、クォーテーションを使用する必要があることに注意して下さい。
[Code View]に切り替え、$filter 式を変更してインターバルの開始を含むプロパティを抽出します。'@{body('MyFunc')['MyProp']' 構文を使用します。
"getAllAccount": { "inputs": { "method": "get", "queries": { "$filter":"CreatedDate ge '@{body('getInterval')['start']}'" }, "uri": "https://MySite:MyPort/api.rsc/@MyAuthtoken/SampleTable_1" }
これで、ワークフローのデータソースおよび宛先としてKafka にアクセスできます。
新しレコードをメールで送信
以下のステップに従って、新しいSampleTable_1 エンティティを含むレポートをメールで送信します。
- [Logic Apps Designer]で[SMTP - Send Email]アクションを追加します。
- SMTP サーバーに必要な情報を構成します。
- From、To、Subject、Body を構成します。返されたKafka 列からパラメータを追加できます。
[Save]をクリックし、[Run]をクリックして過去一時間に作成されたKafka レコードに関する電子メール通知を送信します。
