Build a Singer.io Tap for Snowflake using CData Python Connector



Singer.io is a free and open-source ETL framework that connects data sources to data destinations, using standardized JSON-based Taps (extractors) and Targets (loaders). It works by extracting data from a source (like a CRM or database) through a Tap and loading it into a Target system (like Snowflake) for analysis — all while maintaining data integrity and structure.

CData Python Connectors enhance this process by providing reliable, high-performance connections to hundreds of data sources and destinations, ensuring smooth, optimized data movement without complex coding.

In this article, we'll build a Tap for Snowflake using a CData connector with Singer.io in Python, then move the data from Snowflake to the Singer.io CSV target target-csv. We'll use a Free Community License for the CData Python Connector for Snowflake along with VSCode. We'll also explore how to dynamically retrieve schema and metadata using CData's sys_tables system tables to simplify data extraction and improve pipeline efficiency.

Let's begin!

PREREQUISITES

  1. CData Python Connector for Snowflake. Request a Free Community Edition License here (download links included). If you already have a license, you can download the CData Python Connector for Snowflake here.
  2. Python distribution. Download the latest version for your machine here. Make sure to check the "Add Python to PATH" option during installation.
  3. Visual Studio Code. Download it here.


GETTING STARTED

OVERVIEW

Here's an overview of the steps we are going to follow:

  1. Install: Install and configure the CData Connector, Singer.io, and the Singer CSV Target (target-csv).
  2. Connect: Establish the connection to Snowflake and retrieve the metadata and schema information.
  3. Build: Build the Snowflake Tap to move and replicate the data.

You're going to be interacting with a lot of files and subfolders. It's a good idea to create a dedicated folder for this project — for example: Singer-Snowflake — and place all the project files in it.


STEP 1: Installation Setup

1.1 Install CData Python Connector for Snowflake

Follow these steps to install and configure the CData Python Connector for Snowflake:

Dependencies Note: The Python connector supports Python versions 3.8, 3.9, 3.10, 3.11, and 3.12. If you're using a version of Python outside these, you may need to create a virtual environment.

  1. Extract the downloaded connector ZIP to your desired location.
  2. Open the terminal or command prompt and navigate to the corresponding installation directory or open the terminal directly in the directory where the .whl file is located. For example:
    C:\Users\Public\Downloads\CDataPythonConnectorforSnowflake\CData.Python.Snowflake\win\Python312\64
  3. For Windows: Install the .whl file using pip. Use the appropriate version for your Python version and architecture. Example command:
    pip install cdata_snowflake_connector-24.0.9111-cp312-cp312-win_amd64.whl
  4. For Linux or macOS: Install the .tar.gz file using pip. Example command:
    pip install cdata_snowflake_connector-24.0.####-python3.tar.gz
  5. Confirm that the installation was successful by running pip list. If cdata-snowflake-connector is listed, the installation was successful.

1.2 Install the License for the CData Connector

This is an optional step. A Free Community Edition license for your machine should already be installed when you install the connector using the .whl file.

However, in some cases, if you don't see the license or if you're using a trial version of the CData Connector, you can install your Free Community Edition license using the license key that was sent to your email.

If you didn't receive it, you can request a Community License for the CData Snowflake Connector here.

On Windows

  1. Download and extract the ZIP file containing the license.
  2. Open a terminal or command prompt.
  3. Navigate to the license installer location, typically: C:\Users\Username\AppData\Local\Programs\Python\Python312\Lib\site-packages\cdata\installlic_snowflake
  4. Or, if you downloaded it separately: C:\Downloads\cdata\installlic_snowflake
  5. Run the installer with:
    .\license-installer.exe [YOUR LICENSE KEY HERE]
  6. Enter the name and email registration prompted on the screen to complete the installation.

On macOS/Linux

  1. Download and extract the ZIP file containing the license.
  2. Open a terminal inside the extracted directory. Example:
    cd ~/Downloads/CData-Python-Snowflake
  3. Navigate to the license installer location, typically: /usr/local/lib/python3.12/site-packages/cdata/installlic_snowflake
  4. Run the installer:
    ./license-installer [YOUR LICENSE KEY HERE]
  5. Enter the name and email registration prompted on the screen to complete the installation.

1.3 Install Singer.io and target-csv

  1. Install Singer.io with pip by running:
    pip install singer-python
  2. Install target-csv by creating a virtual environment:

    For Windows:

    python -m venv %USERPROFILE%\target-csv  
    %USERPROFILE%\target-csv\Scripts\activate  
    pip install target-csv  
    deactivate

    For macOS/Linux:

    python3 -m venv ~/.virtualenvs/target-csv  
    source ~/.virtualenvs/target-csv/bin/activate  
    pip install target-csv  
    deactivate
  3. Conflict: In Python 3.10 and later, the MutableMapping class was moved from collections to collections.abc, which may cause an AttributeError in your target-csv virtual environment. To fix:
    1. Open target_csv.py in a text editor from ~\target-csv\Scripts.
    2. Replace:
      import collections
      with:
      from collections.abc import MutableMapping
    3. Update this line:
      isinstance(v, collections.MutableMapping)
      to:
      isinstance(v, MutableMapping)

