SQLAlchemy ORM を使って、Python でSnowflake のデータに連携する方法

CData Python Connector を使って、Python アプリケーションおよびスクリプトからSQLAlchemy 経由でSnowflake にOR マッピング可能に。

加藤龍彦
デジタルマーケティング

最終更新日:2023-09-23

この記事で実現できるSnowflake 連携のシナリオ

こんにちは!ウェブ担当の加藤です。マーケ関連のデータ分析や整備もやっています。

Pythonエコシステムには、多くのモジュールがあり、システム構築を素早く効率的に行うことができます。CData Python Connector for Snowflake は、pandas、Matplotlib モジュール、SQLAlchemy ツールキットから使用することで Snowflake にデータ連携するPython アプリケーションを構築し、Snowflake のデータを可視化できます。 本記事では、SQLAlchemy でSnowflake に連携して、データを取得、、更新、挿入、削除 する方法を説明します。

CData Python Connectors の特徴

CData Python Connectors は、以下のような特徴を持った製品です。

  1. Snowflake をはじめとする、CRM、MA、会計ツールなど多様なカテゴリの270種類以上のSaaS / オンプレデータソースに対応
  2. Python をはじめとする多様なデータ分析・BI ツールにSnowflake のデータを連携
  3. ノーコードでの手軽な接続設定

CData Python Connectors では、1.データソースとしてSnowflake の接続を設定、2.Python からPython Connectors との接続を設定、という2つのステップだけでデータソースに接続できます。以下に具体的な設定手順を説明します。

必要なモジュールのインストール

pip でSQLAlchemy ツールキットをインストールします:

pip install sqlalchemy

モジュールのインポートを忘れずに行います:

import sqlalchemy

Python でSnowflake のデータをモデル化

次は、接続文字列で接続を確立します。create_engine 関数を使って、Snowflake のデータに連携するEngne を作成します。

engine = create_engine("snowflake///?User=Admin&Password=test123&Server=localhost&Database=Northwind&Warehouse=TestWarehouse&Account=Tester1")

Snowflake データベースに接続するには、認証に加えて次のプロパティを設定します。

  • Url:自身のSnowflake URL、例えばhttps://orgname-myaccount.snowflakecomputing.com
    • Legacy URL を使用する場合:https://myaccount.region.snowflakecomputing.com
    • 自身のURL を見つけるには:
      1. Snowflake UI の左下にある自身の名前をクリックします。
      2. Account ID にカーソルを合わせます。
      3. Copy Account URL アイコンをクリックして、アカウントURL をコピーします。
  • Database(オプション):によって公開されるテーブルとビューを、特定のSnowflake データベースのものに制限します。
  • Schema(オプション):本製品によって公開されるテーブルとビューを、特定のSnowflake データベーススキーマのものに制限します。

Snowflake への認証

本製品は、Snowflake ユーザー認証、フェデレーション認証、およびSSL クライアント認証をサポートしています。認証するには、UserPassword を設定し、AuthScheme プロパティで認証メソッドを選択します。

キーペア

ユーザーアカウントに定義されたプライベートキーを使用してセキュアなトークンを作成することにより、キーペア認証を使用して認証できます。この方法で接続するには、AuthSchemePRIVATEKEY に設定し、次の値を設定します。

  • User:認証に使用するユーザーアカウント。
  • PrivateKey:プライベートキーを含む.pem ファイルへのパスなど、ユーザーに使用されるプライベートキー。
  • PrivateKeyType:プライベートキーを含むキーストアの種類(PEMKEY_FILE、PFXFILE など)。
  • PrivateKeyPassword:指定されたプライベートキーのパスワード。

その他の認証方法は、ヘルプドキュメントの「Snowflake への認証」セクションを参照してください。

Snowflake のデータのマッピングクラスの宣言

接続を確立したら、OR マッパーでモデル化するテーブルのマッピングクラスを宣言します。本記事では、Products テーブルを使います。sqlalchemy.ext.declarative.declarative_base 関数を使って、新しいクラスにフィールド(カラム)を定義します。

base = declarative_base()
class Products(base):
	__tablename__ = "Products"
	Id = Column(String,primary_key=True)
	ProductName = Column(String)
	...

Snowflake のデータをクエリ

マッピングクラスができたので、セッションオブジェクトを使ってデータソースをクエリすることができます。セッションにEngine をバインドして、セッションのquery メソッドにマッピングクラスを提供します。

query メソッドを使う

engine = create_engine("snowflake///?User=Admin&Password=test123&Server=localhost&Database=Northwind&Warehouse=TestWarehouse&Account=Tester1")
factory = sessionmaker(bind=engine)
session = factory()
for instance in session.query(Products).filter_by(Id="1"):
	print("Id: ", instance.Id)
	print("ProductName: ", instance.ProductName)
	print("---------")

ほかの方法としては、execute メソッドを適切なテーブルオブジェクトに使うことが可能です。以下のコードはアクティブなsession に対して有効です。

execute メソッドを使う

Products_table = Products.metadata.tables["Products"]
for instance in session.execute(Products_table.select().where(Products_table.c.Id == "1")):
	print("Id: ", instance.Id)
	print("ProductName: ", instance.ProductName)
	print("---------")

より複雑なクエリとして、JOIN、集計、Limit などが利用可能です。詳細はヘルプドキュメントをご覧ください。

Snowflake のデータの挿入(INSERT)

Snowflake のデータへの挿入には、マップされたクラスのインスタンスを定義し、アクティブな session に追加します。commit 関数を呼び出して、Snowflake にすべての追加インスタンスを送ります。

new_rec = Products(Id="placeholder", Id="1")
session.add(new_rec)
session.commit()

Snowflake のデータを更新(UPDATE)

Snowflake のデータの更新には、更新するレコードをフィルタクエリとともにフェッチします。そして、フィールドの値を変更し、セッションでcommit 関数を呼んで、Snowflake にレコードを追加します。

updated_rec = session.query(Products).filter_by(SOME_ID_COLUMN="SOME_ID_VALUE").first()
updated_rec.Id = "1"
session.commit()

Snowflake のデータを削除(DELETE)

Snowflake のデータの削除には、フィルタクエリと一緒に対象となるレコードをフェッチします。そして、アクティブsession でレコードを削除し、セッションでcommit 関数を呼び出して、該当するレコードの削除を実行します。

deleted_rec = session.query(Products).filter_by(SOME_ID_COLUMN="SOME_ID_VALUE").first()
session.delete(deleted_rec)
session.commit()

Snowflake からPython へのデータ連携には、ぜひCData Python Connector をご利用ください

このようにCData Python Connector と併用することで、270を超えるSaaS、NoSQL データをPython からコーディングなしで扱うことができます。30日の無償評価版が利用できますので、ぜひ自社で使っているクラウドサービスやNoSQL と合わせて活用してみてください。

日本のユーザー向けにCData Python Connector は、UI の日本語化、ドキュメントの日本語化、日本語でのテクニカルサポートを提供しています。

関連コンテンツ

トライアル・お問い合わせ

30日間無償トライアルで、CData のリアルタイムデータ連携をフルにお試しいただけます。記事や製品についてのご質問があればお気軽にお問い合わせください。