コラボフローでKafka のデータと連携したワークフローを作成

クラウドワークフローのコラボフローで、Kafka のデータと連携したワークフローを作成

杉本和也
リードエンジニア

最終更新日:2022-10-26

こんにちは!リードエンジニアの杉本です。

コラボフロー(www.collabo-style.co.jp/ )は誰でも簡単に作れるクラウドベースのワークフローサービスです。さらにCData Connect Server と連携することで、Kafka のデータへのクラウドベースのアクセスをノーコードで追加できます。本記事では、CData Connect Server 経由でコラボフローからKafka 連携を実現する方法を紹介します。

CData Connect Server はKafka のデータへのクラウドベースのOData インターフェースを提供し、コラボフローからKafka のデータへのリアルタイム連携を実現します。

Kafka の仮想OData API エンドポイントを作成

まずCData Connect Server でデータソースへの接続およびOData API エンドポイント作成を行います。

  1. CData Connect Server にログインして、「DATA MODEL」をクリックします。 データベースを追加
  2. 利用できるデータソースアイコンから"Kafka" を選択します。
  3. Kafka に接続するために必要なプロパティを入力します。

    Apache Kafka 接続プロパティの取得・設定方法

    .NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされ、自動的に本製品と一緒にインストールされます。 別のインストール方法を利用する場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0 をインストールしてください。

    Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。

    デフォルトでは、本製品はデータソースとPLAINTEXT で通信し、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化するには:

    1. UseSSLtrue に設定し、本製品がSSL 暗号化を使用するように構成します。
    2. SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします。

    Apache Kafka への認証

    Apache Kafka データソースは、次の認証メソッドをサポートしています:

    • Anonymous
    • Plain
    • SCRAM ログインモジュール
    • SSL クライアント証明書
    • Kerberos

    Anonymous

    Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 こうした接続はanonymous(匿名)と呼ばれます。

    匿名認証を行うには、このプロパティを設定します。

    • AuthSchemeNone

    その他の認証方法については、ヘルプドキュメントを参照してください。 接続を設定

  4. 「Test Connection」をクリックします。
  5. 「USERS」 -> Add をクリックして、新しいユーザーを追加し、適切な権限を指定します。
  6. API タブをクリックして OData API エンドポイントが生成されていることを確認します。

コネクションとOData エンドポイントを設定したら、Collaboflow からKafka のデータに接続できます。

Kafka のデータに接続したワークフローを作成

コラボフローからConnect Server に連携するためのJavaScript の準備

コラボフロー上で使用するConnect Server との接続用JavaScriptを準備します。

(function () { 'use strict'; // Setting Propeties const AutocompleteSetting = { // Autocomplete target field for Collaboflow InputName: 'fid0', // Collaboflow item detils line number ListRowNumber : 15, // Autocomplete tartget field for Connect Server ApiListupFiledColumn : 'apachekafka_column', // Key Column Name for Connect Server resource ApiListupKeyColumn : 'apachekafka_keycolumn', // Mapping between Collaboflow field and Connect Server column Mappings: [ { PartsName: 'fid1', // Collabo flow field name APIName: 'apachekafka_column1' // Connect Server column name }, { PartsName: 'fid2', APIName: 'apachekafka_column2' }, { PartsName: 'fid3', APIName: 'apachekafka_column3' }, { PartsName: 'fid4', APIName: 'apachekafka_column4' } ] }; const CDataConnectServerSetting = { // Connect Server URL ConnectServerUrl : 'http://XXXXXX', // Connect Server Resource Name ConnectServerResourceName : 'apachekafka_table', // Connect Server Key Headers : { Authorization: 'Basic YOUR_BASIC_AUTHENTICATION' }, // General Properties ParseType : 'json', get BaseUrl() { return CDataConnectServerSetting.ApiServerUrl + '/api.rsc/' + CDataConnectServerSetting.ApiServerResourceName } } let results = []; let records = []; // Set autocomplete processing for target input field collaboflow.events.on('request.input.show', function (data) { for (let index = 1; index < AutocompleteSetting.ListRowNumber; index++) { $('#' + AutocompleteSetting.InputName + '_' + index).autocomplete({ source: AutocompleteDelegete, autoFocus: true, delay: 500, minLength: 2 }); } }); // This function get details from Connect Server, Then set values at each input fields based on mappings object. collaboflow.events.on('request.input.' + AutocompleteSetting.InputName + '.change', function (eventData) { debugger; let tartgetParts = eventData.parts.tbl_1.value[eventData.row_index - 1]; let keyId = tartgetParts[AutocompleteSetting.InputName].value.split(':')[1\; let record = records.find(x => x[AutocompleteSetting.ApiListupKeyColumn] == keyId); if (!record) return; AutocompleteSetting.Mappings.forEach(x => tartgetParts[x.PartsName].value = ''); AutocompleteSetting.Mappings.forEach(x => tartgetParts[x.PartsName].value = record[x.APIName]); }); function AutocompleteDelegete(req, res) { let topParam = '&$top=10' let queryParam = '$filter=contains(' + AutocompleteSetting.ApiListupFiledColumn + ',\'' + encodeURIComponent(req.term) + '\')'; collaboflow.proxy.get( CDataConnectServerSetting.BaseUrl + '?' + queryParam + topParam, CDataConnectServerSetting.Headers, CDataConnectServerSetting.ParseType).then(function (response) { results = []; records = []; if (response.body.value.length == 0) { results.push('No Results') res(results); return; } records = response.body.value; records.forEach(x => results.push(x[AutocompleteSetting.ApiListupFiledColumn] + ':' + x[AutocompleteSetting.ApiListupKeyColumn])); res(results); }).catch(function (error) { alert(error); }); } })();
  • 「CDataConnectServerSetting」のそれぞれのプロパティには構成したSSH Server のURL とConnect Server の認証情報をそれぞれ設定してください。
  • 「AutocompleteSetting」はどのフィールドでオートコンプリートを動作させるか?といった設定と、API のプロパティとのマッピングを行います。
  • 今回はコラボフローのデフォルトテンプレートで提供されている「12a.見積書・注文書」で利用しますので、デフォルトでは商品名のフィールドを、Product テーブルのName と紐付けて、Autocomplete を行うように構成しています。値が決定されたら、Key となるProductID を元に「型番、標準単価、仕入単価、御提供単価」をそれぞれAPI から取得した値で自動補完するようになっています。

コラボフロー側でJavaScript を登録

JavaScript を作成したら、後はコラボフローにアップするだけです。

  • コラボフローにログインし「アプリ設定」→「フォーム設定」に移動します。
  • CollaboFlow
  • フォーム一覧から使用するフォームを選択します。
  • CollaboFlow
  • フォーム編集画面に移動後、「カスタマイズ」タブをクリックし、ファイルをアップロードから作成したJSファイルをアップロードし、保存します。
  • CollaboFlow
  • これでConnect Server 経由でKafka のデータを取得し、自動入力補完する機能がコラボフローの申請フォームに追加できました。

クラウドアプリケーションからKafka のデータへのライブ接続

コラボフローからKafka リアルタイムデータに直接接続できるようになりました。これで、Kafka のデータを複製せずにより多くの接続とアプリを作成できます。

クラウドアプリケーションから直接100を超えるSaaS 、ビッグデータ、NoSQL ソースへのリアルタイムデータアクセスを取得するには、CData Connect Server を参照してください。

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。