Combining Apache Kafka and the Rest client

Another week, another interesting question. This week, someone asks me about combining Kafka and the Rest Client. That is a recurring subject, and most of the time, the goal is to achieve the following process:

kafka rest architecture

In other words, we want to call a remote service for each Kafka message we receive. Therefore, we have a first topic ("in") containing the data we are consuming, for instance, "transactions". Then, we have the central piece of the architecture: the processing component. It consumes the incoming transactions, and for each of them, calls a remote service. It also writes the response (produced by the remote service) to another Kafka topic "out".

Implementing this with Quarkus is straightforward, and that’s what we will cover in this post. Thanks to Reactive Messaging and the Rest Client, this should not take more than 20 lines of code!

The Remote Service

Let’s start with the remote service. Quarkus offers multiple ways to invoke a remote HTTP service, but let’s use the Rest Client as it provides an excellent way to interact with HTTP services without having to handle the low-level details of HTTP.

You can use any HTTP API, but to simplify, let’s consider a straightforward remote service, something like:

@RegisterRestClient(configKey = "transaction-service")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public interface TransactionService {

    @Path("/transactions")
    @POST
    TransactionResult postSync(Transaction transaction);

    @Path("/transactions")
    @POST
    Uni<TransactionResult> postAsync(Transaction transaction);

}

This service contains two methods calling the same HTTP endpoint. The first one is synchronous, and so blocks the caller thread until the response is received. The second is asynchronous, and the returned Uni gets the response when received. In this case, the caller thread is not blocked and can do something else. We are going to see how to use these methods later, but first, a bit of configuration. In the application.properties add:

# Configure the transaction-service (rest client)
transaction-service/mp-rest/url=http://localhost:8080

Of course, update the URL. The https://quarkus.io/guides/rest-client guide gives more details about the usage and configuration of the Rest Client.

Invoking the service for each incoming transaction

Ok, we can call our service, but remember, we want to invoke it for every incoming transaction, and these transactions come from a Kafka topic. With Reactive Messaging, there is no need to handle Kafka right now. We can focus on the logic. Let’s say we have a channel (a stream of data), in which to transit our transactions. We call this first channel in.

We also want to write the responses from the remote service into another Kafka topic. Again, no need to handle Kafka right now. Let’s say we write the responses into a channel named out.

So, we have the following (incomplete) code:

@ApplicationScoped
public class TransactionProcessor {

    @Incoming("in") // The first channel - we read from it
    @Outgoing("out") // The second channel - we write to it
    public TransactionResult sendToTransactionService(Transaction transaction) {
       // Need to call our service here
    }

}

@Incoming configures the read channel. @Outgoing configures the written channel. But, something is missing…​ we need to call our remote service:

@ApplicationScoped
public class TransactionProcessor {

    private static final Logger LOGGER = Logger.getLogger("TransactionProcessor");

    @Inject @RestClient TransactionService service;

    @Incoming("in")
    @Outgoing("out")
    @Blocking
    public TransactionResult sendToTransactionService(Transaction transaction) {
        LOGGER.infof("Sending %s transaction service", transaction);
        return service.postSync(transaction);
    }

}

First, we inject the Rest Client. Then, we just invoke it in our method.

You may wonder about @Blocking. With reactive messaging, you need to indicate when you are using blocking code, as by default, it uses an event loop architecture. While convenient, you should not abuse @Blocking, as it relies on a thread pool limiting your concurrency. But, it keeps your logic synchronous.

Using Asynchronous operations

We can get rid of @Blocking annotation by using the second method provided by the TransactionService: postAsync:

@ApplicationScoped
public class TransactionProcessor {

    private static final Logger LOGGER = Logger.getLogger("TransactionProcessor");

    @Inject @RestClient TransactionService service;

    @Incoming("in")
    @Outgoing("out")
    public Uni<TransactionResult> sendToTransactionService(Transaction transaction) {
        LOGGER.infof("Sending %s transaction service", transaction);
        return service.postAsync(transaction);
    }

}

Using the async variant of the post method allows us to remove @Blocking. We return the Uni directly. When that Uni receives the remote service’s response, it writes the value to the out channel.

Mapping channel to Kafka

So far, so good. It’s time to connect our code with Kafka. With Reactive Messaging, we map channels to connectors, here Kafka. So, we just need to configure the application to indicate that the in and out channels are Kafka topics. Once again, edit the application.properties file, and add:

mp.messaging.incoming.in.topic=transactions
mp.messaging.incoming.in.value.deserializer=org.acme.model.TransactionDeserializer
mp.messaging.incoming.in.auto.offset.reset=earliest
mp.messaging.incoming.in.enable.auto.commit=false

mp.messaging.outgoing.out.connector=smallrye-kafka
mp.messaging.outgoing.out.topic=output
mp.messaging.outgoing.out.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer

The first block is about the in channel. It’s connected to the transactions Kafka topic. The data is deserialized with a custom deserializer. The last other properties disable the auto-commit (Reactive Messaging is handling commits for you) and read the data since the last committed offset.

The second block configures the out channel. We connect it with the output Kafka topic and configure the value serializer. For this simple example, we write the data as JSON.

So, when a transaction is written to the Kafka transaction topic, it gets received by our processing component, sent to the remote service, and the result is written to the output Kafka topic:

2020-08-27 10:04:44,141 INFO  [TransactionProcessor] (vert.x-eventloop-thread-0) Sending Transaction{name='MacroHard', amount=20} transaction service
2020-08-27 10:04:44,196 INFO  [TransactionResource] (executor-thread-2) Handling transaction MacroHard / 20
2020-08-27 10:04:44,240 INFO  [TransactionProcessor] (vert.x-eventloop-thread-0) Sending Transaction{name='BlueHat', amount=10} transaction service
2020-08-27 10:04:44,245 INFO  [TransactionResource] (executor-thread-2) Handling transaction BlueHat / 10

If you look inside the output topic, you will see the TransactionResult flowing:

output

We are done!

With a few lines of code and a bit of configuration, we can read data from a Kafka topic, call a remote service, and write the result to another Kafka topic. Plain simple.

Want to try by yourself? Check out the code in this GitHub repository and follow the instructions from the readme.

Reactive Messaging and the Rest client contain other gems, check the related guides and documentation to learn more about them: