5.6 Event Handling

Fire and consume events.

In the previous section we defined our events. Now let’s have a look at the application logic to fire and consume these.

Event overview

We will build the following event flow. Keep in mind that messages are not directly written to Kafka.

Environment

Creating Orders

New orders are received by requests to our RESTful API /shop-orders. They are handled by the ch.puzzle.mm.debezium.order.boundary.ShopOrderResource.

The required POST body corresponds to the ShopOrderDTO class.

1
2
3
4
5
6
7
8
{
  "articleOrders" : [
    {
      "articleId" : 1,
      "amount" : 3
    }
  ]
}

Task 5.6.1 - Implement order creation and fire event

The actual work for creating an order is done by method createOrder in ch.puzzle.mm.debezium.order.control.ShopOrderService class.

1
2
3
4
5
6
7
8
9
public ShopOrder createOrder(ShopOrderDTO shopOrderDTO) {
    // TODO: implementation - create ArticleOrder list

    // TODO: implementation - create new shopOrder

    // TODO: fire OrderCreatedEvent

    return new ShopOrder();
}

Finish implementation of the createOrder Method:

  • Create list of ArticleOrder entities from article list in ShopOrderDto
  • Create a new ShopOrder
    • status: Set to ShopOrderStatus.NEW
    • articleOrders: Set created list of ArticleOrder
    • persist the order
  • Fire the OrderCreatedEvent event
  • Return the newly created order
Hint list of ArticleOrder creation
1
2
3
    List<ArticleOrder> articleOrders = shopOrderDTO.articleOrders.stream()
                .map(s -> new ArticleOrder(s.articleId, s.amount))
                .collect(Collectors.toList());
Hint ShopOrder creation
1
2
3
4
5
// store order to shopOrder table
ShopOrder shopOrder = new ShopOrder();
shopOrder.setStatus(ShopOrderStatus.NEW);
shopOrder.setArticleOrders(articleOrders);
shopOrder.persist();
Hint event firing

Fire events with

1
2
// fire event (outbox table)
event.fire(new OrderCreatedEvent(Instant.now(), shopOrder));
Complete Task Hint
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@ApplicationScoped
public class ShopOrderService {

    @Inject
    Event<ExportedEvent<?, ?>> event;

    public ShopOrder createOrder(ShopOrderDTO shopOrderDTO) {
        List<ArticleOrder> articleOrders = shopOrderDTO.articleOrders.stream().map(s -> new ArticleOrder(s.articleId, s.amount)).collect(Collectors.toList());

        // store order to shopOrder table
        ShopOrder shopOrder = new ShopOrder();
        shopOrder.setStatus(ShopOrderStatus.NEW);
        shopOrder.setArticleOrders(articleOrders);
        shopOrder.persist();

        // fire event (outbox table)
        event.fire(new OrderCreatedEvent(Instant.now(), shopOrder));

        return shopOrder;
    }

    // ...
}

Cancelling Order

In this lab you can delete orders by POST to /shop-orders/{id}/status. For simplicity cancelling is only allowed if the order is in state completed (stock have been deducted).

Task 5.6.2 - Implement order cancellation and fire event

The actual work for cancelling an order is done by cancelOrder method in ch.puzzle.mm.debezium.order.control.ShopOrderService class.

  • Fetch the ShopOrder by orderId (already done)
  • Check Status (already done)
  • Set status of order to ShopOrderStatus.CANCELLED
  • Fire the OrderCancelledEvent event
  • Return the modified order
Complete Task Hint
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
@ApplicationScoped
public class ShopOrderService {

    @Inject
    Event<ExportedEvent<?, ?>> event;

    // ...

    public ShopOrder cancelOrder(long orderId) {
        ShopOrder order = ShopOrder.getByIdOrThrow(orderId);
        if (order.getStatus().canCancel()) {
            order.setStatus(ShopOrderStatus.CANCELLED);
            event.fire(new OrderCancelledEvent(Instant.now(), order));
            return order;
        } else {
            throw new IllegalStateException("Cannot cancel Order " + orderId);
        }
    }
}

Consuming Stock Events

The following configuration defines the incoming stock channel containing the messages from the Kafka topics stock-stockcomplete-events and stock-stockincomplete-events

1
mp.messaging.incoming.stock.topics=stock-stockcomplete-events,stock-stockincomplete-events

Our event consumer class is the ch.puzzle.mm.debezium.event.boundary.KafkaEventConsumer.

Task 5.6.3 - Consuming Kafka messages

Consume incoming events in KafkaEventConsumer and delegate the processing to the StockEventHandler.

Finish implementation of onMessage in ch.puzzle.mm.debezium.event.boundary.KafkaEventConsumer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public CompletionStage<Void> onMessage(KafkaRecord<String, String> message) {
    return CompletableFuture.runAsync(() -> {
        try (final Scope span = tracer.buildSpan("handle-stock-message").asChildOf(TracingKafkaUtils.extractSpanContext(message.getHeaders(), tracer)).startActive(true)) {
            // TODO: implementation - read id, eventType from headers
            // TODO: implementation - delegate message to StockEventHandler
        } catch (Exception e) {
            logger.error("Error while preparing articlestock", e);
            throw e;
        }
    }).thenRun(message::ack);
}
  • Annotate method onMessage as handler for the incoming stock channel with @Incoming(...).
  • Read the Kafka Headers id and eventType from the message using getHeadersAsString.
  • Inject the StockEventHandler and delegate processing to onStockEvent of StockEventHandler
Incoming definition Hint

Channels (incoming or outgoing) can be annotated on method level with @Incoming.

