Concurrent asynchronous actions with Mutiny
This week, I’ve been asked about a widespread use case around concurrency. This user wanted to call two microservices in parallel, and when both results are received, join them and continue the processing. Basically, the following pattern:
In a non-reactive approach, both calls would block the caller thread, and, except if you use a worker thread pool, the calls are not concurrent. Even if you use a worker thread pool, these threads are likely blocked, consuming resources for nothing.
But no worries, Quarkus reactive nature and Mutiny have everything to handle this scenario.
Let’s call two services
In this post, I’m going to use the Vert.x Web Client, a reactive HTTP client. It leverages non-blocking I/O to be highly performant and truly non-blocking. It does not rely on a hidden thread pool. You can also use the Quarkus Rest Client, but, at the moment, it still uses worker threads.
No matter which client we use, we need some remote services to call. Let’s use:
-
https://programming-quotes-api.herokuapp.com/ - to get a quote about programming
-
https://api.chucknorris.io/ - to get a quote about Chuck Norris
First thing first, let’s see the code required to retrieve our quotes: While both services are similar, the structure of the response differs a little bit. So we end up with:
private static Uni<String> getProgrammingQuote(WebClient client) {
return client.getAbs(PROGRAMMING_QUOTE)
.as(BodyCodec.jsonObject())
.send()
.onItem().transform(r -> r.body().getString("en") + " (" + r.body().getString("author") + ")");
}
private static Uni<String> getChuckNorrisQuote(WebClient client) {
return client.getAbs(CHUCK_NORRIS_QUOTE)
.as(BodyCodec.jsonObject())
.send()
.onItem().transform(r -> r.body().getString("value"));
}
These two methods receive a WebClient
, invoke the services, retrieve the
JSON responses, and extract them. They both return a Uni
. So they are
asynchronous. The result (the quote) is provided later when available.
Also, returning a Uni
means that the services will only be invoked when
someone subscribes to the returned Uni
. If you subscribe multiple times,
you will call the service multiple times.
Combining Unis
So far, we have two methods to call our services. But we want to call them concurrently, as depicted above.
Mutiny provides a way to combine items produced by Unis
:
Uni<Tuple2<String, String>> tuple = Uni.combine().all()
.unis(getProgrammingQuote(client), getChuckNorrisQuote(client))
.asTuple();
When someone subscribes to the Uni tuple
, it subscribes to the
getProgrammingQuote(client)
and getChuckNorrisQuote(client)
Unis
,
which invoke the services. So the requests are emitted, and the services
are invoked concurrently.
When both responses are available, it combines them into a Tuple, a simple structure carrying multiple items.
In other words, concurrently calling our services is pretty
straightforward. Just create the Unis
representing the services or the
asynchronous action you want to achieve and combine them using
Uni.combine().all()
You can decide to combine the results using tuples
or to use a combinator function.
Putting everything together
// Create a Web Client
WebClient client = WebClient.create(vertx);
// Combine the result of our 2 Unis in a tuple
Uni.combine().all()
.unis(getProgrammingQuote(client), getChuckNorrisQuote(client))
.asTuple()
// Subscribe (which will trigger the calls)
.subscribe().with(tuple -> {
System.out.println("Programming Quote: " + tuple.getItem1());
System.out.println("Chuck Norris Quote: " + tuple.getItem2());
});
That’s it! If you want to see this code in action, check this gist. You even can run it directly with JBang:
jbang https://gist.github.com/cescoffier/1ed68bef12b798529e10350f77686e9a
Enjoy!