Mutiny - Async for bare mortal
Mutiny is an intuitive, reactive programming library. It is the primary model to write reactive applications with Quarkus.
An event-driven reactive programming API
Mutiny is very different from the other reactive programming libraries. It takes a different approach to design your program. With Mutiny everything is event-driven: you receive events, and you react to them. This event-driven aspect embraces the asynchronous nature of distributed systems and provides an elegant and precise way to express continuation.
Mutiny offers two types that are both event-driven and lazy:
-
A
Uni
emits a single event (an item or a failure). Unis are convenient to represent asynchronous actions that return 0 or 1 result. A good example is the result of sending a message to a message broker queue. -
A
Multi
emits multiple events (n items, 1 failure or 1 completion). Multis can represent streams of items, potentially unbounded. A good example is receiving messages from a message broker queue.
These two types allow representing any type of interactions. They are event sources. You observe them (subscription) and you get notified when they emit an item, a failure, or, in the case of a bounded Multi, a completion event. When you (the subscriber) receive the event, you can process it (e.g., transform it, filter it). With Mutiny, you are going to write code like onX().action(), which reads as “on item X do action”.
If you want to know more about Mutiny, and the concepts behind it, check the Mutiny Reference documentation.
Mutiny in Quarkus
Mutiny is the primary API when dealing with the reactive features from Quarkus. It means that most extensions support Mutiny either by exposing an API returning Unis and Multis (such as reactive data sources or rest clients) or understanding when your methods return a Uni or a Multi (such as RESTEasy Reactive or Reactive Messaging).
These integrations make Mutiny a prominent and cohesive model for every reactive application developed with Quarkus. In addition, Mutiny architecture allows fine-grain dead-code elimination which improves the memory usage when compiled to native (such as with Quarkus native mode or GraalVM native image compiler).
Why another reactive programming API?
Seasoned reactive developers may wonder why Quarkus introduced yet another reactive programming APIs while there are existing ones. Mutiny is taking a very different angle:
Event-Driven - Mutiny places events at the core of its design. With Mutiny, you observe events, react to them, and create elegant and readable processing pipelines. A Ph.D. in functional programming is not required.
Navigable - Even with intelligent code completion, classes with hundreds of methods are confusing. Mutiny provides a navigable and explicit API driving you towards the operator you need.
Non-Blocking I/O - Mutiny is the perfect companion to tame the asynchronous nature of applications with non-blocking I/O (which powers Quarkus). Declaratively compose operations, transform data, enforce progress, recover from failures, and more.
Made for an asynchronous world - Mutiny can be used in any asynchronous application such as event-driven microservices, message-based applications, network utilities, data stream processing, and of course… reactive applications!
Reactive Streams and Converters Built-In - Mutiny is based on the Reactive Streams specification, and so it can be integrated with any other reactive programming library. In addition, it proposes converters to interact with other popular libraries.
Mutiny and the I/O Threads
Quarkus is powered by a reactive engine, and when developing a reactive application, your code is executed on one of the few I/O threads. Remember, you must never block these threads, and the model would collapse if you do. So, you can’t use blocking I/O. Instead, you need to schedule the I/O operation and pass a continuation.
The Mutiny event-driven paradigm is tailored for this. When the I/O operation completes successfully, the Uni that represents it emits an item event. When it fails, it emits a failure event. The continuation is simply and naturally expressed using the event-driven API.
Mutiny through Examples
Many Quarkus extensions expose Mutiny APIs. In this section, we use the MongoDB extension to illustrate how to use Mutiny.
Let’s imagine a simple structure representing an element from the periodic table:
public class Element {
public String name;
public String symbol;
public int position;
public Element(String name, String symbol, int position) {
this.name = name;
this.symbol = symbol;
this.position = position;
}
public Element() {
// Use by JSON mappers
}
}
This structure contains the name, symbol, and position of the element. To retrieve and store elements into a Mongo collection, you can use the following code:
@ApplicationScoped
public class ElementService {
final ReactiveMongoCollection<Element> collection;
@Inject
ElementService(ReactiveMongoClient client) {
collection = client.getDatabase("quarkus")
.getCollection("elements", Element.class);
}
public void add(Element element) {
Uni<InsertOneResult> insertion = collection.insertOne(element);
insertion
.onItem().transform(r -> r.getInsertedId().asString())
.subscribe().with(
result -> System.out.println("inserted " + result),
failure -> System.out.println("D'oh" + failure));
}
public void getAll() {
collection.find()
.subscribe().with(
element -> System.out.println("Element: " + element),
failure -> System.out.println("D'oh! " + failure),
() -> System.out.println("No more elements")
);
}
}
First, the Mongo client is injected. Note that it uses the reactive variant
(io.quarkus.mongodb.reactive.ReactiveMongoClient
). In the initialize
method, we retrieve and store the collection in which elements will be
inserted.
The add
method inserts an element in the collection. It receives the
element as a parameter and uses the reactive API of the collection.
Interacting with the database involves I/Os. The reactive principles forbid
blocking while waiting for the interaction to complete. Instead, we
schedule the operation and pass a continuation. The insertOne
method
returns a Uni, i.e., an asynchronous operation. That’s the scheduled
I/O. We now need to express the continuation, which is done using the
.onItem()
method. .onItem()
allows configuring what needs to happen
when the observed Uni emits an item, in our case when the scheduled I/Os
completes. In this example, we extract the inserted document id. The final
step is the subscription. Without it, nothing would ever
happen. Subscribing triggers the operation. The subscription method can
also define handlers: the id
value on success, or a failure when the
insertion fails.
Let’s now look at the second method. It retrieves all the stored elements.
In this case, it returns multiple items (one per stored element), so we are
using a Multi
. As for the insertion, getting the stored elements involves
I/Os. The find
is our operation. As for Uni, you need to subscribe to
trigger the operation. The subscriber receives item events, a failure
event, or a completion event when all the elements have been received.
Subscribing to a Uni or a Multi is essential, as without it, the operation is never executed. In Quarkus some extensions deal with the subscription for you. For example, in RESTEasy Reactive your HTTP methods can return a Uni or a Multi, and RESTEasy Reactive handles the subscription.
Mutiny Patterns
The example from the last section was simplistic on purpose. Let’s have a look at a few common patterns.
Observing events
You can observe the various kind of events using:
on{event}().invoke(ev → System.out.println(ev));
For example, for items use: onItem().invoke(item → …);
For failure, use: onFailure().invoke(failure → …)
The invoke
method is synchronous. Sometimes you need to execute an
asynchronous action. In this case use call
, as in: onItem().call(item →
someAsyncAction(item))
. Note that call
does not change the item, it just
calls an asynchronous action, and when this one completes, it emits the
original item downstream.
Transforming item
The first instrumental pattern consists of transforming the item events you receive. As we have seen in the previous section, it could indicate the successful insertion, or the elements stored in the database.
Transforming an item is done using: onItem().transform(item → ….)
.
More details about transformation can be found in the Mutiny documentation.
Sequential composition
Sequential composition allows chaining dependent asynchronous
operations. This is achieved using onItem().transformToUni(item → …)
.
Unlike transform
, the function passed to transformToUni
returns a Uni.
Uni<String> uni1 = …
uni1
.onItem().transformToUni(item -> anotherAsynchronousAction(item));
More details about asynchronous transformation can be found in the Mutiny documentation.
Failure handling
So far we only handle the item events, but handling failure is
essential. You can handle failures using onFailure()
.
For example, you can recover with a fallback item using
onFailure().recoverWithItem(fallback)
:
Uni<String> uni1 = …
uni1
.onFailure().recoverWithItem(“my fallback value”);
You can also retry the operation such as in:
Uni<String> uni1 = …
uni1
.onFailure().retry().atMost(5);
More info about failure recovery can be found on the handling failure documentation and the retrying on failures documentation.
Events and Actions
The following tables list the events that you can receive for Uni and Multi. Each of them is associated with its own group (onX). The second table lists the classic action you can do upon an event. Note that some groups offer more possibilities.
Events from the upstream |
Events from the downstream |
|
Uni |
Subscription (1), Item (0..1), failure (0..1) |
Cancellation |
Multi |
Subscription (1), Item (0..n), failure (0..1), completion (0..1) |
Cancellation, Request |
Check the full list of events on the event documentation.
Action |
API |
Comment |
transform |
|
Transform the event into another event using a synchronous function. The downstream receives the result of the function (or a failure if the transformation failed). |
transformToUni |
|
Transform the event into another event using an asynchronous function. The downstream receives the item emitted by the produced Uni (or a failure if the transformation failed). If the produced Uni emits a failure, that failure is passed to the downstream. |
invoke |
|
Invokes the synchronous consumer. This is particularly convenient to execute side effects actions. The downstream receives the original event, or a failure if the consumer throws an exception |
call |
|
Invokes the asynchronous function. This is particularly convenient to execute asynchronous side effect actions.The downstream receives the original event, or a failure if the consumer throws an exception or if the produced Uni emits a failure. |
fail |
|
Emits a failure when it receives the event. |
complete (Multi only) |
|
Emits the completion event when it receives the event. |
Other patterns
Mutiny provides lots of other features. Head over to the Mutiny documentation to see many more examples, including the whole list of events and how to handle them.
Some frequently asked guides are the following:
-
merge vs. concatenation - https://smallrye.io/smallrye-mutiny/latest/guides/merging-and-concatenating-streams/
-
controlling the emission thread - https://smallrye.io/smallrye-mutiny/latest/guides/emit-on-vs-run-subscription-on/
-
explicit blocking - https://smallrye.io/smallrye-mutiny/latest/guides/imperative-to-reactive/
Shortcuts
When using Uni, having to write onItem()
can be cumbersome. Fortunately,
Mutiny provides a set of shortcut to make your code more concise:
Shortcut |
Equivalent |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Reactive Streams
Mutiny uses Reactive Streams. Multi
implements Publisher
and enforces the back-pressure protocol. Emissions
are constrained by the request emitted from the downstream subscribers.
Thus, it does not overload the subscribers. Note that in some cases, you
can’t follow this protocol (because the Multi emits events that can’t be
controlled, such as time, or measures sensors). In this case, Mutiny
provides a way to control the overflow using onOverflow()
.
Uni
does not implement Reactive Streams Publisher
. A Uni
can only
emit one event, so subscribing to the Uni
is enough to express your intent
to use the result and does not need the request protocol ceremony.
Mutiny and Vert.x
Vert.x is a toolkit to build reactive applications and systems. It provides a huge ecosystem of libraries following the reactive principles (i.e., non-blocking and asynchronous). Vert.x is an essential part of Quarkus, as it provides its reactive capabilities.
In addition, the whole Vert.x API can be used with Quarkus. To provide a cohesive experience, the Vert.x API is also available using a Mutiny variant, i.e., returns Uni and Multi.
More details about this API can be found on: https://quarkus.io/blog/mutiny-vertx/.
Mutiny Integration in Quarkus
The integration of Mutiny in Quarkus goes beyond just the library. Mutiny exposes hooks that allow Quarkus and Mutiny to be closely integrated:
-
Calling
await
ortoIterable
would fail if you are running on an I/O thread, preventing blocking the I/O thread; -
The
log()
operator use the Quarkus logger; -
The default Mutiny thread pool is the Quarkus worker thread pool;
-
Context Propagation is enabled by default when using Mutiny Uni and Multi
More details about the infrastructure integration can be found on https://smallrye.io/smallrye-mutiny/latest/guides/framework-integration/.