Getting started with Eventuate Tram

Read this document to learn how to get started with Eventuate Tram(and Eventuate Tram Quarkus), frameworks for transactional messaging. Eventuate Tram sends and receives messages as part of a database transaction ensuring that your application atomically updates the database and publishes messages. Currently, it supports the following databases:

  • Transaction log tailing: MySQL, Postgres

  • Polling: Other SQL databases including Microsoft SQL Server

And, the following message brokers:

  • Apache Kafka

  • ActiveMQ

  • RabbitMQ

  • Redis

JavaDocs

Project setup

Latest library versions:

Spring/Micronaut

eventuate tram bom

Quarkus

eventuate tram quarkus bom

Gradle

In gradle.properties:

eventuateTramVersion=LATEST_VERSION
eventuateTramQuarkusVersion=LATEST_VERSION

In build.gradle, specify these Maven repositories:

repositories {
    mavenCentral()
}

Include one or more of the following dependencies, depending on which API and implementation you want to use.

API artifacts
dependencies {

  // Basic messaging

  compile "io.eventuate.tram.core:eventuate-tram-<framework>-messaging:$eventuateTramVersion"

  // Domain events

  compile "io.eventuate.tram.core:eventuate-tram-<framework>-events:$eventuateTramVersion"

  // Command/Async Reply messaging

  compile "io.eventuate.tram.core:eventuate-tram-<framework>-commands:$eventuateTramVersion"

where

  • framework is spring, micronaut or quarkus

Producer implementation artifact

If your service is a producer, you must add this dependency:

dependencies {

  // Producer implementation

  compile "io.eventuate.tram.core:eventuate-tram-<framework>-producer-jdbc:$eventuateTramVersion"

}
Consumer implementation artifacts

If your service is a consumer, you must add a message broker-specific dependency:

dependencies {

  // Consumer implementation

  compile "io.eventuate.tram.core:eventuate-tram-<framework>-consumer-<message-broker>:$eventuateTramVersion"

}

where

  • framework is spring, micronaut or quarkus

  • message-broker is kafka, apachemq, rabbitmq or redis.

Note: micronaut and quarkus only support kafka

If you want to use JDBC-based idempotency (see below) also include this dependency:

dependencies {

  // Consumer idempotency

  compile "io.eventuate.tram.core:eventuate-tram-<framework>-consumer-jdbc:$eventuateTramVersion"

}

where

  • framework is spring or micronaut

See below for more details.

All-in-one artifacts

If you need all of the above artifacts, then you can include this one dependency:

dependencies {

  // Consumer/Producer all-in-one

  compile "io.eventuate.tram.core:eventuate-tram-<framework>-jdbc-<message-broker>:$eventuateTramVersion"

}

where

  • framework is spring or micronaut

  • message-broker is kafka, apachemq, rabbitmq or redis.

Note: micronaut only supports kafka

Eventuate BOM

eventuate platform dependencies

You can use the Eventuate BOM to avoid needing to specify the artifact versions:

dependencies {
    implementation(platform("io.eventuate.platform:eventuate-platform-dependencies:$eventuateBomVersion"))
}

You can then specify artifacts as follows:

dependencies {
  compile "io.eventuate.tram.core:eventuate-tram-spring-messaging"
}

Transactional messages

Eventuate Tram has APIs for sending and receiving messages as part of a database transaction.

Sending messages

Send a message using MessageProducer:

Spring
public abstract class AbstractTramMessageTest {

  @Autowired
  private MessageProducer messageProducer;

  @Test
  public void shouldReceiveMessage() {
    ...
    messageProducer.send(destination, MessageBuilder.withPayload(payload).build());
    ...
  }

See this example of sending messages.

Micronaut/Quarkus
public abstract class AbstractTramMessageTest {

  @Inject
  private MessageProducer messageProducer;

  @Test
  public void shouldReceiveMessage() {
    ...
    messageProducer.send(destination, MessageBuilder.withPayload(payload).build());
    ...
  }

Consuming messages

Receive messages using MessageConsumer:

Spring
public abstract class AbstractTramMessageTest {

  @Autowired
  private MessageConsumer messageConsumer;

  @Test
  public void shouldReceiveMessage() throws InterruptedException {
    messageConsumer.subscribe(subscriberId, Collections.singleton(destination), this::handleMessage);
    ...
  }

  private void handleMessage(Message message) {
    ...
  }
}

See this example of consuming messages.

Micronaut/Quarkus
public abstract class AbstractTramMessageTest {

  @Inject
  private MessageConsumer messageConsumer;

  @Test
  public void shouldReceiveMessage() throws InterruptedException {
    messageConsumer.subscribe(subscriberId, Collections.singleton(destination), this::handleMessage);
    ...
  }

  private void handleMessage(Message message) {
    ...
  }
}

Message interceptors

Eventuate Tram implements message interceptors. Your service can define one or more message interceptors to inject custom logic into the various message processing steps. It could, for example, add custom headers to a message.

A message interceptor must implement the MessageInterceptor interface.

public interface MessageInterceptor {

  default void preSend(Message message) {}
  default void postSend(Message message, Exception e) {}

  default void preReceive(Message message) {}
  default void preHandle(String subscriberId, Message message) {}
  default void postHandle(String subscriberId, Message message, Throwable throwable) {}
  default void postReceive(Message message) {}

}

Message handler decorators

Eventuate Tram implements message handler decorators. They are equivalent to Servlet Filters and are invoked when handling a message. Eventuate Tram uses them to implement message consumer mechanisms such as idempotency and optimistic locking. Your service can also define custom decorators.

MessageHandlerDecorator is the interface that message handler decorators must implement:

public interface MessageHandlerDecorator
     extends BiConsumer<SubscriberIdAndMessage, MessageHandlerDecoratorChain> {
  int getOrder();
}

The @MessageHandlerDecorator`’s in the `ApplicationContext are executed in the order defined by getOrder().

Idempotent message consumers

A message broker might deliver the same message more than once. A message consumer must typically be idempotent. Some message consumers are naturally idempotent, e.g. account.balance = event.newBalance, and can be invoked repeatedly without error. But others are not - e.g. account.balance += event.amount - and must be made idempotent by tracking the messages that they have consumed.

Eventuate Tram has an pluggable duplicate message detection mechanism. DuplicateMessageDetector is the strategy interface. There are the following concrete implementations:

  • SqlTableBasedDuplicateMessageDetector - tracks successfully processed message IDs in a RECEIVED_MESSAGES tables. Your message handling logic is executed within a transaction managed by this class.

  • NoopDuplicateMessageDetector - does nothing. Use this implementation when your message handlers are idempotent. For example, the Order History Service in the FTGO application tracks message IDs in a Dynamodb table

  • TransactionalNoopDuplicateMessageDetector - it doesn’t track message IDs but it does manage transactions.

A note about transaction management

If you use a DuplicateMessageDetector that manages transactions then you must not manage transactions (e.g. use @Transactional) in your message handlers.

Your service’s ApplicationContext must define an DuplicateMessageDetector bean.

Spring

SqlTableBasedDuplicateMessageDetector

TransactionalNoopDuplicateMessageDetector

NoopDuplicateMessageDetector

Dependency

eventuate-tram-spring-consumer-jdbc (or indirectly via eventuate-tram-spring-jdbc-<message-broker>)

eventuate-tram-spring-consumer-jdbc (or indirectly via eventuate-tram-spring-jdbc-<message-spring-jdbc-<message-broker>)

-

@Configuration

Either auto-configuration or @Import TramConsumerJdbcAutoConfiguration

@Import TransactionalNoopDuplicateMessageDetectorConfiguration

@Import NoopDuplicateMessageDetector

dependency {
  compile "io.eventuate.tram.core:eventuate-tram-spring-consumer-jdbc:$eventuateTramVersion"
}
Micronaut

SqlTableBasedDuplicateMessageDetector

TransactionalNoopDuplicateMessageDetector

NoopDuplicateMessageDetector

Dependency

eventuate-tram-micronaut-consumer-jdbc directly or indirectly via eventuate-tram-micronaut-jdbc-<message-broker>

eventuate-tram-micronaut-consumer-jdbc directly or indirectly via eventuate-tram-micronaut-jdbc-<message-broker>

-

Configuration

do not set transactional.noop.duplicate.message.detector.factory.enabled

set transactional.noop.duplicate.message.detector.factory.enabled

The default if no other DuplicateMessageDetector bean is defined

dependency {
  compile "io.eventuate.tram.core:eventuate-tram-micronaut-consumer-jdbc:$eventuateTramVersion"
}
Quarkus

SqlTableBasedDuplicateMessageDetector

TransactionalNoopDuplicateMessageDetector

NoopDuplicateMessageDetector

Dependency

eventuate-tram-quarkus-consumer-jdbc directly or indirectly via eventuate-tram-quarkus-jdbc-<message-broker>

eventuate-tram-quarkus-consumer-jdbc directly or indirectly via eventuate-tram-quarkus-jdbc-<message-broker>

-

Configuration

do not set transactional.noop.duplicate.message.detector.configuration.enabled

set transactional.noop.duplicate.message.detector.configuration.enabled

The default if no other DuplicateMessageDetector bean is defined

Optimistic locking

If your application uses a DuplicateMessageDetector that manages transactions and your message handler uses optimistic locking then you must configure Eventuate Tram to retry the transaction when an optimistic locking failure occurs.

Spring

To configure retries, you must:

  • Add eventuate-tram-optimistic-locking as a dependency

  • @Import OptimisticLockingDecoratorConfiguration

This configures an OptimisticLockingDecorator @Bean, which is a MessageHandlerDecorator that retries when an OptimisticLockingFailureException is thrown.

Micronaut

Not yet implemented: Issue #46

Transactional domain events

The domain event package builds on the transaction messaging APIs.

Publishing domain events

Publish domain events using the DomainEventPublisher interface:

Spring
public abstract class AbstractTramEventTest {

  @Autowired
  private DomainEventPublisher domainEventPublisher;

  @Test
  public void shouldReceiveEvent() throws InterruptedException {
    long uniqueId = config.getUniqueId();
    String accountId = ...;

    DomainEvent domainEvent = new AccountDebited(...);

    domainEventPublisher.publish("Account", accountId, Collections.singletonList(domainEvent));

To publish events you need to @Import the TramEventsPublisherConfiguration @Configuration class:

@Configuration
@Import(TramEventsPublisherConfiguration.class)
public class AbstractTramEventTestConfiguration {
...

Instead of explicitly @Import-ing TramEventsPublisherConfiguration you can rely on the auto-configuration provided by eventuate-tram-spring-events-publisher-starter:

dependencies {
  compile "io.eventuate.tram.core:eventuate-tram-spring-events-publisher-starter:$eventuateTramVersion"
}

Note: eventuate-tram-spring-events-publisher-starter includes eventuate-tram-spring-messaging-starter

See this example of transaction events.

Micronaut/Quarkus
public abstract class AbstractTramEventTest {

  @Inject
  private DomainEventPublisher domainEventPublisher;

  @Test
  public void shouldReceiveEvent() throws InterruptedException {
    long uniqueId = config.getUniqueId();
    String accountId = ...;

    DomainEvent domainEvent = new AccountDebited(...);

    domainEventPublisher.publish("Account", accountId, Collections.singletonList(domainEvent));

To publish events you need to have TramEventsPublisherFactory class in your classpath (see dependency configuration section):

dependencies {
    compile "io.eventuate.tram.core:eventuate-tram-<micronaut|quarkus>-events:$eventuateTramVersion"
}

See these example of publishing events:

Consuming domain events

First, define DomainEventHandlers:

public class TramEventTestEventConsumer {

  public DomainEventHandlers domainEventHandlers() {
    return DomainEventHandlersBuilder
            .forAggregateType("Account")
            .onEvent(AccountDebited.class, this::handleAccountDebited)
            .build();
  }

  public void handleAccountDebited(DomainEventEnvelope<AccountDebited> event) {
    ...
  }

}

Second, import TramEventSubscriberConfiguration and configure a DomainEventDispatcher:

Spring
@Configuration
@Import(TramEventSubscriberConfiguration.class)
public class AbstractTramEventTestConfiguration {

  @Bean
  public DomainEventDispatcher domainEventDispatcher(DomainEventDispatcherFactory domainEventDispatcherFactory,
                                                     AbstractTramEventTestConfig config,
                                                     TramEventTestEventConsumer target) {
    return domainEventDispatcherFactory.make("eventDispatcherId" + config.getUniqueId(),
                                             target.domainEventHandlers());
  }

  @Bean
  public TramEventTestEventConsumer tramEventTestTarget(AbstractTramEventTestConfig config) {
    return new TramEventTestEventConsumer();
  }

See this example of transaction events.

Instead of explicitly TramEventSubscriberConfiguration you can use the auto-configuration provided by eventuate-tram-spring-events-subscriber-starter:

dependencies {
  compile "io.eventuate.tram.core:eventuate-tram-spring-events-subscriber-starter:$eventuateTramVersion"
}

There is also a starter for both event publishers and subscribers:

dependencies {
  compile "io.eventuate.tram.core:eventuate-tram-spring-events-starter:$eventuateTramVersion"
}
Micronaut/Quarkus
@Factory
public class AbstractTramEventTestFactory {

  @Context
  public DomainEventDispatcher domainEventDispatcher(AbstractTramEventTestConfig config,
                                                     TramEventTestEventConsumer target,
                                                     DomainEventDispatcherFactory domainEventDispatcherFactory) {
    return domainEventDispatcherFactory.make("eventDispatcherId" + config.getUniqueId(), target.domainEventHandlers());
  }

  @Singleton
  public TramEventTestEventConsumer tramEventTestTarget(AbstractTramEventTestConfig config) {
    return new TramEventTestEventConsumer(config.getAggregateType());
  }
}

See this example of transactional events.

Please note: because of how Micronaut behaves, if you want to define several dispatchers, you must used the @Named annotation on the DomainEventDispatcher bean definitions.

Quarkus
@Singleton
public class AbstractTramEventTestConfiguration {
  @Singleton
  public DomainEventDispatcher domainEventDispatcher(AbstractTramEventTestConfig config,
                                                     TramEventTestEventConsumer target,
                                                     DomainEventDispatcherFactory domainEventDispatcherFactory) {
    return domainEventDispatcherFactory.make("eventDispatcherId" + config.getUniqueId(), target.domainEventHandlers());
  }
  @Singleton
  public TramEventTestEventConsumer tramEventTestTarget(AbstractTramEventTestConfig config) {
    return new TramEventTestEventConsumer(config.getAggregateType());
  }
}

See this example of transaction events.

Transactional commands

Transactional commands are implemented using transactional messaging.

Sending commands

Send a command using a CommandProducer:

Spring
public abstract class AbstractTramCommandTest {

  @Autowired
  private CommandProducer commandProducer;

  @Test
  public void shouldInvokeCommand() throws InterruptedException {

    String commandId = commandProducer.send("CustomerCommandChannel",
            new DoSomethingCommand(),
            "ReplyToChannel",
            Collections.emptyMap());

You also need to @Import the TramCommandProducerConfiguration @Configuration class:

@Configuration
@Import(TramCommandProducerConfiguration.class)
public class AbstractTramCommandTestConfiguration {

Alternatively, you can use the auto-configuration provided by eventuate-tram-spring-commands-starter:

dependencies {
  compile "io.eventuate.tram.core:eventuate-tram-spring-commands-starter:$eventuateTramVersion"
}

Note: eventuate-tram-spring-commands-starter includes eventuate-tram-spring-messaging-starter

To handle a reply message, simply subscribe to the ReplyChannel.

messageConsumer.subscribe(subscriberId, "ReplyToChannel", this::handleMessage);

See this example of transactional commands.

Micronaut
public abstract class AbstractTramCommandTest {

  @Inject
  private CommandProducer commandProducer;

  @Test
  public void shouldInvokeCommand() throws InterruptedException {

    String commandId = commandProducer.send("CustomerCommandChannel",
            new DoSomethingCommand(),
            "ReplyToChannel",
            Collections.emptyMap());

You also need to have the TramCommandProducerFactory in your classpath (see dependency configuration section):

dependencies {
    compile "io.eventuate.tram.core:eventuate-tram-micronaut-commands:$eventuateTramVersion"
}

To handle a reply message, simply subscribe to the ReplyChannel.

messageConsumer.subscribe(subscriberId, "ReplyToChannel", this::handleMessage);

See this example of transactional commands.

Quarkus
public abstract class AbstractTramCommandTest {
  @Inject
  private CommandProducer commandProducer;
  @Test
  public void shouldInvokeCommand() throws InterruptedException {
    String commandId = commandProducer.send("CustomerCommandChannel",
            new DoSomethingCommand(),
            "ReplyToChannel",
            Collections.emptyMap());

You also need to have the TramCommandProducerFactory in your classpath (see dependency configuration section):

dependencies {
    compile "io.eventuate.tram.core:eventuate-tram-quarkus-commands:$eventuateTramVersion"
}

To handle a reply message, simply subscribe to the ReplyChannel.

messageConsumer.subscribe(subscriberId, "ReplyToChannel", this::handleMessage);

See this example of transactional commands.

Handling commands

First, define CommandHandlers:

public class TramCommandTestCommandHandler {

  public Message doSomething(CommandMessage<DoSomethingCommand> cm, PathVariables pvs) {
    ...
    return withSuccess();
  }

  public CommandHandlers getCommandHandlers() {
    return CommandHandlersBuilder
            .fromChannel("CustomerCommandChannel")
            .onMessage(DoSomethingCommand.class, this::doSomething)
            .build();

  }

Second, import TramCommandConsumerConfiguration and define a CommandDispatcher:

Spring
@Configuration
@Import(TramCommandConsumerConfiguration.class)
public class AbstractTramCommandTestConfiguration {

  @Bean
  public CommandDispatcher commandDispatcher(CommandDispatcherFactory commandDispatcherFactory,
                                             AbstractTramCommandTestConfig config,
                                             AbstractTramCommandTestCommandHandler target) {
  return commandDispatcherFactory.make("customerServiceCommandDispatcher",
                                       target.getCommandHandlers());
}

@Bean
public TramCommandTestCommandHandler abstractTramCommandTestTarget(AbstractTramCommandTestConfig config) {
  return new TramCommandTestCommandHandler(config.getCommandChannel());
}

Instead of explicitly @Import-ing TramCommandConsumerConfiguration you can use the auto-configuration provided by eventuate-tram-spring-commands-starter:

dependencies {
  compile "io.eventuate.tram.core:eventuate-tram-spring-commands-starter:$eventuateTramVersion"
}
Micronaut
@Factory
public class AbstractTramCommandTestFactory {

  @Singleton
  public TramCommandTestCommandHandler abstractTramCommandTestTarget(AbstractTramCommandTestConfig config) {
    return new TramCommandTestCommandHandler(config.getCommandChannel());
  }

  @Singleton
  public CommandDispatcher commandDispatcher(CommandDispatcherFactory commandDispatcherFactory,
                                             AbstractTramCommandTestConfig config,
                                             TramCommandTestCommandHandler target) {
    return commandDispatcherFactory.make(config.getCommandDispatcheId(), target.getCommandHandlers());
  }
}

See this example of transactional commands.

Quarkus
@Singleton
public class AbstractTramCommandTestConfiguration {
  @Singleton
  public TramCommandTestCommandHandler abstractTramCommandTestTarget(AbstractTramCommandTestConfig config) {
    return new TramCommandTestCommandHandler(config.getCommandChannel());
  }
  @Singleton
  public CommandDispatcher commandDispatcher(CommandDispatcherFactory commandDispatcherFactory,
                                             AbstractTramCommandTestConfig config,
                                             TramCommandTestCommandHandler target) {
    return commandDispatcherFactory.make(config.getCommandDispatcheId(), target.getCommandHandlers());
  }
}

See this example of transactional commands.

Configuring the transport

Spring

You also need to configure the transport mechanism, specifically the message broker. You do this importing one of the TramJdbc<MessageBroker>Configuration classes:

  • TramJdbcKafkaConfiguration

  • TramJdbcActiveMQConfiguration

  • TramJdbcRabbitMQConfiguration

  • TramJdbcRedisConfiguration

  • TramInMemoryConfiguration - in-memory JDBC and messaging

For example, if you want to use JDBC/Apache Kafka then @Import TramJdbcKafkaConfiguration:

@Configuration
@EnableAutoConfiguration
@Import({TramJdbcKafkaConfiguration.class})
public class JdbcKafkaTramMessageTestConfiguration {
}

The TramJdbc<MessageBroker>Configuration @Configuration classes configure both a producer and a consumer. If you have a service that is either only a producer or only a consumer you can use a more specific @Configuration class.

To configure a producer, @Import the TramMessageProducerJdbcConfiguration @Configuration class.

To consumer a consumer, @Import one of the EventuateTram<MessageBroker>MessageConsumerConfiguration @Configuration classes:

  • EventuateTramKafkaMessageConsumerConfiguration

  • EventuateTramActiveMQMessageConsumerConfiguration

  • EventuateTramRabbitMQMessageConsumerConfiguration

  • EventuateTramRedisMessageConsumerConfiguration

For example, see the FTGO application’s Order History Service, which is a consumer-only service.

Instead of explicitly @Import-ing a transport-specific configuration class, you can instead using the auto-configuration provided by eventuate-tram-spring-messaging-starter:

dependencies {
  compile "io.eventuate.tram.core:eventuate-tram-spring-messaging-starter:$eventuateTramVersion"
}

Micronaut/Quarkus

For now, the Micronaut and Quarkus version of Eventuate support only Apache Kafka as the transport mechanism.

You don’t need to create additional configuration, you only need to setup dependencies (see dependency configuration section):

Configuration properties

There are various configuration properties that need to be set for each transport.

Since JDBC is used, you must set the usual properties, such as:

Spring

spring.datasource.url=jdbc:mysql://${DOCKER_HOST_IP}/eventuate
spring.datasource.username=mysqluser
spring.datasource.password=mysqlpw
spring.datasource.driver.class.name=com.mysql.jdbc.driver

Micronaut

datasources:
  default:
    url: dbc:mysql://${DOCKER_HOST_IP}/eventuate
    driverClassName: com.mysql.jdbc.driver
    username: mysqluser
    password: mysqlpw

Quarkus

quarkus.datasource.username=mysqluser
quarkus.datasource.password=mysqlpw
quarkus.datasource.jdbc.url=jdbc:mysql://localhost/eventuate
quarkus.datasource.db-kind=mysql
eventuateDatabase=mysql

In addition, you need to define message broker-specific properties.

Spring

Message Broker Properties

Apache Kafka

eventuatelocal.kafka.bootstrap.servers=${DOCKER_HOST_IP}:9092
eventuatelocal.zookeeper.connection.string=${DOCKER_HOST_IP}:2181

Apache ActiveMQ

activemq.url=...

RabbitMQ

rabbitmq.broker.addresses=...
eventuate.rabbitmq.partition.count=...

Redis

eventuate.redis.servers=...
eventuate.redis.partitions=...

Micronaut

Message Broker Properties

Apache Kafka

eventuatelocal:
  kafka:
    bootstrap:
      servers: ${DOCKER_HOST_IP}:9092
  zookeeper:
    connection:
      string: ${DOCKER_HOST_IP}:2181

Quarkus

See micronaut application.yml

Message Broker Properties

Apache Kafka

eventuatelocal.kafka.bootstrap.servers=${DOCKER_HOST_IP}:9092

See micronaut application.properties

In-memory transport

Spring

If you want to use in-memory transport @Import TramInMemoryConfiguration:

@Configuration
@EnableAutoConfiguration
@Import({TramInMemoryConfiguration.class})
public class TramInMemoryConfiguration {
}
Micronaut

If you want to use in-memory transport include the eventuate-tram-micronaut-in-memory dependency:

dependencies {
    compile "io.eventuate.tram.core:eventuate-tram-micronaut-in-memory:$eventuateTramVersion"
}
Quarkus

If you want to use in-memory transport include eventuate-tram-quarkus-in-memory dependency:

dependencies {
    compile "io.eventuate.tram.core:eventuate-tram-quarkus-in-memory:$eventuateTramVersion"
}

ID Generation

By default, eventuate tram uses application-generated message IDs. If you want to use database-generated IDs (recommended), specify the outbox id you used for the Eventuate CDC reader. For more information, please see CDC Documentation.

Spring

eventuate.outbox.id=1

Micronaut

eventuate:
    outbox:
        id: 1

Running the CDC service

In addition to a database and message broker, you will need to run the Eventuate Tram CDC service. It reads events inserted into the database and publishes them to the message broker. It is written using Spring Boot. The easiest way to run this service during development is to use Docker Compose.

cdcservice:
  image: eventuateio/eventuate-tram-cdc-mysql-service:0.4.0.RELEASE
  ports:
    - "8099:8080"
  depends_on:
    - mysql
    - kafka
    - zookeeper
  environment:
    SPRING_DATASOURCE_URL: jdbc:mysql://mysql/eventuate
    SPRING_DATASOURCE_USERNAME: mysqluser
    SPRING_DATASOURCE_PASSWORD: mysqlpw
    SPRING_DATASOURCE_DRIVER_CLASS_NAME: com.mysql.jdbc.Driver
    EVENTUATELOCAL_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
    EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING: zookeeper:2181
    EVENTUATELOCAL_CDC_DB_USER_NAME: root
    EVENTUATELOCAL_CDC_DB_PASSWORD: rootpassword
    EVENTUATELOCAL_CDC_READER_NAME: MySqlReader
    EVENTUATELOCAL_CDC_OFFSET_STORE_KEY: MySqlBinlog
    EVENTUATELOCAL_CDC_MYSQL_BINLOG_CLIENT_UNIQUE_ID: 1234567890
    EVENTUATELOCAL_CDC_READ_OLD_DEBEZIUM_DB_OFFSET_STORAGE_TOPIC: "false"
    EVENTUATE_OUTBOX_ID: 1

For more information, please see Eventuate Tram CDC