4.4 Event Driven with Kafka

Transition from RESTful to event driven messaging.

In this stage we will have a look at the setup of a microservice and we will implement the order microservice.

The tasks we need to complete are the following:

  • Update dependencies
  • Create and consume shop-order-request events
  • Create and consume shop-order-confirmation events
  • Create and consume shop-order-compensation events

Task 4.4.1 - Update dependencies

First we are going to declare the dependencies we are going to use in this chapter. Add the following dependencies to your pom.xml:

GroupId ArtifactId Description Detailed information
io.quarkus quarkus-smallrye-reactive-messaging-kafka Reactive Messaging with Apache Kafka Using Apache Kafka with Reactive Messaging
io.quarkus quarkus-smallrye-context-propagation Context Propagation in Quarkus Context Propagation in Quarkus
io.opentracing.contrib opentracing-jdbc OpenTracing JDBC Instrumentation OpenTracing JDBC Instrumentation
io.opentracing.contrib opentracing-kafka-client OpenTracing Kafka Client OpenTracing Kafka Client
Dependencies Task Hint

The following dependencies have to be added.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
<dependencies>
    ...
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-smallrye-context-propagation</artifactId>
    </dependency>
    <dependency>
        <groupId>io.opentracing.contrib</groupId>
        <artifactId>opentracing-jdbc</artifactId>
    </dependency>
    <dependency>
        <groupId>io.opentracing.contrib</groupId>
        <artifactId>opentracing-kafka-client</artifactId>
    </dependency>
    ...
</dependencies>

Task 4.4.2 - Emit the first event

We are now going event driven. Let’s create our first message and emit an event whenever a new order comes in. We create a new @ApplicationScoped class ch.puzzle.mm.kafka.order.order.boundary.ShopOrderRequestProducer in the order microservice for emitting events to our shop-order-request topic. In order to to emit messages to a topic manually we have to define a channel. To send things (payload or Message) from imperative code to a specific channel you need to use: @Channel annotation on an Emitter type field. The @Channel lets you indicate to which channel you are going to send your payloads or messages. The Emitter is the object to use to send these payloads or messages.

In the ShopOrderRequestProducer class we define a @Channel("shop-order-request") on an Emitter<ShopOrderDTO> emitter. We then create a method which creates the requests and takes a ShopOrderDTO as a parameter. The method simply emits an event to the shop-order-request channel with emitter.send(Message.of(shopOrderDTO)).

Emit Event Task Hint

The class should look like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@ApplicationScoped
public class ShopOrderRequestProducer {

    @Inject
    @Channel("shop-order-request")
    Emitter<ShopOrderDTO> emitter;

    @Traced
    public void createRequest(ShopOrderDTO shopOrderDTO) {
      emitter.send(Message.of(shopOrderDTO));
    }
}

We need to update configurations to establish the communication to our message broker and define the connectors from the channels to the topics in our message broker:

Add the following properties to the application.properties file:

1
2
3
4
5
6
7
kafka.bootstrap.servers=kafka:9092

mp.messaging.outgoing.shop-order-request.connector=smallrye-kafka
mp.messaging.outgoing.shop-order-request.topic=shop-order-request
mp.messaging.outgoing.shop-order-request.key.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer
mp.messaging.outgoing.shop-order-request.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer
mp.messaging.outgoing.shop-order-request.incerceptor.classes=io.opentracing.contrib.kafka.TracingProducerInterceptor

In order to have our OpenTracing traces injected into the message headers we will need to add some code around the message emitter to make it work properly. Update your class to the following:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@ApplicationScoped
public class ShopOrderRequestProducer {

    @Inject
    @Channel("shop-order-request")
    Emitter<ShopOrderDTO> emitter;

    @Inject
    Tracer tracer;

    @Traced
    public void createRequest(ShopOrderDTO shopOrderDTO) {
        HeadersMapInjectAdapter headersMapInjectAdapter = new HeadersMapInjectAdapter();
        try (Scope scope = tracer.buildSpan("create-request").startActive(true)) {
            tracer.inject(scope.span().context(), Format.Builtin.TEXT_MAP, headersMapInjectAdapter);
            OutgoingKafkaRecordMetadata<ShopOrderDTO> metadata = OutgoingKafkaRecordMetadata.<ShopOrderDTO>builder()
                    .withKey(shopOrderDTO)
                    .withTopic("shop-order-request")
                    .withHeaders(headersMapInjectAdapter.getRecordHeaders())
                    .build();
            emitter.send(Message.of(shopOrderDTO, Metadata.of(metadata)));
        }
    }
}

