Peeking at the streams
Mutiny is an event-driven Reactive Programming library. Like other reactive programming libraries, it uses streams as the primary construct. These streams convey events, and your code processes these events. Most of the time, your code is only interested in item and failure events. But there are other kinds of events such as cancellation, request, completion, and so on.
It’s not rare that you need to look at these various events to understand better what’s going on or implement specific side effects. For example, you may need to close a resource after a completion event or log a message on failure or cancellation.
For each kind of event, there is an associated group providing the methods
to handle that specific event: onItem()
, onFailure()
, onCompletion()
and so on. These groups provide two methods to peek at the various events
without impacting its distribution: invoke
and call
. It does not
transform the received event; it notifies you that something happened and
let you react.
The invoke method
The invoke
method is synchronous and does not return anything. Mutiny
invokes the configured callback when the observed stream dispatches the
event:
Uni<Integer> u = uni.onItem()
.invoke(i -> System.out.println("Received item: " + i));
Multi<Integer> m = multi.onItem()
.invoke(i -> System.out.println("Received item: " + i));
As said above, invoke
is synchronous. Mutiny invokes the callback and
propagates the event downstream when the callback returns. It blocks the
dispatching.
Of course, we highly recommend you not to block.
The following snippets show how you can log the different types of events.
multi
.onSubscribe().invoke(() -> System.out.println("⬇️ Subscribed"))
.onItem().invoke(i -> System.out.println("⬇️ Received item: " + i))
.onFailure().invoke(f -> System.out.println("⬇️ Failed with " + f))
.onCompletion().invoke(() -> System.out.println("⬇️ Completed"))
.onCancellation().invoke(() -> System.out.println("⬆️ Cancelled"))
.onRequest().invoke(l -> System.out.println("⬆️ Requested: " + l))
The arrows from the previous code snippet indicate if the event comes from the upstream (source) or downstream (consumer). |
The invoke
method does not change the event, except in one case. If the
invoke
callback throws an exception, the downstream does not get the
actual event but get a failure event instead.
When observing the failure event, if the callback throws an exception,
Mutiny propagates a CompositeException aggregating the original failure
and the callback failure.
|
The call method
Unlike invoke
, call
is asynchronous, and the callback returns a Uni<?>
object.
call
is often used when you need to implement asynchronous side-effects,
such as closing resources.
Mutiny does not dispatch the original event downstream until the Uni returned by the callback emits an item:
multi
.onItem().call(i ->
Uni.createFrom().nullItem()
.onItem().delayIt().by(Duration.ofSeconds(1))
)
As shown in the previous snippet, you can use this approach to delay items. But the primary use case is about completing asynchronous actions:
multi
.onCompletion().call(() -> resource.close())
Under the hood, Mutiny gets the Uni
(by invoking the callback) and subscribes to it.
It observes the item or failure event from that Uni
.
It discards the item value as only the emission matters in this case.
If the callback throws an exception or the produced Uni
produces a failure, Mutiny propagates that failure (or a CompositeException
) downstream, replacing the original event.
Summary
The invoke
and call
methods are handy when you need to observe a stream
without changing the transiting events.
Use invoke
for implementing synchronous side-effects or logging events.
The asynchronous nature of call
makes it perfect for implementing
asynchronous side-effects, such as closing resources, flushing data, delay
items, and so on.
The following table highlights the key differences:
Method |
|
|
Nature |
synchronous |
asynchronous |
Return type |
|
|
Main Use cases |
logging |
closing resources, flushing data |
These methods are available for every kind of event in the associated group.