> ## Documentation Index
> Fetch the complete documentation index at: https://docs.maia.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Create and manage streaming pipelines via the API

export const s_runner = "Streaming runner";

export const maia = "Maia";

In this guide, you will learn how to the {maia} API to create and manage a Streaming pipeline. Streaming pipelines provide a complete end-to-end solution for near-real-time data ingestion, allowing you to capture data from a source database and write it to either cloud storage or a cloud data warehouse. For more details, read [Streaming pipelines overview](/docs/streaming/streaming-pipelines).

***

## Prerequisites

Before you begin, make sure you have the following:

* A [{maia} account](/docs/administration/registration).
* Valid API credentials for {maia} API access.
* An access token with the required privileges. For more information, read [Obtaining an API access token](/docs/api-reference/maia-api-authentication#step-3-obtaining-an-api-access-token).
* A {maia} [project](/docs/guides/projects). The examples in this article use the project ID `3dda0acf-e646-4f4b-b4f6-e00449b69427`; you should substitute your own project ID. For how to get the project ID through the API, see below.
* A [{s_runner}](/docs/streaming/create-streaming-agent) that has been deployed to your cloud infrastructure. The examples in this article use the {s_runner} ID `2734aa6f-0c64-4d2b-9406-f44dc8f1f3b6`; you should substitute your own {s_runner} ID. For how to get the {s_runner} ID through the API, see below.
* Any [secrets](/docs/administration/secrets-overview) used in the Streaming pipeline must be added to the secrets manager accessible to the {s_runner}. The examples in this article use the secrets `sqlserver-password`, `snowflake-private-key`, and `snowflake-private-key-passphrase` added to the Azure Key Vault `streaming-key-vault` that the {s_runner} has permissions to read; you should substitute your own secrets.

### Get the project ID

Retrieve the list of projects using the [Project API](/api-reference/projects/list-all-projects).

Base URL: `GET /v1/projects`

Example response:

```json theme={null}
{
  "page": 0,
  "results": [
    {
      "description": "project-1",
      "id": "3dda0acf-e646-4f4b-b4f6-e00449b69427",
      "name": "Test-project"
    }
  ],
  "size": 0,
  "total": 0
}
```

This response includes the project ID we need in our examples, `3dda0acf-e646-4f4b-b4f6-e00449b69427`.

### Get the Streaming agent ID

Once you have the project ID, use the [Environment API](/api-reference/environments/list-all-environments) to obtain details about the environment in which the project is running. These details include the {s_runner} ID (`defaultAgentID`).

Base URL: `GET /v1/projects/{projectId}/environments`

Example response:

```json theme={null}
{
  "page": 0,
  "results": [
    {
      "defaultAgentId": "2734aa6f-0c64-4d2b-9406-f44dc8f1f3b6",
      "defaultAgentName": "Test-agent",
      "name": "Environment-1"
    }
  ],
  "size": 0,
  "total": 0
}
```

This response includes the {s_runner} ID we need in our examples, `2734aa6f-0c64-4d2b-9406-f44dc8f1f3b6`.

***

## Create a Streaming pipeline

Use an API endpoint to create a new Streaming pipeline in a specified project. The pipeline configuration must be defined in a JSON object, which you will send in the body of a POST request.

The POST request used to create the pipeline is:

Base URL: `POST /v1/projects/{projectId}/streaming-pipelines`

This requires the following headers to be specified:

| Header        | Value               |
| ------------- | ------------------- |
| content-type  | application/json    |
| authorization | bearer \<api token> |

The following example shows the JSON body needed to create a pipeline that streams from an SQL Server source to a Snowflake table. You should modify this example appropriately with the values needed to create your pipeline:

```json theme={null}
{
  "name": "my-streaming-pipeline",
  "agentId": "2734aa6f-0c64-4d2b-9406-f44dc8f1f3b6",
  "streamingSource": {
    "type": "sqlserver",
    "connection": {
      "host": "sqlserver.my-org.com",
      "port": 1433,
      "database": "my_database",
      "username": "sa",
      "password": {
        "secretType": "AZURE_KEY_VAULT",
        "secretLocation": "streaming-key-vault",
        "secretName": "sqlserver-password"
      },
      "jdbcProperties" : {
        "lockTimeout": 60
      }
    },
    "tables": [
      {
        "schema": "my_schema",
        "table": "my_first_table"
      },
      {
        "schema": "my_schema",
        "table": "my_second_table"
      }
    ]
  },
  "streamingTarget": {
    "type": "snowflake",
    "connection": {
      "accountName": "my-snowflake-account.eu-central-1",
      "username": "STREAMING_USER",
      "authentication": {
        "type": "key-pair",
        "privateKey": {
          "secretType": "AZURE_KEY_VAULT",
          "secretLocation": "streaming-key-vault",
          "secretName": "snowflake-private-key"
        },
        "passphrase": {
          "secretType": "AZURE_KEY_VAULT",
          "secretLocation": "streaming-key-vault",
          "secretName": "snowflake-private-key-passphrase"
        }
      },
      "jdbcProperties": {
        "networkTimeout": 60
      }
    },
    "role": "STREAMING_ROLE",
    "warehouse": "STREAMING_WH",
    "database": "STREAMING_DB",
    "stageSchema": "STREAMING_SCHEMA",
    "stageName": "STREAMING_STAGE",
    "stagePrefix": "MY_STREAMING_PIPELINE",
    "tableSchema": "STREAMING_SCHEMA",
    "tablePrefixType": "PREFIX",
    "transformationType": "CHANGE_LOG",
    "temporalMapping": "NATIVE"
  },
  "advancedProperties": {
    "max.batch.size": "40000"
  }
}
```

An example of this request in cURL is:

```
curl --request POST \
  --url https://us1.api.matillion.com/dpc/v1/projects/3dda0acf-e646-4f4b-b4f6-e00449b69427/streaming-pipelines \
  --header 'authorization: Bearer $MAIA_TOKEN' \
  --header 'content-type: application/json' \
  --data @streaming-pipeline.json
```

Where the request body is stored as `streaming-pipeline.json` and the API token is set as the environment variable `MAIA_TOKEN`.

A successful request should return a `201` status code and a response body containing the newly created Streaming pipeline definition. For example:

```json theme={null}
{
  "streamingPipelineId": "50ead6b2-5d1a-40c7-bb50-0d17c8fb23c3",
  "name": "my-streaming-pipeline",
  "agentId": "2734aa6f-0c64-4d2b-9406-f44dc8f1f3b6",
  "streamingSource": {
    "type": "sqlserver"
    "connection": {
      "host": "sqlserver.my-org.com",
      "port": 1433,
      "database": "my_database",
      "username": "sa",
      "password": {
        "secretType": "AZURE_KEY_VAULT",
        "secretLocation": "streaming-key-vault",
        "secretName": "sqlserver-password"
      },
      "jdbcProperties" : {
        "lockTimeout": 60
      }
    },
    "tables": [
      {
        "schema": "my_schema",
        "table": "my_first_table"
      },
      {
        "schema": "my_schema",
        "table": "my_second_table"
      }
    ]
  },
  "streamingTarget": {
    "type": "snowflake"
    "connection": {
      "accountName": "my-snowflake-account.eu-central-1",
      "username": "STREAMING_USER",
      "authentication": {
        "type": "key-pair",
        "privateKey": {
          "secretType": "AZURE_KEY_VAULT",
          "secretLocation": "streaming-key-vault",
          "secretName": "snowflake-private-key"
        },
        "passphrase": {
          "secretType": "AZURE_KEY_VAULT",
          "secretLocation": "streaming-key-vault",
          "secretName": "snowflake-private-key-passphrase"
        }
      },
      "jdbcProperties": {
        "networkTimeout": 60
      }
    },
    "role": "STREAMING_ROLE",
    "warehouse": "STREAMING_WH",
    "database": "STREAMING_DB",
    "stageSchema": "STREAMING_SCHEMA",
    "stageName": "STREAMING_STAGE",
    "stagePrefix": "MY_STREAMING_PIPELINE",
    "tableSchema": "STREAMING_SCHEMA",
    "tablePrefixType": "PREFIX",
    "transformationType": "CHANGE_LOG",
    "temporalMapping": "NATIVE"
  },
  "advancedProperties": {
    "max.batch.size": "40000"
  }
}
```

The `streamingPipelineId` field is a UUID value that's generated for the pipeline when it's created and acts as a unique identifier for the pipeline. You will need this identifier to perform management of the pipeline through the API, such as [viewing](#view-a-streaming-pipeline), [starting and stopping](#start-or-stop-a-streaming-pipeline), or [checking the status](#check-the-streaming-pipeline-status) of the pipeline.

If the streaming pipeline definition sent in the POST body is incomplete or invalid, the response code will be `400` and the response body will contain a list of the issues and the path to the relevant fields from the request.

For example, using the same request body as above but removing the `streamingSource.connection.username` and `streamingTarget.connection.authentication` fields to make the request invalid, the response body would be:

```json theme={null}
{
  "title": "Bad Request",
  "status": 400,
  "detail": "There are validation errors with the streaming pipeline definition",
  "instance": "/v1/projects/3dda0acf-e646-4f4b-b4f6-e00449b69427/streaming-pipelines",
  "validation-errors": [
    {
      "field": "streamingTarget.connection.authentication",
      "message": "must not be null"
    },
    {
      "field": "streamingSource.connection.username",
      "message": "must not be empty"
    }
  ]
}
```

***

## View a Streaming pipeline

The Streaming pipeline ID can be used to view the Streaming pipeline definition, using the following GET call:

Base URL: `GET /v1/projects/{projectId}/streaming-pipelines/{streamingPipelineId}`

An example of this request in cURL is:

```
curl --request GET \
  --url https://us1.api.matillion.com/dpc/v1/projects/3dda0acf-e646-4f4b-b4f6-e00449b69427/streaming-pipelines/50ead6b2-5d1a-40c7-bb50-0d17c8fb23c3 \
  --header 'authorization: Bearer $MAIA_TOKEN'
```

***

## Start or stop a Streaming pipeline

The Streaming pipeline ID can be used to start or stop the pipeline, using the following POST call:

Base URL: `POST /v1/projects/{projectId}/streaming-pipelines/{streamingPipelineId}/commands`

The request body must contain the start or stop commands:

```json theme={null}
{
  "command": "start"
}
```

```json theme={null}
{
  "command": "stop"
}
```

A successful call will respond with a `200` status code, indicating that the {s_runner} has received the request and initiated the pipeline start or stop process.

An example of this request in cURL is:

```
curl --request POST \
  --url https://us1.api.matillion.com/dpc/v1/projects/3dda0acf-e646-4f4b-b4f6-e00449b69427/streaming-pipelines/50ead6b2-5d1a-40c7-bb50-0d17c8fb23c3/commands \
  --header 'authorization: Bearer $MAIA_TOKEN' \
  --header 'content-type: application/json' \
  --data '{"command":"start"}'
```

***

## Check the Streaming pipeline status

The Streaming pipeline ID can be used to check the status of the pipeline, using the following GET call:

Base URL: `GET /v1/projects/{projectId}/streaming-pipelines/{streamingPipelineId}/status`

This will return a response body with a `status` field value. If the pipeline has just been started, this may return a value of `starting` or `streaming`.

An example of this request in cURL is:

```
curl --request GET \
  --url https://us1.api.matillion.com/dpc/v1/projects/3dda0acf-e646-4f4b-b4f6-e00449b69427/streaming-pipelines/50ead6b2-5d1a-40c7-bb50-0d17c8fb23c3/status \
  --header 'authorization: Bearer $MAIA_TOKEN'
```