Now we can simply replace the REST call we made with help of the rest-client in the previous chapter with the emitting of the event by injecting the ShopOrderRequestProducer into our ShopOrderService and using its createRequest method to emit an event to the kafka topic. Please remove the update of the status flag as well from the ShopOrderService::createShopOrder. The updating of the status flag in the order should be done when we received the confirmation event from the stock microservice.

Emit event

The createOrder function could look like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
    public ShopOrder createOrder(ShopOrderDTO shopOrderDTO) {
        List<ArticleOrder> articleOrders = shopOrderDTO.articleOrders.stream().map(s -> new ArticleOrder(s.articleId, s.amount)).collect(Collectors.toList());
        articleOrders.forEach(articleOrder -> articleOrder.persist());

        // store order to shopOrder table
        ShopOrder shopOrder = new ShopOrder();
        shopOrder.setStatus(ShopOrderStatus.NEW);
        shopOrder.setArticleOrders(articleOrders);
        shopOrder.persist();
        shopOrderDTO.id = shopOrder.id;

        // fire event
        shopOrderRequestProducer.createRequest(shopOrderDTO);

        return shopOrder;
    }

Task 4.4.3 - Consume the first event

When we create a valid shop-order-request to the topic, the stock microservice will handle the event and confirm the order in case of succession or compensate the order in case of failure. It then emits events to either the shop-order-confirmation topic or the shop-order-compensation topic. Let’s create our consumer for these two cases in the order microservice.

Create a class ch.puzzle.mm.kafka.order.order.boundary.ShopOrderConfirmationConsumer which will handle all incoming events in the shop-order-confirmation topic. Define a function CompletionStage<Void> consumeOrders(Message<ShopOrderDTO> message). We annotate the consumeOrders function with @Incoming("shop-order-confirmation") to indicate that incoming events in the shop-order-confirmation channel will be handled by this function. Inside the function we want our ShopOrderService to confirm the order, which updates the status to COMPLETED. Be careful though, the changing of the status will follow in a database commit which is a blocking operation. With quarkus reactive messaging blocking operations are only allowed in reactive context when executed from a worker thread. Spawning worker threads can be achieved by using the quarkus-smallrye-context-propagation class SmallRyeManagedExecutor which propagates the context properly into the new threads.

The consumer could look something like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18

@ApplicationScoped
@Traced
public class ShopOrderConfirmationConsumer {

    @Inject
    ShopOrderService shopOrderService;

    @Inject
    SmallRyeManagedExecutor executor;

    @Incoming("shop-order-confirmation")
    public CompletionStage<Void> consumeOrders(Message<ShopOrderDTO> message) {
        executor.runAsync(() -> shopOrderService.confirmOrder(message.getPayload().id));
        return message.ack();
    }
}

To connect the defined channel in the @Inbound annotation to the kafka broker we need to add a few lines to the application.properties:

1
2
3
4
5
6
7

mp.messaging.incoming.shop-order-confirmation.connector=smallrye-kafka
mp.messaging.incoming.shop-order-confirmation.topic=shop-order-confirmation
mp.messaging.incoming.shop-order-confirmation.value.deserializer=ch.puzzle.mm.kafka.order.order.boundary.ShopOrderDeserializer
mp.messaging.incoming.shop-order-confirmation.interceptor.classes=io.opentracing.contrib.kafka.TracingConsumerInterceptor


We add some boilerplate code around the consumer to make the propagation of the tracing headers work:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

@ApplicationScoped
@Traced
public class ShopOrderConfirmationConsumer {

    private final Logger logger = LoggerFactory.getLogger(ShopOrderConfirmationConsumer.class.getName());

    @Inject
    ShopOrderService shopOrderService;

    @Inject
    Tracer tracer;

    @Inject
    SmallRyeManagedExecutor executor;

