各製品の資料を入手。
詳細はこちら →Entity Framework 6 からKafka のデータに連携
この記事は、Entity Framework のcode-first アプローチを使って、Kafka に接続する方法を説明します。Entity Framework 6 は.NET 4.5 以上で利用可能です。
最終更新日:2022-04-04
この記事で実現できるKafka 連携のシナリオ
こんにちは!ウェブ担当の加藤です。マーケ関連のデータ分析や整備もやっています。
Entity Framework はobject-relational mapping フレームワークで、データをオブジェクトとして扱うために使われます。Visual Studio のADO.NET Entity Data Model ウィザードを実行するとEntity Model を作成できますが、このモデルファーストアプローチでは、データソースに変更があった場合やエンティティ操作をより制御したい場合は不都合があります。この記事では、CData ADO.NET Provider を使いコードファーストアプローチでKafka にアクセスします。
- Visual Studio を起動し、新しいWindows Form アプリケーションを作成します。ここでは、.NET 4.5 のC# プロジェクトを使います。
- Visual Studio の [パッケージ マネージャー コンソール]から'Install-Package EntityFramework' コマンドを実行し、最新のEntity Framework をインストールします。
- プロジェクトのApp.config ファイルを修正して、Kafka Entity Framework 6 アセンブリおよびコネクションストリングへの参照を追加します。
Apache Kafka 接続プロパティの取得・設定方法
.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされ、自動的に本製品と一緒にインストールされます。 別のインストール方法を利用する場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0 をインストールしてください。
Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。
デフォルトでは、本製品はデータソースとPLAINTEXT で通信し、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化するには:
- UseSSL をtrue に設定し、本製品がSSL 暗号化を使用するように構成します。
- SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします。
Apache Kafka への認証
Apache Kafka データソースは、次の認証メソッドをサポートしています:- Anonymous
- Plain
- SCRAM ログインモジュール
- SSL クライアント証明書
- Kerberos
Anonymous
Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 こうした接続はanonymous(匿名)と呼ばれます。
匿名認証を行うには、このプロパティを設定します。
- AuthScheme:None。
その他の認証方法については、ヘルプドキュメントを参照してください。
<configuration> ... <connectionStrings> <add name="ApacheKafkaContext" connectionString="Offline=False;User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;" providerName="System.Data.CData.ApacheKafka" /> </connectionStrings> <entityFramework> <providers> ... <provider invariantName="System.Data.CData.ApacheKafka" type="System.Data.CData.ApacheKafka.ApacheKafkaProviderServices, System.Data.CData.ApacheKafka.Entities.EF6" /> </providers> <entityFramework> </configuration> </code>
- インストールディレクトリの[lib] > 4.0 サブフォルダにあるSystem.Data.CData.ApacheKafka.Entities.EF6.dll を設定し、プロジェクトを作成してEntity Framework 6 を使うためのセットアップを完了します。
- この時点でプロジェクトを作成し、すべてが正しく動作していることを確認してください。これで、Entity Framework を使ってコーディングを開始できます。
- プロジェクトに新しい.cs ファイルを追加し、そこにクラスを追加します。これがデータベースのコンテキストとなり、DbContext クラスを拡張します。この例では、クラス名はApacheKafkaContext です。以下のサンプルコードは、OnModelCreating メソッドをオーバーライドして次の変更を加えます:
- PluralizingTableNameConvention をModelBuilder Conventions から削除。
- MigrationHistory テーブルへのリクエストを削除。
using System.Data.Entity; using System.Data.Entity.Infrastructure; using System.Data.Entity.ModelConfiguration.Conventions; class ApacheKafkaContext :DbContext { public ApacheKafkaContext() { } protected override void OnModelCreating(DbModelBuilder modelBuilder) { // To remove the requests to the Migration History table Database.SetInitializer<ApacheKafkaContext>(null); // To remove the plural names modelBuilder.Conventions.Remove<PluralizingTableNameConvention>(); } }
- もう一つ.cs ファイルを作成し、ファイル名を呼び出そうとしているKafka のエンティティ、例えばSampleTable_1 にします。このファイルでは、エンティティとエンティティ設定の両方を定義します。以下に例を示します。
using System.Data.Entity.ModelConfiguration; using System.ComponentModel.DataAnnotations.Schema; public class SampleTable_1 { [DatabaseGeneratedAttribute(DatabaseGeneratedOption.Identity)] public System.String Id { get; set; } public System.String Id { get; set; } } public class SampleTable_1Map :EntityTypeConfiguration<SampleTable_1> { public SampleTable_1Map() { this.ToTable("SampleTable_1"); this.HasKey(SampleTable_1 => SampleTable_1.Id); this.Property(SampleTable_1 => SampleTable_1.Id); } }
- エンティティの作成が済んだので、コンテキストクラスにエンティティを追加します:
public DbSet<SampleTable_1> SampleTable_1 { set; get; }
- コンテキストとエンティティの作成が完了したら、別クラスでデータをクエリできます。例:
ApacheKafkaContext context = new ApacheKafkaContext(); context.Configuration.UseDatabaseNullSemantics = true; var query = from line in context.SampleTable_1 select line;