Prerequisites
- You need a Snowflake account.
-
You need the following Snowflake resources:
- A Snowflake user with the required permissions (see below).
- A Snowflake warehouse that will be used to merge changes into the replicated tables.
- A Snowflake database and schema into which the tables will be replicated.
- A Snowflake internal stage configured for the Avro file format (see below).
- You need to add IPv6 to your virtual private cloud (VPC). To do this, read the appropriate documentation for the platform your is installed on:
- You need to add your IP address or block to the allowed IP addresses in your Snowflake network policy. You’ll need to create a network policy if you don’t have one already.
Replicated tables
The selected source tables will be replicated into tables created in the configured Snowflake schema. The Streaming pipeline will create replica tables if they don’t already exist in the target schema. To avoid ambiguous or overlapping table names, the replica tables follow this naming pattern:<source_database>_<source_schema>_<source_table>.
Destination connection
Refer to this section to complete the Destination connection section of the Create Streaming pipeline screen.Your Snowflake account is the string of text between
https:// and .snowflakecomputing.com.To find your Snowflake account URL:- Visit app.snowflake.com and sign in.
- Once you have signed in, click your account menu in the bottom-left of the UI and how over the account you wish to use.
- Click the copy button to copy your account URL and then paste it into the Snowflake account field.
Choose to authenticate with Snowflake using Username and password or Key pair credentials. The chosen type will determine which of the following properties you will need to complete.If you are using key-pair authentication, read Using Snowflake key-pair authentication for details of how to store your private key as a secret.
Your Snowflake account username.
Choose the service you use to manage secrets. This must be the service you selected when you set up the .
The name of the secret in your secret manager service that references your Snowflake password. Only used if Credentials type is Username and password.
If you are using AWS Secrets Manager, enter the password as plain text in the secret value. Avoid using key/value pairs or other structured formats, as these can cause formatting issues and prevent authentication from succeeding. This requirement is the same as when storing private keys in plaintext.
A named entry created in AWS Secrets Manager or Azure Key Vault denoting the secret that holds your Snowflake private key. Read Using snowflake key-pair authentication to learn how to store the key as a secret. Only used if Credentials type is Key pair.
A named entry created in AWS Secrets Manager or Azure Key Vault denoting the secret that holds your Snowflake key-pair passphrase. Only used if Credentials type is Key pair.
The key name to retrieve the passphrase when it is stored in a key/value formatted secret. Only used if Credentials type is Key pair.
Specify any parameter:value pairs as part of an advanced connection. Click Save to finish adding parameters.Click Connect to check that your credentials are valid and a connection can be established.
Destination configuration
Refer to this section to complete the Destination configuration section of the Create a Streaming pipeline screen.Select a Snowflake role. A role is an entity with privileges. Read Overview of Access Control in the Snowflake documentation to learn more.The will manage the replicated tables and write the change data to your Snowflake warehouse. To do this, the Snowflake role requires the following privileges:
USAGEon the warehouse.CREATE TABLEon the target schema.USAGEon the target database, stage schema, and target schema (the same schema may be used as both stage and target).READandWRITEon the stage.
Select a Snowflake warehouse. Read Overview of Warehouses to learn more.
Select a Snowflake database. Read Databases, Tables and Views - Overview to learn more.
Select a Snowflake schema for staging. Read Database, Schema, and Share DDL to learn more.
Select an internal stage to load data from files into Snowflake tables. Read CREATE STAGE to learn more.The Streaming pipeline replicates changes into Snowflake by first writing the change records in the Avro file format to an internal stage location. The data is temporarily stored in the Snowflake stage until it’s ready to be merged into the replicated tables. The internal stage can be created in Snowflake with the following SQL statement:
This property determines the root folder name in the staging area. Assigning a unique prefix to a pipeline’s stage prevents naming conflicts and enables the reuse of a single staging area for multiple pipelines.
The Snowflake schema that will contain the replicated tables.
Select how the replica table names will be prefixed. Assigning a unique prefix to replica tables prevents naming conflicts between multiple pipelines writing replicated tables to the same schema.
- Source Database & Schema (e.g. MYDB_MYSCHEMA_MYTABLE): The source database and schema names (separated by underscores) will be used as the table name prefix.
- Re-Use Staging Prefix (e.g. STAGEROOTFOLDER_MYTABLE): Your chosen staging prefix will be used as the table name prefix.
- No Prefix (e.g. MYTABLE): A table name prefix will not be used.
Replication types
The Snowflake destination has support for three replication types that determine how the change data is replicated into Snowflake.A primary key is required on the source table for Copy Table and Copy Table with Soft Deletes replication types. The primary key is used to merge updates into the target table. If the source table doesn’t have a primary key, the replication type will automaticlly become Change Log for that table.
Change Log
The Change Log replication type produces Snowflake tables that append entries for each change event that occurs on the source table. The table contains columns for the corresponding source table with the values for each row being the values after the change has occurred, except for delete changes where the values are from the row that was deleted. Along with the source data columns, the following metadata columns are created in the destination table to describe the change event.| Metadata column | Description |
|---|---|
MTLN_CDC_LAST_CHANGE_TYPE | A VARCHAR column containing a code for the type of change operation: r for a snapshot read, c for create (insert), u for an update, or d for a delete operation. |
MTLN_CDC_LAST_COMMIT_TIMESTAMP | A TIMESTAMP column containing the timestamp of the event provided by the source database. |
MTLN_CDC_SEQUENCE_NUMBER | Contains values that provide an ordering key that matches the order of the change events. The type and format of these values depends on the source system being captured. |
MTLN_CDC_LOAD_TIMESTAMP | TIMESTAMP column containing the timestamp of the record’s ingestion into the target table. |
Copy Table
The Copy Table replication type produces a replica in Snowflake of each captured source table. Each column in the target table is directly mapped from the corresponding source table and the content of each table matches the source table. Along with the source data columns, the following metadata column is created in the destination table to describe the change event.| Metadata column | Description |
|---|---|
MTLN_CDC_LOAD_TIMESTAMP | TIMESTAMP column containing the timestamp of the record’s ingestion into the target table. |
Copy Table with Soft Deletes
The Copy Table with Soft Deletes replication type is similar to the Copy Table replication type, but rows deleted in the source are not deleted in the target table. Along with the source data columns, the following metadata columns are created in the destination table to describe the change event.| Metadata column | Description |
|---|---|
MTLN_CDC_DELETED | A value of true in this BOOLEAN column indicates that a row has been deleted and is no longer present in the source table, and the data values in that row show the values of the row at the point it was deleted. A value of false in this column indicates that a hard delete has taken place instead of a soft delete, and permanently removed the record from the target table. |
MTLN_CDC_LOAD_TIMESTAMP | TIMESTAMP column containing the timestamp of the record’s ingestion into the target table. |
Example
We have a table in a source system with two columns—a primary key ID, and a value field. A pipeline is started that captures the changes to that table, and the following series of changes are applied to the source table:- Insert a new row (1, inserted).
- Insert a new row (2, inserted).
- Update value in row 2 to updated.
- Insert a new row (3, inserted).
- Delete row 2.
- Update value in row 1 to updated.
| id | value |
|---|---|
| 1 | updated |
| 3 | inserted |
Copy Table
A replica of the values in the source table.| id | value |
|---|---|
| 1 | updated |
| 3 | inserted |
Copy Table With Soft Deletes
The rows withid values 1 and 3 match the source. The row with the id of 2 has the value at the time the row was deleted and has been flagged as deleted in the MTLN_CDC_DELETED field.
| id | value | MTLN_CDC_DELETED |
|---|---|---|
| 1 | updated | null |
| 2 | updated | true |
| 3 | inserted | null |
Changelog
Each change to the source table has a corresponding entry.| id | value | MTLN_CDC_LAST_CHANGE_TYPE | MTLN_CDC_LAST_COMMIT_TIMESTAMP | MTLN_CDC_SEQUENCE_NUMBER |
|---|---|---|---|---|
| 1 | inserted | c | <change_timestamp> | <sequence_value> |
| 2 | inserted | c | <change_timestamp> | <sequence_value> |
| 2 | updated | u | <change_timestamp> | <sequence_value> |
| 3 | inserted | c | <change_timestamp> | <sequence_value> |
| 2 | updated | d | <change_timestamp> | <sequence_value> |
| 1 | updated | u | <change_timestamp> | <sequence_value> |
Dates and times strategy
- Snowflake Native Types: This action will convert the source date/time value to an appropriate Snowflake type. If it’s not possible to convert, the pipeline will output the value as a string. This is the default setting.
- Integers from Epoch: This action will load all date/time values as integers in Snowflake. You can use these integers to calculate the date and time by adding the value to the epoch value, applying this value to all date/time fields in the target table. Although this may be complicated, it has the benefit of maintaining a very accurate date/time value without any errors that may be caused by conversion in the pipeline.
The epoch integer value is the number of days since the Unix epoch for all data sources except Oracle, where it is the number of milliseconds since the Unix epoch.
| Source type | Integers from epoch | Cast to Snowflake types |
|---|---|---|
| date | Integer, number of days since epoch | Snowflake DATE |
| timestamp (up to 3 decimal places) | Integer, number of milliseconds since epoch | Snowflake TIMESTAMP_NTZ(9) |
| timestamp (4-6 decimal places) | Integer, number of microseconds since epoch | Snowflake TIMESTAMP_NTZ(9) |
| timestamp (7-9 decimal places) | Integer, number of nanoseconds since epoch | Snowflake TIMESTAMP_NTZ(9) |
| timestamp with time zone | String, ISO formatted, often forced into GMT | Snowflake TIMESTAMP_TZ(9), often forced into GMT |
| time (up to 3 decimal places) | Integer, number of milliseconds past midnight | Snowflake TIME(9) |
| time (4-6 decimal places) | Integer, number of microseconds past midnight | Snowflake TIME(9) |
| time (7-9 decimal places) | Integer, number of nanoseconds past midnight | Snowflake TIME(9) |
| time with time zone | String, ISO formatted, often forced into GMT | String, ISO formatted, often forced into GMT |
Schema drift
Schema drift is supported for this destination. Read Schema drift to learn more.Example Snowflake configuration
The Streaming pipeline will work with any Snowflake ROLE that has the required privileges; however, we recommend creating a dedicated Snowflake USER and ROLE for where possible. The following SQL provides an example of setting up Snowflake for by creating a dedicated USER, DATABASE, SCHEMA, STAGE, and WAREHOUSE. Existing Snowflake objects can be used, and multiple pipelines can share the same objects. This example creates one schema that contains both the stage and the target tables.Creating the database or the schema as
TRANSIENT will cause all lower objects (schemas, tables) to also be created as TRANSIENT. Read the Snowflake documentation to learn more.