STEP 2: Connect, Configure, and Query

Now that you've installed the necessary components, it's time to establish a successful connection to Snowflake and retrieve metadata and schema information. This will help you build your schema to move or replicate data efficiently.

2.1 Establish Connection

  1. In your project folder, create a new Python file named meta_snowflake.py, and open the file.
  2. Import the following statements at the beginning of your file:
    import singer  
    import cdata.snowflake as mod  
    
    import sys  
    import os  
    import time  
    from datetime import date, datetime
  3. Create a connection function and replace the placeholder values with your actual Snowflake details:
    def create_connection():  
        conn = mod.connect(  
        "AuthScheme=Password;"  
        "url=YourSnowflakeURL.snowflakecomputing.com;"  
        "user=Your_Username;"  
        "password=Your_Password;"  
        "Database=Your_DB;"  
        "Warehouse=Your_WH;"  
        )  
    
    return conn
  4. Run the script to check if the connection works without any errors.
  5. If you're using advanced authentication methods like OAuth, SSO, or Proxy, update AuthScheme= accordingly. You can find the complete documentation in the HTML help file located in your downloaded connector folder: ~\SnowflakePythonConnector\CData.Python.Snowflake\help\help.htm.

2.2 Retrieve Metadata by Querying

You can retrieve metadata and schema information from your Snowflake warehouse using System Tables in CData's connectors. With system_table, the connector automatically retrieves all the table details, including column names, data types, and row counts, giving you a complete overview of your Snowflake schema. Check out the help file for your Connector for more info.

Paste the following code below the connection properties, save it, and run it. Make sure to update YOUR_TABLE_NAME with the name of your preferred table in your Snowflake database schema:

conn = create_connection()

table_count = conn.execute("SELECT COUNT(*) FROM sys_tables").fetchone()[0]
account_columns = conn.execute(
    """
    SELECT ColumnName, DataTypeName 
    FROM sys_tablecolumns 
    WHERE TableName = 'YOUR_TABLE_NAME' 
    """
).fetchall()

print(f"Tables: {table_count}, Columns in 'YOUR_TABLE_NAME': {len(account_columns)}\n")
print("\n".join(f"{col[0]} ({col[1]})" for col in account_columns))

conn.close()

This will display the total number of tables in your database and the total number of columns in YOUR_TABLE_NAME, along with a detailed list of all columns and their respective data types. This helps in understanding the schema structure and enables better data management.

You can use this metadata to build schemas, map table structures, and migrate data efficiently between systems. This approach ensures you have the necessary schema insights before performing operations such as ETL (Extract, Transform, Load) processes, data validation, or analytics.

For our example, we will create a schema using the following columns:

{
    'Id': {'type': 'string'},
    'Name': {'type': 'string'}, 
    'BillingCity': {'type': 'string'}, 
    'AnnualRevenue': {'type': 'number'}
}

STEP 3: Building the Tap

Now that you've connected to your Snowflake warehouse and retrieved the schema metadata, it's time to build the tap by defining your schema for data replication. Follow these steps to create your tap, configure it, and export the data to a CSV file:

  1. Create a new file in your project directory called tap_snowflake.py, and add the previous import statements and connection properties. Then, paste the following code after the connection setup. Make sure to define and map the schema according to your table structure and update the path for the execution log file at with open("YOUR_PATH").

    # Define schema for the Account table
    schema = {
        'properties': {
            'Id': {'type': 'string'},
            'Name': {'type': 'string'},
            'BillingCity': {'type': 'string'},
            'AnnualRevenue': {'type': 'number'}
        }
    }
    
    # Fetch Account data from Snowflake
    def fetch_sf_data():
        conn = create_connection()
        try:
            query = 'SELECT "Id", "Name", "BillingCity", "AnnualRevenue" FROM "DEMO_DB"."CRM"."Account"'
            return conn.execute(query).fetchall()
        except Exception as e:
            sys.stderr.write(f"Error fetching data: {e}\n")
            return []
        finally:
            conn.close()
    
    # Write schema, records, and log performance
    def write_records():
        start_time = time.perf_counter_ns()
    
        singer.write_schema('account', schema, ['Id'])
        records = fetch_sf_data()
    
        for record in records:
            singer.write_record('account', {
                'Id': record[0] or 'N/A',
                'Name': record[1] or 'Unknown',
                'BillingCity': record[2] or '',
                'AnnualRevenue': record[3] or 0
            })
    
        duration_ms = (time.perf_counter_ns() - start_time) / 1_000_000
        with open("YOUR_PATH/execution_log_snow.txt", "w") as log_file: 
            log_file.write(f"Execution Time: {duration_ms:.3f} ms\nTotal Records: {len(records)}\n")
    
    # Main guard
    if __name__ == "__main__":
        write_records()
  2. The target-csv tool accepts an optional JSON config file for customization. Create a file named my-config.json and configure it as needed. Here's a basic example:

    {
    "delimiter": "\t",
    "quotechar": "'",
    "destination_path": "YOUR/PATH/FOR/CSV/OUTPUTS",
    "disable_collection": true
    }
  3. Open your terminal, navigate to your project directory, activate the target-csv virtual environment with:
    ~\target-csv\Scripts\activate.

  4. Then, run the following command to execute the tap and pipe the output to CSV:

    python tap_snowflake.py | .\target-csv\Scripts\target-csv -c my-config.json

