Scylla CDC Source Connector is a source connector capturing row-level changes in the tables of a Scylla cluster. It is a Debezium connector, compatible with Kafka Connect (with Kafka 2.6.0+) and built on top of scylla-cdc-java library.
The connector reads the CDC log for specified tables and produces Kafka messages for each row-level INSERT, UPDATE or DELETE operation. The connector is able to split reading the CDC log accross multiple processes: the connector can start a separate Kafka Connect task for reading each Vnode of Scylla cluster allowing for high throughput. You can limit the number of started tasks by using tasks.max property.
Scylla CDC Source Connector seamlessly handles schema changes and topology changes (adding, removing nodes from Scylla cluster). The connector is fault-tolerant, retrying reading data from Scylla in case of failure. It periodically saves the current position in Scylla CDC log using Kafka Connect offset tracking (configurable by offset.flush.interval.ms parameter). If the connector is stopped, it is able to resume reading from previously saved offset. Scylla CDC Source Connector has at-least-once semantics.
The connector has the following capabilities:
- Kafka Connect connector using Debezium framework
- Replication of row-level changes from Scylla using Scylla CDC:
INSERTUPDATEDELETE(single row deletes)
- High scalability - able to split work accross multiple Kafka Connect workers
- Fault tolerant - connector periodically saves its progress and can resume from previously saved offset (with at-least-once semantics)
- Support for many standard Kafka Connect converters, such as JSON and Avro
- Compatible with standard Kafka Connect transformations
- Metadata about CDC events - each generated Kafka message contains information about source, such as timestamp and table name
- Seamless handling of schema changes and topology changes (adding, removing nodes from Scylla cluster)
- Preimage and postimage support (optional) - messages generated for row-level changes can have their
beforeand/orafterfields filled with complete row state from corresponding preimage/postimage rows. - Full support for collection types (
LIST,SET,MAP) and User Defined Types (UDT) - see Collection and UDT Type Support
The connector has the following limitations:
- Only Kafka 2.6.0+ is supported
- Only row-level operations are produced (
INSERT,UPDATE,DELETE):- Row range deletes - those changes are ignored
- Partition deletes - see Partition Delete Support section below
- By default, changes only contain those columns that were modified, not the entire row before/after change. To include full row state, configure preimage/postimage support. More information here
The connector is tested against the latest Apache Kafka 3.x, 4.x and currently last 6 minor Confluent Platform releases. It should remain compatible with those versions.
Below is a summary of the Debezium and Kafka (Connect API) dependency versions used by the last few tagged connector releases.
| Connector version | Debezium version | Connect API |
|---|---|---|
| v2.0.0 | 2.7.4.Final | 8.1.1-ccs |
| v1.2.6 | 2.6.2.Final | 3.9.1 |
| v1.2.5 | 2.6.2.Final | 3.3.1 |
| v1.2.4 | 2.6.2.Final | 3.3.1 |
| v1.2.3 | 2.6.2.Final | 3.3.1 |
| v1.2.2 | 1.4.1.Final | 3.3.1 |
For older releases, consult the corresponding tag's pom.xml for the <debezium.version> and <kafka.version> properties.
You can download the connector as a prebuilt package:
- JAR with dependencies from github releases (fat JAR)
- Confluent Hub package (ZIP)
The artifacts are also available in Maven Central Repository - we recommend using the "JAR with dependencies" file there.
You can also build the connector from source by using the following commands:
git clone https://github.com/scylladb/scylla-cdc-source-connector.git
cd scylla-cdc-source-connector
mvn clean packageThe connector JAR file will be available in target/fat-jar directory.
Copy the JAR file with connector into your Kafka Connect deployment and append the directory containing the connector to your Kafka Connect's plugin path (plugin.path configuration property).
Scylla CDC Source Connector exposes many configuration properties. These are the most important:
| Property | Required | Description |
|---|---|---|
(until 1.2.2)scylla.name(since 1.2.3) topic.prefix |
Yes | A unique name that identifies the Scylla cluster and that is used as a prefix for all schemas, topics. The logical name allows you to easily differentiate between your different Scylla cluster deployments. Each distinct Scylla installation should have a separate namespace and be monitored by at most one Scylla CDC Source Connector. It should consist of alphanumeric or underscore (_) characters. |
scylla.cluster.ip.addresses |
Yes | List of IP addresses of nodes in the Scylla cluster that the connector will use to open initial connections to the cluster. In the form of a comma-separated list of pairs : (host1:port1,host2:port2). |
scylla.table.names |
Yes | List of CDC-enabled table names for connector to read. See Change Data Capture (CDC) for more information about configuring CDC on Scylla. Provided as a comma-separated list of pairs <keyspace name>.<table name>. |
scylla.user |
No | The username to connect to Scylla with. If not set, no authorization is done. |
scylla.password |
No | The password to connect to Scylla with. If not set, no authorization is done. |
⚠️ Deprecation Warning: The legacy output format is deprecated and will be removed in version 3.0.0. New deployments should use the advanced format (cdc.output.format=advanced). Existing deployments using legacy format should plan to migrate before upgrading to 3.0.0.
The connector supports two output formats. Choose based on your requirements:
| Feature | Legacy Format (Default) | Advanced Format |
|---|---|---|
| Configuration | cdc.output.format=legacy |
cdc.output.format=advanced |
| Column value representation | Wrapped in Cell structs ({"value": ...}) |
Direct values |
| Preimage support | experimental.preimages.enabled=true |
cdc.include.before=full|only-updated |
| Postimage support | Not supported | cdc.include.after=full|only-updated |
| Primary key placement options | Fixed | Configurable via cdc.include.primary-key.placement |
| Recommended for | Existing deployments, backward compatibility | New deployments, cleaner message structure |
See Legacy Format Details and Advanced Format Details for complete documentation.
See additional configuration properties in the "Advanced administration" section.
Example configuration (as .properties file):
name=ScyllaCDCSourceConnector
connector.class=com.scylladb.cdc.debezium.connector.ScyllaConnector
# use scylla.name instead for versions < 1.2.3
topic.prefix=MyScyllaCluster
scylla.cluster.ip.addresses=127.0.0.1:9042,127.0.0.2:9042
scylla.table.names=ks.my_table
tasks.max=10
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
auto.create.topics.enable=true
This configuration will capture row-level changes in the ks.my_table table from Scylla cluster (127.0.0.1, 127.0.0.2). Change data capture events will appear on MyScyllaCluster.ks.my_table Kafka topic encoded as JSONs with schema information.
Note that some of the unspecified properties will be set according to the defaults or your common connector configuration defined by your Kafka Connect installation. For example if you skip converter settings, they can be different depending on the Kafka Connect version you're using.
Scylla CDC Source Connector writes events to a separate Kafka topic for each source Scylla table. The topic name will be: logicalName.keyspaceName.tableName (logical name configured by topic.prefix property). You can turn on automatic topic creation by using the auto.create.topics.enable property.
Scylla CDC Source Connector generates a data change event for each row-level INSERT, UPDATE or DELETE operation. Each event consists of key and value.
Debezium and Kafka Connect are designed around continuous streams of event messages, and the structure of these events may change over time. This could be difficult for consumers to deal with, so to make it easy Kafka Connect makes each event self-contained. Every message key and value has two parts: a schema and payload. The schema describes the structure of the payload, while the payload contains the actual data.
Important: The connector supports two output formats: Legacy (default) and Advanced. The format affects the structure of messages, particularly how non-primary-key column values are represented. See Output Format Configuration for details on choosing a format.
The data change event's key will contain a field for each column in the primary key (partition key and clustering key).
For example, given this Scylla table and INSERT operation:
CREATE TABLE ks.t(
pk int, ck int, v text, PRIMARY KEY(pk, ck)
) WITH cdc = {'enabled': true};
INSERT INTO ks.t(pk, ck, v) VALUES (1, 1, 'example row');
The data change event's key will look like this (with JSON serializer and schema enabled):
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": true,
"field": "ck"
},
{
"type": "int32",
"optional": true,
"field": "pk"
}
],
"optional": false,
"name": "MyScyllaCluster.ks.my_table.Key"
},
"payload": {
"ck": 1,
"pk": 1
}
}Data change event's value consists of a schema and a payload section. The payload of every data change events contains the following fields:
op: type of operation.cforINSERT,uforUPDATE,dforDELETE.before: an optional field with state of the row before the event occurred. Present inDELETEdata change events.after: an optional field with state of the row after the event occurred. Present inUPDATEandINSERTdata change events.ts_ms: time at which connector processed the event.source: metadata about the source of event:name: logical name of Scylla cluster (scylla.name).ts_ms: the time that the change was made in the database (in milliseconds). You can compute a difference betweensource.ts_msand (top-level)ts_msto determine the lag between the source Scylla change and the connector.ts_us: the time that the change was made in the database (in microseconds).keyspace_name,table_name: the name of keyspace and table this data change event originated from.
⚠️ Deprecation Warning: The legacy output format is deprecated and will be removed in version 3.0.0. Please migrate to the advanced format. See Migrating from Legacy to Advanced Format.
This section describes the message structure and behavior specific to the legacy output format (cdc.output.format=legacy, the default). For advanced format, see Advanced Format Details.
Operations in Scylla, such as INSERT or UPDATE, do not have to modify all columns of a row. In the legacy output format, to differentiate between non-modification of column and inserting/updating NULL, all non-primary-key columns are wrapped with structure containing a single value field. For example, given this Scylla table and UPDATE operation:
CREATE TABLE ks.t(
pk int, ck int, v text, PRIMARY KEY(pk, ck)
) WITH cdc = {'enabled': true};
INSERT INTO ks.t(pk, ck, v) VALUES (1, 1, 'example row');
UPDATE ks.t SET v = 'new value' WHERE pk = 1 AND ck = 1;
The v column will be represented as:
...
"v": {
"value": "new value"
}
...In case of UPDATE setting v to NULL:
UPDATE ks.t SET v = NULL WHERE pk = 1 AND ck = 1;
The v column will be represented as:
...
"v": {
"value": null
}
...If the operation did not modify the v column, the data event will contain the following representation of v:
...
"v": null
...See UPDATE example for full data change event's value.
To enable preimage support in legacy format, use the experimental.preimages.enabled configuration option:
experimental.preimages.enabled=trueNote: Legacy format does not support postimages. For postimage support, use the advanced format with
cdc.include.afterconfiguration.
The connector provides two single message transformations (SMTs) that are primarily useful for the legacy format: ScyllaExtractNewRecordState and ScyllaFlattenColumns.
ScyllaExtractNewRecordState (class: com.scylladb.cdc.debezium.connector.transforms.ScyllaExtractNewRecordState) works exactly like io.debezium.transforms.ExtractNewRecordState (in fact it is called underneath), but also flattens structure by extracting values from Cell structs (the {"value": ...} wrapper used in legacy format).
Note: This transform works with both output formats. For legacy format (the default), it unwraps Cell structs. For advanced format (where values are already direct), it is effectively a pass-through for the value extraction part.
Such transformation makes message structure simpler (and easier to use with e.g. Elasticsearch), but when used with legacy format, it makes it impossible to differentiate between NULL value and non-modification. If the message is as following:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": true,
"field": "ck"
},
{
"type": "int32",
"optional": true,
"field": "pk"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": true,
"field": "value"
}
],
"optional": true,
"name": "NS2.ks.t.v.Cell",
"field": "v"
}
],
"optional": false,
"name": "NS2.ks.t.After"
},
"payload": {
"ck": 2,
"pk": 20,
"v": {
"value": 3
}
}
}then the same message transformed by ScyllaExtractNewRecordState would be:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": true,
"field": "ck"
},
{
"type": "int32",
"optional": true,
"field": "pk"
},
{
"type": "int32",
"optional": true,
"field": "v"
}
],
"optional": false,
"name": "NS2.ks.t.After"
},
"payload": {
"ck": 2,
"pk": 20,
"v": 3
}
}Notice how v field is no longer packed in value.
ScyllaFlattenColumns (class: com.scylladb.cdc.debezium.connector.transforms.ScyllaFlattenColumns) flattens columns that are wrapped in Cell structures (the {"value": ...} wrapper used in legacy format), such as:
"v": {
"value": 3
}transforming it into:
"v": 3Compared to ScyllaExtractNewRecordState transformation, ScyllaFlattenColumns does not remove any additional metadata or modify the message in any other way.
For example, running the transformation on this message:
{
"source": {
"version": "1.1.4",
"connector": "scylla",
"name": "SMTExample",
"ts_ms": 1706890860030,
"snapshot": {
"string": "false"
},
"db": "ks",
"keyspace_name": "ks",
"table_name": "t",
"ts_us": 1706890860030414
},
"before": null,
"after": {
"SMTExample.ks.t.Before": {
"ck": 7,
"pk": 1,
"v": {
"value": 7
}
}
},
"op": {
"string": "c"
},
"ts_ms": {
"long": 1706890892952
},
"transaction": null
}will result in the following message:
{
"source": {
"version": "1.1.4",
"connector": "scylla",
"name": "SMTExample",
"ts_ms": 1706890860030,
"snapshot": {
"string": "false"
},
"db": "ks",
"keyspace_name": "ks",
"table_name": "t",
"ts_us": 1706890860030414
},
"before": null,
"after": {
"SMTExample.ks.t.Before": {
"ck": 7,
"pk": 1,
"v": 7
}
},
"op": {
"string": "c"
},
"ts_ms": {
"long": 1706890892952
},
"transaction": null
}while ScyllaExtractNewRecordState would produce:
{
"ck": 7,
"pk": 1,
"v": 7
}The following examples show data change events in legacy format (the default). Notice how non-primary-key columns like v are wrapped in Cell structs ({"value": ...}).
Given this Scylla table and INSERT operation:
CREATE TABLE ks.t(
pk int, ck int, v text, PRIMARY KEY(pk, ck)
) WITH cdc = {'enabled': true};
INSERT INTO ks.t(pk, ck, v) VALUES (1, 1, 'example row');
The connector will generate the following data change event's value (with JSON serializer and schema enabled):
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "int64",
"optional": false,
"field": "ts_us"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": false,
"field": "keyspace_name"
},
{
"type": "string",
"optional": false,
"field": "table_name"
}
],
"optional": false,
"name": "com.scylladb.cdc.debezium.connector",
"field": "source"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": true,
"field": "ck"
},
{
"type": "int32",
"optional": true,
"field": "pk"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "value"
}
],
"optional": true,
"name": "MyScyllaCluster.ks.my_table.v.Cell",
"field": "v"
}
],
"optional": true,
"name": "MyScyllaCluster.ks.my_table.Before",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": true,
"field": "ck"
},
{
"type": "int32",
"optional": true,
"field": "pk"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "value"
}
],
"optional": true,
"name": "MyScyllaCluster.ks.my_table.v.Cell",
"field": "v"
}
],
"optional": true,
"name": "MyScyllaCluster.ks.my_table.After",
"field": "after"
},
{
"type": "string",
"optional": true,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"field": "transaction"
}
],
"optional": false,
"name": "MyScyllaCluster.ks.my_table.Envelope"
},
"payload": {
"source": {
"version": "1.0.1-SNAPSHOT",
"connector": "scylla",
"name": "MyScyllaCluster",
"ts_ms": 1611578778701,
"ts_us": 1611578778701813,
"snapshot": "false",
"db": "ks",
"keyspace_name": "ks",
"table_name": "my_table"
},
"before": null,
"after": {
"ck": 1,
"pk": 1,
"v": {
"value": "example row"
}
},
"op": "c",
"ts_ms": 1611578838754,
"transaction": null
}
}Given this Scylla table and UPDATE operations:
CREATE TABLE ks.t(
pk int, ck int, v text, PRIMARY KEY(pk, ck)
) WITH cdc = {'enabled': true};
UPDATE ks.t SET v = 'new value' WHERE pk = 1 AND ck = 1;
UPDATE ks.t SET v = NULL WHERE pk = 1 AND ck = 1;
The connector will generate the following data change event's value (with JSON serializer and schema enabled) for the first UPDATE. Note that schema is ommitted as it is the same as in INSERT example:
{
"schema": {},
"payload": {
"source": {
"version": "1.0.1-SNAPSHOT",
"connector": "scylla",
"name": "MyScyllaCluster",
"ts_ms": 1611578808701,
"ts_us": 1611578808701321,
"snapshot": "false",
"db": "ks",
"keyspace_name": "ks",
"table_name": "my_table"
},
"before": null,
"after": {
"ck": 1,
"pk": 1,
"v": {
"value": "new value"
}
},
"op": "u",
"ts_ms": 1611578868758,
"transaction": null
}
}Data change event's value for the second UPDATE:
{
"schema": {},
"payload": {
"source": {
"version": "1.0.1-SNAPSHOT",
"connector": "scylla",
"name": "MyScyllaCluster",
"ts_ms": 1611578808701,
"ts_us": 1611578808701341,
"snapshot": "false",
"db": "ks",
"keyspace_name": "ks",
"table_name": "my_table"
},
"before": null,
"after": {
"ck": 1,
"pk": 1,
"v": {
"value": null
}
},
"op": "u",
"ts_ms": 1611578868758,
"transaction": null
}
}Given this Scylla table and DELETE operation:
CREATE TABLE ks.t(
pk int, ck int, v text, PRIMARY KEY(pk, ck)
) WITH cdc = {'enabled': true};
DELETE FROM ks.t WHERE pk = 1 AND ck = 1;
The connector will generate the following data change event's value (with JSON serializer and schema enabled). Note that schema is ommitted as it is the same as in INSERT example:
{
"schema": {},
"payload": {
"source": {
"version": "1.0.1-SNAPSHOT",
"connector": "scylla",
"name": "MyScyllaCluster",
"ts_ms": 1611578808701,
"ts_us": 1611578808701919,
"snapshot": "false",
"db": "ks",
"keyspace_name": "ks",
"table_name": "my_table"
},
"before": {
"ck": 1,
"pk": 1,
"v": null
},
"after": null,
"op": "d",
"ts_ms": 1611578868759,
"transaction": null
}
}This section describes the message structure and behavior specific to the advanced output format (cdc.output.format=advanced). For legacy format details, see Legacy Format Details.
In the advanced format, non-primary-key column values are emitted directly without the Cell struct wrapper. This results in cleaner, more compact messages.
The following examples show data change events in advanced format. Notice how non-primary-key columns like v contain values directly (no {"value": ...} wrapper).
Given this Scylla table and INSERT operation:
CREATE TABLE ks.t(
pk int, ck int, v text, PRIMARY KEY(pk, ck)
) WITH cdc = {'enabled': true};
INSERT INTO ks.t(pk, ck, v) VALUES (1, 1, 'example row');The connector will generate the following data change event's value (payload only, schema omitted for brevity):
{
"payload": {
"source": {
"version": "1.0.1-SNAPSHOT",
"connector": "scylla",
"name": "MyScyllaCluster",
"ts_ms": 1611578778701,
"ts_us": 1611578778701813,
"snapshot": "false",
"db": "ks",
"keyspace_name": "ks",
"table_name": "my_table"
},
"before": null,
"after": {
"ck": 1,
"pk": 1,
"v": "example row"
},
"op": "c",
"ts_ms": 1611578838754,
"transaction": null
}
}Note: The v column value is "example row" directly, not {"value": "example row"} as in legacy format.
Given an UPDATE operation:
UPDATE ks.t SET v = 'new value' WHERE pk = 1 AND ck = 1;The connector will generate:
{
"payload": {
"source": {
"version": "1.0.1-SNAPSHOT",
"connector": "scylla",
"name": "MyScyllaCluster",
"ts_ms": 1611578808701,
"ts_us": 1611578808701321,
"snapshot": "false",
"db": "ks",
"keyspace_name": "ks",
"table_name": "my_table"
},
"before": null,
"after": {
"ck": 1,
"pk": 1,
"v": "new value"
},
"op": "u",
"ts_ms": 1611578868758,
"transaction": null
}
}Given a DELETE operation:
DELETE FROM ks.t WHERE pk = 1 AND ck = 1;The connector will generate:
{
"payload": {
"source": {
"version": "1.0.1-SNAPSHOT",
"connector": "scylla",
"name": "MyScyllaCluster",
"ts_ms": 1611578808701,
"ts_us": 1611578808701919,
"snapshot": "false",
"db": "ks",
"keyspace_name": "ks",
"table_name": "my_table"
},
"before": {
"ck": 1,
"pk": 1,
"v": null
},
"after": null,
"op": "d",
"ts_ms": 1611578868759,
"transaction": null
}
}The advanced format supports flexible preimage and postimage configuration through the cdc.include.before and cdc.include.after options. See Preimage/Postimage Configuration for detailed documentation.
Example configuration:
cdc.output.format=advanced
cdc.include.before=full
cdc.include.after=fullWith cdc.include.before=full and cdc.include.after=full, an UPDATE will include complete row state:
{
"payload": {
"source": { ... },
"before": {
"ck": 1,
"pk": 1,
"v": "old value"
},
"after": {
"ck": 1,
"pk": 1,
"v": "new value"
},
"op": "u",
"ts_ms": 1611578868758,
"transaction": null
}
}The connector fully supports Scylla collection types (LIST, SET, MAP) and User Defined Types (UDT). Both frozen and non-frozen collections are supported, with different CDC behavior for each.
| Type | Frozen | Non-Frozen |
|---|---|---|
| CDC Behavior | Entire collection is replaced atomically | Element-level changes (add, remove, update) |
| Message Content | Complete collection value | Only the modified elements |
| Use Case | Small, rarely modified collections | Large collections with frequent element updates |
Lists are represented as JSON arrays:
{
"after": {
"pk": 1,
"my_list": ["item1", "item2", "item3"]
}
}For non-frozen lists, Scylla CDC tracks element-level changes. The connector reconstructs the list from the CDC log entries.
Sets are represented as JSON arrays (order is not guaranteed):
{
"after": {
"pk": 1,
"my_set": ["value1", "value2", "value3"]
}
}Maps are represented as arrays of key-value structs:
{
"after": {
"pk": 1,
"my_map": [
{"key": "key1", "value": "value1"},
{"key": "key2", "value": "value2"}
]
}
}This representation is used instead of a JSON object to support non-string key types.
UDTs are represented as structs with fields matching the UDT definition:
CREATE TYPE ks.address (
street text,
city text,
zip int
);
CREATE TABLE ks.users (
id int PRIMARY KEY,
home_address frozen<address>
) WITH cdc = {'enabled': true};The resulting message:
{
"after": {
"id": 1,
"home_address": {
"street": "123 Main St",
"city": "Springfield",
"zip": 12345
}
}
}The connector supports nested structures such as:
LIST<frozen<UDT>>MAP<text, frozen<LIST<int>>>- UDTs containing collection fields
Nested types are represented using the same conventions as top-level types.
To enable CDC for tables with collections:
-- Frozen collection (atomic updates only)
CREATE TABLE ks.with_frozen (
pk int PRIMARY KEY,
data frozen<list<text>>
) WITH cdc = {'enabled': true};
-- Non-frozen collection (element-level tracking)
CREATE TABLE ks.with_non_frozen (
pk int PRIMARY KEY,
data list<text>
) WITH cdc = {'enabled': true};In addition to the configuration parameters described in the "Configuration" section, Scylla CDC Source Connector exposes the following (non-required) configuration parameters:
| Property | Description |
|---|---|
scylla.query.time.window.size |
The size of windows queried by the connector. Changes are queried using SELECT statements with time restriction with width defined by this parameter. Value expressed in milliseconds. |
scylla.confidence.window.size |
The size of the confidence window. It is necessary for the connector to avoid reading too fresh data from the CDC log due to the eventual consistency of Scylla. The problem could appear when a newer write reaches a replica before some older write. For a short period of time, when reading, it is possible for the replica to return only the newer write. The connector mitigates this problem by not reading a window of most recent changes (controlled by this parameter). Value expressed in milliseconds. |
scylla.consistency.level |
The consistency level of CDC table read queries. This consistency level is used only for read queries to the CDC log table. By default, QUORUM level is used. |
scylla.local.dc |
The name of Scylla local datacenter. This local datacenter name will be used to setup the connection to Scylla to prioritize sending requests to the nodes in the local datacenter. If not set, no particular datacenter will be prioritized. |
scylla.initial.lookback.ms |
Maximum time in milliseconds to look back when the connector starts without saved offsets. When set to a positive value, the connector will begin reading CDC changes from (current time - this value) instead of from the beginning of the first CDC generation. This prevents the connector from scanning through a potentially large number of empty windows on first start, reducing cluster load. Set to 0 to start from the beginning of the first CDC generation (default behavior). |
The connector supports SSL/TLS connections to Scylla. The following properties configure SSL:
| Property | Description |
|---|---|
scylla.ssl.enabled |
Flag to determine if SSL is enabled when connecting to Scylla. Default: false. |
scylla.ssl.provider |
The SSL provider to use. Valid values: JDK, OPENSSL, OPENSSL_REFCNT. Default: JDK. |
scylla.ssl.truststore.path |
Path to the Java truststore. |
scylla.ssl.truststore.password |
Password to open the Java truststore with. |
scylla.ssl.keystore.path |
Path to the Java keystore. |
scylla.ssl.keystore.password |
Password to open the Java keystore with. |
scylla.ssl.cipherSuites |
The cipher suites to enable (comma-separated list). Defaults to none, resulting in a minimal quality of service according to JDK documentation. |
scylla.ssl.openssl.keyCertChain |
Path to the SSL certificate chain file, when using OpenSSL provider. |
scylla.ssl.openssl.privateKey |
Path to the private key file, when using OpenSSL provider. |
These properties control how the connector's worker tasks connect to Scylla and handle retries:
| Property | Default | Description |
|---|---|---|
worker.retry.backoff.base |
50 |
Initial backoff in milliseconds for retried queries to Scylla. Each consecutive retry increases exponentially by a factor of 2 up to the configured max backoff. |
worker.maximum.backoff |
30000 |
Maximum backoff in milliseconds for retried queries to Scylla. |
worker.jitter.percentage |
20 |
Jitter percentage applied to retry backoffs to spread out retry surges. A value of 20 means the backoff will have randomly up to 20% of its value subtracted. Min: 1, Max: 100. |
worker.pooling.core.pool.local |
1 |
Target number of connections per local Scylla node. The driver session will aim to maintain this many connections per node. |
worker.pooling.max.pool.local |
1 |
Maximum number of connections per local Scylla node. Additional connections are opened when existing ones exceed the concurrent request threshold. |
worker.pooling.max.queue.size |
512 |
Maximum request queue size per connection pool. Requests are enqueued when no connection is available. Increase this for setups with many nodes/shards but few connector tasks. |
worker.pooling.max.requests.per.connection |
1024 |
Maximum concurrent requests per connection to a local Scylla node. Requests above this limit are enqueued. |
worker.pooling.pool.timeout.ms |
5000 |
Timeout in milliseconds for acquiring a connection from a host's pool. |
The connector supports two output formats for CDC messages, controlled by the cdc.output.format configuration option.
For detailed documentation with examples, see:
- Legacy Format Details - Default format with Cell struct wrappers
- Advanced Format Details - Direct values, advanced preimage/postimage support
| Property | Default | Values | Description |
|---|---|---|---|
cdc.output.format |
legacy |
legacy, advanced |
Specifies the output format for CDC messages. See format descriptions below. |
experimental.preimages.enabled |
false |
true, false |
Enable preimage support in legacy mode only. For advanced mode, use cdc.include.before instead. |
| Format | Description |
|---|---|
legacy |
Default. V1 format where non-PK column values are wrapped in Cell structs ({"value": <actual_value>}). Uses experimental.preimages.enabled for simple preimage support. Does not support postimages or advanced configuration options. |
advanced |
Non-PK column values are emitted directly without wrapping. Supports advanced preimage/postimage configuration via cdc.include.before and cdc.include.after. Supports flexible primary key placement via cdc.include.primary-key.placement. |
⚠️ Deprecated: Will be removed in version 3.0.0. Use advanced format for new deployments.
The legacy format wraps non-PK column values in Cell structs:
{
"after": {
"pk": 1,
"ck": 1,
"v": {
"value": "example value"
}
}
}Key characteristics:
- Non-PK columns are wrapped in
{"value": <actual_value>}structs - Use
experimental.preimages.enabled=truefor preimage support (notcdc.include.before) - Postimages are not supported in legacy mode
- The
cdc.include.*configuration options do not apply in legacy mode
The advanced format emits column values directly:
{
"after": {
"pk": 1,
"ck": 1,
"v": "example value"
}
}This format supports:
- Preimage and postimage configuration via
cdc.include.beforeandcdc.include.after - Flexible primary key placement via
cdc.include.primary-key.placement - Cleaner message structure without Cell wrappers
If you want to migrate from the legacy format to the advanced format:
- Schema changes: The advanced format has a different message schema (no Cell wrappers). Downstream consumers may need updates.
- Configuration changes:
- Add
cdc.output.format=advanced - Remove
experimental.preimages.enabled=true - Add
cdc.include.before=full(oronly-updated) if you need preimages - Optionally configure
cdc.include.afterfor postimage support
- Add
- SMT changes: If using
ScyllaExtractNewRecordStateorScyllaFlattenColumnstransforms, they will work with both formats (they are no-ops for advanced format since values aren't wrapped).
# Use advanced output format
cdc.output.format=advanced
# Enable preimage support (advanced mode)
cdc.include.before=fullNote: This section applies to the advanced output format (
cdc.output.format=advanced). For legacy format (the default), useexperimental.preimages.enabledinstead. See Legacy Format Details.
The connector supports including the complete row state before and/or after a change in CDC messages. This is useful when you need the full context of a row, not just the columns that were modified.
| Property | Default | Values | Description |
|---|---|---|---|
cdc.include.before |
none |
none, full, only-updated |
Specifies whether to include the 'before' state of the row in CDC messages. Requires the Scylla table to have preimage enabled (WITH cdc = {'preimage': true}) for full or only-updated modes. |
cdc.include.after |
none |
none, full, only-updated |
Specifies whether to include the 'after' state of the row in CDC messages. Requires the Scylla table to have postimage enabled (WITH cdc = {'postimage': true}) for full or only-updated modes. |
cdc.include.primary-key.placement |
kafka-key,payload-after,payload-before |
Comma-separated list of: kafka-key, payload-after, payload-before, payload-key, kafka-headers |
Specifies where primary key (PK) and clustering key (CK) columns should be included in the output. See Primary Key Placement for details. |
cdc.include.primary-key.payload-key-name |
key |
Any valid field name | Specifies the field name for the primary key object in the message payload when payload-key is included in cdc.include.primary-key.placement. |
cdc.incomplete.task.timeout.ms |
15000 |
Positive integer (milliseconds) | Timeout for incomplete CDC tasks waiting for preimage/postimage events. Tasks that remain incomplete longer than this duration are dropped and logged as errors. |
| Mode | Description |
|---|---|
none |
The field (before or after) will be null. No preimage/postimage data is fetched from Scylla. |
full |
The field will contain the complete row state with all columns. For before, this shows the full row before the change. For after, this shows the full row after the change. |
only-updated |
The field will contain only the columns that were modified by the operation (plus primary key columns based on cdc.include.primary-key.placement). This reduces message size when you only care about what changed. |
| Operation | before field |
after field |
|---|---|---|
| INSERT | Always null (no previous state) |
Full image regardless of mode (all columns with values) |
| UPDATE | Depends on mode: full = all columns, only-updated = only modified columns, none = null |
Depends on mode: full = all columns, only-updated = only modified columns, none = null |
| DELETE | Full preimage regardless of mode (all columns) | Always null (row was deleted) |
To use preimage/postimage support, you must enable the corresponding CDC options on your Scylla table:
-- Enable preimage only
CREATE TABLE ks.my_table (
pk int, ck int, v text, PRIMARY KEY(pk, ck)
) WITH cdc = {'enabled': true, 'preimage': true};
-- Enable postimage only
CREATE TABLE ks.my_table (
pk int, ck int, v text, PRIMARY KEY(pk, ck)
) WITH cdc = {'enabled': true, 'postimage': true};
-- Enable both preimage and postimage
CREATE TABLE ks.my_table (
pk int, ck int, v text, PRIMARY KEY(pk, ck)
) WITH cdc = {'enabled': true, 'preimage': true, 'postimage': true};# Include full row state before and after changes
cdc.include.before=full
cdc.include.after=full
# Include only modified columns (reduces message size for UPDATE operations)
cdc.include.before=only-updated
cdc.include.after=only-updated
# Mixed mode: full before state, only modified columns after
cdc.include.before=full
cdc.include.after=only-updatedThe only-updated mode is particularly useful when:
- Reducing message size: When tables have many columns but updates typically modify only a few,
only-updatedsignificantly reduces Kafka message sizes. - Change-focused consumers: When downstream consumers only need to know what changed rather than the complete row state.
- Audit trails: When combined with
cdc.include.before=full, you get complete "before" state for auditing while keeping "after" minimal.
The connector validates that the Scylla table's CDC options match the connector configuration:
- If you configure
cdc.include.before=fullorcdc.include.before=only-updatedbut the table does not have preimage enabled, the connector will report a configuration error at startup. - If you configure
cdc.include.after=fullorcdc.include.after=only-updatedbut the table does not have postimage enabled, the connector will report a configuration error at startup.
The cdc.include.primary-key.placement option controls where the primary key (partition key and clustering key) columns appear in the generated Kafka messages. This provides flexibility for different consumption patterns and downstream system requirements.
Note: This option only applies to the advanced output format (
cdc.output.format=advanced).
| Location | Description |
|---|---|
kafka-key |
Include PK/CK columns in the Kafka record key. This is essential for proper message partitioning, ordering, and log compaction. |
payload-after |
Include PK/CK columns inside the after field in the message value. Useful when consumers need complete row data in the after image. |
payload-before |
Include PK/CK columns inside the before field in the message value. Useful when consumers need complete row data in the before image. |
payload-key |
Include PK/CK columns as a top-level key object in the message value. The field name can be customized using cdc.include.primary-key.payload-key-name. |
kafka-headers |
Include PK/CK columns as Kafka message headers. Useful for routing or filtering without parsing the message body. |
# Default: PK in Kafka key and both before/after payloads
cdc.include.primary-key.placement=kafka-key,payload-after,payload-before
# Minimal: Only in Kafka key (for partitioning/compaction)
cdc.include.primary-key.placement=kafka-key
# Include PK as separate top-level field named "primaryKey"
cdc.include.primary-key.placement=kafka-key,payload-key
cdc.include.primary-key.payload-key-name=primaryKey
# Include PK in headers for routing without message parsing
cdc.include.primary-key.placement=kafka-key,kafka-headers- Log compaction: Always include
kafka-keyto ensure proper compaction behavior based on the row's primary key. - Simplified consumers: Use
payload-keyto provide a dedicated key object separate from the before/after images. - Header-based routing: Use
kafka-headerswhen downstream systems need to route or filter messages based on PK values without deserializing the message body. - Minimal message size: Remove
payload-afterandpayload-beforeif PK columns are only needed in the Kafka key.
The connector's handling of partition deletes (DELETE FROM table WHERE pk = ?) depends on the table schema and Scylla version.
For tables that have clustering keys, partition deletes are not supported and are ignored by the connector. This is because a partition delete in such tables can affect multiple rows, and Scylla CDC does not provide individual row-level information for these operations.
-- Table with clustering key - partition deletes are IGNORED
CREATE TABLE ks.with_ck (
pk int,
ck int,
v text,
PRIMARY KEY(pk, ck)
) WITH cdc = {'enabled': true};
DELETE FROM ks.with_ck WHERE pk = 1; -- This delete is IGNORED by the connectorFor tables that have only a partition key (no clustering key), partition deletes are effectively single-row deletes. Starting with Scylla 2026.1.0, these operations are fully supported:
- The connector emits a
DELETEevent (op: "d") for the operation - When preimage is enabled on the table and
cdc.include.beforeis set tofulloronly-updated, thebeforefield will be populated with the row state before deletion - Scylla generates a
PRE_IMAGErecord for these operations, allowing the connector to include the complete row data
-- Table without clustering key - partition deletes ARE SUPPORTED (Scylla 2026.1.0+)
CREATE TABLE ks.without_ck (
pk int PRIMARY KEY,
v text
) WITH cdc = {'enabled': true, 'preimage': true};
DELETE FROM ks.without_ck WHERE pk = 1; -- This delete IS captured with preimage dataConnector configuration for partition delete preimage:
cdc.include.before=full
# or
cdc.include.before=only-updated| Scylla Version | Table Type | Partition Delete Support |
|---|---|---|
| All versions | With clustering key | Not supported (ignored) |
| < 2026.1.0 | Without clustering key | Supported, but no preimage data |
| >= 2026.1.0 | Without clustering key | Fully supported with preimage data |
Scylla CDC Source Connector reads the CDC log by quering on Vnode granularity level. It uses Kafka Connect to store current progress (offset) for each Vnode. By default, there are 256 Vnodes per each Scylla node. Kafka Connect stores those offsets in its connect-offsets internal topic, but it could grow large in case of big Scylla clusters. You can minimize this topic size, by adjusting the following configuration options on this topic:
segment.bytesorsegment.ms- lowering them will make the compaction process trigger more often.cleanup.policy=deleteand settingretention.msto at least the TTL value of your Scylla CDC table (in milliseconds; Scylla default is 24 hours). Using this configuration, older offsets will be deleted. By settingretention.msto at least the TTL value of your Scylla CDC table, we make sure to delete only those offsets that have already expired in the source Scylla CDC table.
By adjusting tasks.max property, you can configure how many Kafka Connect worker tasks will be started. By scaling up the number of nodes in your Kafka Connect cluster (and tasks.max number), you can achieve higher throughput. In general, the tasks.max property should be greater or equal the number of nodes in Kafka Connect cluster, to allow the connector to start on each node. tasks.max property should also be greater or equal the number of nodes in your Scylla cluster, especially if those nodes have high shard count (32 or greater) as they have a large number of CDC Streams.
When the connector starts for the first time, it may need to catch up on a large backlog of historical CDC data. During this catch-up phase the connector can issue rapid-fire queries to Scylla, consuming significant cluster resources. The following configuration parameters allow you to throttle the connector and control the rate at which it reads data.
The connector reads the CDC log in a series of time windows. Each window is a SELECT query covering a slice of the CDC log defined by scylla.query.time.window.size (default: 30000 ms). After reading one window, the connector moves to the next one. When it reaches data that is too recent (within scylla.confidence.window.size of the present), it waits before proceeding. On first start the connector has many windows to read through sequentially, and this is where throttling matters.
| Property | Default | Description |
|---|---|---|
scylla.minimal.wait.for.window.time |
0 (disabled) |
Minimum time in milliseconds between reading consecutive CDC log windows. This is the primary throttling knob. Setting it to a positive value introduces a mandatory pause after each window read. |
scylla.query.time.window.size |
30000 |
Size of each query window in milliseconds. Smaller windows mean less data per query. |
scylla.query.options.fetch.size |
0 (driver default, typically 5000) |
Number of rows fetched per CQL page within each query. A smaller value reduces per-query memory usage but increases the number of network round trips. |
poll.interval.ms |
500 |
How often (in milliseconds) Kafka Connect polls the connector's internal queue for records. Higher values reduce how frequently batches are sent to Kafka. |
max.batch.size |
2048 |
Maximum number of records returned per poll cycle. Smaller values cap the throughput on the Kafka side. |
max.queue.size |
8192 |
Maximum size of the internal in-memory queue between the CDC reader and Kafka Connect. When the queue is full, the CDC reader blocks, creating backpressure toward Scylla. |
To estimate the right values, start from the target throughput you want to allow and work backwards.
Step 1: Decide your target read rate.
For example, you want the connector to read at most R = 1 window per second during catch-up.
Step 2: Set scylla.minimal.wait.for.window.time.
This parameter directly controls the pause between windows. If each window read takes approximately T_read milliseconds, set:
scylla.minimal.wait.for.window.time = (1000 / R) - T_read
For example, if each window read takes ~200 ms and you want 1 window/s:
scylla.minimal.wait.for.window.time = 1000 - 200 = 800
If you are unsure how long a window read takes, start with scylla.minimal.wait.for.window.time = 1000 and adjust down from there.
Step 3: Adjust the window size if needed.
Each window covers scylla.query.time.window.size milliseconds of CDC log data. The default of 30 seconds is reasonable for most workloads. If your tables have a very high write rate and each window returns a large amount of data, reduce the window size:
scylla.query.time.window.size = 10000
This makes each query smaller, which reduces per-query load on Scylla. The trade-off is that the connector needs more windows to cover the same time span.
Step 4: Limit the output side.
To prevent bursts on the Kafka side, reduce batch and queue sizes:
poll.interval.ms = 1000
max.batch.size = 512
max.queue.size = 2048
When max.queue.size is small, the internal queue fills up faster and the CDC reader blocks until Kafka Connect drains it. This creates natural backpressure.
Step 5: Reduce CQL page size for memory-constrained environments.
If Scylla nodes are under memory pressure, limit how many rows the driver fetches per network round trip:
scylla.query.options.fetch.size = 1000
The following configuration keeps the connector well-behaved during initial catch-up:
# Pause 1 second between each window read
scylla.minimal.wait.for.window.time=1000
# Read 10-second windows instead of 30-second
scylla.query.time.window.size=10000
# Fetch 1000 rows per CQL page
scylla.query.options.fetch.size=1000
# Slow down the Kafka output side
poll.interval.ms=1000
max.batch.size=512
max.queue.size=2048Once the connector has caught up with the real-time CDC stream, these throttles remain in effect but have minimal impact since the connector naturally waits for new data to appear. You can relax the values after catch-up by updating the connector configuration.