This section describes how to configure the Eventuate CDC service (Eventuate Local 0.30.X, Eventuate Tram 0.20.X).
Introduction to the CDC service
The Eventuate CDC service is responsible reading the events/messages inserted into a Transactional OUTBOX
table and publishing them to the message broker.
The following diagram shows the architecture.

The key parts are as follows:
-
Publisher service - the service that inserts a message/event into an OUTBOX table
-
Consumer service - the service that consumes the message/event
-
OUTBOX table - The EVENTS table for Eventuate Local application and the MESSAGE table for en Eventuate Tram application
-
Eventuate CDC service - the service that publishes messages/events to message broker
-
Reader - a component of the CDC that retrieves inserted messages events using either transaction log tailing or polling using a
SELECT
statement -
Pipeline - invoked by the reader to publish the message
-
Offset Store - used by the reader to record the current transaction log offset, e.g. MySQL binlog
-
Publisher - used by the pipeline to publish the message/event to the message broker
-
Zookeeper - used by the reader for leadership election when clustered
The Eventuate CDC service is a Spring Boot application. There are two flavors of the Eventuate CDC service:
-
Eventuate Local CDC service - Eventuate Local only
-
Eventuate Tram CDC service - Eventuate Local and Eventuate Tram
Supported databases
Eventuate CDC supports the following databases:
-
MySQL
-
Postgres
It can also support any RDBMS with a JDBC driver, although the CDC service would need to be rebuilt with the driver.
Supported message brokers
Eventuate CDC supports the following message brokers
-
Apache Kafka - Eventuate Local and Eventuate Tram
-
Apache ActiveMQ - Eventuate Tram only
-
RabbitMQ - Eventuate Tram only
Docker images
The Eventuate CDC service is conveniently packaged as a Docker image.
Image | Description |
---|---|
eventuateio/eventuateio-local-new-cdc-service:0.30.0.M12 |
Eventuate Local |
eventuateio/eventuate-tram-cdc-mysql-service:0.20.0.M8 |
Eventuate Tram + Eventuate Local |
TODO WIP renames:
-
eventuateio-local-new-cdc-service → eventuateio-local-cdc-service
-
eventuate-tram-cdc-mysql-service → eventuateio-tram-cdc-service
How it works
To use the Eventuate CDC service, you configure one or more readers and pipelines and a publisher
-
Reader - reads events/messages inserted into the database by either tailing the transaction log or polling using queries
-
Pipeline - transforms the inserted message/event into a message to publish to a message broker destination (topic, queue, exchange etc)
-
Publisher - invoked by the pipeline to publish the message to the message broker
About Readers
A reader retrieves each message/event inserted into the database and invokes the pipeline that’s interested in that message/event.
Strategies for reading messages/events
A CDC reader reads messages/events using one of the following mechanisms:
-
MySQL - transaction log tailing using the MySql binlog protocol
-
Postgres - transaction log tailing using Postgres WAL
-
Generic JDBC - polling the
OUTBOX
table by periodically executing a SQLSELECT
statement to retrieve messages and anUPDATE
to mark them as having been read.
MySQL binlog current offset storage
The MySQ binlog reader records its current position (file, offset) in the MySql binlog in an offset store. There are two types of offset store depending on which message broker is being used:
-
Apache Kafka topic - when using Apache Kafka
-
Database table - when a message broker other than Apache Kafka
The offset is stored using a key value of reader.mySqlBinLogClientName
property, which enables multiple readers to use the same store.
CDC Leadership election
When running a cluster of Eventuate CDC services, a reader uses a Zookeeper-based leadership election to determine which reader is active.
About Pipelines
A pipeline, which is invoked by a reader, transforms the inserted message/event into a message to publish to a message broker destination (topic, queue, exchange etc). There are two types of pipelines:
-
Eventuate Local
-
Processes events inserted into an EVENTS table
-
An event’s aggregate type determines which message broker channel the event is published to
-
-
Eventuate
-
Processes messages inserted into an MESSAGE table
-
The row’s DESTINATION column specifies the message broker channel the message is published to
-
About Publishers
A publisher is invoked by the pipeline and publishes the message/event to a message broker destination. There are three types of publishers:
-
Apache Kafka
-
Apache ActiveMQ
-
RabbitMQ
Configuring the Eventuate CDC service
The Eventuate CDC is configured using Spring Boot-style properties. You can, for example, configure an Eventuate CDC service that is deployed as a container by setting the container’s environment variables.
There are two styles of configuration:
-
Single pipeline - this is intended for simple scenarios and to simplify upgrading from previous versions
-
Multi pipeline - allows the configuration of multiple pipelines, such as when each service has its OUTBOX table, or when an application uses both Eventuate Local and Eventuate Tram
Single pipeline configuration
This style of configuration is almost identical to how previous versions of the CDC were configured. It creates a single reader and pipeline.
Reader:
-
The default reader type is MySQL binlog
-
Enable Postgres WAL by
spring.profile.active=PostgresWal
-
Enable polling by
spring.profile.active=EventuatePolling
Pipeline:
-
The pipeline type is determined by the service
-
Eventuate Local CDC service - Eventuate Local pipeline
-
Eventuate Tram CDC service - Eventuate Tram pipeline
Publishing:
-
Eventuate Local CDC service - Apache Kafka only
-
Eventuate Tram CDC service - The default is Apache Kafka but you can use Apache ActiveMQ or RabbitMQ by activating the appropriate profile (described below).
There are two sets of properties, one for the reader and another for the pipeline.
Single pipeline - Reader Properties
property | Description | Default value | Reader property name |
---|---|---|---|
spring.datasource.url |
JDBC connection url |
- |
dataSourceUrl |
spring.datasource.username |
Username to use for the connection |
- |
dataSourceUserName |
spring.datasource.password |
Password to use for the connection |
- |
dataSourcePassword |
spring.datasource.driver.class.name |
Jdbc driver class name |
- |
dataSourceDriverClassName |
eventuatelocal.cdc.leadership.lock.path |
Zookeeper node path used for the leadership election. Only one reader with the same properties is allowed at the moment. To achieve that the zookeeper leadership recipe is used. |
|
leadershipLockPath |
eventuatelocal.cdc.mysql.binlog.client.name |
(mysql-binlog only) The client key used to store the current offset in the offset store. |
- |
mySqlBinlogClientName |
eventuatelocal.cdc.offset.storage.topic.name |
(mysql-binlog) Apache Kafka topic that stores the current binlog offset. |
|
offsetStorageTopicName |
eventuatelocal.cdc.binlog.connection.timeout.in.milliseconds |
(mysql-binlog, postgres-wal only) If the CDC cannot connect to the database, it will retry after the specified timeout. |
5000 |
binlogConnectionTimeoutInMilliseconds |
eventuatelocal.cdc.max.attempts.for.binlog.connection |
(mysql-binlog, postgres-wal only) The number of connection attempts that the CDC service will make. |
100 |
maxAttemptsForBinlogConnection |
eventuatelocal.cdc.db.user.name |
(mysql-binlog only) The MySQL reader uses the separate user name with administrator privileges to read events from database. Usually ‘root’. |
- |
cdcDbUserName |
eventuatelocal.cdc.db.password |
(mysql-binlog only) Password of the MYSQL reader user. |
- |
cdcDbPassword |
eventuatelocal.cdc.binlog.client.id |
(mysql-binlog only) Unique identifier across whole replication group. |
- |
binlogClientId |
eventuatelocal.cdc.read.old.debezium.db.offset.storage.topic |
(mysql-binlog only) Boolean flag, set it to "true" to start read records from the old debezium kafka topic, set it to "false" to start read records from the new cdc kafka topic. |
- |
ReadOldDebeziumDbOffsetStorageTopic |
eventuatelocal.cdc.polling.interval.in.milliseconds |
(polling only) Sleep time between polling queries |
500 |
pollingIntervalInMilliseconds |
eventuatelocal.cdc.max.events.per.polling |
(polling only) Event count requested by each polling query |
1000 |
maxEventsPerPolling |
eventuatelocal.cdc.max.attempts.for.polling |
(polling only) If polling fails, reader will try again, but not more than specified times. |
100 |
maxAttemptsForPolling |
eventuatelocal.cdc.polling.retry.interval.in.milleseconds |
(polling only) If polling fails, reader will try again using the specified interval. |
500 |
pollingRetryIntervalInMilliseconds |
Single pipeline - Pipeline Properties
property | Description | Default value | Pipeline property name |
---|---|---|---|
eventuate.database.schema |
Schema which is listened by the CDC service |
eventuate |
eventuateDatabaseSchema |
eventuatelocal.cdc.source.table.name |
Name of the table to read events/messages from |
Depends on the pipeline type.
|
sourceTableName |
Multi-pipeline configuration
The new style configuration supports multiple readers and pipelines. You can, for example, have a single CDC service, which supports both Eventuate Local and Eventuate Tram services. See, for example, FTGO application.
Configuring a reader
A reader is defined using Spring framework properties that obey the following naming convention: eventuate.cdc.reader.<reader name>.<property name>
.
For example, you can configure a reader called READER1
using environment variables such as these:
EVENTUATE_CDC_READER_READER1_TYPE: mysql-binlog
EVENTUATE_CDC_READER_READER1_DATASOURCEURL: jdbc:mysql://${DOCKER_HOST_IP}:3306/eventuate
EVENTUATE_CDC_READER_READER1_DATASOURCEUSERNAME: mysqluser
...
A reader has the properties shown in the following table.
Name | Description | Default Value |
---|---|---|
type |
Type of the reading mechanism. Supported types are |
- |
dataSourceUrl |
JDBC connection url |
- |
dataSourceUserName |
Username to use for the connection |
- |
dataSourcePassword |
Password to use for the connection |
- |
dataSourceDriverClassName |
Jdbc driver class name |
- |
leadershipLockPath |
Zookeeper node path used for the leadership election. The Zookeeper leadership recipe is used to select a leader. Example value: |
- |
mySqlBinlogClientName |
(mysql-binlog only) The client key used to store the current offset in the offset store. |
- |
offsetStorageTopicName |
(mysql-binlog) Apache Kafka topic used to store the current reader offset |
|
binlogConnectionTimeoutInMilliseconds |
(mysql-binlog, postgres-wal only) If connection to the database failed, cdc service will try to connect again after the specified timeout. |
5000 |
maxAttemptsForBinlogConnection |
(mysql-binlog, postgres-wal only) If connection to the database failed, cdc service will try to connect again but not more than specified times. |
100 |
cdcDbUserName |
(mysql-binlog only) The MySQL reader uses the separate user name with administrator privileges to read events from database. Usually ‘root’. |
- |
cdcDbPassword |
(mysql-binlog only) Password of the MYSQL reader user. |
- |
binlogClientId |
(mysql-binlog only) Unique identifier across whole replication group. |
- |
readOldDebeziumDbOffsetStorageTopic |
(mysql-binlog only). Boolean flag, set it to "true" to start read records from the old debezium kafka topic, set it to "false" to start read records from the new cdc kafka topic. |
- |
postgresWalIntervalInMilliseconds |
(postgres-wal only) Specifies the time interval between status packets sent back to Postgres. A value of zero disables the periodic status updates completely, although an update will still be sent when requested by the Postgres, to avoid timeout disconnect. |
10 |
postgresReplicationStatusIntervalInMilliseconds |
(postgres-wal only) Time to sleep when events are not received. |
1000 |
postgresReplicationSlotName |
(postgres-wal only) Name of the replication slot used to read events/messages. |
|
pollingIntervalInMilliseconds |
(polling only) Sleep time between polling queries |
500 |
maxEventsPerPolling |
(polling only) Event count requested by each polling query |
1000 |
maxAttemptsForPolling |
(polling only) If polling fails, reader will try again, but not more than specified times. |
100 |
pollingRetryIntervalInMilliseconds |
(polling only) If polling fails, reader will try again using the specified interval. |
500 |
Configuring a pipeline
A pipeline is configured using Spring framework properties that obey the following naming convention: eventuate.cdc.pipeline.<pipeline name>.<property name>
.
For example, you can configure a pipeline called PIPELINE1
using environment variables such as these:
EVENTUATE_CDC_PIPELINE_PIPELINE1_TYPE: eventuate-local
EVENTUATE_CDC_PIPELINE_PIPELINE1_READER: reader1
...
A pipeline has the properties shown in the following table.
Name | Description | Default Value |
---|---|---|
type |
Type of a pipeline. The Eventuate Local CDC only supports The Eventuate Tram CDC supports |
- |
eventuateDatabaseSchema |
The schema of the transaction outbox table |
|
sourceTableName |
Name of the transactional outbox table |
Depends on the pipeline type.
|
reader |
Name of the reader. |
- |
Configuring the publisher
The publisher is invoked by the pipeline to publish a message/event to the message broker. The Eventuate Local CDC service only supports Apache Kafka, since it relies on Kafka’s message retention capability. By default, the Eventuate Tram CDC service publishes messages to Apache Kafka. But it also supports Apache ActiveMQ and RabbitMQ.
Apache Kafka
Name | Description | Default Value |
---|---|---|
eventuatelocal.kafka.bootstrap.servers |
comma-separated list of host and port pairs |
- |
Apache Kafka Consumer Properties
The Eventuate CDC uses an Apache Kafka Consumer to read the current offset from the Apache Kafka-based offset store topic. The currently configured consumer properties are as follows:
Name | Value |
---|---|
auto.offset.reset |
earliest |
group.id |
Random UUID |
enable.auto.commit |
false |
auto.commit.interval.ms |
1000 |
session.timeout.ms |
30000 |
key.deserializer |
org.apache.kafka.common.serialization.StringDeserializer |
value.deserializer |
org.apache.kafka.common.serialization.StringDeserializer |
You can override these properties and supply other Apache Kafka Consumer properties by defining properties prefixed with eventuate.local.kafka.consumer.properties
.
Apache Kafka Producer Properties
The Eventuate CDC uses an Apache Kafka producer to publish messages/events to Apache Kafka and to record the current offset in the Apache Kafka-based offset store topic. The currently configured producer properties are as follows.
Name | Default Value |
---|---|
acks |
all |
retries |
0 |
batch.size |
16384 |
linger.ms |
1 |
buffer.memory |
33554432 |
key.serializer |
org.apache.kafka.common.serialization.StringDeserializer |
value.serializer |
org.apache.kafka.common.serialization.StringDeserializer |
You can override these properties and supply other Apache Kafka Producer properties by defining properties prefixed with eventuate.local.kafka.producer.properties
.
Apache ActiveMQ
To publish messages to Apache ActiveMQ, please enable the ActiveMQ
Spring profile using spring.profiles.active=ActiveMQ
.
Name | Description | Default Value |
---|---|---|
activemq.url |
Url of the activemq server. |
Format: <protocol>://<ip>:<port> Example: tcp://172.17.0.1:61616 |
RabbitMQ
To publish messages to RabbitMQ, please enable the RabbitMQ
Spring profile using spring.profiles.active=RabbitMQ
.
Name | Description | Default Value |
---|---|---|
rabbitmq.host |
Host of the RabbitMQ server. Example: 172.17.0.1 |
- |
rabbitmq.port |
Port of the RabbitMQ server |
- |
Other configuration
The Eventuate CDC service also has the following configuration properties.
Name | Description | Default value |
---|---|---|
eventuatelocal.cdc.max.event.interval.to.assume.reader.healthy |
The number of milliseconds within which an event must be received in order for a MySQL binlog/Postgres WAL-based reader to be considered healthy |
60000 |
TODO: This should probably be replaced with N x reader.replicationLagMeasuringIntervalInMilliseconds
Configuring infrastructure services
The CDC service requires various infrastructure services including:
-
Relational database, such as MySQL or Postgres
-
Message broker, such as Apache Kafka
CDC Database schema
The Eventuate CDC service requires several tables. The Eventuate MySQL and Postgres images define these tables.
CDC_MONITORING
table
The CDC service uses the CDC_MONITORING
table to implement a 'heart beat' mechanism.
Each reader that uses transaction log tailing (MySQL binlog/Postgres WAL) periodically updates a row in this table and measures the delay in receiving the update from the transaction log.
OFFSET_STORE
table
When publishing messages to Apache ActiveMQ or RabbitMQ, the MySql binlog reader records the current binlog position in this table.
Apache Kafka broker configuration
The MySQL binlog reader records the current binlog position in the offsetStorageTopicName
topic.
This topic should have the following configuration:
-
Compacted topic with a relatively short retention time
-
Replication factor of 3
CDC service observability
The Eventuate CDC service has several features that enable monitoring in a production environment.
Health Check endpoint
The Eventuate CDC service has a standard Spring Boot health check endpoint <eventuate-local-base-url>/actuator/health
.
It reports on the health of the following:
-
Apache Zookeeper - verifies that the CDC service can connect to Apache Zookeeper
-
Apache Kafka - verifies that the CDC service can connect to Apache Kafka
-
CDC readers - the health of a reader is determined as follows:
-
A reader is always healthy when it is not leader
-
A reader that is the leader is unhealthy when one of the following is true:
-
It is not connected to the database (PostgresWal, MySqlBinlog only)
-
It did not receive events recently as defined by the
eventuatelocal.cdc.max.event.interval.to.assume.reader.healthy
property, which defaults to 60 seconds. (PostgresWal, MySqlBinlog only)
-
-
-
Publisher - is unhealthy when the last attempt at publishing the message failed.
Metrics
The Eventuate CDC service publishes the standard Spring Boot/Prometheus metrics via the following endpoint <eventuate-local-base-url>/actuator/prometheus
.
Name | Type | Description |
---|---|---|
eventuate.cdc.messages.processed |
Counter |
Count of the processed events (inserts/updates into cdc the related tables). |
eventuate.cdc.binlog.entries.processed |
Counter |
Count of the Inserts/Updates into the all tables. (PostgresWal, MySqlBinlog only) |
eventuate.cdc.leader |
Gauge |
1 If binlog entry reader is leader, 0 otherwise |
eventuate.cdc.connection.attempts |
Counter |
Each time when reader reconnects to database it is increased by 1. (PostgresWal, MySqlBinlog only) |
eventuate.cdc.replication.lag |
Gauge (should be replaced by DistributionSummary) |
Time in milliseconds between event is inserted into table and read by the binlog entry reader. (PostgresWal, MySqlBinlog only) |
eventuate.cdc.replication.lag.age |
Gauge |
Time in milliseconds since the last lag measurement. (PostgresWal, MySqlBinlog only) |
eventuate.cdc.connected.to.database |
Gauge |
1 if reader is connected to database. 0 otherwise. (PostgresWal, MySqlBinlog only) |
eventuate.cdc.event.age |
Gauge |
Shows time in milliseconds between event created and published. |
eventuate.cdc.events.published |
Counter |
Indicates how many events were published to message broker. |
eventuate.cdc.events.duplicates |
Counter. |
Indicates how many event duplicates were found. |
eventuate.cdc.events.retries |
Counter. |
Increased by 1 when message publishing is failed and retried. |
Upgrading to the UCDC
The configuration properties of the UCDC are different than older versions of the CDC. When upgrading, you need to specify configuration properties that correctly correspond to those of the older CDC. In particular, if you are using MySQL you need to ensure that the UCDC is correctly configured to read the binlog offset maintained by the older CDC.
The upgrade process consists of the following steps:
-
Stop old CDC
-
Apply schema migration to add new tables
-
Configure the new UCDC
-
Start the new UCDC
Migrating the schema
You will need to update the database schema with a script similar to:
CREATE TABLE cdc_monitoring (reader_id BIGINT PRIMARY KEY, last_time BIGINT);
CREATE TABLE offset_store(client_name VARCHAR(255) NOT NULL PRIMARY KEY, serialized_offset VARCHAR(255));
ALTER TABLE message ADD creation_time BIGINT;
ALTER TABLE received_messages ADD creation_time BIGINT;
This script creates two new tables and adds the creation_time
column to the messaging tables.
Configuring the new CDC
There are few different upgrade scenarios depending on your starting point:
-
Old (Debezium-based) CDC -
eventuateio/eventuateio-local-cdc-service:0.22.1.RELEASE
or older -
'New' Eventuate Local CDC -
eventuateio/eventuateio-local-new-cdc-service:0.22.1.RELEASE
or older -
Eventuate Tram CDC -
eventuateio/eventuate-tram-cdc-mysql-service:0.11.1.RELEASE
or older -
Multiple older Eventuate Local/Eventuate Tram CDCs
Upgrading from old (Debezium-based) Eventuate Local CDC
In order to upgrade from the Debezium-based Eventuate Local CDC for MySQL (eventuateio/eventuateio-local-cdc-service:0.22.1.RELEASE
or older) please define these additional properties:
Name | Value |
---|---|
EVENTUATELOCAL_CDC_READ_OLD_DEBEZIUM_DB_OFFSET_STORAGE_TOPIC |
true |
EVENTUATELOCAL_CDC_BINLOG_CLIENT_ID |
85744 (the value used by the Debezium-based Eventuate Local CDC) |
EVENTUATELOCAL_CDC_MYSQL_BINLOG_CLIENT_NAME |
MySqlBinlog |
The EVENTUATELOCAL_CDC_READ_OLD_DEBEZIUM_DB_OFFSET_STORAGE_TOPIC
property configures the UCDC to Debezium-maintained offset from the old offset store Apache Kafka topic.
Upgrading from 'new' Eventuate Local CDC
When upgrading from the 'new' Eventuate Local CDC (eventuateio/eventuateio-local-new-cdc-service:0.22.1.RELEASE
or older), you need to make a few changes.
Some properties have changed names and default values.
New Eventuate Local CDC | UCDC | ||
---|---|---|---|
Name |
Default value |
Name |
Default value |
eventuatelocal.cdc.my.sql.bin.log.client.name |
MySqlBinLog |
eventuatelocal.cdc.mysql.binlog.client.name |
- |
eventuatelocal.cdc.db.history.topic.name |
db.history.topic |
eventuatelocal.cdc.offset_storage.topic.name |
offset.storage.topic |
eventuatelocal.cdc.old.debezium.db.offset.storage.topic.name |
eventuate.local.cdc.my-sql-connector.offset.storage |
eventuatelocal.cdc.read.old.debezium.db.history.topic |
- |
For example, define these additional properties:
Name | Value |
---|---|
EVENTUATELOCAL_CDC_READ_OLD_DEBEZIUM_DB_OFFSET_STORAGE_TOPIC |
false |
EVENTUATELOCAL_CDC_OFFSET_STORAGE_TOPIC_NAME |
db.history.topic (or whatever was specified previously) |
EVENTUATELOCAL_CDC_BINLOG_CLIENT_ID |
1 (or whatever was specified previously) |
From Eventuate Tram CDC
To upgrade from the Eventuate Tram CDC ()eventuateio/eventuate-tram-cdc-mysql-service:0.11.1.RELEASE
or older), you must define these properties.
Name | Value |
---|---|
EVENTUATELOCAL_CDC_READ_OLD_DEBEZIUM_DB_OFFSET_STORAGE_TOPIC |
false |
EVENTUATELOCAL_CDC_MYSQL_BINLOG_CLIENT_NAME |
MySqlBinLog (or whatever was specified previously) |
Upgrading an application that has multiple CDC services to a single CDC service
You can replace multiple CDC services, such as one for Eventuate Local and another for Eventuate Tram, with a single CDC service configured with multiple readers and pipelines. For example, the FTGO application uses a single CDC service with Eventuate Local and Eventuate Tram pipelines.
Before | After |
---|---|
|
|