Getting started with Eventuate Tram
Learn how to get started with Eventuate Tram, a framework for transactional messaging. Eventuate Tram sends and receives messages as part of a database transaction ensuring that your application atomically updates the database and publishes messages. Currently, it supports the following databases:
-
Transaction log tailing: MySQL, Postgres
-
Polling: Other SQL databases
And, the following message brokers:
-
Apache Kafka
-
ActiveMQ
-
RabbitMQ
See also
Project setup
Latest library version:
Gradle
In gradle.properties
:
eventuateTramVersion=LATEST_VERSION
In build.gradle
, specify the JCenter maven repository:
repositories {
mavenCentral()
jcenter()
}
Include one or more of the following dependencies, depending on which API you want to use, along with a messaging transport:
dependencies {
// Basic messaging
compile "io.eventuate.tram.core:eventuate-tram-events:$eventuateTramVersion"
// Domain events
compile "io.eventuate.tram.core:eventuate-tram-events:$eventuateTramVersion"
// Command/Async Reply messaging
compile "io.eventuate.tram.core:eventuate-tram-commands:$eventuateTramVersion"
// Use JDBC and Apache Kafka as the underlying messaging transport
compile "io.eventuate.tram.core:eventuate-tram-jdbc-kafka:$eventuateTramVersion"
// Use in-memory messaging transport for testing
testCompile "io.eventuate.tram.core:eventuate-tram-in-memory:$eventuateTramVersion"
}
Transactional messages
Eventuate Tram has APIs for sending and receiving messages as part of a database transaction.
Sending messages
Send a message using MessageProducer
:
public abstract class AbstractTramMessageTest {
@Autowired
private MessageProducer messageProducer;
@Test
public void shouldReceiveMessage() {
...
messageProducer.send(destination, MessageBuilder.withPayload(payload).build());
...
}
See this example of sending messages.
Consuming messages
Receive messages using MessageConsumer
:
public abstract class AbstractTramMessageTest {
@Autowired
private MessageConsumer messageConsumer;
@Test
public void shouldReceiveMessage() throws InterruptedException {
messageConsumer.subscribe(subscriberId, Collections.singleton(destination), this::handleMessage);
...
}
private void handleMessage(Message message) {
...
}
}
See this example of consuming messages.
Transactional domain events
The domain event package builds on the transaction messaging APIs.
Publishing domain events
Publish domain events using the DomainEventPublisher
interface:
public abstract class AbstractTramEventTest {
@Autowired
private DomainEventPublisher domainEventPublisher;
@Test
public void shouldReceiveEvent() throws InterruptedException {
long uniqueId = config.getUniqueId();
String accountId = ...;
DomainEvent domainEvent = new AccountDebited(...);
domainEventPublisher.publish("Account", accountId, Collections.singletonList(domainEvent));
To publish events you need to @Import
the TramEventsPublisherConfiguration.class
@Configuration
class:
@Configuration
@Import(TramEventsPublisherConfiguration.class)
public class AbstractTramEventTestConfiguration {
...
See this example of transaction events.
Consuming domain events
First, define DomainEventHandlers
:
public class TramEventTestEventConsumer {
public DomainEventHandlers domainEventHandlers() {
return DomainEventHandlersBuilder
.forAggregateType("Account")
.onEvent(AccountDebited.class, this::handleAccountDebited)
.build();
}
public void handleAccountDebited(DomainEventEnvelope<AccountDebited> event) {
...
}
}
Second, configure a DomainEventDispatcher
@Bean
:
@Configuration
public class AbstractTramEventTestConfiguration {
@Bean
public DomainEventDispatcher domainEventDispatcher(AbstractTramEventTestConfig config,
TramEventTestEventConsumer target,
MessageConsumer messageConsumer) {
return new DomainEventDispatcher("eventDispatcherId",
target.domainEventHandlers(),
messageConsumer);
}
@Bean
public TramEventTestEventConsumer tramEventTestTarget(AbstractTramEventTestConfig config) {
return new TramEventTestEventConsumer();
}
You need the following Spring @Configuration
in order to publish events:
@Configuration
@Import(TramEventsPublisherConfiguration.class)
public class AbstractTramEventTestConfiguration {
}
See this example of transaction events.
Transactional commands
Transactional commands are implemented using transactional messaging.
Sending commands
Send a command using a CommandProducer
:
public abstract class AbstractTramCommandTest {
@Autowired
private CommandProducer commandProducer;
@Test
public void shouldInvokeCommand() throws InterruptedException {
String commandId = commandProducer.send("CustomerCommandChannel",
new DoSomethingCommand(),
"ReplyToChannel",
Collections.emptyMap());
You also need to @Import
the TramCommandProducerConfiguration
@Configuration
class:
@Configuration
@Import(TramCommandProducerConfiguration.class)
public class AbstractTramCommandTestConfiguration {
See this example of transactional commands.
Handling commands
First, define CommandHandlers
:
public class TramCommandTestCommandHandler {
public Message doSomething(CommandMessage<DoSomethingCommand> cm, PathVariables pvs) {
...
return withSuccess();
}
public CommandHandlers getCommandHandlers() {
return CommandHandlersBuilder
.fromChannel("CustomerCommandChannel")
.onMessage(DoSomethingCommand.class, this::doSomething)
.build();
}
Second, define a CommandDispatcher
@Bean
:
@Configuration
public class AbstractTramCommandTestConfiguration {
@Bean
public CommandDispatcher commandDispatcher(AbstractTramCommandTestConfig config, AbstractTramCommandTestCommandHandler target) {
return new CommandDispatcher("customerServiceCommandDispatcher", target.getCommandHandlers());
}
@Bean
public TramCommandTestCommandHandler abstractTramCommandTestTarget(AbstractTramCommandTestConfig config) {
return new TramCommandTestCommandHandler(config.getCommandChannel());
}
See this example of transactional commands.
Configuring the transport
JDBC/Apache Kafka transport
If you want to use JDBC/Apache Kafka @Import
TramJdbcKafkaConfiguration
:
@Configuration
@EnableAutoConfiguration
@Import({TramJdbcKafkaConfiguration.class})
public class JdbcKafkaTramMessageTestConfiguration {
}
In-memory transport
If you want to use JDBC/Apache Kafka @Import
TramInMemoryConfiguration
:
@Configuration
@EnableAutoConfiguration
@Import({TramInMemoryConfiguration.class})
public class TramInMemoryConfiguration {
}
Configuration properties
There are various configuration properties that need to be set for JDBC/Kafka transport.
Here, for example, is an application.properties
file:
spring.datasource.url=jdbc:mysql://${DOCKER_HOST_IP}/eventuate spring.datasource.username=mysqluser spring.datasource.password=mysqlpw spring.datasource.driver.class.name=com.mysql.jdbc.driver eventuatelocal.kafka.bootstrap.servers=${DOCKER_HOST_IP}:9092 eventuatelocal.zookeeper.connection.string=${DOCKER_HOST_IP}:2181
Running the CDC service
In addition to a database and message broker, you will need to run the Eventuate Tram CDC service. It reads events inserted into the database and publishes them to Apache Kafka. It is written using Spring Boot. The easiest way to run this service during development is to use Docker Compose. The Eventuate Tram Code Basic examples project has an example docker-compose.yml file.
cdcservice:
image: eventuateio/eventuate-tram-cdc-mysql-service:0.4.0.RELEASE
ports:
- "8099:8080"
depends_on:
- mysql
- kafka
- zookeeper
environment:
SPRING_DATASOURCE_URL: jdbc:mysql://mysql/eventuate
SPRING_DATASOURCE_USERNAME: mysqluser
SPRING_DATASOURCE_PASSWORD: mysqlpw
SPRING_DATASOURCE_DRIVER_CLASS_NAME: com.mysql.jdbc.Driver
EVENTUATELOCAL_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING: zookeeper:2181
EVENTUATELOCAL_CDC_DB_USER_NAME: root
EVENTUATELOCAL_CDC_DB_PASSWORD: rootpassword
EVENTUATELOCAL_CDC_BINLOG_CLIENT_ID: 1234567890
EVENTUATELOCAL_CDC_SOURCE_TABLE_NAME: message