Emitter - Bridging the imperative and the reactive worlds
In a previous blog post about Kafka and Avro, we used an emitter to send Kafka messages.
In this post, we are going look at this emitter construct a little bit more closely.
Injecting an Emitter
Injecting an emitter is straightforward. You indicate the targeted channel, i.e., where do you send your messages:
@Inject @Channel("movies") Emitter<Movie> emitter;
Remember that reactive messaging uses channels as a primary abstraction. They can be in-memory channels or mapped to a remote broker.
In the previous code snippet, we inject an Emitter<Movie>
. It means you
will send messages containing movies as payload. So, the specified type is
the payload type. That lets you send: payloads directly (wrapped
automatically in a message) or more detailed messages containing a movie as
payload:
Movie movie = ...
// Send payloads directly
emitter.send(movie);
// Send messages
emitter.send(Message.of(movie));
Sending payloads
Sending payload is the simplest way to send data. You just pass the payload
to the send
method like an instance of Movie
. Under the hood, it just
creates a simple Message
wrapping the payload.
When used with payload, the send
method returns a CompletionStage
indicating if the message processing succeeded or failed:
emitter.send(movie)
.whenComplete((success, failure) -> {
if (failure != null) {
System.out.println("D'oh! " + failure.getMessage());
} else {
System.out.println("Message processed successfully");
}
});
Processing, and will see later event the emission, happens asynchronously.
So, the returned CompletionStage
lets you know when the message is
processed. The CompletionStage
is completed successfully when the message
is acknowledged. Most of the time, it means that the processing has been
completed smoothly, or the message has been sent to a broker successfully.
If something wrong happens, the CompletionStage
is completed
exceptionally. The passed exception gives you an idea of the reason.
Sending messages
While sending payloads is more straightforward, sometimes you want to attach metadata to the message, like configuring how it should be written in Kafka, tracing information, etc. The emitter also allows sending messages, and so attach the metadata you want. In the following example, we configure the outbound Kafka record. We set the key, the topic, and so on. That way, you can dispatch messages to different topics and even decide dynamically:
OutgoingKafkaRecordMetadata<?> metadata = OutgoingKafkaRecordMetadata.builder()
.withTopic("movies")
.withKey(movie.getYear())
.build();
emitter.send(Message.of(movie).addMetadata(metadata));
Emissions are asynchronous
Emitters form a bridge between the imperative and the reactive worlds. When you emit a message, this message is not processed immediately. The downstream component consuming the message are part of a Reactive Streams. Passing the message immediately would violate the Reactive Streams protocol. We must be sure that the downstream components are ready to accept this message. As a result, the emitter is not pushing the message directly, but enqueue it in a buffer used to handle the downstream capacity (requests in Reactive Streams lingo).
The downstream component receives the messages according to the requests it makes, ensuring its capacity is never exceeded.
Overflow management
But with buffer comes… overflow. If you emit too many messages and the downstream cannot keep up, the messages are stored in the buffer until it reaches its maximum capacity. Then, you cannot emit anymore, and attempting to emit will throw exceptions. But what can we do in this case? When injecting the emitter, you can configure an Overflow strategy. For example, you can set the buffer size, use an unbounded buffer, drop the messages, fail, or just ignore the back pressure and let the downstream handle it. By default, it uses a buffer, but depending on your use case, you may want to configure it differently:
@Inject
@Channel("movies")
@OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 1000)
Emitter<Movie> emitter1;
@Inject
@Channel("movies")
@OnOverflow(value = OnOverflow.Strategy.NONE)
Emitter<Movie> emitter2;
@Inject
@Channel("movies")
@OnOverflow(value = OnOverflow.Strategy.UNBOUNDED_BUFFER)
Emitter<Movie> emitter3;
Conclusion
This post is a brief introduction to the Emitter
construct from Reactive
Messaging. More information is available on the
SmallRye
Reactive Messaging documentation.
In the next Quarkus version (1.9), this feature will be improved with two very nice enhancements. First, it will offer a Mutiny variant, easing the integration with Mutiny APIs. Then, it would be possible for the Kafka case to directly emit key/value pairs without needing to use metadata.
Stay tuned! Will will cover these in a follow-up post!