How to implement a job queue with Redis
In how to cache with Redis, we implemented a simple cache backed by Redis. + That’s just one use case of Redis. Redis is also used as a messaging server to implement the processing of background jobs or other kinds of messaging tasks. This post explores implementing this pattern with Quarkus and the new Redis data source API.
Job Queues and Supes!
A job queue is a data structure storing execution requests. Job dispatchers submit the tasks they want to execute in that data structure. On the other side, job consumers poll the requests and execute them.
There are plenty of variants of that pattern, so let’s focus on the following application. We have an application managing heroes and villains. The application offers the possibility to simulate a fight between a random hero and a random villain. The fight simulation is delegated to fight simulators, applications dedicated to that task.
In this context, the main application submits the fight request to the job queue. Then, the fight simulators poll the submitted fight request and execute them.
The fight outcomes are communicated using another Redis feature: pub/sub communication. The simulators send the outcome to a channel consumed by the application. The application then broadcasts these outcomes to a web page.
This post only discusses the interaction with Redis. The rest of the application is straightforward and just uses RESTEasy Reactive and Hibernate ORM with Panache. You can find the full code of the application on https://github.com/cescoffier/quarkus-redis-job-queue-demo.
Submitting jobs
The first task is to model the job queue. We are using a Redis list to store the FightRequest.
package me.escoffier.quarkus.redis.fight;
public record FightRequest(String id, Hero hero, Villain villain) {
}
Redis lists distinguish the left side of the list from the right side of the list. This distinction allows implementing a FIFO queue where we write on the left side and consume from the right side.
To manipulate a Redis list, we need the group of commands associated with
this data structure. In the
SupesService
class, we inject the RedisDataSource
and retrieve the group of commands:
public SupesService(RedisDataSource dataSource, ...) {
commands = dataSource.list(FightRequest.class);
// ...
}
Let’s now look at the submitAFight
method:
public FightRequest submitAFight() {
var hero = Hero.getRandomHero();
var villain = Villain.getRandomVillain();
var id = UUID.randomUUID().toString();
var request = new FightRequest(id, hero, villain);
commands.lpush("fight-requests", request);
return request;
}
The submitAFight
method retrieves the random fighters, computes an id,
builds the FightRequest
instance, and executes the LPUSH
command. The
LPUSH
command writes the given item to the left side of the list stored at
the given key (fight-requests
).
Receiving the job requests
Let’s now look at the other side: the fight simulators. The simulators poll
the FightRequests
from the Redis list representing our job queue and
simulate the fight.
The simulator is implemented in
me.escoffier.quarkus.redis.fight.FightSimulator
.
The constructor receives a configured name (to distinguish multiple
simulators) and the Redis data source. It creates the objects to emit the
Redis commands to read from a Redis list:
public FightSimulator(@ConfigProperty(name = "simulator-name") String name, RedisDataSource ds) {
this.name = name;
this.queue = ds.list(FightRequest.class);
// ...
}
The simulator polls the fight requests and for each of them simulate the
fight. The implementation is an infinite loop (it only stops when the
application is shut down). In each iteration, it reads the pending
FightRequest
from the right side of the queue with the BRPOP
command.
If there is no pending request, it restarts from the beginning of the loop.
If it has a request, it simulates the fight:
@Override
public void run() {
logger.infof("Simulator %s starting", name);
while ((!stopped)) {
KeyValue<String, FightRequest> item =
queue.brpop(Duration.ofSeconds(1), "fight-requests");
if (item != null) {
var request = item.value();
var result = simulate(request);
//...
}
}
}
The BRPOP
command retrieves and removes the last (right) element of the
list. Unlike the RPOP
, it waits for a given amount of time (1 second in
the code above) if there are no elements in the list. So, if the list
contains an element, it gets it. Otherwise, it waits up to one second
before giving up. It returns null
in this case. The BRPOP
command
returns a KeyValue
composed of the key of the list and the
FightRequest
. It uses that structure because you can pass multiple keys,
which is convenient when you have lists with priorities.
The BRPOP
command also avoids spinning indefinitely if the list is empty,
as it waits for 1 second during each iteration. Finally, the BRPOP
command is atomic. It means that if you have multiple simulators, they
cannot retrieve the same item. It dispatches each item once.
Sending the fight outcome
The pool loop retrieves the FightRequests
from the queue and simulates the
fights, but how to communicate the results? For this, we use another Redis
feature: pub/sub communication.
In simple words, we are going to send the FightResult
to a channel.
Applications subscribing to that channel will receive the emitted
FightResult
.
A FightResult
contains the request id, the two fighters, and the name of
the winner:
package me.escoffier.quarkus.redis.fight;
public record FightResult(String id, Hero hero, Villain villain, String winner) {
}
To use Redis pub/sub commands, we need the object associated with this
group. In the FightSimulator
, we also uses the pubsub
method to get
that object:
public FightSimulator(@ConfigProperty(name = "simulator-name") String name, Logger logger, RedisDataSource ds) {
this.name = name;
this.logger = logger;
this.queue = ds.list(FightRequest.class);
this.publisher = ds.pubsub(FightResult.class); // <--- this is it!
}
Now, we can use this publisher
to send the FightResults
. After each
fight, we call publisher.publish
to send the FightResult
instance to the
fight-results
channel:
@Override
public void run() {
logger.infof("Simulator %s starting", name);
while ((!stopped)) {
KeyValue<String, FightRequest> item = queue.brpop(Duration.ofSeconds(1), "fight-requests");
if (item != null) {
var request = item.value();
var result = simulate(request);
publisher.publish("fight-results", result); // Send the outcome
}
}
}
Receiving the fight outcome
At that point:
-
we submit the fight request into the job queue,
-
we consume that queue and simulate the fight,
-
we send the outcome to the
fight-results
channel.
So, the only missing piece is the consumption of that channel. Let’s return
to the
me.escoffier.quarkus.redis.supes.SupesService
class. In the constructor, we also inject the ReactiveRedisDataSource
,
the reactive variant of the Redis data source. Then, in the constructor
code, we subscribe to the fight-results
.
public SupesService(RedisDataSource dataSource, ReactiveRedisDataSource reactiveRedisDataSource) {
commands = dataSource.list(FightRequest.class);
stream = reactiveRedisDataSource.pubsub(FightResult.class).subscribe("fight-results")
.broadcast().toAllSubscribers();
}
Because we use the reactive data source, this subscription returns a
Multi<FightResult>
, ready to be served by Quarkus and an SSE (see
SupesResource.java):
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<FightResult> fights() {
return supes.getFightResults();
}
.broadcast().toAllSubscribers() instructs Quarkus to broadcast all the
received FightResult to all the connected SSE. So, the browser filters
out unrequested results.
|
Running the system
The circle is complete! The full code source is available from https://github.com/cescoffier/quarkus-redis-job-queue-demo. To run the system, open three terminals.
First, we start the supes-application
. In the first terminal, navigate to
the supes-application
and run mvn quarkus:dev
Quarkus automatically
starts the PostgreSQL and Redis instances (if your machine can run
containers). In the console, hit h
and then c
. It displays the running
dev services. Look for the redis one, and copy the quarkus.redis.hosts
injected configuration:
redis-client - Up About a minute
Container: 348edec50f80/trusting_jennings docker.io/redis:7-alpine
Network: bridge - 0.0.0.0:53853->6379/tcp
Exec command: docker exec -it 348edec50f80 /bin/bash
Injected Config: quarkus.redis.hosts=redis://localhost:53853
In the previous snippet, copy:
quarkus.redis.hosts=redis://localhost:53853
. This is the address of the
redis server. We need to configure to the simulators with that address.
If you go to http://localhost:8080, the web page is served. You can hit the
fights!
button a few times.
The fight won’t happen as we have no simulator. However, the fight requests have been submitted and stored in the list. So they are not lost.
Now, in the second terminal, navigate to the fight-simulator
directory,
and run:
mvn package
java -Dsimulator-name=A -Dquarkus.redis.hosts=redis://localhost:53853 -jar target/quarkus-app/quarkus-run.jar
IMPORTANT: update the quarkus.redis-hosts
with the one copied above.
As soon as you start it, it processes the pending fight requests:
2022-09-11 15:31:58,914 INFO [me.esc.qua.red.fig.FightSimulator] (Thread-3) Simulator A is going to simulate a fight between Pakku and Tulon Voidgazer
2022-09-11 15:31:59,786 INFO [me.esc.qua.red.fig.FightSimulator] (Thread-3) Simulator A is going to simulate a fight between Comet Zuko and Arishem The Judge (Knullified)
2022-09-11 15:32:01,809 INFO [me.esc.qua.red.fig.FightSimulator] (Thread-3) Simulator A is going to simulate a fight between Ms. America and Kazumi (Devil Form)
If you go back to the web page, the winners get a halo:
Now, in the third terminal, navigate to the fight-simulator
directory, and
run:
java -Dsimulator-name=B -Dquarkus.redis.hosts=redis://localhost:53853 -jar target/quarkus-app/quarkus-run.jar
IMPORTANT: as in the previous command, update the quarkus.redis-hosts
with the one copied above.
Go back to the web page and click on the fight!
button a few times. Check
the logs of both simulators to see that the fight requests are now
dispatched beween the two simulators.
Summary
This posts explains how you can implement a job queue with Redis and the Quarkus Redis datasource API.
Learn more about the Redis data source API from the Quarkus documentation. We will publish more content about Redis patterns, so stay tuned!