1
2
@Incoming("stock")
public CompletionStage<Void> onMessage(KafkaRecord<String, String> message) { }
Read Kafka Headers Hint

Use the provided method getHeaderAsString.

1
2
String eventId = getHeaderAsString(message, "id");
String eventType = getHeaderAsString(message, "eventType");
Calling StockEventHandler Hint
1
2
3
4
5
6
stockEventHandler.onStockEvent(
    UUID.fromString(eventId),
    eventType,
    message.getKey(),
    message.getPayload(),
    message.getTimestamp());
Complete Task Hint
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Incoming("stock")
public CompletionStage<Void> onMessage(KafkaRecord<String, String> message) {
    return CompletableFuture.runAsync(() -> {
        try (final Scope span = tracer.buildSpan("handle-stock-message").asChildOf(TracingKafkaUtils.extractSpanContext(message.getHeaders(), tracer)).startActive(true)) {
            logger.debug("Kafka message with key = {} arrived", message.getKey());

            logHeaders(message);

            String eventId = getHeaderAsString(message, "id");
            String eventType = getHeaderAsString(message, "eventType");

            stockEventHandler.onStockEvent(
                    UUID.fromString(eventId),
                    eventType,
                    message.getKey(),
                    message.getPayload(),
                    message.getTimestamp()
            );
        } catch (Exception e) {
            logger.error("Error while preparing articlestock", e);
            throw e;
        }
    }).thenRun(message::ack);
}

Task 5.6.4 - Creating an event log

The EventLog class uses the ConsumedEvent entity to keep track of processed events.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@ApplicationScoped
public class EventLog {

    private static final Logger logger = LoggerFactory.getLogger(EventLog.class);

    public void processed(UUID eventId) {
        // TODO: implementation - store
    }

    public boolean alreadyProcessed(UUID eventId) {
        // TODO: implementation - check exists
        return false;
    }
}

Transaction management

  • Ensure that processed requires a transaction by using Transactional.TxType.MANDATORY
  • Ensure that alreadyProcessed requires a transaction by using Transactional.TxType.MANDATORY
Transaction management Hint

Adding the @Transactional annotation ensures that methods throw an exception if there isn’t an existing transaction context. Annotate both methods with:

1
@Transactional(value = Transactional.TxType.MANDATORY)

Remember that we use the Panache extension and that our entities provide methods to easily query database records. You can get more details about using the Active Record Pattern in Quarkus in the Simplified Hibernate ORM with Panache Guide.

Implement both EventLog methods.

  • Implement method processed to write a new ConsumedEvent with the eventId and the current timestamp to the database.
  • Implement method alreadyProcessed to check the database for having a record with the given eventId.
Complete EventLog Hint
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
@Traced
@ApplicationScoped
public class EventLog {

  private static final Logger logger = LoggerFactory.getLogger(EventLog.class);

  @Transactional(value = Transactional.TxType.MANDATORY)
  public void processed(UUID eventId) {
    ConsumedEvent.persist(new ConsumedEvent(eventId, Instant.now()));
  }

  @Transactional(value = Transactional.TxType.MANDATORY)
  public boolean alreadyProcessed(UUID eventId) {
    logger.info("Looking for event with id {} in message log", eventId);
    return ConsumedEvent.findByIdOptional(eventId).isPresent();
  }
}

Task 5.6.5 - Processing stock events

Implement the event handling in StockEventHandler

  • Check if message is already processed by using the EventLog.
    • If so, skip the message.
  • Depending on eventType call the corresponding method in ShopOrderService
    • StockComplete: deserialize the event and call onStockCompleteEvent
    • StockIncomplete: deserialize the event and call onStockIncompleteEvent
    • If none of them: log warning about an unknown event.
  • Register the event as processed using the EventLog
StockEventHandler Hint
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
public void onStockEvent(UUID eventId, String eventType, String key, String event, Instant ts) {
    if (eventLog.alreadyProcessed(eventId)) {
        logger.info("Event with id {} was already processed, ignore.", eventId);
        return;
    }

    logger.info("Received '{}' event {} - OrderId: {}, ts: '{}'", eventType, eventId, key, ts);
    if (eventType.equalsIgnoreCase("StockComplete")) {
        shopOrderService.onStockCompleteEvent(deserialize(event));
    } else if (eventType.equalsIgnoreCase("StockIncomplete")) {
        shopOrderService.onStockIncompleteEvent(deserialize(event));
    } else {
        logger.warn("Ignoring unknown event '{}'", eventType);
    }

    eventLog.processed(eventId);
}

Task 5.6.6 - Complete the order management

For the order to be managed completely we have to implement the methods onStockCompleteEvent and onStockIncompleteEvent in our ShopOrderService.

Implement the stock complete event

  • Find ShopOrder in database using the orderId from the event
  • Set status to ShopOrderStatus.COMPLETED
OnStockCompleteEvent Hint
1
2
3
4
5
public void onStockCompleteEvent(ShopOrderStockResponse stockComplete) {
    ShopOrder.findByIdOptional(stockComplete.orderId).ifPresent(o -> {
        ((ShopOrder)o).setStatus(ShopOrderStatus.COMPLETED);
    });
}

Implement the stock incomplete event

  • Find ShopOrder in database using the orderId from the event
  • Set status to ShopOrderStatus.STOCK_INCOMPLETE
OnStockIncompleteEvent Hint
1
2
3
4
5
public void onStockIncompleteEvent(ShopOrderStockResponse stockIncomplete) {
    ShopOrder.findByIdOptional(stockIncomplete.orderId).ifPresent(o -> {
        ((ShopOrder) o).setStatus(ShopOrderStatus.STOCK_INCOMPLETE);
    });
}