Getting started with Eventuate Tram

Learn how to get started with Eventuate Tram, a framework for transactional messaging. Eventuate Tram sends and receives messages as part of a database transaction ensuring that your application atomically updates the database and publishes messages. Currently, it supports the following databases:

  • Transaction log tailing: MySQL, Postgres

  • Polling: Other SQL databases

And, the following message brokers:

  • Apache Kafka

  • ActiveMQ

  • RabbitMQ

Project setup

Latest library version:

download

Gradle

In gradle.properties:

eventuateTramVersion=LATEST_VERSION

In build.gradle, specify the JCenter maven repository:

repositories {
    mavenCentral()
    jcenter()
}

Include one or more of the following dependencies, depending on which API you want to use, along with a messaging transport:

dependencies {

  // Basic messaging

  compile "io.eventuate.tram.core:eventuate-tram-events:$eventuateTramVersion"

  // Domain events

  compile "io.eventuate.tram.core:eventuate-tram-events:$eventuateTramVersion"

  // Command/Async Reply messaging

  compile "io.eventuate.tram.core:eventuate-tram-commands:$eventuateTramVersion"

  // Use JDBC and Apache Kafka as the underlying messaging transport

  compile "io.eventuate.tram.core:eventuate-tram-jdbc-kafka:$eventuateTramVersion"

  // Use in-memory messaging transport for testing

  testCompile "io.eventuate.tram.core:eventuate-tram-in-memory:$eventuateTramVersion"

}

Transactional messages

Eventuate Tram has APIs for sending and receiving messages as part of a database transaction.

Sending messages

Send a message using MessageProducer:

public abstract class AbstractTramMessageTest {

  @Autowired
  private MessageProducer messageProducer;

  @Test
  public void shouldReceiveMessage() {
    ...
    messageProducer.send(destination, MessageBuilder.withPayload(payload).build());
    ...
  }

See this example of sending messages.

Consuming messages

Receive messages using MessageConsumer:

public abstract class AbstractTramMessageTest {

  @Autowired
  private MessageConsumer messageConsumer;

  @Test
  public void shouldReceiveMessage() throws InterruptedException {
    messageConsumer.subscribe(subscriberId, Collections.singleton(destination), this::handleMessage);
    ...
  }

  private void handleMessage(Message message) {
    ...
  }
}

See this example of consuming messages.

Transactional domain events

The domain event package builds on the transaction messaging APIs.

Publishing domain events

Publish domain events using the DomainEventPublisher interface:

public abstract class AbstractTramEventTest {

  @Autowired
  private DomainEventPublisher domainEventPublisher;

