> ## Documentation Index
> Fetch the complete documentation index at: https://docs.maia.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka

export const ComponentMetadata = ({warehouses, unsupportedWarehouses = [], componentType, connectionInputs, connectionOutputs}) => {
  const allWarehouses = [...warehouses.map(w => ({
    name: w,
    supported: true
  })), ...unsupportedWarehouses.map(w => ({
    name: w,
    supported: false
  }))];
  return <div style={{
    background: 'var(--colors-background-light, #f9fafb)',
    border: '1px solid var(--colors-border-default, #e5e7eb)',
    borderRadius: '12px',
    padding: '20px 28px',
    marginBottom: '28px',
    boxShadow: '0 1px 4px rgba(0,0,0,0.10)'
  }}>
      <table style={{
    width: '100%',
    borderCollapse: 'collapse'
  }}>
        <tbody>
          <tr>
            <td style={{
    fontWeight: '600',
    paddingRight: '32px',
    paddingBottom: '14px',
    whiteSpace: 'nowrap',
    verticalAlign: 'middle',
    width: '180px'
  }}>Project Availability</td>
            <td style={{
    paddingBottom: '14px',
    verticalAlign: 'middle'
  }}>
              <div style={{
    display: 'flex',
    flexWrap: 'wrap',
    gap: '8px'
  }}>
                {allWarehouses.map((w, i) => <span key={i} style={{
    background: w.supported ? '#dcfce7' : '#fee2e2',
    color: w.supported ? '#15803d' : '#b91c1c',
    border: `1px solid ${w.supported ? '#bbf7d0' : '#fca5a5'}`,
    borderRadius: '9999px',
    padding: '3px 12px',
    fontSize: '0.85rem',
    fontWeight: '500',
    whiteSpace: 'nowrap'
  }}>
                    {w.name} {w.supported ? '✅' : '❌'}
                  </span>)}
              </div>
            </td>
          </tr>
          <tr>
            <td style={{
    fontWeight: '600',
    paddingRight: '32px',
    paddingBottom: '14px',
    whiteSpace: 'nowrap',
    verticalAlign: 'middle'
  }}>Component Type</td>
            <td style={{
    paddingBottom: '14px',
    verticalAlign: 'middle'
  }}>{componentType}</td>
          </tr>
          <tr>
            <td style={{
    fontWeight: '600',
    paddingRight: '32px',
    paddingBottom: '14px',
    whiteSpace: 'nowrap',
    verticalAlign: 'middle'
  }}>Connection Inputs</td>
            <td style={{
    paddingBottom: '14px',
    verticalAlign: 'middle'
  }}>{connectionInputs}</td>
          </tr>
          <tr>
            <td style={{
    fontWeight: '600',
    paddingRight: '32px',
    whiteSpace: 'nowrap',
    verticalAlign: 'middle'
  }}>Connection Outputs</td>
            <td style={{
    verticalAlign: 'middle'
  }}>{connectionOutputs}</td>
          </tr>
        </tbody>
      </table>
    </div>;
};

<ComponentMetadata warehouses={["Snowflake", "Databricks", "Amazon Redshift"]} unsupportedWarehouses={["Google BigQuery"]} componentType="Connector, Orchestration" connectionInputs="One" connectionOutputs="Unlimited" />

The Kafka component is an orchestration component that lets you load data from Kafka topics into either schemaless JSON or JSON schema objects. This data is then stored in your preferred storage location (Snowflake, Databricks, Amazon Redshift, or cloud storage). New to Kafka? Read Apache Kafka's [Introduction](https://kafka.apache.org/intro). You *do not* need to use the Create Table component when using this connector, as the Kafka component will create a new table or replace an existing table for you using the Destination parameters you define.