You have now successfully built a Singer Tap for Snowflake. This will output your defined schema to a CSV file using the target-csv target. You can check the exported CSV file in your project folder or in the location defined in your my-config.json.

Feel free to review the execution_log_snow.txt file in your defined location in the code to verify the number of records moved/replicated and the total execution time (in milliseconds).


Target Ahead

You've successfully built a Singer Tap for Snowflake and moved your data. But this is just scratching the surface of what's possible with CData Python Connectors. You can explore further with the following:

  • Build Taps for Other Sources: Extend your setup by creating taps for databases like PostgreSQL, MySQL, or APIs like Salesforce and HubSpot to unify data pipelines.
  • Incorporate Incremental Data Loads: Modify your tap to track and load only new or updated records, improving efficiency for large datasets.
  • Transform Data Mid-Stream: Add preprocessing steps to clean, format, or enrich data before it reaches the target destination.
  • Integrate with Other Targets: Expand beyond CSV — push data to cloud warehouses like BigQuery, Redshift, or even live dashboards.

For more info on this, check out the documentation and tutorial guides at our Developer Resource Center.



Full Code

For meta_snowflake.py

import singer  
import cdata.snowflake as mod  

import sys  
import os  
import time  
from datetime import date, datetime  

# Create Snowflake connection  
def create_connection():  
    conn = mod.connect(  
        "AuthScheme=Password;"  
        "url=YourSnowflakeURL.snowflakecomputing.com;"  
        "user=Your_Username;"  
        "password=Your_Password;"  
        "Database=Your_DB;"  
        "Warehouse=Your_WH;"  
    )  
    return conn  

# Fetch table count and column metadata  
conn = create_connection()  

table_count = conn.execute("SELECT COUNT(*) FROM sys_tables").fetchone()[0]  
account_columns = conn.execute(  
    """  
    SELECT ColumnName, DataTypeName  
    FROM sys_tablecolumns  
    WHERE TableName = 'YOUR_TABLE_NAME'  
    """  
).fetchall()  

# Output metadata  
print(f"Tables: {table_count}, Columns in 'YOUR_TABLE_NAME': {len(account_columns)}\n")  
print("\n".join(f"{col[0]} ({col[1]})" for col in account_columns))  

conn.close()

For tap_snowflake.py

import singer  
import cdata.snowflake as mod  

import sys  
import os  
import time  
from datetime import date, datetime  

# Create Snowflake connection  
def create_connection():  
    conn = mod.connect(  
        "AuthScheme=Password;"  
        "url=YourSnowflakeURL.snowflakecomputing.com;"  
        "user=Your_Username;"  
        "password=Your_Password;"  
        "Database=Your_DB;"  
        "Warehouse=Your_WH;"  
    )  
    return conn  

# Define schema for the Account table  
schema = {  
    'properties': {  
        'Id': {'type': 'string'},  
        'Name': {'type': 'string'},  
        'BillingCity': {'type': 'string'},  
        'AnnualRevenue': {'type': 'number'}  
    }  
}  

# Fetch Account data from Snowflake  
def fetch_sf_data():  
    conn = create_connection()  
    try:  
        query = 'SELECT "Id", "Name", "BillingCity", "AnnualRevenue" FROM "DEMO_DB"."CRM"."Account"'  
        return conn.execute(query).fetchall()  
    except Exception as e:  
        sys.stderr.write(f"Error fetching data: {e}\n")  
        return []  
    finally:  
        conn.close()  

# Write schema, records, and log performance  
def write_records():  
    start_time = time.perf_counter_ns()  

    singer.write_schema('account', schema, ['Id'])  
    records = fetch_sf_data()  

    for record in records:  
        singer.write_record('account', {  
            'Id': record[0] or 'N/A',  
            'Name': record[1] or 'Unknown',  
            'BillingCity': record[2] or '',  
            'AnnualRevenue': record[3] or 0  
        })  

    duration_ms = (time.perf_counter_ns() - start_time) / 1_000_000  
    with open("YOUR_PATH/execution_log_snow.txt", "w") as log_file:  
        log_file.write(f"Execution Time: {duration_ms:.3f} ms\nTotal Records: {len(records)}\n")  

# Main guard  
if __name__ == "__main__":  
    write_records()