> ## Documentation Index
> Fetch the complete documentation index at: https://docs.maia.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# PostgreSQL streaming connector

export const s_runner = "Streaming runner";

export const maia = "Maia";

The PostgreSQL connector can monitor and capture row-level changes within PostgreSQL schemas in a non-intrusive and performant manner, and achieves this by using the raw output stream from the `pgoutput` decoding plugin. The connector produces a change event record for every insert, update, and delete event occurring in any tables being monitored by the {s_runner}.

<Note>
  We are using the **Debezium PostgreSQL** connector to connect to this source. For more information, read [Debezium connector for PostgreSQL](https://debezium.io/documentation/reference/stable/connectors/postgresql.html).
</Note>

***

## How the connector works

PostgreSQL normally purges write-ahead log segments (WAL) after a period of time. This means a complete history of changes is not available for the connector. To overcome this scenario, the connector will initially perform a consistent snapshot for all schemas and tables that are being monitored. This will allow the connector to establish a base state, after which, streaming can begin.

<Note>
  Depending on the number of rows within your schema, this may take a while to complete.
</Note>

Once the snapshot stage has completed, the connector will move to the streaming stage. All changes that have occurred since the snapshot started will be captured during the streaming process. No changes will be missed.

The streaming stage will continue to monitor and consume changes as and when they're occurring within the database and any produced change events will be exported to your selected destination in a consistent and predictable pattern. The connector will maintain an acceptable time lag behind the source database—this lag can be monitored through the pipeline dashboard.

The connector is tolerant of failures. As the connector reads changes and produces events, it records the WAL position for each event. If the connector stops for any reason (including communication failures, network problems, or crashes), upon restart, the connector continues reading the WAL where it last stopped.

If the connector stopped at any point during the snapshot stage, a new snapshot will be taken on restart of the pipeline.

***

## Prerequisites

Observe the following information about supported versions, limitations, and database rules.

### Versions

{maia} supports PostgreSQL versions 12, 13, 14, 15, and 16 (including minor versions). PostgreSQL versions 12 and above contain the `pgoutput` plugin by default to capture logs natively.

### Limitations

* The numeric values `NaN`, `Infinity`, and `-Infinity` are not supported.
* Amazon Aurora Serverless database clusters do *not* support streaming. Only provisioned Amazon Aurora database clusters support streaming.

### Database rules

* Your PostgreSQL database must be configured for streaming. Read [Configuring PostgreSQL database](/docs/streaming/postgresql-configure-database) to learn more.
* Your PostgreSQL database must be hosted on the primary server for PostgreSQL versions before 16, and this server must be active. PostgreSQL 16 and later versions support replication slots on standby servers.

***

## Source setup

Refer to this section when you [create a streaming pipeline](/docs/streaming/streaming-pipelines/#source-setup).

<ResponseField name="Server address" type="string" required>
  The server address of your PostgreSQL database.
</ResponseField>

<ResponseField name="Port" type="integer" required>
  The port number used to access your PostgreSQL database. The default is `5432`.
</ResponseField>

<ResponseField name="Database name" type="string" required>
  The name of your PostgreSQL container database installation.
</ResponseField>

<ResponseField name="Username" type="string" required>
  The username used to log in to the specified database.
</ResponseField>

<ResponseField name="Secrets Manager" type="drop-down" required>
  Choose the service you use to manage secrets.

  * [AWS Secrets Manager](/docs/administration/aws-secrets-manager)
  * [Azure Key Vault](/docs/administration/azure-key-vault)
</ResponseField>

<ResponseField name="Secret name" type="string" required>
  The name of the secret in your secret manager service that references your PostgreSQL password.
</ResponseField>

<ResponseField name="JDBC parameters and specific connection settings" type="column editor">
  Specify any parameter:value pairs as part of an advanced connection. Click **Save** to finish adding parameters.

  Click **Connect** to establish the connection to your source database.
</ResponseField>

## Unavailable field values

When a field (column) is not part of the replica identity for a table and the value is large, Postgres may use [TOAST](https://www.postgresql.org/docs/current/storage-toast.html) to store the value. For change records where a TOASTED value hasn't been modified as part of the change, that field within the change record will contain the following value: `__value_not_modified__`.

TOAST values aren't treated any differently from the actual values when replicating changes into the target tables. This means that it's possible for the replica table to inadvertently have rows where the value is updated to this placeholder value, and the actual value of the row is lost.

To avoid this case, set the replica identity of the table to `FULL`, which ensures all values are always included in the change records. The replica identity setting for a table can be changed with an [alter table](https://www.postgresql.org/docs/current/sql-altertable.html#SQL-ALTERTABLE-REPLICA-IDENTITY) statement:

```sql theme={null}
ALTER TABLE <the_table> REPLICA IDENTITY FULL;
```

***

## Advanced properties

Advanced properties are optional. Advanced properties are entered as parameter:value pairs in the **Add advanced properties** dialog. Some parameters you might want to add are discussed below.

### slot.name

**slot.name:** Is the name of the PostgreSQL logical decoding slot that's created for streaming changes from a particular plug-in for a particular database/schema. A valid replication slot name must contain only digits, lowercase characters, and underscores with a length of `<= 63`.

A unique identifier (slot name) associated with a replication slot will be generated, and the default name will be `matillion_cdc`. A replication slot is a feature provided by the PostgreSQL database management system, to facilitate streaming replication. The {s_runner} can connect to the replication slot by referencing the slot name, and consume the captured changes in real time. When the {s_runner} connects, it will look for the replication slot name, `matillion_cdc`, and if it doesn't find it, then the {s_runner} will create one.

<Note>
  This setting must be used if you want to have more than one pipeline using the same source PostgreSQL database.
</Note>

### time.precision.mode

**time.precision.mode:** A simplified explanation of the mappings for temporal types in PostgreSQL based on the `time.precision.mode` configuration property:

* When the `time.precision.mode` property is set to `adaptive`, which is the default setting, the connector determines the literal type and semantic type for temporal columns based on their data type definition. This approach guarantees that the events accurately reflect the values stored in the database.
* When the `time.precision.mode` property is set to `connect`, the connector uses connector-logical types to represent temporal types. This can be beneficial when consumers are limited to handling the standard logical types and cannot handle variable-precision time values. However, it's important to note that choosing this mode may lead to a loss of precision for PostgreSQL columns that have a fractional second precision greater than 3.

***

## Configuring new tables

When adding tables to a pipeline, they must be correctly configured to allow the on-demand snapshot to take place and to ensure future changes on those tables will be streamed. Read [Configure your PostgreSQL database](/docs/streaming/postgresql-configure-database) for a more comprehensive outline of these requirements.

### Add tables to the streaming publication

To enable on-demand snapshots and streaming for new tables, you must add the tables to the database publication. Use the following SQL statements:

For adding tables individually:

```sql theme={null}
ALTER PUBLICATION matillion_cdc_publication ADD TABLE <schema>.<table>;
```

For adding all tables in a schema:

```sql theme={null}
ALTER PUBLICATION matillion_cdc_publication ADD TABLES IN SCHEMA <schema>;
```

Replace `<schema>` and `<table>` with the names of each new table.

### Grant USAGE privileges for new schemas

If any of the new tables are in schemas that previously did not have tables captured in the pipeline, grant USAGE privileges to the database role configured for streaming. Use the following SQL statement:

```sql theme={null}
GRANT USAGE ON SCHEMA <schema> TO <streaming_role>;
```

Replace `<schema>` with the schema of the new tables and `<streaming_role>` with the user configured for streaming. Repeat this step for every new schema.

### Grant SELECT privileges on new tables

To run on-demand snapshots, the user configured for streaming must have SELECT privileges on the new tables. Use the following SQL statements:

For granting SELECT permissions to tables individually:

```sql theme={null}
GRANT SELECT ON <schema>.<table> TO <streaming_role>;
```

For granting SELECT permissions on all tables in a schema:

```sql theme={null}
GRANT SELECT ON ALL TABLES IN SCHEMA public TO <streaming_role>;
```

Replace `<schema>` and `<table>` with the names of each new table and `<streaming_role>` with the user configured for streaming.

By following these steps, you ensure that new tables added to the pipeline will be properly configured for on-demand snapshots and streaming, allowing historical data to be captured without losing synchronization with the target. If on-demand snapshots are disabled, new tables will only capture changes from the point of addition and the pipeline will have a **STREAMING** status immediately.