We recommend that you make use of the [Apache Kafka documentation](https://kafka.apache.org/documentation/) when using this component.

If the component requires access to a cloud provider (AWS, Azure, or Google Cloud), it will use the [cloud credentials](/docs/guides/cloud-credentials) associated with your environment to access resources.

To stage data to Azure Blob Storage, the Azure credentials associated with your environment must be assigned the `Storage Blob Data Contributor` role. For more information, read [User assigned with the Storage Blob Data Contributor role](https://learn.microsoft.com/en-us/answers/questions/1155139/user-assigned-with-storage-blob-data-contributor-r).

***

## Properties

Reference material is provided below for the Connect, Configure, Destination, and Advanced Settings properties.

<ResponseField name="Name" type="string" required>
  A human-readable name for the component.
</ResponseField>

### Connect

<ResponseField name="Authentication Type" type="drop-down" required>
  Specify the authentication mechanism to use for connecting to the Kafka broker.

  * **None:** No authentication.
  * **Username & Password:** Authenticate with a username and password (basic authentication). This uses the Simple Authentication and Security Layer (SASL) protocol. In your Kafka configuration file (`server.properties`), you must set the `security.inter.broker.protocol` to support SASL. Read [SASL configuration](https://kafka.apache.org/documentation/#security_sasl_config) to learn more.
  * **OAuth 2.0 Client Credentials:** Authenticate using OAuth 2.0 authorization. You will need an OAuth 2.0 authorization server (such as Auth0, Okta, or Keycloak) to issue access tokens.

  If using OAuth 2.0 Client Credentials, you must edit your `server.properties` file to ensure the server (broker) accepts SASL/OAUTHBEARER tokens. For example:

  ```
  listeners=SASL_SSL://:9092
  sasl.enabled.mechanisms=OAUTHBEARER
  ```
</ResponseField>

{/* <!-- param-start:[kafka-input-v1.connection.overrides.username] | warehouses: [snowflake, databricks, redshift] --> */}

<ResponseField name="Username" type="string" required>
  The username value defined in your `jaas.conf` (Java Authentication and Authorization Service) file. You can define as many users as required in this file. For example:

  ```
  KafkaServer {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="myUser"
      password="myPassword";
  };
  ```

  To set up additional Kafka users, edit your `jaas.conf` file accordingly. For example, this is a configuration that defines additional users (producer, consumer, registry):

  ```
  KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
   serviceName="kafka"
   username="admin"
   password="admin-secret"
   user_producer="producer-secret"
   user_consumer="consumer-secret"
   user_registry="registry-secret";
  };
  ```
</ResponseField>

{/* <!-- param-start:[kafka-input-v1.connection.overrides.password] | warehouses: [snowflake, databricks, redshift] --> */}

<ResponseField name="Password" type="drop-down" required>
  Use the drop-down menu to select the corresponding secret definition that denotes the value of the password tied to your username in your `jaas.conf` (Java Authentication and Authorization Service) file. For example:

  ```
  KafkaServer {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="myUser"
      password="myPassword";
  };
  ```

  Read [Secrets and secret definitions](/docs/guides/secrets-and-secret-definitions) to learn how to create a new secret definition.
</ResponseField>

{/* <!-- param-start:[kafka-input-v1.connection.overrides.oAuthReferenceId] | warehouses: [snowflake, databricks, redshift] --> */}

<ResponseField name="Authentication" type="drop-down" required>
  Use the drop-down menu to select an OAuth connection to Kafka or Kafka Confluent Cloud.

  Read [Kafka authentication guide](/docs/guides/kafka-authentication-guide) to learn how to obtain the credentials to create a Kafka or Kafka Confluent Cloud OAuth connection.
</ResponseField>

{/* <!-- param-start:[kafka-input-v1.bootstrapServers] | warehouses: [snowflake, databricks, redshift] --> */}

<ResponseField name="Bootstrap Servers" type="string list" required>
  Set addresses for each bootstrap server (also known as a broker) to connect to when commencing communication with a Kafka cluster. Multiple bootstrap servers will be defined as a comma-separated list. For example, `myBroker1:9092, myBroker2:9092, myBroker3:9092`. If a failure occurs, Kafka will attempt to connect to the next broker in the list.
</ResponseField>

{/* <!-- param-start:[kafka-input-v1.encryption] | warehouses: [snowflake, databricks, redshift] --> */}

<ResponseField name="Encryption" type="drop-down" required>
  * **None:** No encryption.
  * **TLS:** Use a TLS certificate (in .pem format) to secure communication to the Kafka cluster. To use TLS, make sure you update your Kafka configuration (`server.properties`) on the server side as well as client properties to enable SSL as the communication protocol. For example, `security.protocol=SASL_SSL`. This setting is recommended if you're using basic authentication (username and password) to authenticate.

  | Security protocol | Description                               |
  | ----------------- | ----------------------------------------- |
  | PLAINTEXT         | Un-authenticated, non-encrypted channel   |
  | SSL               | SSL channel                               |
  | SASL\_PLAINTEXT   | SASL authenticated, non-encrypted channel |
  | SASL\_SSL         | SASL authenticated, SSL channel           |

  <Note>
    You can enable TLS without passing a certificate. In such cases, the security protocol will be set to the SSL equivalent of the authentication type chosen. If you *do* pass a certificate, the trust store certificate and trust store type will be configured.
  </Note>
</ResponseField>

{/* <!-- param-start:[kafka-input-v1.certificate] | warehouses: [snowflake, databricks, redshift] --> */}

<ResponseField name="Trusted Certificate" type="text editor">
  Add your TLS certificates into the text editor. Supports x.509 certificates in .pem format.

  This certificate should be the trusted certificate of the broker.

  <Note>
    This parameter is optional. It's possible to set **Encryption** to `TLS` and not provide a certificate. This works in instances where the broker's certificate is trusted by a well-known [certificate authority (CA)](https://en.wikipedia.org/wiki/Certificate_authority).
  </Note>
</ResponseField>

{/* <!-- param-start:[kafka-input-v1.topic] | warehouses: [snowflake, databricks, redshift] --> */}

<ResponseField name="Topic" type="drop-down" required>
  Once you have authenticated with your Kafka configuration, this drop-down will display any available Kafka topics in your defined cluster. Topics are high-level categories for data in Kafka, used as the storage mechanism of Kafka messages (events), similar to a folder storing files in a filesystem. Read [Introduction](https://kafka.apache.org/intro) to learn more.

  <Note>
    Internal topics have been filtered out and won't appear in the drop-down list.
  </Note>
</ResponseField>

{/* <!-- param-start:[kafka-input-v1.consumerGroup] | warehouses: [snowflake, databricks, redshift] --> */}

<ResponseField name="Consumer Group" type="string" required>
  ID of a consumer group, usually defined by the app or client consuming data from Kafka. A consumer group is set as a `group.id`. For example, in a properties file such as `consumer.properties`:

  ```
  bootstrap.servers=myBroker1:9092
  group.id=my-consumer-group
  enable.auto.commit=true
  key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  ```

  The consumer group may also be defined in code, as in Java:

  ```
  Properties props = new Properties();
  props.put("bootstrap.servers", "myBroker1:9092");
  props.put("group.id", "my-consumer-group");
  ```
</ResponseField>

{/* <!-- param-start:[kafka-input-v1.generateUniqueConsumerGroup] | warehouses: [snowflake, databricks, redshift] --> */}

<ResponseField name="Generate Unique Consumer Group" type="boolean" required>
  The primary function of this property is to enable a **Full Load Pattern** by creating a new consumer that reads from the topic's beginning, ignoring any previously committed offsets. When set to **Yes**, a unique ID is appended to the value specified in **Consumer Group** for each run of this connector.

  The table below highlights different ways you might use the Kafka connector, with advice on how to set the **Generate Unique Consumer Group** property.

  | Use case                       | Method                                                                                                                                                                                                                                                                                                                                                                                                                      | Generate Unique Consumer Group? |
  | ------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------- |
  | Single connector, full load    | On each execution of the connector, a new consumer group will be initiated. The read operation will execute from the beginning of a topic.                                                                                                                                                                                                                                                                                  | Yes                             |
  | Multiple connectors, full load | For users who wish to run multiple Kafka connectors in parallel. You will need a unique consumer group per pipeline execution. You can achieve this using variables created at the pipeline level passed into each Kafka connector's **Consumer Group** property. This operation will execute all Kafka connectors in the pipeline as parts of the same consumer group, meaning the full load will be executed in parallel. | No                              |
  | Kafka-native incremental load  | For users who wish to use Kafka's internal incremental load capabilities. Use a static consumer group name. On each execution of the Kafka connector, it will remain part of the same consumer group. This will expose Kafka's internal offset management. The broker will track the offset. When the connector is executed again, it will begin reading from where it finished on the last execution.                      | No                              |

  <Warning>
    If you opt for the parallel load or incremental load use cases mentioned above, you should ensure you have a pipeline that executes on failure, where a user can set the **Generate Unique Consumer Group** to `Yes`, ensuring that a full load of the topic is executed successfully. We have created pipelines for this, which you can download:

    * [Parallel load](https://matillion-docs.s3.eu-west-1.amazonaws.com/Attachments/kafka/kafka-parallel-load.zip)
    * [Incremental load](https://matillion-docs.s3.eu-west-1.amazonaws.com/Attachments/kafka/kafka-incremental-load.zip)

    The purpose of these pipelines is to circumvent issues that could occur if the pipeline fails after the read operation. In such instances, data could be lost because the consumer group would still read from its last read position, not where the pipeline failed.
  </Warning>
</ResponseField>

### Configure

<ResponseField name="Data Format" type="drop-down" required>
  Set the data format of messages in your topics.

  * **Schemaless JSON:** Messages have no predefined structure, meaning messages may vary. While schemaless JSON can offer simplicity and flexibility, discrepancies and missing fields require more manual intervention.
  * **JSON schema:** Provides validation, compatibility control, and consistency of messages. You may deem this format more suitable to production-grade applications where data integrity is vital.
</ResponseField>

{/* <!-- param-start:[kafka-input-v1.schemaRegistryUrl] | warehouses: [snowflake, databricks, redshift] --> */}

<ResponseField name="Schema Registry URL" type="string" required>
  The URL to the schema registry used by the Kafka cluster. This may be specified in your client configuration file, or directly in an application's code. If you're using Confluent Cloud, use the [Confluent Cloud UI](https://confluent.cloud). Read [Quick Start for Schema Management on Confluent Cloud](https://docs.confluent.io/cloud/current/get-started/schema-registry.html#use-curl-to-view-and-manage-schemas) to learn more.

  Only available when **Data Format** is set to a structured type. Currently, the only supported structured data format is `JSON Schema`.
</ResponseField>

{/* <!-- param-start:[kafka-input-v1.schemaRegistryConnection.overrides.authType] | warehouses: [snowflake, databricks, redshift] --> */}

<ResponseField name="Authentication Type" type="drop-down" required>
  Choose to authenticate with a username and password or not to authenticate (None).

  Only available when **Data Format** is set to a structured type, such as JSON schema.
</ResponseField>

{/* <!-- param-start:[kafka-input-v1.schemaRegistryConnection.overrides.schemaRegistryUsername] | warehouses: [snowflake, databricks, redshift] --> */}

<ResponseField name="Schema Registry Username" type="string" required>
  The username specified in your client app that connects to the schema registry. For example:

  ```
  schema.registry.url=https://your-schema-registry-url
  basic.auth.credentials.source=USER_INFO
  schema.registry.basic.auth.user.info=my-username:my-password
  ```

  Only available when **Data Format** is set to `JSON Schema` and **Authentication Type** is set to `Username & Password`.
</ResponseField>

{/* <!-- param-start:[kafka-input-v1.schemaRegistryConnection.overrides.schemaRegistryPassword] | warehouses: [snowflake, databricks, redshift] --> */}

<ResponseField name="Schema Registry Password" type="drop-down" required>
  Use the drop-down menu to select the corresponding secret definition that denotes the value of the password tied to your username and schema registry. For example:

  ```
  schema.registry.url=https://your-schema-registry-url
  basic.auth.credentials.source=USER_INFO
  schema.registry.basic.auth.user.info=my-username:my-password
  ```

  Read [Secrets and secret definitions](/docs/guides/secrets-and-secret-definitions) to learn how to create a new secret definition.

  Only available when **Data Format** is set to `JSON Schema` and **Authentication Type** is set to `Username & Password`.
</ResponseField>

{/* <!-- param-start:[kafka-input-v1.schemaRegistryEncryption] | warehouses: [snowflake, databricks, redshift] --> */}

<ResponseField name="Schema Registry Encryption" type="drop-down" required>
  Determines whether the **Schema Registry Trusted Certificate** parameter is displayed or hidden.

  * **None:** No encryption.
  * **TLS:** Use a TLS certificate (in .pem format). Encryption is based on the protocol specified in **Schema Registry URL**. If the URL uses HTTPS (i.e. begins with https\://...) then TLS will be enabled. Otherwise, traffic will be in plaintext.

  Only available when **Data Format** is set to `JSON Schema`.
</ResponseField>

{/* <!-- param-start:[kafka-input-v1.schemaRegistryCertificate] | warehouses: [snowflake, databricks, redshift] --> */}

<ResponseField name="Schema Registry Trusted Certificate" type="editor">
  Add your TLS certificates into the text editor. Supports x.509 certificates in .pem format.

  Only available when **Data Format** is set to `JSON Schema` and **Schema Registry Encryption** is set to `TLS`.
</ResponseField>

### Destination

Select your cloud data warehouse.

<Tabs>
  <Tab title="Snowflake">
    <ResponseField name="Destination" type="drop-down" required>
      Select the destination for your data. This is either in Snowflake as a table or as files in cloud storage.

      * **Snowflake:** Load your data into a table in Snowflake. The data must first be staged via Snowflake or a cloud storage solution.
      * **Cloud Storage:** Load your data directly into files in your preferred cloud storage location. The format of these files can differ between source systems and will not have a file extension so we suggest inspecting the output to determine the format of the data.
    </ResponseField>

    <Tabs>
      <Tab title="Snowflake">
        {/* <!-- param-start:[snowflake-output-connector-v0.warehouse, snowflake-output-connector-v1.warehouse] | warehouses: [snowflake] --> */}

        <ResponseField name="Warehouse" type="drop-down" required>
          The Snowflake warehouse used to run the queries. The special value `[Environment Default]` uses the warehouse defined in the environment. Read [Overview of Warehouses](https://docs.snowflake.com/en/user-guide/warehouses-overview.html) to learn more.
        </ResponseField>

        {/* <!-- param-end:[snowflake-output-connector-v0.warehouse, snowflake-output-connector-v1.warehouse] --> */}

        {/* <!-- param-start:[snowflake-output-connector-v0.database, snowflake-output-connector-v1.database] | warehouses: [snowflake] --> */}

        <ResponseField name="Database" type="drop-down" required>
          The Snowflake database to access. The special value `[Environment Default]` uses the database defined in the environment. Read [Databases, Tables and Views - Overview](https://docs.snowflake.com/en/guides-overview-db) to learn more.
        </ResponseField>

        {/* <!-- param-end:[snowflake-output-connector-v0.database, snowflake-output-connector-v1.database] --> */}

        {/* <!-- param-start:[snowflake-output-connector-v0.schema, snowflake-output-connector-v1.schema] | warehouses: [snowflake] --> */}

        <ResponseField name="Schema" type="drop-down" required>
          The Snowflake schema. The special value `[Environment Default]` uses the schema defined in the environment. Read [Database, Schema, and Share DDL](https://docs.snowflake.com/en/sql-reference/ddl-database.html) to learn more.
        </ResponseField>

        {/* <!-- param-end:[snowflake-output-connector-v0.schema, snowflake-output-connector-v1.schema] --> */}

        {/* <!-- param-start:[snowflake-output-connector-v0.tableName, snowflake-output-connector-v1.tableName] | warehouses: [snowflake] --> */}

        <ResponseField name="Table Name" type="string" required>
          The name of the table to be created in your Snowflake database. You can use a [Table Input](/docs/components/table-input) component in a transformation pipeline to access and transform this data after it has been loaded.
        </ResponseField>

        {/* <!-- param-end:[snowflake-output-connector-v0.tableName, snowflake-output-connector-v1.tableName] --> */}

        {/* <!-- param-start:[snowflake-output-connector-v0.createTableMode, snowflake-output-connector-v1.createTableMode] | warehouses: [snowflake] --> */}

        <ResponseField name="Load Strategy" type="drop-down" required>
          Define what happens if the table name already exists in the specified Snowflake database and schema.

          * **Replace:** If the specified table name already exists, that table will be destroyed and replaced by the table created during this pipeline run.
          * **Truncate and Insert:** Each time the pipeline runs, two operations are performed: first, the table is truncated, meaning all existing rows are deleted. Then, your new rows are inserted. The table itself is never destroyed and recreated.
          * **Fail if Exists:** If the specified table name already exists, this pipeline will fail to run.
          * **Append:** If the specified table name already exists, then the data is inserted without altering or deleting the existing data in the table. It's appended onto the end of the existing data in the table. If the specified table name doesn't exist, then the table will be created, and your data will be inserted into the table.
        </ResponseField>

        {/* <!-- param-end:[snowflake-output-connector-v0.createTableMode, snowflake-output-connector-v1.createTableMode] --> */}

        {/* <!-- param-start:[snowflake-output-connector-v0.primaryKeys, snowflake-output-connector-v1.primaryKeys] | warehouses: [snowflake] --> */}

        <ResponseField name="Primary Keys" type="dual listbox">
          Select one or more columns to be designated as the table's primary key.
        </ResponseField>

        {/* <!-- param-end:[snowflake-output-connector-v0.primaryKeys, snowflake-output-connector-v1.primaryKeys] --> */}

        {/* <!-- param-start:[snowflake-output-connector-v0.cleanStagedFiles, snowflake-output-connector-v1.cleanStagedFiles] | warehouses: [snowflake] --> */}

        <ResponseField name="Clean Staged files" type="boolean" required>
          * **Yes:** Staged files will be destroyed after data is loaded. This is the default setting.
          * **No:** Staged files are retained in the staging area after data is loaded.
        </ResponseField>

        {/* <!-- param-end:[snowflake-output-connector-v0.cleanStagedFiles, snowflake-output-connector-v1.cleanStagedFiles] --> */}

        {/* <!-- param-start:[snowflake-output-connector-v0.stageAccessStrategyForS3, snowflake-output-connector-v1.stageAccessStrategyForS3] | warehouses: [snowflake] --> */}

        <ResponseField name="Stage Access Strategy" type="drop-down">
          Select the stage access strategy. The strategies available depend on the cloud platform you select in **Stage Platform**.

          * **Credentials:** Connects to the external stage (AWS, Azure) using your configured [cloud provider credentials](/docs/guides/cloud-credentials). Not available for Google Cloud Storage.
          * **Storage Integration:** Use a Snowflake storage integration to grant access to Snowflake to read data from and write to a cloud storage location. This will reveal the **Storage Integration** property, through which you can select any of your existing Snowflake storage integrations.
        </ResponseField>

        {/* <!-- param-end:[snowflake-output-connector-v0.stageAccessStrategyForS3, snowflake-output-connector-v1.stageAccessStrategyForS3] --> */}

        {/* <!-- param-start:[snowflake-output-connector-v0.stagePlatform, snowflake-output-connector-v1.stagePlatform] | warehouses: [snowflake] --> */}

        <ResponseField name="Stage Platform" type="drop-down" required>
          Use the drop-down menu to choose where the data is staged before being loaded into your Snowflake table.

          * **Amazon S3:** Stage your data on an AWS S3 bucket.
          * **Snowflake:** Stage your data on a Snowflake internal stage.
          * **Azure Storage:** Stage your data in an Azure Blob Storage container.
          * **Google Cloud Storage:** Stage your data in a Google Cloud Storage bucket.
        </ResponseField>

        {/* <!-- param-end:[snowflake-output-connector-v0.stagePlatform, snowflake-output-connector-v1.stagePlatform] --> */}

        {/* <!-- param-start:[snowflake-output-connector-v0.snowflake#internalStageType, snowflake-output-connector-v1.snowflake#internalStageType] | warehouses: [snowflake] --> */}

        <ResponseField name="Internal Stage Type" type="drop-down" required>
          Select the Snowflake internal stage type. Use the Snowflake links provided to learn more about each type of stage.

          * **User:** Each Snowflake user has a [user stage](https://docs.snowflake.com/en/user-guide/data-load-local-file-system-create-stage#user-stages) allocated to them by default for file storage. You may find the user stage convenient if your files will only be accessed by a single user, but need to be copied into multiple tables.
          * **Named:** A [named stage](https://docs.snowflake.com/en/user-guide/data-load-local-file-system-create-stage#named-stages) provides high flexibility for data loading. Users with the appropriate privileges on the stage can load data into any table. Furthermore, because the stage is a database object, any security or access rules that apply to all objects will apply to the named stage.

          Named stages can be altered and dropped. User stages cannot.
        </ResponseField>

        {/* <!-- param-end:[snowflake-output-connector-v0.snowflake#internalStageType, snowflake-output-connector-v1.snowflake#internalStageType] --> */}

        {/* <!-- param-start:[snowflake-output-connector-v0.snowflake#internalNamedStage, snowflake-output-connector-v1.snowflake#internalNamedStage] | warehouses: [snowflake] --> */}

        <ResponseField name="Named Stage" type="drop-down" required>
          Select your named stage. Read [Creating a named stage](https://docs.snowflake.com/en/user-guide/data-load-local-file-system-create-stage#creating-a-named-stage) to learn how to create a new named stage.

          <Warning>
            There is a known issue where named stages that include special characters or spaces are not supported.
          </Warning>
        </ResponseField>

        {/* <!-- param-end:[snowflake-output-connector-v0.snowflake#internalNamedStage, snowflake-output-connector-v1.snowflake#internalNamedStage] --> */}
      </Tab>

      <Tab title="Cloud Storage">
        {/* <!-- param-start:[storage-only-output-v0.prepareStageStrategy, storage-only-output-v1.prepareStageStrategy] | warehouses: [snowflake] --> */}

        <ResponseField name="Load Strategy" type="drop-down">
          * **Append Files in Folder:** Appends files to storage folder. This is the default setting.
          * **Overwrite Files in Folder:** Overwrite existing files with matching structure.
        </ResponseField>

        {/* <!-- param-end:[storage-only-output-v0.prepareStageStrategy, storage-only-output-v1.prepareStageStrategy] --> */}

        {/* <!-- param-start:[storage-only-output-v0.folderPath, storage-only-output-v1.folderPath] | warehouses: [snowflake] --> */}

        <ResponseField name="Folder Path" type="string">
          The folder path for the files to be written to. Note that this path follows, but does not include, the bucket or container name.
        </ResponseField>

        {/* <!-- param-end:[storage-only-output-v0.folderPath, storage-only-output-v1.folderPath] --> */}

        {/* <!-- param-start:[storage-only-output-v0.filePrefix, storage-only-output-v1.filePrefix] | warehouses: [snowflake] --> */}

        <ResponseField name="File Prefix" type="string">
          A string of characters that precedes the name of the written files. This can be useful for organizing database objects.
        </ResponseField>

        {/* <!-- param-end:[storage-only-output-v0.filePrefix, storage-only-output-v1.filePrefix] --> */}

        {/* <!-- param-start:[storage-only-output-v0.storage, storage-only-output-v1.storage] | warehouses: [snowflake] --> */}

        <ResponseField name="Storage" type="drop-down" required>
          A cloud storage location to load your data into files for storage. Choose either Amazon S3, Azure Storage, or Google Cloud Storage.
        </ResponseField>

        {/* <!-- param-end:[storage-only-output-v0.storage, storage-only-output-v1.storage] --> */}
      </Tab>
    </Tabs>
  </Tab>

  <Tab title="Databricks">
    <ResponseField name="Destination" type="drop-down" required>
      Select the destination for your data. This is either in Databricks as a table or as files in cloud storage.

      * **Databricks:** Load your data into Databricks. You'll need to set a cloud storage location for temporary staging of the data.
      * **Cloud Storage:** Load your data directly into files in your preferred cloud storage location.
    </ResponseField>

    <Tabs>
      <Tab title="Databricks">
        {/* <!-- param-start:[databricks-output-connector-v0.catalog, databricks-output-connector-v1.catalog] | warehouses: [databricks] --> */}

        <ResponseField name="Catalog" type="drop-down" required>
          Select a [Databricks Unity Catalog](https://docs.databricks.com/en/data-governance/unity-catalog/index.html). The special value `[Environment Default]` uses the catalog defined in the environment. Selecting a catalog will determine which databases are available in the next parameter.
        </ResponseField>

        {/* <!-- param-end:[databricks-output-connector-v0.catalog, databricks-output-connector-v1.catalog] --> */}

        {/* <!-- param-start:[databricks-output-connector-v0.schema, databricks-output-connector-v1.schema] | warehouses: [databricks] --> */}

        <ResponseField name="Schema" type="drop-down" required>
          The Databricks schema. The special value `[Environment Default]` uses the schema defined in the environment. Read [Create and manage schemas](https://docs.databricks.com/en/data-governance/unity-catalog/create-schemas.html) to learn more.
        </ResponseField>

        {/* <!-- param-end:[databricks-output-connector-v0.schema, databricks-output-connector-v1.schema] --> */}

        {/* <!-- param-start:[databricks-output-connector-v0.tableName, databricks-output-connector-v1.tableName] | warehouses: [databricks] --> */}

        <ResponseField name="Table Name" type="string" required>
          The name of the table to be created in your Databricks schema. You can use a [Table Input](/docs/components/table-input) component in a transformation pipeline to access and transform this data after it has been loaded.
        </ResponseField>

        {/* <!-- param-end:[databricks-output-connector-v0.tableName, databricks-output-connector-v1.tableName] --> */}

        {/* <!-- param-start:[databricks-output-connector-v0.loadStrategy, databricks-output-connector-v1.loadStrategy] | warehouses: [databricks] --> */}

        <ResponseField name="Load Strategy" type="drop-down" required>
          Define what happens if the table name already exists in the specified Databricks schema.

          * **Fail if Exists:** If the specified table name already exists, this pipeline will fail to run.
          * **Replace:** If the specified table name already exists, that table will be destroyed and replaced by the table created during this pipeline run.
          * **Truncate and Insert:** Each time the pipeline runs, two operations are performed: first, the table is truncated, meaning all existing rows are deleted. Then, your new rows are inserted. The table itself is never destroyed and recreated.
          * **Append:** If the specified table name already exists, then the data is inserted without altering or deleting the existing data in the table. It's appended onto the end of the existing data in the table. If the specified table name doesn't exist, then the table will be created, and your data will be inserted into the table.
        </ResponseField>

        {/* <!-- param-end:[databricks-output-connector-v0.loadStrategy, databricks-output-connector-v1.loadStrategy] --> */}

        {/* <!-- param-start:[databricks-output-connector-v0.cleanStagedFiles, databricks-output-connector-v1.cleanStagedFiles] | warehouses: [databricks] --> */}

        <ResponseField name="Clean Staged Files" type="boolean" required>
          * **Yes:** Staged files will be destroyed after data is loaded. This is the default setting.
          * **No:** Staged files are retained in the staging area after data is loaded.
        </ResponseField>

        {/* <!-- param-end:[databricks-output-connector-v0.cleanStagedFiles, databricks-output-connector-v1.cleanStagedFiles] --> */}

        {/* <!-- param-start:[databricks-output-connector-v0.stagePlatform, databricks-output-connector-v1.stagePlatform] | warehouses: [databricks] --> */}

        <ResponseField name="Stage Platform" type="drop-down" required>
          Use the drop-down menu to choose where the data is staged before being loaded into your Databricks table.

          * **Amazon S3:** Stage your data on an AWS S3 bucket.
          * **Azure Storage:** Stage your data in an Azure Blob Storage container.
        </ResponseField>

        {/* <!-- param-end:[databricks-output-connector-v0.stagePlatform, databricks-output-connector-v1.stagePlatform] --> */}
      </Tab>

      <Tab title="Cloud Storage">
        {/* <!-- param-start:[storage-only-output-v0.prepareStageStrategy, storage-only-output-v1.prepareStageStrategy] | warehouses: [databricks] --> */}

        <ResponseField name="Load Strategy" type="drop-down">
          * **Append Files in Folder:** Appends files to storage folder. This is the default setting.
          * **Overwrite Files in Folder:** Overwrite existing files with matching structure.
        </ResponseField>

        {/* <!-- param-end:[storage-only-output-v0.prepareStageStrategy, storage-only-output-v1.prepareStageStrategy] --> */}

        {/* <!-- param-start:[storage-only-output-v0.folderPath, storage-only-output-v1.folderPath] | warehouses: [databricks] --> */}

        <ResponseField name="Folder Path" type="string">
          The folder path for the files to be written to. Note that this path follows, but does not include, the bucket or container name.
        </ResponseField>

        {/* <!-- param-end:[storage-only-output-v0.folderPath, storage-only-output-v1.folderPath] --> */}

        {/* <!-- param-start:[storage-only-output-v0.filePrefix, storage-only-output-v1.filePrefix] | warehouses: [databricks] --> */}

        <ResponseField name="File Prefix" type="string">
          A string of characters that precedes the name of the written files. This can be useful for organizing database objects.
        </ResponseField>

        {/* <!-- param-end:[storage-only-output-v0.filePrefix, storage-only-output-v1.filePrefix] --> */}

        {/* <!-- param-start:[storage-only-output-v0.storage, storage-only-output-v1.storage] | warehouses: [databricks] --> */}

        <ResponseField name="Storage" type="drop-down" required>
          A cloud storage location to load your data into for storage. Choose either Amazon S3, Azure Storage, or Google Cloud Storage.
        </ResponseField>

        {/* <!-- param-end:[storage-only-output-v0.storage, storage-only-output-v1.storage] --> */}
      </Tab>
    </Tabs>
  </Tab>

  <Tab title="Amazon Redshift">
    <ResponseField name="Destination" type="drop-down" required>
      Select the destination for your data. This is either in Amazon Redshift as a table or as files in cloud storage.

      * **Redshift:** Load your data into Amazon Redshift. You'll need to set a cloud storage location for temporary staging of the data.
      * **Cloud Storage:** Load your data directly into files in your preferred cloud storage location.
    </ResponseField>

    <Tabs>
      <Tab title="Amazon Redshift">
        {/* <!-- param-start:[redshift-output-connector-v0.schema, redshift-output-connector-v1.schema] | warehouses: [redshift] --> */}

        <ResponseField name="Schema" type="drop-down" required>
          Select the Amazon Redshift schema that will contain your table. The special value `[Environment Default]` uses the schema defined in the environment. For information about using multiple schemas, read [Schemas](https://docs.aws.amazon.com/redshift/latest/dg/r_Schemas_and_tables.html).
        </ResponseField>

        {/* <!-- param-end:[redshift-output-connector-v0.schema, redshift-output-connector-v1.schema] --> */}

        {/* <!-- param-start:[redshift-output-connector-v0.table, redshift-output-connector-v1.table] | warehouses: [redshift] --> */}

        <ResponseField name="Table Name" type="string" required>
          The name of the table to be created in your Amazon Redshift database. You can use a [Table Input](/docs/components/table-input) component in a transformation pipeline to access and transform this data after it has been loaded.
        </ResponseField>

        {/* <!-- param-end:[redshift-output-connector-v0.table, redshift-output-connector-v1.table] --> */}

        {/* <!-- param-start:[redshift-output-connector-v0.createTableMode, redshift-output-connector-v1.createTableMode] | warehouses: [redshift] --> */}

        <ResponseField name="Load Strategy" type="drop-down" required>
          Define what happens if the table name already exists in the specified Amazon Redshift database and schema.

          * **Replace:** If the specified table name already exists, that table will be destroyed and replaced by the table created during this pipeline run.
          * **Fail if Exists:** If the specified table name already exists, this pipeline will fail to run.
          * **Truncate and Insert:** Each time the pipeline runs, two operations are performed: first, the table is truncated, meaning all existing rows are deleted. Then, your new rows are inserted. The table itself is never destroyed and recreated.
          * **Append:** If the specified table name already exists, then the data is inserted without altering or deleting the existing data in the table. It's appended onto the end of the existing data in the table. If the specified table name doesn't exist, then the table will be created, and your data will be inserted into the table.
        </ResponseField>

        {/* <!-- param-end:[redshift-output-connector-v0.createTableMode, redshift-output-connector-v1.createTableMode] --> */}

        {/* <!-- param-start:[redshift-output-connector-v0.cleanStagedFiles, redshift-output-connector-v1.cleanStagedFiles] | warehouses: [redshift] --> */}

        <ResponseField name="Clean Staged Files" type="boolean" required>
          * **Yes:** Staged files will be destroyed after data is loaded. This is the default setting.
          * **No:** Staged files are retained in the staging area after data is loaded.
        </ResponseField>

        {/* <!-- param-end:[redshift-output-connector-v0.cleanStagedFiles, redshift-output-connector-v1.cleanStagedFiles] --> */}

        {/* <!-- param-start:[redshift-output-connector-v0.s3#bucket, redshift-output-connector-v1.s3#bucket] | warehouses: [redshift] --> */}

        <ResponseField name="Amazon S3 Bucket" type="drop-down" required>
          An AWS S3 bucket to stage data into before it is loaded into your Amazon Redshift table. The drop-down menu will include buckets tied to the [cloud provider credentials](/docs/guides/cloud-credentials) that you have associated with your [environment](/docs/guides/environments).
        </ResponseField>

        {/* <!-- param-end:[redshift-output-connector-v0.s3#bucket, redshift-output-connector-v1.s3#bucket] --> */}
      </Tab>

      <Tab title="Cloud Storage">
        {/* <!-- param-start:[storage-only-output-v0.prepareStageStrategy, storage-only-output-v1.prepareStageStrategy] | warehouses: [redshift] --> */}

        <ResponseField name="Load Strategy" type="drop-down">
          * **Append Files in Folder:** Appends files to storage folder. This is the default setting.
          * **Overwrite Files in Folder:** Overwrite existing files with matching structure.
        </ResponseField>

        {/* <!-- param-end:[storage-only-output-v0.prepareStageStrategy, storage-only-output-v1.prepareStageStrategy] --> */}

        {/* <!-- param-start:[storage-only-output-v0.folderPath, storage-only-output-v1.folderPath] | warehouses: [redshift] --> */}

        <ResponseField name="Folder Path" type="string">
          The folder path for the files to be written to. Note that this path follows, but does not include, the bucket or container name.
        </ResponseField>

        {/* <!-- param-end:[storage-only-output-v0.folderPath, storage-only-output-v1.folderPath] --> */}

        {/* <!-- param-start:[storage-only-output-v0.filePrefix, storage-only-output-v1.filePrefix] | warehouses: [redshift] --> */}

        <ResponseField name="File Prefix" type="string">
          A string of characters to include at the beginning of the written files. Often used for organizing database objects.
        </ResponseField>

        {/* <!-- param-end:[storage-only-output-v0.filePrefix, storage-only-output-v1.filePrefix] --> */}

        {/* <!-- param-start:[storage-only-output-v0.storage, storage-only-output-v1.storage] | warehouses: [redshift] --> */}

        <ResponseField name="Storage" type="drop-down" required>
          A cloud storage location to load your data into for storage. Choose either Amazon S3, Azure Storage, or Google Cloud Storage.
        </ResponseField>

        {/* <!-- param-end:[storage-only-output-v0.storage, storage-only-output-v1.storage] --> */}
      </Tab>
    </Tabs>
  </Tab>
</Tabs>

### Advanced Settings

<ResponseField name="Poll Timeout" type="integer" required>
  The maximum amount of time in milliseconds that a consumer will await new messages when calling the `poll()` method if none are currently available. If messages are available, they are retrieved immediately. By setting a poll timeout, you can prevent consumers from continuously looping and consuming CPU resources.

  The default is 10,000 (ten thousand milliseconds).
</ResponseField>

{/* <!-- param-start:[kafka-input-v1.consumerPropertyOverrides] | warehouses: [snowflake, databricks, redshift] --> */}

<ResponseField name="Consumer Property Overrides" type="column editor" required>
  Modify or customize specific settings in your Kafka consumer configurations at runtime. You may wish to make use of this setting if you have default consumer setups but want to adjust certain properties for particular consumers without altering any global or shared configurations.

  * **Property:** The property whose value you wish to override. For example `max.poll.records`.
  * **Value:** The override value of the corresponding property. For example `50`. This would limit how many records are returned on each poll to 50. You might wish to do this for performance optimization.

  The following consumer properties are prohibited from being overridden for security reasons and/or performance concerns:

  * interceptor.classes
  * key.deserializer
  * metric.reporters
  * sasl.client.callback.handler.class
  * sasl.jaas.config
  * sasl.kerberos.kinit.cmd
  * sasl.kerberos.min.time.before.relogin
  * sasl.kerberos.service.name
  * sasl.kerberos.ticket.renew\.jitter
  * sasl.kerberos.ticket.renew\.window\.factor
  * sasl.login.callback.handler.class
  * sasl.login.class
  * sasl.login.connect.timeout.ms
  * sasl.login.read.timeout.ms
  * sasl.login.refresh.buffer.seconds
  * sasl.login.refresh.min.period.seconds
  * sasl.login.refresh.window\.factor
  * sasl.login.refresh.window\.jitter
  * sasl.login.retry.backoff.max.ms
  * sasl.login.retry.backoff.ms
  * sasl.mechanism
  * sasl.oauthbearer.clock.skew\.seconds
  * sasl.oauthbearer.expected.audience
  * sasl.oauthbearer.expected.issuer
  * sasl.oauthbearer.jwks.endpoint.refresh.ms
  * sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms
  * sasl.oauthbearer.jwks.endpoint.retry.backoff.ms
  * sasl.oauthbearer.jwks.endpoint.url
  * sasl.oauthbearer.scope.claim.name
  * sasl.oauthbearer.sub.claim.name
  * sasl.oauthbearer.token.endpoint.url
  * security.protocol
  * security.providers
  * ssl.cipher.suites
  * ssl.enabled.protocols
  * ssl.endpoint.identification.algorithm
  * ssl.engine.factory.class
  * ssl.key.password
  * ssl.keymanager.algorithm
  * ssl.keystore.certificate.chain
  * ssl.keystore.key
  * ssl.keystore.location
  * ssl.keystore.password
  * ssl.keystore.type
  * ssl.protocol
  * ssl.provider
  * ssl.secure.random.implementation
  * ssl.trustmanager.algorithm
  * ssl.truststore.certificates
  * ssl.truststore.location
  * ssl.truststore.password
  * ssl.truststore.type
  * value.deserializer
</ResponseField>

## Parallel loading

Parallel loading is a technique to maximize data ingestion throughput by reading from a Kafka topic concurrently. Instead of a single Kafka component reading all data partition-by-partition, you can run multiple components in parallel, where each one processes a different subset of the topic's partitions simultaneously. The effectiveness of parallel loading is directly limited by the number of partitions in your Kafka topic. So, if your topic has 8 partitions, you can run a maximum of 8 parallel Kafka components to read from it.

The recommended method for implementing this is to use a [Loop Iterator](/docs/components/loop-iterator) to run multiple instances of the Kafka component.

***

## Deactivate soft delete for Azure blobs (Databricks)

If you intend to set your destination as Databricks *and* your stage platform as Azure Storage, you must turn off the "Enable soft delete for blobs" setting in your Azure account for your pipeline to run successfully. To do this:

1. In the [Azure portal](https://portal.azure.com/), navigate to your storage account.
2. In the menu, under **Data management**, click **Data protection**.
3. Clear the **Enable soft delete for blobs** checkbox. For more information, read [Soft delete for blobs](https://learn.microsoft.com/en-gb/azure/storage/blobs/soft-delete-blob-overview).