  @Test
  public void shouldReceiveEvent() throws InterruptedException {
    long uniqueId = config.getUniqueId();
    String accountId = ...;

    DomainEvent domainEvent = new AccountDebited(...);

    domainEventPublisher.publish("Account", accountId, Collections.singletonList(domainEvent));

To publish events you need to @Import the TramEventsPublisherConfiguration.class @Configuration class:

@Configuration
@Import(TramEventsPublisherConfiguration.class)
public class AbstractTramEventTestConfiguration {
...

See this example of transaction events.

Consuming domain events

First, define DomainEventHandlers:

public class TramEventTestEventConsumer {

  public DomainEventHandlers domainEventHandlers() {
    return DomainEventHandlersBuilder
            .forAggregateType("Account")
            .onEvent(AccountDebited.class, this::handleAccountDebited)
            .build();
  }

  public void handleAccountDebited(DomainEventEnvelope<AccountDebited> event) {
    ...
  }

}

Second, configure a DomainEventDispatcher @Bean:

@Configuration
public class AbstractTramEventTestConfiguration {

  @Bean
  public DomainEventDispatcher domainEventDispatcher(AbstractTramEventTestConfig config,
                                                     TramEventTestEventConsumer target,
                                                     MessageConsumer messageConsumer) {
    return new DomainEventDispatcher("eventDispatcherId",
            target.domainEventHandlers(),
            messageConsumer);
  }

  @Bean
  public TramEventTestEventConsumer tramEventTestTarget(AbstractTramEventTestConfig config) {
    return new TramEventTestEventConsumer();
  }

You need the following Spring @Configuration in order to publish events:

@Configuration
@Import(TramEventsPublisherConfiguration.class)
public class AbstractTramEventTestConfiguration {

}

See this example of transaction events.

Transactional commands

Transactional commands are implemented using transactional messaging.

Sending commands

Send a command using a CommandProducer:

public abstract class AbstractTramCommandTest {

  @Autowired
  private CommandProducer commandProducer;

  @Test
  public void shouldInvokeCommand() throws InterruptedException {

    String commandId = commandProducer.send("CustomerCommandChannel",
            new DoSomethingCommand(),
            "ReplyToChannel",
            Collections.emptyMap());

You also need to @Import the TramCommandProducerConfiguration @Configuration class:

@Configuration
@Import(TramCommandProducerConfiguration.class)
public class AbstractTramCommandTestConfiguration {

See this example of transactional commands.

Handling commands

First, define CommandHandlers:

public class TramCommandTestCommandHandler {

  public Message doSomething(CommandMessage<DoSomethingCommand> cm, PathVariables pvs) {
    ...
    return withSuccess();
  }

  public CommandHandlers getCommandHandlers() {
    return CommandHandlersBuilder
            .fromChannel("CustomerCommandChannel")
            .onMessage(DoSomethingCommand.class, this::doSomething)
            .build();

  }

Second, define a CommandDispatcher @Bean:

@Configuration
public class AbstractTramCommandTestConfiguration {

  @Bean
  public CommandDispatcher commandDispatcher(AbstractTramCommandTestConfig config, AbstractTramCommandTestCommandHandler target) {
  return new CommandDispatcher("customerServiceCommandDispatcher", target.getCommandHandlers());
}

@Bean
public TramCommandTestCommandHandler abstractTramCommandTestTarget(AbstractTramCommandTestConfig config) {
  return new TramCommandTestCommandHandler(config.getCommandChannel());
}

See this example of transactional commands.

Configuring the transport

JDBC/Apache Kafka transport

If you want to use JDBC/Apache Kafka @Import TramJdbcKafkaConfiguration:

@Configuration
@EnableAutoConfiguration
@Import({TramJdbcKafkaConfiguration.class})
public class JdbcKafkaTramMessageTestConfiguration {
}

In-memory transport

If you want to use JDBC/Apache Kafka @Import TramInMemoryConfiguration:

@Configuration
@EnableAutoConfiguration
@Import({TramInMemoryConfiguration.class})
public class TramInMemoryConfiguration {
}

Configuration properties

There are various configuration properties that need to be set for JDBC/Kafka transport. Here, for example, is an application.properties file:

spring.datasource.url=jdbc:mysql://${DOCKER_HOST_IP}/eventuate
spring.datasource.username=mysqluser
spring.datasource.password=mysqlpw
spring.datasource.driver.class.name=com.mysql.jdbc.driver
eventuatelocal.kafka.bootstrap.servers=${DOCKER_HOST_IP}:9092
eventuatelocal.zookeeper.connection.string=${DOCKER_HOST_IP}:2181

Running the CDC service

In addition to a database and message broker, you will need to run the Eventuate Tram CDC service. It reads events inserted into the database and publishes them to Apache Kafka. It is written using Spring Boot. The easiest way to run this service during development is to use Docker Compose. The Eventuate Tram Code Basic examples project has an example docker-compose.yml file.

cdcservice:
  image: eventuateio/eventuate-tram-cdc-mysql-service:0.4.0.RELEASE
  ports:
    - "8099:8080"
  depends_on:
    - mysql
    - kafka
    - zookeeper
  environment:
    SPRING_DATASOURCE_URL: jdbc:mysql://mysql/eventuate
    SPRING_DATASOURCE_USERNAME: mysqluser
    SPRING_DATASOURCE_PASSWORD: mysqlpw
    SPRING_DATASOURCE_DRIVER_CLASS_NAME: com.mysql.jdbc.Driver
    EVENTUATELOCAL_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
    EVENTUATELOCAL_ZOOKEEPER_CONNECTION_STRING: zookeeper:2181
    EVENTUATELOCAL_CDC_DB_USER_NAME: root
    EVENTUATELOCAL_CDC_DB_PASSWORD: rootpassword
    EVENTUATELOCAL_CDC_BINLOG_CLIENT_ID: 1234567890
    EVENTUATELOCAL_CDC_SOURCE_TABLE_NAME: message