Quarkus Virtual Thread support with Reactive Messaging
This guide explains how to benefit from Java virtual threads when writing message processing applications in Quarkus.
This guide focuses on using virtual threads with Reactive Messaging extensions. Please refer to Writing simpler reactive REST services with Quarkus Virtual Thread support to read more about Java virtual threads in general and the Quarkus Virtual Thread support for REST services. For reference guides of specific Reactive Messaging extensions see Apache Kafka Reference Guide, Reactive Messaging AMQP 1.0 Connector, Reactive Messaging RabbitMQ Connector or Apache Pulsar Reference Guide. |
By default, Reactive Messaging invokes message processing methods on an event-loop thread. See the Quarkus Reactive Architecture documentation for further details on this topic. But, you sometimes need to combine Reactive Messaging with blocking processing such as calling external services or database operations. For this, you can use the @Blocking annotation indicating that the processing is blocking and should be run on a worker thread. You can read more on the blocking processing in SmallRye Reactive Messaging documentation.
The idea behind Quarkus Virtual Thread support for Reactive Messaging is to offload the message processing on virtual threads, instead of running it on an event-loop thread or a worker thread.
To enable virtual thread support on a message consumer method, simply add the @RunOnVirtualThread annotation to the method. If the JDK is compatible (Java 19 or later versions, we recommend 21+) then each incoming message will be offloaded to a new virtual thread. It will then be possible to perform blocking operations without blocking the platform thread upon which the virtual thread is mounted.
Example using the Reactive Messaging Kafka extension
Let’s see an example of how to process Kafka records on virtual threads. First, make sure to have a reactive messaging extension dependency to your build file:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
implementation("io.quarkus:quarkus-smallrye-reactive-messaging-kafka")
You also need to make sure that you are using Java 19 or later (we recommend
21+), this can be enforced in your pom.xml
file with the following:
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
</properties>
Run the application with:
java -jar target/quarkus-app/quarkus-run.jar
or to use the Quarkus Dev mode, insert the following to the
quarkus-maven-plugin
configuration:
<maven.compiler.release>21</maven.compiler.release>
Then you can start using the annotation @RunOnVirtualThread
on your
consumer methods also annotated with @Incoming
. In the following example
we’ll use the RESTEasy Reactive REST Client
to make a blocking call to a REST endpoint:
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import io.smallrye.common.annotation.RunOnVirtualThread;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class PriceConsumer {
@RestClient (2)
PriceAlertService alertService;
@Incoming("prices")
@RunOnVirtualThread (1)
public void consume(double price) {
if (price > 90.0) {
alertService.alert(price); (3)
}
}
@Outgoing("prices-out") (4)
public Multi<Double> randomPriceGenerator() {
return Multi.createFrom().<Random, Double>generator(Random::new, (r, e) -> {
e.emit(r.nextDouble(100));
return r;
});
}
}
1 | @RunOnVirtualThread annotation on the @Incoming method ensures that the
method will be called on a virtual thread. |
2 | the REST client stub is injected with the @RestClient annotation. |
3 | alert method blocks the virtual thread until the REST call returns. |
4 | This @Outgoing method generates random prices and writes them a Kafka
topic to be consumed back by the application. |
Note that by default Reactive Messaging message processing happens
sequentially, preserving the order of messages. In the same way,
@Blocking(ordered = false)
annotation changes this behaviour, using
@RunOnVirtualThread
enforces concurrent message processing without
preserving the order.
Use the @RunOnVirtualThread annotation
Methods signatures eligible to @RunOnVirtualThread
Only method can be annotated with @Blocking
can use
@RunOnVirtualThreads
. The eligible method signatures are:
-
@Outgoing("channel-out") O generator()
-
@Outgoing("channel-out") Message<O> generator()
-
@Incoming("channel-in") @Outgoing("channel-out") O process(I in)
-
@Incoming("channel-in") @Outgoing("channel-out") Message<O> process(I in)
-
@Incoming("channel-in") void consume(I in)
-
@Incoming("channel-in") Uni<Void> consume(I in)
-
@Incoming("channel-in") Uni<Void> consume(Message<I> msg)
-
@Incoming("channel-in") CompletionStage<Void> consume(I in)
-
@Incoming("channel-in") CompletionStage<Void> consume(Message<I> msg)
Use of @RunOnVirtualThread annotation on methods and classes
You can use the @RunOnVirtualThread
annotation:
-
directly on a reactive messaging method - this method will be considered blocking and executed on a virtual thread
-
on the class containing reactive messaging methods - the methods from this class annotation with
@Blocking
will be executed on virtual thread, except if the annotation defines a pool name configured to use regular worker threads
For example, you can use @RunOnVirtualThread
directly on the method:
@ApplicationScoped
public class MyBean {
@Incoming("in")
@Outgoing("out")
@RunOnVirtualThread
public String process(String s) {
// Called on a new virtual thread for every incoming message
}
}
Alternatively, you can use @RunOnVirtualThread
on the class itself:
@ApplicationScoped
@RunOnVirtualThread
public class MyBean {
@Incoming("in1")
@Outgoing("out1")
public String process(String s) {
// Called on the event loop - no @Blocking annotation
}
@Incoming("in2")
@Outgoing("out2")
@Blocking
public String process(String s) {
// Call on a new virtual thread for every incoming message
}
@Incoming("in3")
@Outgoing("out3")
@Blocking("my-worker-pool")
public String process(String s) {
// Called on a regular worker thread from the pool named "my-worker-pool"
}
}
Control the maximum concurrency
In order to leverage the lightweight nature of virtual threads, the default
maximum concurrency for methods annotated with @RunOnVirtualThread
is
1024. As opposed to platform threads, virtual threads are not pooled and
created per message. Therefore the maximum concurrency applies separately to
all @RunOnVirtualThread
methods.
There are two ways to customize the concurrency level:
-
The
@RunOnVirtualThread
annotation can be used together with the @Blocking annotation to specify a worker name.@Incoming("prices") @RunOnVirtualThread @Blocking("my-worker") public void consume(double price) { //... }
Then, for example, to set the maximum concurrency of this method down to 30, set using the config property
smallrye.messaging.worker.my-worker.max-concurrency=30
. -
For every
@RunOnVirtualThread
method that is not configured with a worker name, you can use the config propertysmallrye.messaging.worker.<virtual-thread>.max-concurrency
.