> ## Documentation Index
> Fetch the complete documentation index at: https://docs.maia.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Streaming to a Snowflake destination

export const s_runner = "Streaming runner";

export const maia = "Maia";

Streaming pipelines can use Snowflake as a direct storage destination. This page describes the prerequisites, process, and other considerations of using Snowflake as a Streaming pipeline destination.

You should have arrived here by first reading [Create a Streaming pipeline](/docs/streaming/streaming-pipelines/#create-a-streaming-pipeline).

***

## Prerequisites

* You need a [Snowflake account](https://www.snowflake.com/).

* You need the following Snowflake resources:

  * A Snowflake user with the required permissions (see below).
  * A Snowflake warehouse that will be used to merge changes into the replicated tables.
  * A Snowflake database and schema into which the tables will be replicated.
  * A Snowflake internal stage configured for the Avro file format (see below).

* You need to add IPv6 to your virtual private cloud (VPC). To do this, read the appropriate documentation for the platform your {s_runner} is installed on:

  * [AWS](https://docs.aws.amazon.com/vpc/latest/userguide/modify-subnets.html#subnet-associate-ipv6-cidr)
  * [Azure](https://learn.microsoft.com/en-us/azure/virtual-network/ip-services/ipv6-overview)

* You need to add your IP address or block to the allowed IP addresses in your [Snowflake network policy](https://docs.snowflake.com/en/user-guide/network-policies). You'll need to create a network policy if you don't have one already.

***

## Replicated tables

The selected source tables will be replicated into tables created in the configured Snowflake schema. The Streaming pipeline will create replica tables if they don't already exist in the target schema. To avoid ambiguous or overlapping table names, the replica tables follow this naming pattern: `<source_database>_<source_schema>_<source_table>`.

***

## Destination connection

Refer to this section to complete the **Destination connection** section of the [Create Streaming pipeline](/docs/streaming/streaming-pipelines/#destination-connection) screen.

<ResponseField name="Snowflake account" type="string" required>
  Your Snowflake account is the string of text between `https://` and `.snowflakecomputing.com`.

  To find your Snowflake account URL:

  1. Visit [app.snowflake.com](https://app.snowflake.com/) and sign in.
  2. Once you have signed in, click your account menu in the bottom-left of the UI and how over the account you wish to use.
  3. Click the copy button to copy your account URL and then paste it into the **Snowflake account** field.

  Read [Finding the organization and account name for an account](https://docs.snowflake.com/en/user-guide/admin-account-identifier#finding-the-organization-and-account-name-for-an-account) in the Snowflake documentation for additional help.
</ResponseField>

<ResponseField name="Credentials type" type="drop-down" required>
  Choose to authenticate with Snowflake using **Username and password** or **Key pair** credentials. The chosen type will determine which of the following properties you will need to complete.

  If you are using key-pair authentication, read [Using Snowflake key-pair authentication](/docs/administration/snowflake-key-pair-authentication) for details of how to store your private key as a secret.
</ResponseField>

<ResponseField name="Username" type="string" required>
  Your Snowflake account username.
</ResponseField>

<ResponseField name="Secrets Manager" type="drop-down" required>
  Choose the service you use to manage secrets. This must be the service you selected when you set up the {s_runner}.

  * [AWS Secrets Manager](/docs/administration/aws-secrets-manager)
  * [Azure Key Vault](/docs/administration/azure-key-vault)
</ResponseField>

<ResponseField name="Password secret name" type="string" required>
  The name of the secret in your secret manager service that references your Snowflake password. Only used if **Credentials type** is **Username and password**.

  <Note>
    If you are using [AWS Secrets Manager](/docs/administration/aws-secrets-manager), enter the password as **plain text** in the secret value. Avoid using key/value pairs or other structured formats, as these can cause formatting issues and prevent authentication from succeeding. This requirement is the same as when storing private keys in plaintext.
  </Note>
</ResponseField>

<ResponseField name="Private key secret name" type="string" required>
  A named entry created in [AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) or [Azure Key Vault](https://learn.microsoft.com/en-us/azure/key-vault/general/overview) denoting the secret that holds your Snowflake private key. Read [Using snowflake key-pair authentication](/docs/administration/snowflake-key-pair-authentication) to learn how to store the key as a secret. Only used if **Credentials type** is **Key pair**.
</ResponseField>

<ResponseField name="Passphrase secret name (optional)" type="string">
  A named entry created in [AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html) or [Azure Key Vault](https://learn.microsoft.com/en-us/azure/key-vault/general/overview) denoting the secret that holds your Snowflake key-pair passphrase. Only used if **Credentials type** is **Key pair**.
</ResponseField>

<ResponseField name="Passphrase secret key (optional)" type="string">
  The key name to retrieve the passphrase when it is stored in a key/value formatted secret. Only used if **Credentials type** is **Key pair**.
</ResponseField>

<ResponseField name="JDBC parameters & specific connection settings" type="column editor">
  Specify any parameter:value pairs as part of an advanced connection. Click **Save** to finish adding parameters.

  Click **Connect** to check that your credentials are valid and a connection can be established.
</ResponseField>

## Destination configuration

Refer to this section to complete the **Destination configuration** section of the [Create a Streaming pipeline](/docs/streaming/streaming-pipelines/#destination-configuration) screen.

<ResponseField name="Role" type="drop-down" required>
  Select a [Snowflake role](/docs/administration/snowflake-role-privileges). A role is an entity with privileges. Read [Overview of Access Control](https://docs.snowflake.com/en/user-guide/security-access-control-overview.html) in the Snowflake documentation to learn more.

  The {s_runner} will manage the replicated tables and write the change data to your Snowflake warehouse. To do this, the Snowflake role requires the following privileges:

  * `USAGE` on the warehouse.
  * `CREATE TABLE` on the target schema.
  * `USAGE` on the target database, stage schema, and target schema (the same schema may be used as both stage and target).
  * `READ` and `WRITE` on the stage.

  The correct Snowflake ROLE should be granted to the provided Snowflake USER as part of this setup.

  Details of the individual permissions are described in [Snowflake Access Control Privileges](https://docs.snowflake.com/en/user-guide/security-access-control-privileges) in the Snowflake documentation.
</ResponseField>

<ResponseField name="Warehouse" type="drop-down" required>
  Select a Snowflake warehouse. Read [Overview of Warehouses](https://docs.snowflake.com/en/user-guide/warehouses-overview.html) to learn more.
</ResponseField>

<ResponseField name="Database" type="drop-down" required>
  Select a Snowflake database. Read [Databases, Tables and Views - Overview](https://docs.snowflake.com/en/guides-overview-db) to learn more.
</ResponseField>

<ResponseField name="Stage schema" type="drop-down" required>
  Select a Snowflake schema for staging. Read [Database, Schema, and Share DDL](https://docs.snowflake.com/en/sql-reference/ddl-database.html) to learn more.
</ResponseField>

<ResponseField name="Snowflake stage" type="drop-down" required>
  Select an internal stage to load data from files into Snowflake tables. Read [CREATE STAGE](https://docs.snowflake.com/en/sql-reference/sql/create-stage) to learn more.

  The Streaming pipeline replicates changes into Snowflake by first writing the change records in the Avro file format to an internal stage location. The data is temporarily stored in the Snowflake stage until it's ready to be merged into the replicated tables. The internal stage can be created in Snowflake with the following SQL statement:

  ```sql theme={null}
  CREATE STAGE <database>.<stage_schema>.<stage> FILE_FORMAT = ( TYPE = AVRO );
  ```
</ResponseField>

<ResponseField name="Unique staging prefix" type="string" required>
  This property determines the root folder name in the staging area. Assigning a unique prefix to a pipeline's stage prevents naming conflicts and enables the reuse of a single staging area for multiple pipelines.
</ResponseField>

<ResponseField name="Table schema" type="drop-down" required>
  The Snowflake schema that will contain the replicated tables.
</ResponseField>

<ResponseField name="Table name prefix" type="drop-down" required>
  Select how the replica table names will be prefixed. Assigning a unique prefix to replica tables prevents naming conflicts between multiple pipelines writing replicated tables to the same schema.

  * **Source Database & Schema (e.g. MYDB\_MYSCHEMA\_MYTABLE):** The source database and schema names (separated by underscores) will be used as the table name prefix.
  * **Re-Use Staging Prefix (e.g. STAGEROOTFOLDER\_MYTABLE):** Your chosen staging prefix will be used as the table name prefix.
  * **No Prefix (e.g. MYTABLE):** A table name prefix will not be used.

  This completes the destination setup. The next groups of properties on this screen are for [source setup](/docs/streaming/streaming-pipelines/#source-setup) and [pipeline configuration](/docs/streaming/streaming-pipelines/#pipeline-configuration).
</ResponseField>

## Replication types

The Snowflake destination has support for three replication types that determine how the change data is replicated into Snowflake.

<Note>
  A primary key is required on the source table for **Copy Table** and **Copy Table with Soft Deletes** replication types. The primary key is used to merge updates into the target table. If the source table doesn't have a primary key, the replication type will automaticlly become **Change Log** for that table.
</Note>

### Change Log

The Change Log replication type produces Snowflake tables that append entries for each change event that occurs on the source table. The table contains columns for the corresponding source table with the values for each row being the values after the change has occurred, except for delete changes where the values are from the row that was deleted.

Along with the source data columns, the following metadata columns are created in the destination table to describe the change event.

| Metadata column                  | Description                                                                                                                                                                |
| -------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `MTLN_CDC_LAST_CHANGE_TYPE`      | A VARCHAR column containing a code for the type of change operation: `r` for a snapshot read, `c` for create (insert), `u` for an update, or `d` for a delete operation.   |
| `MTLN_CDC_LAST_COMMIT_TIMESTAMP` | A TIMESTAMP column containing the timestamp of the event provided by the source database.                                                                                  |
| `MTLN_CDC_SEQUENCE_NUMBER`       | Contains values that provide an ordering key that matches the order of the change events. The type and format of these values depends on the source system being captured. |
| `MTLN_CDC_LOAD_TIMESTAMP`        | TIMESTAMP column containing the timestamp of the record's ingestion into the target table.                                                                                 |

<Warning>
  If a snapshot with the Change Log replication type fails and is restarted, a second record will be created for each row that was processed before the point of failure, resulting in duplicate rows in the destination table. This is expected behavior for a Change Log replication type, but you need to be aware of the behavior and have a strategy to deal with the duplicate rows.
</Warning>

### Copy Table

The Copy Table replication type produces a replica in Snowflake of each captured source table. Each column in the target table is directly mapped from the corresponding source table and the content of each table matches the source table.

Along with the source data columns, the following metadata column is created in the destination table to describe the change event.

| Metadata column           | Description                                                                                |
| ------------------------- | ------------------------------------------------------------------------------------------ |
| `MTLN_CDC_LOAD_TIMESTAMP` | TIMESTAMP column containing the timestamp of the record's ingestion into the target table. |

### Copy Table with Soft Deletes

The Copy Table with Soft Deletes replication type is similar to the Copy Table replication type, but rows deleted in the source are not deleted in the target table.

Along with the source data columns, the following metadata columns are created in the destination table to describe the change event.

| Metadata column           | Description                                                                                                                                                                                                                                                                                                                                                                             |
| ------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `MTLN_CDC_DELETED`        | A value of `true` in this BOOLEAN column indicates that a row has been deleted and is no longer present in the source table, and the data values in that row show the values of the row at the point it was deleted. A value of `false` in this column indicates that a hard delete has taken place instead of a soft delete, and permanently removed the record from the target table. |
| `MTLN_CDC_LOAD_TIMESTAMP` | TIMESTAMP column containing the timestamp of the record's ingestion into the target table.                                                                                                                                                                                                                                                                                              |

### Example

We have a table in a source system with two columns—a primary key ID, and a value field. A pipeline is started that captures the changes to that table, and the following series of changes are applied to the source table:

1. Insert a new row (1, inserted).
2. Insert a new row (2, inserted).
3. Update value in row 2 to updated.
4. Insert a new row (3, inserted).
5. Delete row 2.
6. Update value in row 1 to updated.

Then the resulting source table after these changes would be:

| id | value    |
| -- | -------- |
| 1  | updated  |
| 3  | inserted |

The resulting Snowflake table depends on the replication type configured in the pipeline, as follows.

#### Copy Table

A replica of the values in the source table.

| id | value    |
| -- | -------- |
| 1  | updated  |
| 3  | inserted |

#### Copy Table With Soft Deletes

The rows with `id` values `1` and `3` match the source. The row with the `id` of `2` has the value at the time the row was deleted and has been flagged as deleted in the `MTLN_CDC_DELETED` field.

| id | value    | MTLN\_CDC\_DELETED |
| -- | -------- | ------------------ |
| 1  | updated  | null               |
| 2  | updated  | true               |
| 3  | inserted | null               |

#### Changelog

Each change to the source table has a corresponding entry.

| id | value    | MTLN\_CDC\_LAST\_CHANGE\_TYPE | MTLN\_CDC\_LAST\_COMMIT\_TIMESTAMP | MTLN\_CDC\_SEQUENCE\_NUMBER |
| -- | -------- | ----------------------------- | ---------------------------------- | --------------------------- |
| 1  | inserted | c                             | `<change_timestamp>`               | `<sequence_value>`          |
| 2  | inserted | c                             | `<change_timestamp>`               | `<sequence_value>`          |
| 2  | updated  | u                             | `<change_timestamp>`               | `<sequence_value>`          |
| 3  | inserted | c                             | `<change_timestamp>`               | `<sequence_value>`          |
| 2  | updated  | d                             | `<change_timestamp>`               | `<sequence_value>`          |
| 1  | updated  | u                             | `<change_timestamp>`               | `<sequence_value>`          |

***

## Dates and times strategy

* **Snowflake Native Types:** This action will convert the source date/time value to an appropriate Snowflake type. If it's not possible to convert, the pipeline will output the value as a string. This is the default setting.
* **Integers from Epoch:** This action will load all date/time values as integers in Snowflake. You can use these integers to calculate the date and time by adding the value to the epoch value, applying this value to all date/time fields in the target table. Although this may be complicated, it has the benefit of maintaining a very accurate date/time value without any errors that may be caused by conversion in the pipeline.

<Note>
  The epoch integer value is the number of **days** since the Unix epoch for all data sources except Oracle, where it is the number of **milliseconds** since the Unix epoch.
</Note>

The following table shows how different source time types will be loaded into the destination table, depending on the setting of the **Dates and times strategy** property.

| Source type                        | Integers from epoch                           | Cast to Snowflake types                            |
| ---------------------------------- | --------------------------------------------- | -------------------------------------------------- |
| date                               | Integer, number of days since epoch           | Snowflake `DATE`                                   |
| timestamp (up to 3 decimal places) | Integer, number of milliseconds since epoch   | Snowflake `TIMESTAMP_NTZ(9)`                       |
| timestamp (4-6 decimal places)     | Integer, number of microseconds since epoch   | Snowflake `TIMESTAMP_NTZ(9)`                       |
| timestamp (7-9 decimal places)     | Integer, number of nanoseconds since epoch    | Snowflake `TIMESTAMP_NTZ(9)`                       |
| timestamp with time zone           | String, ISO formatted, often forced into GMT  | Snowflake `TIMESTAMP_TZ(9)`, often forced into GMT |
| time (up to 3 decimal places)      | Integer, number of milliseconds past midnight | Snowflake `TIME(9)`                                |
| time (4-6 decimal places)          | Integer, number of microseconds past midnight | Snowflake `TIME(9)`                                |
| time (7-9 decimal places)          | Integer, number of nanoseconds past midnight  | Snowflake `TIME(9)`                                |
| time with time zone                | String, ISO formatted, often forced into GMT  | String, ISO formatted, often forced into GMT       |

***

## Schema drift

Schema drift is supported for this destination. Read [Schema drift](/docs/streaming/streaming-pipelines/#schema-drift) to learn more.

***

## Example Snowflake configuration

The Streaming pipeline will work with any Snowflake ROLE that has the required privileges; however, we recommend creating a dedicated Snowflake USER and ROLE for {maia} where possible.

The following SQL provides an example of setting up Snowflake for {maia} by creating a dedicated USER, DATABASE, SCHEMA, STAGE, and WAREHOUSE. Existing Snowflake objects *can* be used, and multiple pipelines can share the same objects.

This example creates one schema that contains both the stage and the target tables.

```sql theme={null}
-- Create a dedicated user and role for streaming
CREATE USER streaming_user PASSWORD = 'password-goes-here';
CREATE ROLE streaming_role;
GRANT ROLE streaming_role TO USER streaming_user;

-- Create a database and grant the streaming user access
CREATE DATABASE streaming_db;
CREATE SCHEMA streaming_db.streaming_schema;
GRANT USAGE ON DATABASE streaming_db TO ROLE streaming_role;
GRANT USAGE, CREATE TABLE ON SCHEMA streaming_db.streaming_schema TO ROLE streaming_role;

-- Create a stage and grant the streaming user read/write
CREATE STAGE streaming_db.streaming_schema.streaming_stage FILE_FORMAT = ( TYPE = AVRO );
GRANT READ, WRITE ON STAGE streaming_db.streaming_schema.streaming_stage TO ROLE streaming_role;

-- Create a warehouse for this example
CREATE WAREHOUSE streaming_warehouse;
GRANT USAGE ON WAREHOUSE streaming_warehouse TO ROLE streaming_role;
```

<Note>
  Creating the database or the schema as `TRANSIENT` will cause all lower objects (schemas, tables) to also be created as `TRANSIENT`. Read the [Snowflake documentation](https://docs.snowflake.com/en/user-guide/tables-temp-transient#transient-databases-and-schemas) to learn more.
</Note>
