Getting started with Eventuate Tram
Read this document to learn how to get started with Eventuate Tram(and Eventuate Tram Quarkus), frameworks for transactional messaging. Eventuate Tram sends and receives messages as part of a database transaction ensuring that your application atomically updates the database and publishes messages. Currently, it supports the following databases:
-
Transaction log tailing: MySQL, Postgres
-
Polling: Other SQL databases including Microsoft SQL Server
And, the following message brokers:
-
Apache Kafka
-
ActiveMQ
-
RabbitMQ
-
Redis
See also
JavaDocs
-
See the Eventuate Tram JavaDocs.
Project setup
Latest library versions:
Spring/Micronaut |
|
Quarkus |
Gradle
In gradle.properties
:
eventuateTramVersion=LATEST_VERSION eventuateTramQuarkusVersion=LATEST_VERSION
In build.gradle
, specify these Maven repositories:
repositories {
mavenCentral()
}
Include one or more of the following dependencies, depending on which API and implementation you want to use.
API artifacts
dependencies {
// Basic messaging
compile "io.eventuate.tram.core:eventuate-tram-<framework>-messaging:$eventuateTramVersion"
// Domain events
compile "io.eventuate.tram.core:eventuate-tram-<framework>-events:$eventuateTramVersion"
// Command/Async Reply messaging
compile "io.eventuate.tram.core:eventuate-tram-<framework>-commands:$eventuateTramVersion"
where
-
framework
isspring
,micronaut
orquarkus
Producer implementation artifact
If your service is a producer, you must add this dependency:
dependencies {
// Producer implementation
compile "io.eventuate.tram.core:eventuate-tram-<framework>-producer-jdbc:$eventuateTramVersion"
}
Consumer implementation artifacts
If your service is a consumer, you must add a message broker-specific dependency:
dependencies {
// Consumer implementation
compile "io.eventuate.tram.core:eventuate-tram-<framework>-consumer-<message-broker>:$eventuateTramVersion"
}
where
-
framework
isspring
,micronaut
orquarkus
-
message-broker
iskafka
,apachemq
,rabbitmq
orredis
.
Note: micronaut
and quarkus
only support kafka
If you want to use JDBC-based idempotency (see below) also include this dependency:
dependencies {
// Consumer idempotency
compile "io.eventuate.tram.core:eventuate-tram-<framework>-consumer-jdbc:$eventuateTramVersion"
}
where
-
framework
isspring
ormicronaut
See below for more details.
All-in-one artifacts
If you need all of the above artifacts, then you can include this one dependency:
dependencies {
// Consumer/Producer all-in-one
compile "io.eventuate.tram.core:eventuate-tram-<framework>-jdbc-<message-broker>:$eventuateTramVersion"
}
where
-
framework
isspring
ormicronaut
-
message-broker
iskafka
,apachemq
,rabbitmq
orredis
.
Note: micronaut
only supports kafka
Eventuate BOM
You can use the Eventuate BOM to avoid needing to specify the artifact versions:
dependencies {
implementation(platform("io.eventuate.platform:eventuate-platform-dependencies:$eventuateBomVersion"))
}
You can then specify artifacts as follows:
dependencies { compile "io.eventuate.tram.core:eventuate-tram-spring-messaging" }
Transactional messages
Eventuate Tram has APIs for sending and receiving messages as part of a database transaction.
Sending messages
Send a message using MessageProducer
:
Spring
public abstract class AbstractTramMessageTest {
@Autowired
private MessageProducer messageProducer;
@Test
public void shouldReceiveMessage() {
...
messageProducer.send(destination, MessageBuilder.withPayload(payload).build());
...
}
See this example of sending messages.
Micronaut/Quarkus
public abstract class AbstractTramMessageTest {
@Inject
private MessageProducer messageProducer;
@Test
public void shouldReceiveMessage() {
...
messageProducer.send(destination, MessageBuilder.withPayload(payload).build());
...
}
See these examples of sending messages with micronaut and sending messages with quarkus.
Consuming messages
Receive messages using MessageConsumer
:
Spring
public abstract class AbstractTramMessageTest {
@Autowired
private MessageConsumer messageConsumer;
@Test
public void shouldReceiveMessage() throws InterruptedException {
messageConsumer.subscribe(subscriberId, Collections.singleton(destination), this::handleMessage);
...
}
private void handleMessage(Message message) {
...
}
}
See this example of consuming messages.
Micronaut/Quarkus
public abstract class AbstractTramMessageTest {
@Inject
private MessageConsumer messageConsumer;
@Test
public void shouldReceiveMessage() throws InterruptedException {
messageConsumer.subscribe(subscriberId, Collections.singleton(destination), this::handleMessage);
...
}
private void handleMessage(Message message) {
...
}
}
See these examples of consuming messages with micronaut and consuming messages with quarkus
Message interceptors
Eventuate Tram implements message interceptors. Your service can define one or more message interceptors to inject custom logic into the various message processing steps. It could, for example, add custom headers to a message.
A message interceptor must implement the MessageInterceptor
interface.
public interface MessageInterceptor {
default void preSend(Message message) {}
default void postSend(Message message, Exception e) {}
default void preReceive(Message message) {}
default void preHandle(String subscriberId, Message message) {}
default void postHandle(String subscriberId, Message message, Throwable throwable) {}
default void postReceive(Message message) {}
}
Message handler decorators
Eventuate Tram implements message handler decorators. They are equivalent to Servlet Filters and are invoked when handling a message. Eventuate Tram uses them to implement message consumer mechanisms such as idempotency and optimistic locking. Your service can also define custom decorators.
MessageHandlerDecorator
is the interface that message handler decorators must implement:
public interface MessageHandlerDecorator extends BiConsumer<SubscriberIdAndMessage, MessageHandlerDecoratorChain> { int getOrder(); }
The @MessageHandlerDecorator`’s in the `ApplicationContext
are executed in the order defined by getOrder()
.
Idempotent message consumers
A message broker might deliver the same message more than once.
A message consumer must typically be idempotent.
Some message consumers are naturally idempotent, e.g. account.balance = event.newBalance
, and can be invoked repeatedly without error.
But others are not - e.g. account.balance += event.amount
- and must be made idempotent by tracking the messages that they have consumed.
Eventuate Tram has an pluggable duplicate message detection mechanism.
DuplicateMessageDetector
is the strategy interface.
There are the following concrete implementations:
-
SqlTableBasedDuplicateMessageDetector
- tracks successfully processed message IDs in aRECEIVED_MESSAGES
tables. Your message handling logic is executed within a transaction managed by this class. -
NoopDuplicateMessageDetector
- does nothing. Use this implementation when your message handlers are idempotent. For example, theOrder History Service
in the FTGO application tracks message IDs in a Dynamodb table -
TransactionalNoopDuplicateMessageDetector
- it doesn’t track message IDs but it does manage transactions.
Your service’s ApplicationContext
must define an DuplicateMessageDetector
bean.
Spring
|
|
|
|
Dependency |
|
|
- |
|
Either auto-configuration or |
|
|
dependency {
compile "io.eventuate.tram.core:eventuate-tram-spring-consumer-jdbc:$eventuateTramVersion"
}
Micronaut
|
|
|
|
Dependency |
|
|
- |
Configuration |
do not set |
set |
The default if no other |
dependency {
compile "io.eventuate.tram.core:eventuate-tram-micronaut-consumer-jdbc:$eventuateTramVersion"
}
Quarkus
|
|
|
|
Dependency |
|
|
- |
Configuration |
do not set |
set |
The default if no other |
Optimistic locking
If your application uses a DuplicateMessageDetector
that manages transactions and your message handler uses optimistic locking then you must configure Eventuate Tram to retry the transaction when an optimistic locking failure occurs.
Spring
To configure retries, you must:
-
Add
eventuate-tram-optimistic-locking
as a dependency -
@Import
OptimisticLockingDecoratorConfiguration
This configures an OptimisticLockingDecorator
@Bean
, which is a MessageHandlerDecorator
that retries when an OptimisticLockingFailureException
is thrown.
Micronaut
Not yet implemented: Issue #46
Transactional domain events
The domain event package builds on the transaction messaging APIs.
Publishing domain events
Publish domain events using the DomainEventPublisher
interface:
Spring
public abstract class AbstractTramEventTest {
@Autowired
private DomainEventPublisher domainEventPublisher;
@Test
public void shouldReceiveEvent() throws InterruptedException {
long uniqueId = config.getUniqueId();
String accountId = ...;
DomainEvent domainEvent = new AccountDebited(...);
domainEventPublisher.publish("Account", accountId, Collections.singletonList(domainEvent));
To publish events you need to @Import
the TramEventsPublisherConfiguration
@Configuration
class:
@Configuration
@Import(TramEventsPublisherConfiguration.class)
public class AbstractTramEventTestConfiguration {
...
Instead of explicitly @Import
-ing TramEventsPublisherConfiguration
you can rely on the auto-configuration provided by eventuate-tram-spring-events-publisher-starter
:
dependencies {
compile "io.eventuate.tram.core:eventuate-tram-spring-events-publisher-starter:$eventuateTramVersion"
}
Note: eventuate-tram-spring-events-publisher-starter
includes eventuate-tram-spring-messaging-starter
See this example of transaction events.
Micronaut/Quarkus
public abstract class AbstractTramEventTest {
@Inject
private DomainEventPublisher domainEventPublisher;
@Test
public void shouldReceiveEvent() throws InterruptedException {
long uniqueId = config.getUniqueId();
String accountId = ...;
DomainEvent domainEvent = new AccountDebited(...);
domainEventPublisher.publish("Account", accountId, Collections.singletonList(domainEvent));
To publish events you need to have TramEventsPublisherFactory
class in your classpath (see dependency configuration section):
dependencies {
compile "io.eventuate.tram.core:eventuate-tram-<micronaut|quarkus>-events:$eventuateTramVersion"
}
See these example of publishing events:
Consuming domain events
First, define DomainEventHandlers
:
public class TramEventTestEventConsumer { public DomainEventHandlers domainEventHandlers() { return DomainEventHandlersBuilder .forAggregateType("Account") .onEvent(AccountDebited.class, this::handleAccountDebited) .build(); } public void handleAccountDebited(DomainEventEnvelope<AccountDebited> event) { ... } }
Second, import TramEventSubscriberConfiguration
and configure a DomainEventDispatcher
:
Spring
@Configuration @Import(TramEventSubscriberConfiguration.class) public class AbstractTramEventTestConfiguration { @Bean public DomainEventDispatcher domainEventDispatcher(DomainEventDispatcherFactory domainEventDispatcherFactory, AbstractTramEventTestConfig config, TramEventTestEventConsumer target) { return domainEventDispatcherFactory.make("eventDispatcherId" + config.getUniqueId(), target.domainEventHandlers()); } @Bean public TramEventTestEventConsumer tramEventTestTarget(AbstractTramEventTestConfig config) { return new TramEventTestEventConsumer(); }
See this example of transaction events.
Instead of explicitly TramEventSubscriberConfiguration
you can use the auto-configuration provided by eventuate-tram-spring-events-subscriber-starter
:
dependencies {
compile "io.eventuate.tram.core:eventuate-tram-spring-events-subscriber-starter:$eventuateTramVersion"
}
There is also a starter for both event publishers and subscribers:
dependencies {
compile "io.eventuate.tram.core:eventuate-tram-spring-events-starter:$eventuateTramVersion"
}
Micronaut/Quarkus
@Factory
public class AbstractTramEventTestFactory {
@Context
public DomainEventDispatcher domainEventDispatcher(AbstractTramEventTestConfig config,
TramEventTestEventConsumer target,
DomainEventDispatcherFactory domainEventDispatcherFactory) {
return domainEventDispatcherFactory.make("eventDispatcherId" + config.getUniqueId(), target.domainEventHandlers());
}
@Singleton
public TramEventTestEventConsumer tramEventTestTarget(AbstractTramEventTestConfig config) {
return new TramEventTestEventConsumer(config.getAggregateType());
}
}
See this example of transactional events.
Please note: because of how Micronaut behaves, if you want to define several dispatchers, you must used the @Named annotation on the DomainEventDispatcher
bean definitions.
Quarkus
@Singleton
public class AbstractTramEventTestConfiguration {
@Singleton
public DomainEventDispatcher domainEventDispatcher(AbstractTramEventTestConfig config,
TramEventTestEventConsumer target,
DomainEventDispatcherFactory domainEventDispatcherFactory) {
return domainEventDispatcherFactory.make("eventDispatcherId" + config.getUniqueId(), target.domainEventHandlers());
}
@Singleton
public TramEventTestEventConsumer tramEventTestTarget(AbstractTramEventTestConfig config) {
return new TramEventTestEventConsumer(config.getAggregateType());
}
}
See this example of transaction events.
Transactional commands
Transactional commands are implemented using transactional messaging.
Sending commands
Send a command using a CommandProducer
:
Spring
public abstract class AbstractTramCommandTest {
@Autowired
private CommandProducer commandProducer;
@Test
public void shouldInvokeCommand() throws InterruptedException {
String commandId = commandProducer.send("CustomerCommandChannel",
new DoSomethingCommand(),
"ReplyToChannel",
Collections.emptyMap());
You also need to @Import
the TramCommandProducerConfiguration
@Configuration
class:
@Configuration
@Import(TramCommandProducerConfiguration.class)
public class AbstractTramCommandTestConfiguration {
Alternatively, you can use the auto-configuration provided by eventuate-tram-spring-commands-starter
:
dependencies {
compile "io.eventuate.tram.core:eventuate-tram-spring-commands-starter:$eventuateTramVersion"
}
Note: eventuate-tram-spring-commands-starter
includes eventuate-tram-spring-messaging-starter
To handle a reply message, simply subscribe to the ReplyChannel
.
messageConsumer.subscribe(subscriberId, "ReplyToChannel", this::handleMessage);
See this example of transactional commands.
Micronaut
public abstract class AbstractTramCommandTest {
@Inject
private CommandProducer commandProducer;
@Test
public void shouldInvokeCommand() throws InterruptedException {
String commandId = commandProducer.send("CustomerCommandChannel",
new DoSomethingCommand(),
"ReplyToChannel",
Collections.emptyMap());
You also need to have the TramCommandProducerFactory
in your classpath (see dependency configuration section):
dependencies {
compile "io.eventuate.tram.core:eventuate-tram-micronaut-commands:$eventuateTramVersion"
}
To handle a reply message, simply subscribe to the ReplyChannel
.
messageConsumer.subscribe(subscriberId, "ReplyToChannel", this::handleMessage);
See this example of transactional commands.
Quarkus
public abstract class AbstractTramCommandTest {
@Inject
private CommandProducer commandProducer;
@Test
public void shouldInvokeCommand() throws InterruptedException {
String commandId = commandProducer.send("CustomerCommandChannel",
new DoSomethingCommand(),
"ReplyToChannel",
Collections.emptyMap());
You also need to have the TramCommandProducerFactory
in your classpath (see dependency configuration section):
dependencies {
compile "io.eventuate.tram.core:eventuate-tram-quarkus-commands:$eventuateTramVersion"
}
To handle a reply message, simply subscribe to the ReplyChannel
.
messageConsumer.subscribe(subscriberId, "ReplyToChannel", this::handleMessage);
See this example of transactional commands.
Handling commands
First, define CommandHandlers
:
public class TramCommandTestCommandHandler {
public Message doSomething(CommandMessage<DoSomethingCommand> cm, PathVariables pvs) {
...
return withSuccess();
}
public CommandHandlers getCommandHandlers() {
return CommandHandlersBuilder
.fromChannel("CustomerCommandChannel")
.onMessage(DoSomethingCommand.class, this::doSomething)
.build();
}
Second, import TramCommandConsumerConfiguration
and define a CommandDispatcher
:
Spring
@Configuration
@Import(TramCommandConsumerConfiguration.class)
public class AbstractTramCommandTestConfiguration {
@Bean
public CommandDispatcher commandDispatcher(CommandDispatcherFactory commandDispatcherFactory,
AbstractTramCommandTestConfig config,
AbstractTramCommandTestCommandHandler target) {
return commandDispatcherFactory.make("customerServiceCommandDispatcher",
target.getCommandHandlers());
}
@Bean
public TramCommandTestCommandHandler abstractTramCommandTestTarget(AbstractTramCommandTestConfig config) {
return new TramCommandTestCommandHandler(config.getCommandChannel());
}
Instead of explicitly @Import
-ing TramCommandConsumerConfiguration
you can use the auto-configuration provided by eventuate-tram-spring-commands-starter
:
dependencies {
compile "io.eventuate.tram.core:eventuate-tram-spring-commands-starter:$eventuateTramVersion"
}
Micronaut
@Factory
public class AbstractTramCommandTestFactory {
@Singleton
public TramCommandTestCommandHandler abstractTramCommandTestTarget(AbstractTramCommandTestConfig config) {
return new TramCommandTestCommandHandler(config.getCommandChannel());
}
@Singleton
public CommandDispatcher commandDispatcher(CommandDispatcherFactory commandDispatcherFactory,
AbstractTramCommandTestConfig config,
TramCommandTestCommandHandler target) {
return commandDispatcherFactory.make(config.getCommandDispatcheId(), target.getCommandHandlers());
}
}
See this example of transactional commands.
Quarkus
@Singleton
public class AbstractTramCommandTestConfiguration {
@Singleton
public TramCommandTestCommandHandler abstractTramCommandTestTarget(AbstractTramCommandTestConfig config) {
return new TramCommandTestCommandHandler(config.getCommandChannel());
}
@Singleton
public CommandDispatcher commandDispatcher(CommandDispatcherFactory commandDispatcherFactory,
AbstractTramCommandTestConfig config,
TramCommandTestCommandHandler target) {
return commandDispatcherFactory.make(config.getCommandDispatcheId(), target.getCommandHandlers());
}
}
See this example of transactional commands.
Configuring the transport
Spring
You also need to configure the transport mechanism, specifically the message broker.
You do this importing one of the TramJdbc<MessageBroker>Configuration
classes:
-
TramJdbcKafkaConfiguration
-
TramJdbcActiveMQConfiguration
-
TramJdbcRabbitMQConfiguration
-
TramJdbcRedisConfiguration
-
TramInMemoryConfiguration
- in-memory JDBC and messaging
For example, if you want to use JDBC/Apache Kafka then @Import
TramJdbcKafkaConfiguration
:
@Configuration
@EnableAutoConfiguration
@Import({TramJdbcKafkaConfiguration.class})
public class JdbcKafkaTramMessageTestConfiguration {
}
The TramJdbc<MessageBroker>Configuration
@Configuration
classes configure both a producer and a consumer.
If you have a service that is either only a producer or only a consumer you can use a more specific @Configuration
class.
To configure a producer, @Import
the TramMessageProducerJdbcConfiguration
@Configuration class.
To consumer a consumer, @Import
one of the EventuateTram<MessageBroker>MessageConsumerConfiguration
@Configuration classes:
-
EventuateTramKafkaMessageConsumerConfiguration
-
EventuateTramActiveMQMessageConsumerConfiguration
-
EventuateTramRabbitMQMessageConsumerConfiguration
-
EventuateTramRedisMessageConsumerConfiguration
For example, see the FTGO application’s Order History Service
, which is a consumer-only service.
Instead of explicitly @Import
-ing a transport-specific configuration class, you can instead using the auto-configuration provided by eventuate-tram-spring-messaging-starter
:
dependencies {
compile "io.eventuate.tram.core:eventuate-tram-spring-messaging-starter:$eventuateTramVersion"
}
Micronaut/Quarkus
For now, the Micronaut and Quarkus version of Eventuate support only Apache Kafka as the transport mechanism.
You don’t need to create additional configuration, you only need to setup dependencies (see dependency configuration section):
Configuration properties
There are various configuration properties that need to be set for each transport.
Since JDBC is used, you must set the usual properties, such as:
Spring
spring.datasource.url=jdbc:mysql://${DOCKER_HOST_IP}/eventuate spring.datasource.username=mysqluser spring.datasource.password=mysqlpw spring.datasource.driver.class.name=com.mysql.jdbc.driver
Micronaut
datasources: default: url: dbc:mysql://${DOCKER_HOST_IP}/eventuate driverClassName: com.mysql.jdbc.driver username: mysqluser password: mysqlpw
Quarkus
quarkus.datasource.username=mysqluser quarkus.datasource.password=mysqlpw quarkus.datasource.jdbc.url=jdbc:mysql://localhost/eventuate quarkus.datasource.db-kind=mysql eventuateDatabase=mysql
In addition, you need to define message broker-specific properties.
Spring
Message Broker | Properties |
---|---|
Apache Kafka |
eventuatelocal.kafka.bootstrap.servers=${DOCKER_HOST_IP}:9092 eventuatelocal.zookeeper.connection.string=${DOCKER_HOST_IP}:2181 |
Apache ActiveMQ |
activemq.url=... |
RabbitMQ |
rabbitmq.broker.addresses=... eventuate.rabbitmq.partition.count=... |
Redis |
eventuate.redis.servers=... eventuate.redis.partitions=... |
See spring application.properties
Micronaut
Message Broker | Properties |
---|---|
Apache Kafka |
eventuatelocal: kafka: bootstrap: servers: ${DOCKER_HOST_IP}:9092 zookeeper: connection: string: ${DOCKER_HOST_IP}:2181 |
Quarkus
See micronaut application.yml
Message Broker | Properties |
---|---|
Apache Kafka |
eventuatelocal.kafka.bootstrap.servers=${DOCKER_HOST_IP}:9092 |
See micronaut application.properties
In-memory transport
Spring
If you want to use in-memory transport @Import
TramInMemoryConfiguration
:
@Configuration
@EnableAutoConfiguration
@Import({TramInMemoryConfiguration.class})
public class TramInMemoryConfiguration {
}
Micronaut
If you want to use in-memory transport include the eventuate-tram-micronaut-in-memory
dependency:
dependencies {
compile "io.eventuate.tram.core:eventuate-tram-micronaut-in-memory:$eventuateTramVersion"
}
Quarkus
If you want to use in-memory transport include eventuate-tram-quarkus-in-memory
dependency:
dependencies {
compile "io.eventuate.tram.core:eventuate-tram-quarkus-in-memory:$eventuateTramVersion"
}
ID Generation
By default, eventuate tram uses application-generated message IDs. If you want to use database-generated IDs (recommended), specify the outbox id you used for the Eventuate CDC reader. For more information, please see CDC Documentation.
Spring
eventuate.outbox.id=1
Micronaut
eventuate: outbox: id: 1
Running the CDC service
In addition to a database and message broker, you will need to run the Eventuate Tram CDC service. It reads events inserted into the database and publishes them to the message broker. It is written using Spring Boot. The easiest way to run this service during development is to use Docker Compose.
The Eventuate Tram Code Basic examples project has an example docker-compose.yml file.
cdcservice:
image: eventuateio/eventuate-tram-cdc-mysql-service:0.4.0.RELEASE
ports:
- "8099:8080"
depends_on:
- mysql
- kafka
- zookeeper
environment:
SPRING_DATASOURCE_URL: jdbc:mysql://mysql/eventuate
SPRING_DATASOURCE_USERNAME: mysqluser
SPRING_DATASOURCE_PASSWORD: mysqlpw
SPRING_DATASOURCE_DRIVER_CLASS_NAME: com.mysql.jdbc.Driver
EVENTUATELOCAL_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING: zookeeper:2181
EVENTUATELOCAL_CDC_DB_USER_NAME: root
EVENTUATELOCAL_CDC_DB_PASSWORD: rootpassword
EVENTUATELOCAL_CDC_READER_NAME: MySqlReader
EVENTUATELOCAL_CDC_OFFSET_STORE_KEY: MySqlBinlog
EVENTUATELOCAL_CDC_MYSQL_BINLOG_CLIENT_UNIQUE_ID: 1234567890
EVENTUATELOCAL_CDC_READ_OLD_DEBEZIUM_DB_OFFSET_STORAGE_TOPIC: "false"
EVENTUATE_OUTBOX_ID: 1
For more information, please see Eventuate Tram CDC