    @Incoming("shop-order-confirmation")
    public CompletionStage<Void> consumeOrders(Message<ShopOrderDTO> message) {
        Optional<IncomingKafkaRecordMetadata> metadata = message.getMetadata(IncomingKafkaRecordMetadata.class);
        if (metadata.isPresent()) {
            SpanContext extract = tracer.extract(Format.Builtin.TEXT_MAP, new HeadersMapExtractAdapter(metadata.get().getHeaders()));
            try (Scope scope = tracer.buildSpan("consume-confirmation").asChildOf(extract).startActive(true)) {
                executor.runAsync(() -> shopOrderService.confirmOrder(message.getPayload().id));
                return message.ack();
            }
        }
        return message.nack(new RuntimeException());
    }
}

Extend the ShopOrderService with the confirmOrder method:

1
2
3
4
5
6
7
8

    @Transactional
    public void confirmOrder(Long id) {
        ShopOrder shopOrder = ShopOrder.findById(id);
        shopOrder.setStatus(ShopOrderStatus.COMPLETED);
        shopOrder.persistAndFlush();
    }

Task 4.4.4 - Compensation

Now it is up to you to do the same for the shop-order-compensation events. Create a consumer listening on the “shop-order-compensation” topic to compensate your failed orders. Create a class ShopOrderCompensationConsumer in the same package as the ShopOrderConfirmationConsumer and update your application.properties to connect your defined connectors to the kafka broker.

Create a class ch.puzzle.mm.kafka.order.order.boundary.ShopOrderCompensationConsumer which will confirm and compensate orders.

Compensation consumer

The class should look like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@ApplicationScoped
@Traced
public class ShopOrderCompensationConsumer {

    @Inject
    ShopOrderService shopOrderService;

    @Inject
    Tracer tracer;

    @Inject
    SmallRyeManagedExecutor executor;

    @Incoming("shop-order-compensation")
    public CompletionStage<Void> consumeOrders(Message<ShopOrderDTO> message) {
        Optional<IncomingKafkaRecordMetadata> metadata = message.getMetadata(IncomingKafkaRecordMetadata.class);
        if (metadata.isPresent()) {
            SpanContext extract = tracer.extract(Format.Builtin.TEXT_MAP, new HeadersMapExtractAdapter(metadata.get().getHeaders()));
            try (Scope scope = tracer.buildSpan("consume-compensation").asChildOf(extract).startActive(true)) {
                compensateOrder(message.getPayload());
                return message.ack();
            }
        }
        return message.nack(new RuntimeException());
    }

    private void compensateOrder(ShopOrderDTO shopOrderDTO) {
        executor.runAsync(() -> shopOrderService.compensateOrder(shopOrderDTO.id));
    }
}

Do not forget to update your application.properties as well to connector your defined connector @Incoming("shop-order-compensation") to your kafka broker and topic.

1
2
3
4
mp.messaging.incoming.shop-order-compensation.connector=smallrye-kafka
mp.messaging.incoming.shop-order-compensation.topic=shop-order-compensation
mp.messaging.incoming.shop-order-compensation.value.deserializer=ch.puzzle.mm.kafka.order.order.boundary.ShopOrderDeserializer
mp.messaging.incoming.shop-order-compensation.interceptor.classes=io.opentracing.contrib.kafka.TracingConsumerInterceptor

Task 4.4.5 - Add business to your service

In the business parts of the ShopOrderService you will add logic to confirm and compensate your orders! Add the two following functions to your service:

  • ShopOrderService::compensateOrder: Find order with id id, set status to ShopOrderStatus.INCOMPLETE and save and flush.
  • ShopOrderService::confirmOrder: Find order with id id, set status to ShopOrderStatus.COMPLETED and save and flush.
Business hint

These two functions could look like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@Transactional
public void compensateOrder(Long id) {
    ShopOrder shopOrder = ShopOrder.findById(id);
    shopOrder.setStatus(ShopOrderStatus.INCOMPLETE);
    shopOrder.persistAndFlush();
}

@Transactional
public void confirmOrder(Long id) {
    ShopOrder shopOrder = ShopOrder.findById(id);
    shopOrder.setStatus(ShopOrderStatus.COMPLETED);
    shopOrder.persistAndFlush();
}