Apache Pulsar Reference Guide
This reference guide demonstrates how your Quarkus application can utilize SmallRye Reactive Messaging to interact with Apache Pulsar.
1. Introduction
Apache Pulsar is an open-source, distributed messaging and streaming platform built for the cloud. It provides a multi-tenant, high-performance solution to server messaging with tiered storage capabilities.
Pulsar implements the publish-subscribe pattern:
-
Producers publish messages to topics.
-
Consumers create subscriptions to those topics to receive and process incoming messages, and send acknowledgments to the broker when processing is finished.
-
When a subscription is created, Pulsar retains all messages, even if the consumer is disconnected. The retained messages are discarded only when a consumer acknowledges that all these messages are processed successfully.
A Pulsar cluster consists of
-
One or more brokers, which are stateless components.
-
A metadata store for maintaining topic metadata, schema, coordination and cluster configuration.
-
A set of bookies used for persistent storage of messages.
2. Quarkus Extension for Apache Pulsar
Quarkus provides support for Apache Pulsar through SmallRye Reactive Messaging framework. Based on Eclipse MicroProfile Reactive Messaging specification 3.0, it proposes a flexible programming model bridging CDI and event-driven.
This guide provides an in-depth look on Apache Pulsar and SmallRye Reactive Messaging framework. For a quick start take a look at Getting Started to SmallRye Reactive Messaging with Apache Pulsar. |
You can add the smallrye-reactive-messaging-pulsar
extensions to your
project by running the following command in your project base directory:
quarkus extension add smallrye-reactive-messaging-pulsar
./mvnw quarkus:add-extension -Dextensions='smallrye-reactive-messaging-pulsar'
./gradlew addExtension --extensions='smallrye-reactive-messaging-pulsar'
This will add the following to your build file:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-pulsar</artifactId>
</dependency>
implementation("io.quarkus:quarkus-smallrye-reactive-messaging-pulsar")
The extension includes |
3. Configuring Smallrye Pulsar Connector
Because Smallrye Reactive Messaging framework supports different messaging backends like Apache Kafka, Apache Pulsar, AMQP, Apache Camel, JMS, MQTT, etc., it employs a generic vocabulary:
-
Applications send and receive messages.
Message
wraps a payload and can be extended with some metadata. This should not be confused with a PulsarMessage
, which consists of value, key With the Pulsar connector, a Reactive Messaging message corresponds to a Pulsar message. -
Messages transit on channels. Application components connect to channels to publish and consume messages. The Pulsar connector maps channels to Pulsar topics.
-
Channels are connected to message backends using connectors. Connectors are configured to map incoming messages to a specific channel (consumed by the application) and collect outgoing messages sent to a specific channel. Each connector is dedicated to a specific messaging technology. For example, the connector dealing with Pulsar is named
smallrye-pulsar
.
A minimal configuration for the Pulsar connector with an incoming channel looks like the following:
%prod.pulsar.client.serviceUrl=pulsar:6650 (1)
mp.messaging.incoming.prices.connector=smallrye-pulsar (2)
1 | Configure the Pulsar broker service url for the production profile. You can
configure it globally or per channel using
mp.messaging.incoming.$channel.serviceUrl property. In dev mode and when
running tests, Dev Services for Pulsar
automatically starts a Pulsar broker. |
2 | Configure the connector to manage the prices channel. By default, the topic name is same as the channel name. |
You can configure the topic attribute to override it.
The %prod prefix indicates that the property is only used when the
application runs in prod mode (so not in dev or test). Refer to the
Profile documentation for further
details.
|
Connector auto-attachment
If you have a single connector on your classpath, you can omit the
This auto-attachment can be disabled using:
|
For more configuration options see Configuring Pulsar clients.
4. Receiving messages from Pulsar
The Pulsar Connector connects to a Pulsar broker using a Pulsar client and
creates consumers to receive messages from Pulsar brokers, and it maps each
Pulsar Message
into Reactive Messaging Message
.
4.1. Example
Let’s imagine you have a Pulsar broker running, and accessible using the
pulsar:6650
address. Configure your application to receive Pulsar
messages on the prices
channel as follows:
mp.messaging.incoming.prices.serviceUrl=pulsar://pulsar:6650 (1)
mp.messaging.incoming.prices.subscriptionInitialPosition=Earliest (2)
-
Configure the Pulsar broker service url.
-
Make sure consumer subscription starts receiving messages from the
Earliest
position.
You don’t need to set the Pulsar topic, nor the consumer name. By default,
the connector uses the channel name ( |
In Pulsar, consumers need to provide a |
Then, your application can receive the double
payload directly:
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class PriceConsumer {
@Incoming("prices")
public void consume(double price) {
// process your price.
}
}
Or, you can retrieve the Reactive Messaging type Message<Double>
:
@Incoming("prices")
public CompletionStage<Void> consume(Message<Double> msg) {
// access record metadata
var metadata = msg.getMetadata(PulsarIncomingMessageMetadata.class).orElseThrow();
// process the message payload.
double price = msg.getPayload();
// Acknowledge the incoming message (acknowledge the Pulsar message back to the broker)
return msg.ack();
}
The Reactive Messaging Message
type lets the consuming method access the
incoming message metadata and handle the acknowledgment manually.
If you want to access the Pulsar message objects directly, use:
@Incoming("prices")
public void consume(org.apache.pulsar.client.api.Message<Double> msg) {
String key = msg.getKey();
String value = msg.getValue();
String topic = msg.topicName();
// ...
}
org.apache.pulsar.client.api.Message
is provided by the underlying Pulsar
client and can be used directly with the consumer method.
Alternatively, your application can inject a Multi
in your bean,
identified with the channel name and subscribe to its events as the
following example:
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Channel;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.jboss.resteasy.reactive.RestStreamElementType;
@Path("/prices")
public class PriceResource {
@Inject
@Channel("prices")
Multi<Double> prices;
@GET
@Path("/prices")
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<Double> stream() {
return prices;
}
}
When consuming messages with |
Following types can be injected as channels:
@Inject @Channel("prices") Multi<Double> streamOfPayloads;
@Inject @Channel("prices") Multi<Message<Double>> streamOfMessages;
@Inject @Channel("prices") Publisher<Double> publisherOfPayloads;
@Inject @Channel("prices") Publisher<Message<Double>> publisherOfMessages;
As with the previous Message
example, if your injected channel receives
payloads (Multi<T>
), it acknowledges the message automatically, and
support multiple subscribers. If your injected channel receives Message
(Multi<Message<T>>
), you will be responsible for the acknowledgment and
broadcasting.
4.2. Blocking processing
Reactive Messaging invokes your method on an I/O thread. See the
Quarkus Reactive Architecture
documentation for further details on this topic. But, you often need to
combine Reactive Messaging with blocking processing such as database
interactions. For this, you need to use the @Blocking
annotation
indicating that the processing is blocking and should not be run on the
caller thread.
For example, The following code illustrates how you can store incoming payloads to a database using Hibernate with Panache:
import io.smallrye.reactive.messaging.annotations.Blocking;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.transaction.Transactional;
@ApplicationScoped
public class PriceStorage {
@Incoming("prices")
@Transactional
public void store(int priceInUsd) {
Price price = new Price();
price.value = priceInUsd;
price.persist();
}
}
There are 2
They have the same effect. Thus, you can use both. The first one provides more fine-grained tuning such as the worker pool to use and whether it preserves the order. The second one, used also with other reactive features of Quarkus, uses the default worker pool and preserves the order. Detailed information on the usage of |
@RunOnVirtualThread
For running the blocking processing on Java virtual threads, see the Quarkus Virtual Thread support with Reactive Messaging documentation. |
@Transactional
If your method is annotated with |
4.3. Pulsar Subscription Types
Pulsar subscriptionType consumer configuration can be used flexibly to achieve different messaging scenarios, such as publish-subscribe or queuing.
-
Exclusive subscription type allows specifying a unique subscription name for "fan-out pub-sub messaging". This is the default subscription type.
-
Shared, Key_Shared or Failover subscription types allow multiple consumers to share the same subscription name, to achieve "message queuing" among consumers.
If a subscription name is not provided Quarkus generates a unique id.
4.4. Deserialization and Pulsar Schema
The Pulsar Connector allows configuring Schema configuration for the underlying Pulsar consumer. See the Pulsar Schema Configuration & Auto Schema Discovery for more information.
4.5. Acknowledgement Strategies
When a message produced from a Pulsar Message is acknowledged, the connector sends an acknowledgement request to the Pulsar broker. All Reactive Messaging messages need to be acknowledged, which is handled automatically in most cases. Acknowledgement requests can be sent to the Pulsar broker using the following two strategies:
-
Individual acknowledgement is the default strategy, an acknowledgement request is to the broker for each message.
-
Cumulative acknowledgement, configured using
ack-strategy=cumulative
, the consumer only acknowledges the last message it received. All messages in the stream up to (and including) the provided message are not redelivered to that consumer.
By default, the Pulsar consumer does not wait for the acknowledgement
confirmation from the broker to validate an acknowledgement. You can enable
this using |
4.6. Failure Handling Strategies
If a message produced from a Pulsar message is nacked, a failure strategy is applied. The Quarkus Pulsar extension supports 4 strategies:
-
nack
(default) sends negative acknowledgment to the broker, triggering the broker to redeliver this message to the consumer. The negative acknowledgment can be further configured usingnegativeAckRedeliveryDelayMicros
andnegativeAck.redeliveryBackoff
properties. -
fail
fail the application, no more messages will be processed. -
ignore
the failure is logged, but the acknowledgement strategy will be applied and the processing will continue. -
continue
the failure is logged, but processing continues without applying acknowledgement or negative acknowledgement. This strategy can be used with Acknowledgement timeout configuration. -
reconsume-later
sends the message to the retry letter topic using thereconsumeLater
API to be reconsumed with a delay. The delay can be configured using thereconsumeLater.delay
property and defaults to 3 seconds. Custom delay or properties per message can be configured by adding an instance ofio.smallrye.reactive.messaging.pulsar.PulsarReconsumeLaterMetadata
to the failure metadata.
4.6.1. Acknowledgement timeout
Similar to the negative acknowledgement, with the acknowledgement timeout mechanism, the Pulsar client tracks the unacknowledged messages, for the given ackTimeout period and sends redeliver unacknowledged messages request to the broker, thus the broker resends the unacknowledged messages to the consumer.
To configure the timeout and redelivery backoff mechanism you can set
ackTimeoutMillis
and ackTimeout.redeliveryBackoff
properties. The
ackTimeout.redeliveryBackoff
value accepts comma separated values of min
delay in milliseconds, max delay in milliseconds and multiplier
respectively:
mp.messaging.incoming.out.failure-strategy=continue
mp.messaging.incoming.out.ackTimeoutMillis=10000
mp.messaging.incoming.out.ackTimeout.redeliveryBackoff=1000,60000,2
4.6.2. Reconsume later and retry letter topic
The retry letter topic pushes messages that are not consumed successfully to a dead letter topic and continue message consumption. Note that dead letter topic can be used in different message redelivery methods, such as acknowledgment timeout, negative acknowledgment or retry letter topic.
mp.messaging.incoming.data.failure-strategy=reconsume-later
mp.messaging.incoming.data.reconsumeLater.delay=5000
mp.messaging.incoming.data.enableRetry=true
mp.messaging.incoming.data.negativeAck.redeliveryBackoff=1000,60000,2
4.6.3. Dead-letter topic
The dead letter topic pushes messages that are not consumed successfully to a dead letter topic an continue message consumption. Note that dead letter topic can be used in different message redelivery methods, such as acknowledgment timeout, negative acknowledgment or retry letter topic.
mp.messaging.incoming.data.failure-strategy=nack
mp.messaging.incoming.data.deadLetterPolicy.maxRedeliverCount=2
mp.messaging.incoming.data.deadLetterPolicy.deadLetterTopic=my-dead-letter-topic
mp.messaging.incoming.data.deadLetterPolicy.initialSubscriptionName=my-dlq-subscription
mp.messaging.incoming.data.subscriptionType=Shared
Negative acknowledgment or acknowledgment timeout methods for redelivery will redeliver the whole batch of messages containing at least an unprocessed message. See Producer Batching for more information. |
4.7. Receiving Pulsar Messages in Batches
By default, incoming methods receive each Pulsar message individually. You
can enable batch mode using batchReceive=true
property, or setting a
batchReceivePolicy
in consumer configuration.
@Incoming("prices")
public CompletionStage<Void> consumeMessage(PulsarIncomingBatchMessage<Double> messages) {
for (PulsarMessage<Double> msg : messages) {
msg.getMetadata(PulsarIncomingMessageMetadata.class).ifPresent(metadata -> {
String key = metadata.getKey();
String topic = metadata.getTopicName();
long timestamp = metadata.getEventTime();
//... process messages
});
}
// ack will commit the latest offsets (per partition) of the batch.
return messages.ack();
}
@Incoming("prices")
public void consumeRecords(Messages<Double> messages) {
for (Message<Double> msg : messages) {
//... process messages
}
}
Or you can directly receive the list of payloads to the consume method:
@Incoming("prices")
public void consume(List<Double> prices) {
for (double price : prices) {
// process price
}
}
Quarkus auto-detects batch types for incoming channels and sets batch
configuration automatically. You can configure batch mode explicitly with
|
5. Sending messages to Pulsar
The Pulsar Connector can write Reactive Messaging Messages as Pulsar Message.
5.1. Example
Let’s imagine you have a Pulsar broker running, and accessible using the
pulsar:6650
address. Configure your application to write the messages
from the prices
channel into a Pulsar Messages as follows:
mp.messaging.outgoing.prices.serviceUrl=pulsar://pulsar:6650 (1)
-
Configure the Pulsar broker service url.
You don’t need to set the Pulsar topic, nor the producer name. By default,
the connector uses the channel name ( |
Then, your application must send Message<Double>
to the prices
channel. It can use double
payloads as in the following snippet:
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import jakarta.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;
@ApplicationScoped
public class PulsarPriceProducer {
private final Random random = new Random();
@Outgoing("prices-out")
public Multi<Double> generate() {
// Build an infinite stream of random prices
// It emits a price every second
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> random.nextDouble());
}
}
Note that the generate method returns a Multi<Double>
, which implements
the Flow.Publisher
interface. This publisher will be used by the
framework to generate messages and send them to the configured Pulsar topic.
Instead of returning a payload, you can return a
io.smallrye.reactive.messaging.pulsar.OutgoingMessage
to send Pulsar
messages:
@Outgoing("out")
public Multi<OutgoingMessage<Double>> generate() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> OutgoingMessage.of("my-key", random.nextDouble()));
}
Payload can be wrapped inside
org.eclipse.microprofile.reactive.messaging.Message
to have more control
on the written records:
@Outgoing("generated-price")
public Multi<Message<Double>> generate() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> Message.of(random.nextDouble())
.addMetadata(PulsarOutgoingMessageMetadata.builder()
.withKey("my-key")
.withProperties(Map.of("property-key", "value"))
.build()));
}
When sending Messages
, you can add an instance of
io.smallrye.reactive.messaging.pulsar.PulsarOutgoingMessageMetadata
to
influence how the message is going to be written to Pulsar.
Other than method signatures returning a Flow.Publisher
, outgoing method
can also return single message. In this case the producer will use this
method as generator to create an infinite stream.
@Outgoing("prices-out") T generate(); // T excluding void
@Outgoing("prices-out") Message<T> generate();
@Outgoing("prices-out") Uni<T> generate();
@Outgoing("prices-out") Uni<Message<T>> generate();
@Outgoing("prices-out") CompletionStage<T> generate();
@Outgoing("prices-out") CompletionStage<Message<T>> generate();
5.2. Serialization and Pulsar Schema
The Pulsar Connector allows configuring Schema configuration for the underlying Pulsar producer. See the Pulsar Schema Configuration & Auto Schema Discovery for more information.
5.3. Sending key/value pairs
In order to send Kev/Value pairs to Pulsar, you can configure the Pulsar producer Schema with a KeyValue schema.
package pulsar.outbound;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.common.annotation.Identifier;
@ApplicationScoped
public class PulsarKeyValueExample {
@Identifier("out")
@Produces
Schema<KeyValue<String, Long>> schema = Schema.KeyValue(Schema.STRING, Schema.INT64);
@Incoming("in")
@Outgoing("out")
public KeyValue<String, Long> process(long in) {
return new KeyValue<>("my-key", in);
}
}
If you need more control on the written records, use
PulsarOutgoingMessageMetadata
.
5.4. Acknowledgement
Upon receiving a message from a Producer, a Pulsar broker assigns a
MessageId
to the message and sends it back to the producer, confirming
that the message is published.
By default, the connector does wait for Pulsar to acknowledge the record to
continue the processing (acknowledging the received Message
). You can
disable this by setting the waitForWriteCompletion
attribute to false
.
If a record cannot be written, the message is nacked
.
The Pulsar client automatically retries sending messages in case of failure,
until the send timeout is reached. The send timeout is configurable
with |
5.5. Back-pressure and inflight records
The Pulsar outbound connector handles back-pressure, monitoring the number
of pending messages waiting to be written to the Pulsar broker. The number
of pending messages is configured using the maxPendingMessages
attribute
and defaults to 1000.
The connector only sends that amount of messages concurrently. No other messages will be sent until at least one pending message gets acknowledged by the broker. Then, the connector writes a new message to Pulsar when one of the broker’s pending messages get acknowledged.
You can also remove the limit of pending messages by setting
maxPendingMessages
to 0
. Note that Pulsar also enables to configure the
number of pending messages per partition using
maxPendingMessagesAcrossPartitions
.
5.6. Producer Batching
By default, the Pulsar producer batches individual messages together to be
published to the broker. You can configure batching parameters using
batchingMaxPublishDelayMicros
,
batchingPartitionSwitchFrequencyByPublishDelay
, batchingMaxMessages
,
batchingMaxBytes
configuration properties, or disable it completely with
batchingEnabled=false
.
When using Key_Shared
consumer subscriptions, the batcherBuilder
can be
configured to BatcherBuilder.KEY_BASED
.
6. Pulsar Transactions and Exactly-Once Processing
Pulsar transactions enable event streaming applications to consume, process, and produce messages in one atomic operation.
Transactions allow one or multiple producers to send batch of messages to multiple topics where all messages in the batch are eventually visible to any consumer, or none is ever visible to consumers.
In order to be used, transaction support needs to be activated on the broker
configuration, using |
On the client side, the transaction support also needs to be enabled on
PulsarClient
configuration:
mp.messaging.outgoing.tx-producer.enableTransaction=true
Pulsar connector provides PulsarTransactions
custom emitter for writing
records inside a transaction.
It can be used as a regular emitter @Channel
:
package pulsar.outbound;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.OutgoingMessage;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;
@ApplicationScoped
public class PulsarTransactionalProducer {
@Inject
@Channel("tx-out-example")
PulsarTransactions<OutgoingMessage<Integer>> txProducer;
@Inject
@Channel("other-producer")
PulsarTransactions<String> producer;
@Incoming("in")
public Uni<Void> emitInTransaction(Message<Integer> in) {
return txProducer.withTransaction(emitter -> {
emitter.send(OutgoingMessage.of("a", 1));
emitter.send(OutgoingMessage.of("b", 2));
emitter.send(OutgoingMessage.of("c", 3));
producer.send(emitter, "4");
producer.send(emitter, "5");
producer.send(emitter, "6");
return Uni.createFrom().completionStage(in::ack);
});
}
}
The function given to the withTransaction
method receives a
TransactionalEmitter
for producing records, and returns a Uni
that
provides the result of the transaction. If the processing completes
successfully, the producer is flushed and the transaction is committed. If
the processing throws an exception, returns a failing Uni
, or marks the
TransactionalEmitter
for abort, the transaction is aborted.
Multiple transactional producers can participate in a single transaction. This ensures all messages are sent using the started transaction and before the transaction is committed, all participating producers are flushed. |
If this method is called on a Vert.x context, the processing function is also called on that context. Otherwise, it is called on the sending thread of the producer.
6.1. Exactly-Once Processing
Pulsar Transactions API also allows managing consumer offsets inside a transaction, together with produced messages. This in turn enables coupling a consumer with a transactional producer in a consume-transform-produce pattern, also known as exactly-once processing. It means that an application consumes messages, processes them, publishes the results to a topic, and commits offsets of the consumed messages in a transaction.
The PulsarTransactions
emitter also provides a way to apply exactly-once
processing to an incoming Pulsar message inside a transaction.
The following example includes a batch of Pulsar messages inside a transaction.
mp.messaging.outgoing.tx-out-example.enableTransaction=true
# ...
mp.messaging.incoming.in-channel.enableTransaction=true
mp.messaging.incoming.in-channel.batchReceive=true
package pulsar.outbound;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingBatchMessage;
import io.smallrye.reactive.messaging.pulsar.PulsarMessage;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions;
@ApplicationScoped
public class PulsarExactlyOnceProcessor {
@Inject
@Channel("tx-out-example")
PulsarTransactions<Integer> txProducer;
@Incoming("in-channel")
public Uni<Void> emitInTransaction(PulsarIncomingBatchMessage<Integer> batch) {
return txProducer.withTransactionAndAck(batch, emitter -> {
for (PulsarMessage<Integer> record : batch) {
emitter.send(PulsarMessage.of(record.getPayload() + 1, record.getKey()));
}
return Uni.createFrom().voidItem();
});
}
}
If the processing completes successfully, the message is acknowledged inside the transaction and the transaction is committed.
When using exactly-once processing, messages can only be acked individually rather than cumulatively. |
If the processing needs to abort, the message is nack’ed. One of the failure
strategies can be employed in order to retry the processing or simply
fail-stop. Note that the Uni
returned from the withTransaction
will
yield a failure if the transaction fails and is aborted.
The application can choose to handle the error case, but for the message
consumption to continue, Uni
returned from the @Incoming
method must not
result in failure. PulsarTransactions#withTransactionAndAck
method will
ack and nack the message but will not stop the reactive stream. Ignoring
the failure simply resets the consumer to the last committed offsets and
resumes the processing from there.
In order to avoid duplicates in case of failure, it is recommended to enable message deduplication and batch index level acknowledgment on the broker side:
|
7. Pulsar Schema Configuration & Auto Schema Discovery
Pulsar messages are stored with payloads as unstructured byte array. A Pulsar schema defines how to serialize structured data to the raw message bytes. The schema is applied in producers and consumers to write and read with an enforced data structure. It serializes data into raw bytes before they are published to a topic and deserializes the raw bytes before they are delivered to consumers.
Pulsar uses a schema registry as a central repository to store the registered schema information, which enables producers/consumers to coordinate the schema of a topic’s messages through brokers. By default the Apache BookKeeper is used to store schemas.
Pulsar API provides built-in schema information for a number of primitive types and complex types such as Key/Value, Avro and Protobuf.
The Pulsar Connector allows specifying the schema as a primitive type using
the schema
property:
mp.messaging.incoming.prices.connector=smallrye-pulsar
mp.messaging.incoming.prices.schema=INT32
mp.messaging.outgoing.prices-out.connector=smallrye-pulsar
mp.messaging.outgoing.prices-out.schema=DOUBLE
If the value for the schema
property matches a
Schema
Type a simple schema will be created with that type and will be used for
that channel.
The Pulsar Connector allows configuring complex schema types by providing
Schema
beans through CDI, identified with the @Identifier
qualifier.
For example the following bean provides an JSON schema and a Key/Value schema:
package pulsar.configuration;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import io.smallrye.common.annotation.Identifier;
@ApplicationScoped
public class PulsarSchemaProvider {
@Produces
@Identifier("user-schema")
Schema<User> userSchema = Schema.JSON(User.class);
@Produces
@Identifier("a-channel")
Schema<KeyValue<Integer, User>> keyValueSchema() {
return Schema.KeyValue(Schema.INT32, Schema.JSON(User.class), KeyValueEncodingType.SEPARATED);
}
public static class User {
String name;
int age;
}
}
To configure the incoming channel users
with defined schema, you need to
set the schema
property to the identifier of the schema user-schema
:
mp.messaging.incoming.users.connector=smallrye-pulsar
mp.messaging.incoming.users.schema=user-schema
If no schema
property is found, the connector looks for Schema
beans
identified with the channel name. For example, the outgoing channel
a-channel
will use the key/value schema.
mp.messaging.outgoing.a-channel.connector=smallrye-pulsar
If no schema information is provided incoming channels will use
Schema.AUTO_CONSUME()
, whereas outgoing channels will use
Schema.AUTO_PRODUCE_BYTES()
schemas.
7.1. Auto Schema Discovery
When using SmallRye Reactive Messaging Pulsar
(io.quarkus:quarkus-smallrye-reactive-messaging-pulsar
), Quarkus can often
automatically detect the correct Pulsar Schema to configure. This
autodetection is based on declarations of @Incoming
and @Outgoing
methods, as well as injected @Channel
s.
For example, if you declare
@Outgoing("generated-price")
public Multi<Integer> generate() {
...
}
and your configuration indicates that the generated-price
channel uses the
smallrye-pulsar
connector, then Quarkus will automatically set the
schema
attribute of the generated-price
channel to Pulsar Schema
INT32
.
Similarly, if you declare
@Incoming("my-pulsar-consumer")
public void consume(org.apache.pulsar.api.client.Message<byte[]> record) {
...
}
and your configuration indicates that the my-pulsar-consumer
channel uses
the smallrye-pulsar
connector, then Quarkus will automatically set the
schema
attribute to Pulsar BYTES
Schema.
Finally, if you declare
@Inject
@Channel("price-create")
Emitter<Double> priceEmitter;
and your configuration indicates that the price-create
channel uses the
smallrye-pulsar
connector, then Quarkus will automatically set the
schema
to Pulsar INT64
Schema.
The full set of types supported by the Pulsar Schema autodetection is:
-
short
andjava.lang.Short
-
int
andjava.lang.Integer
-
long
andjava.lang.Long
-
float
andjava.lang.Float
-
double
andjava.lang.Double
-
byte[]
-
java.time.Instant
-
java.sql.Timestamp
-
java.time.LocalDate
-
java.time.LocalTime
-
java.time.LocalDateTime
-
java.nio.ByteBuffer
-
classes generated from Avro schemas, as well as Avro
GenericRecord
, will be configured withAVRO
schema type -
classes generated from Protobuf schemas, will be configured with
PROTOBUF
schema type -
other classes will automatically be configured with
JSON
schema type
Note that |
In addition to those Pulsar-provided schemas, Quarkus provides following schema implementations without enforcing validation :
-
io.vertx.core.buffer.Buffer
will be configured withio.quarkus.pulsar.schema.BufferSchema
schema -
io.vertx.core.json.JsonObject
will be configured withio.quarkus.pulsar.schema.JsonObjectSchema
schema -
io.vertx.core.json.JsonArray
will be configured withio.quarkus.pulsar.schema.JsonArraySchema
schema -
For schema-less Json serialization, if the
schema
configuration is set toObjectMapper<fully_qualified_name_of_the_bean>
, a Schema will be generated using the JacksonObjectMapper
, without enforcing a Pulsar Schema validation.io.quarkus.pulsar.schema.ObjectMapperSchema
can be used to explicitly configure JSON schema without validation.
If a schema
is set by configuration, it won’t be replaced by the
auto-detection.
In case you have any issues with serializer auto-detection, you can switch
it off completely by setting
quarkus.reactive-messaging.pulsar.serializer-autodetection.enabled=false
.
If you find you need to do this, please file a bug in the
Quarkus issue tracker so
we can fix whatever problem you have.
8. Dev Services for Pulsar
With Quarkus Smallrye Reactive Messaging Pulsar extension
(quarkus-smallrye-reactive-messaging-pulsar
) Dev Services for Pulsar
automatically starts a Pulsar broker in dev mode and when running tests.
So, you don’t have to start a broker manually. The application is
configured automatically.
8.1. Enabling / Disabling Dev Services for Pulsar
Dev Services for Pulsar is automatically enabled unless:
-
quarkus.pulsar.devservices.enabled
is set tofalse
-
the
pulsar.client.serviceUrl
is configured -
all the Reactive Messaging Pulsar channels have the
serviceUrl
attribute set
Dev Services for Pulsar relies on Docker to start the broker. If your
environment does not support Docker, you will need to start the broker
manually, or connect to an already running broker. You can configure the
broker address using pulsar.client.
.
8.2. Shared broker
Most of the time you need to share the broker between applications. Dev Services for Pulsar implements a service discovery mechanism for your multiple Quarkus applications running in dev mode to share a single broker.
Dev Services for Pulsar starts the container with the
quarkus-dev-service-pulsar label which is used to identify the container.
|
If you need multiple (shared) brokers, you can configure the
quarkus.pulsar.devservices.service-name
attribute and indicate the broker
name. It looks for a container with the same value, or starts a new one if
none can be found. The default service name is pulsar
.
Sharing is enabled by default in dev mode, but disabled in test mode. You
can disable the sharing with quarkus.pulsar.devservices.shared=false
.
8.3. Setting the port
By default, Dev Services for Pulsar picks a random port and configures the
application. You can set the port by configuring the
quarkus.pulsar.devservices.port
property.
Note that the Pulsar advertised address is automatically configured with the chosen port.
8.4. Configuring the image
Dev Services for Pulsar supports the official Apache Pulsar image.
A custom image name can be configured as such:
quarkus.pulsar.devservices.image-name=datastax/lunastreaming-all:2.10_4.7
8.5. Configuring the Pulsar broker
You can configure the Dev Services for Pulsar with custom broker configuration.
The following example enables transaction support:
quarkus.pulsar.devservices.broker-config.transaction-coordinator-enabled=true
quarkus.pulsar.devservices.broker-config.system-topic-enabled=true
9. Configuring Pulsar clients
Pulsar clients, consumers and producers are very customizable to configure how a Pulsar client application behaves.
The Pulsar connector creates a Pulsar client and, a consumer or a producer per channel, each with sensible defaults to ease their configuration. Although the creation is handled, all available configuration options remain configurable through Pulsar channels.
While idiomatic way of creating PulsarClient
, PulsarConsumer
or
PulsarProducer
are through builder APIs, in its essence those APIs build
each time a configuration object, to pass onto the implementation. Those
are
ClientConfigurationData,
ConsumerConfigurationData
and
ProducerConfigurationData.
Pulsar Connector allows receiving properties for those configuration objects
directly. For example, the broker authentication information for
PulsarClient
is received using authPluginClassName
and authParams
properties. In order to configure the authentication for the incoming
channel data
:
mp.messaging.incoming.data.connector=smallrye-pulsar
mp.messaging.incoming.data.serviceUrl=pulsar://localhost:6650
mp.messaging.incoming.data.topic=topic
mp.messaging.incoming.data.subscriptionInitialPosition=Earliest
mp.messaging.incoming.data.schema=INT32
mp.messaging.incoming.data.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
mp.messaging.incoming.data.authParams={"userId":"superuser","password":"admin"}
Note that the Pulsar consumer property subscriptionInitialPosition
is also
configured with the Earliest
value which represents with enum value
SubscriptionInitialPosition.Earliest
.
This approach covers most of the configuration cases. However,
non-serializable objects such as CryptoKeyReader
, ServiceUrlProvider
etc. cannot be configured this way. The Pulsar Connector allows taking into
account instances of Pulsar configuration data objects –
ClientConfigurationData
, ConsumerConfigurationData
,
ProducerConfigurationData
:
import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
class PulsarConfig {
@Produces
@Identifier("my-consumer-options")
public ConsumerConfigurationData<String> getConsumerConfig() {
ConsumerConfigurationData<String> data = new ConsumerConfigurationData<>();
data.setAckReceiptEnabled(true);
data.setCryptoKeyReader(DefaultCryptoKeyReader.builder()
//...
.build());
return data;
}
}
This instance is retrieved and used to configure the client used by the
connector. You need to indicate the name of the client using the
client-configuration
, consumer-configuration
or producer-configuration
attributes:
mp.messaging.incoming.prices.consumer-configuration=my-consumer-options
If no [client|consumer|producer]-configuration
is configured, the
connector will look for instances identified with the channel name:
import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.AutoClusterFailover;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
class PulsarConfig {
@Produces
@Identifier("prices")
public ClientConfigurationData getClientConfig() {
ClientConfigurationData data = new ClientConfigurationData();
data.setEnableTransaction(true);
data.setServiceUrlProvider(AutoClusterFailover.builder()
// ...
.build());
return data;
}
}
You also can provide a Map<String, Object>
containing configuration values
by key:
import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl;
import java.util.Map;
class PulsarConfig {
@Produces
@Identifier("prices")
public Map<String, Object> getProducerConfig() {
return Map.of(
"batcherBuilder", BatcherBuilder.KEY_BASED,
"sendTimeoutMs", 3000,
"customMessageRouter", new PartialRoundRobinMessageRouterImpl(4));
}
}
Different configuration sources are loaded in the following order of precedence, from the least important to the highest:
-
Map<String, Object>
config map produced with default config identifier,default-pulsar-client
,default-pulsar-consumer
,default-pulsar-producer
. -
Map<String, Object>
config map produced with identifier in the configuration or channel name -
[Client|Producer|Consuemr]ConfigurationData
object produced with identifier in the channel configuration or the channel name -
Channel configuration properties named with
[Client|Producer|Consuemr]ConfigurationData
field names.
See Configuration Reference for the exhaustive list of configuration options.
9.1. Configuring Pulsar Authentication
Pulsar provides a pluggable authentication framework, and Pulsar brokers/proxies use this mechanism to authenticate clients.
Clients can be configured in application.properties
file using
authPluginClassName
and authParams
attributes:
pulsar.client.serviceUrl=pulsar://pulsar:6650
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
pulsar.client.authParams={"userId":"superuser","password":"admin"}
Or programmatically:
import java.util.Map;
import jakarta.enterprise.inject.Produces;
import io.smallrye.common.annotation.Identifier;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.auth.AuthenticationBasic;
class PulsarConfig {
@Produces
@Identifier("prices")
public ClientConfigurationData config() {
var data = new ClientConfigurationData();
var auth = new AuthenticationBasic();
auth.configure(Map.of("userId", "superuser", "password", "admin"));
data.setAuthentication(auth);
return data;
}
}
9.1.1. Configuring access to Datastax Luna Streaming
Luna Streaming is a production-ready distribution of Apache Pulsar, with tools and support from DataStax. After creating your DataStax Luna Pulsar tenant, note the auto generated token, and configure the token authentication:
pulsar.client.serviceUrl=pulsar+ssl://pulsar-aws-eucentral1.streaming.datastax.com:6651
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken
pulsar.client.authParams=token:eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE2ODY4MTc4MzQsImlzcyI6ImRhdGFzdGF4Iiwic3ViIjoiY2xpZW50OzA3NGZhOTI4LThiODktNDBhNC04MDEzLWNlNjVkN2JmZWIwZTtjSEpwWTJWejsyMDI5ODdlOGUyIiwidG9rZW5pZCI6IjIwMjk4N2U4ZTIifQ....
Make sure to create topics beforehand, or enable the Auto Topic Creation in the namespace configuration.
Note that the topic configuration needs to reference full name of topics:
mp.messaging.incoming.prices.topic=persistent://my-tenant/default/prices
9.1.2. Configuring access to StreamNative Cloud
StreamNative Cloud is a fully managed Pulsar-as-a-Service available in different deployment options, whether it is fully-hosted, on a public cloud but managed by StreamNative or self-managed on Kubernetes.
The StreamNative Pulsar clusters use Oauth2 authentication, so you need to make sure that a service account exists with required permissions to the Pulsar namespace/topic your application is using.
Next, you need to download the Key file (which serves as private
key) of the service account and note the issuer URL (typically
https://auth.streamnative.cloud/
) and the audience (for example
urn:sn:pulsar:o-rf3ol:redhat
) for your cluster. The Pulsar Clients
page in the Admin section in the StreamNative Cloud console helps you
with this process.
To configure your application with Pulsar Oauth2 authentication:
pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.client.authPluginClassName=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
pulsar.client.authParams={"type":"client_credentials","privateKey":"data:application/json;base64,<base64-encoded value>","issuerUrl":"https://auth.streamnative.cloud/","audience":"urn:sn:pulsar:o-rfwel:redhat"}
Note that the pulsar.client.authParams
configuration contains a Json
string with issuerUrl
, audience
and the privateKey
in the
data:application/json;base64,<base64-encoded-key-file>
format.
Alternatively you can configure the authentication programmatically:
package org.acme.pulsar;
import java.net.MalformedURLException;
import java.net.URL;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
@ApplicationScoped
public class PulsarAuth {
@ConfigProperty(name = "pulsar.issuerUrl")
String issuerUrl;
@ConfigProperty(name = "pulsar.credentials")
String credentials;
@ConfigProperty(name = "pulsar.audience")
String audience;
@Produces
@Identifier("pulsar-auth")
public ClientConfigurationData pulsarClientConfig() throws MalformedURLException {
var data = new ClientConfigurationData();
data.setAuthentication(AuthenticationFactoryOAuth2.clientCredentials(new URL(issuerUrl), PulsarAuth.class.getResource(credentials), audience));
return data;
}
}
This assumes that the key file is included to the application classpath as a resource, then the configuration would like the following:
mp.messaging.incoming.prices.client-configuration=pulsar-auth
pulsar.tenant=public
pulsar.namespace=default
pulsar.client.serviceUrl=pulsar+ssl://quarkus-71eaadbf-a6f3-4355-85d2-faf436b23d86.aws-euc1-prod-snci-pool-slug.streamnative.aws.snio.cloud:6651
pulsar.issuerUrl=https://auth.streamnative.cloud/
pulsar.audience=urn:sn:pulsar:o-rfwel:redhat
pulsar.credentials=/o-rfwel-quarkus-app.json
Note that channels using the client configuration identified with
pulsar-auth
need to set the client-configuration
attribute.
10. Health Checks
The Quarkus extension reports startup, readiness and liveness of each channel managed by the Pulsar connector. Health checks rely on the Pulsar client to verify that a connection is established with the broker.
Startup and Readiness probes for both inbound and outbound channels report OK when the connection with the broker is established.
The Liveness probe for both inbound and outbound channels reports OK when the connection is established with the broker AND that no failures have been caught.
Note that a message processing failures nacks the message which is then
handled by the failure-strategy. It is the responsibility of the
failure-strategy to report the failure and influence the outcome of the
liveness checks. The fail
failure strategy reports the failure and so the
liveness check will report the failure.
11. Configuration Reference
Following are the list of configuration attributes for the Pulsar connector channels, consumers, producers and clients. See the Pulsar Client Configuration for more information on how the Pulsar clients are configured.
11.1. Incoming channel configuration (receiving from Pulsar)
The following attributes are configured using:
mp.messaging.incoming.your-channel-name.attribute=value
Attribute (alias) | Description | Type | Mandatory | Default |
---|---|---|---|---|
ack-strategy |
Specify the commit strategy to apply when a message produced from a record is acknowledged. Values can be |
string |
false |
|
ackTimeout.redeliveryBackoff |
Comma separated values for configuring ack timeout MultiplierRedeliveryBackoff, min delay, max delay, multiplier. |
string |
false |
|
batchReceive |
Whether batch receive is used to consume messages |
boolean |
false |
|
client-configuration |
Identifier of a CDI bean that provides the default Pulsar client configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier. |
string |
false |
|
consumer-configuration |
Identifier of a CDI bean that provides the default Pulsar consumer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier. |
string |
false |
|
deadLetterPolicy.deadLetterTopic |
Name of the dead letter topic where the failing messages will be sent |
string |
false |
|
deadLetterPolicy.initialSubscriptionName |
Name of the initial subscription name of the dead letter topic |
string |
false |
|
deadLetterPolicy.maxRedeliverCount |
Maximum number of times that a message will be redelivered before being sent to the dead letter topic |
int |
false |
|
deadLetterPolicy.retryLetterTopic |
Name of the retry topic where the failing messages will be sent |
string |
false |
|
failure-strategy |
Specify the failure strategy to apply when a message produced from a record is acknowledged negatively (nack). Values can be |
string |
false |
|
health-enabled |
Whether health reporting is enabled (default) or disabled |
boolean |
false |
|
negativeAck.redeliveryBackoff |
Comma separated values for configuring negative ack MultiplierRedeliveryBackoff, min delay, max delay, multiplier. |
string |
false |
|
reconsumeLater.delay |
Default delay for reconsume failure-strategy, in seconds |
long |
false |
|
schema |
The Pulsar schema type of this channel. When configured a schema is built with the given SchemaType and used for the channel. When absent, the schema is resolved searching for a CDI bean typed |
string |
false |
|
serviceUrl |
The service URL for the Pulsar service |
string |
false |
|
topic |
The consumed / populated Pulsar topic. If not set, the channel name is used |
string |
false |
|
tracing-enabled |
Whether tracing is enabled (default) or disabled |
boolean |
false |
|
You can also configure properties supported by the underlying Pulsar consumer.
These properties can also be globally configured using pulsar.consumer
prefix:
pulsar.consumer.subscriptionInitialPosition=Earliest
Attribute | Description | Type | Config file | Default |
---|---|---|---|---|
topicNames |
Topic name |
Set |
true |
[] |
topicsPattern |
Topic pattern |
Pattern |
true |
|
subscriptionName |
Subscription name |
String |
true |
|
subscriptionType |
Subscription type. |
SubscriptionType |
true |
Exclusive |
subscriptionProperties |
Map |
true |
||
subscriptionMode |
SubscriptionMode |
true |
Durable |
|
messageListener |
MessageListener |
false |
||
consumerEventListener |
ConsumerEventListener |
false |
||
negativeAckRedeliveryBackoff |
Interface for custom message is negativeAcked policy. You can specify |
RedeliveryBackoff |
false |
|
ackTimeoutRedeliveryBackoff |
Interface for custom message is ackTimeout policy. You can specify |
RedeliveryBackoff |
false |
|
receiverQueueSize |
Size of a consumer’s receiver queue. |
int |
true |
1000 |
acknowledgementsGroupTimeMicros |
Group a consumer acknowledgment for a specified time. |
long |
true |
100000 |
maxAcknowledgmentGroupSize |
Group a consumer acknowledgment for the number of messages. |
int |
true |
1000 |
negativeAckRedeliveryDelayMicros |
Delay to wait before redelivering messages that failed to be processed. |
long |
true |
60000000 |
maxTotalReceiverQueueSizeAcrossPartitions |
The max total receiver queue size across partitions. |
int |
true |
50000 |
consumerName |
Consumer name |
String |
true |
|
ackTimeoutMillis |
Timeout of unacked messages |
long |
true |
|
tickDurationMillis |
Granularity of the ack-timeout redelivery. |
long |
true |
1000 |
priorityLevel |
Priority level for a consumer to which a broker gives more priority while dispatching messages in Shared subscription type. Order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4. |
int |
true |
|
maxPendingChunkedMessage |
The maximum size of a queue holding pending chunked messages. When the threshold is reached, the consumer drops pending messages to optimize memory utilization. |
int |
true |
10 |
autoAckOldestChunkedMessageOnQueueFull |
Whether to automatically acknowledge pending chunked messages when the threshold of |
boolean |
true |
false |
expireTimeOfIncompleteChunkedMessageMillis |
The time interval to expire incomplete chunks if a consumer fails to receive all the chunks in the specified time period. The default value is 1 minute. |
long |
true |
60000 |
cryptoKeyReader |
CryptoKeyReader |
false |
||
messageCrypto |
MessageCrypto |
false |
||
cryptoFailureAction |
Consumer should take action when it receives a message that can not be decrypted. The decompression of message fails. If messages contain batch messages, a client is not be able to retrieve individual messages in batch. Delivered encrypted message contains |
ConsumerCryptoFailureAction |
true |
FAIL |
properties |
A name or value property of this consumer.
When getting a topic stats, associate this metadata with the consumer stats for easier identification. |
SortedMap |
true |
{} |
readCompacted |
If enabling A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal. Only enabling Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a |
boolean |
true |
false |
subscriptionInitialPosition |
Initial position at which to set cursor when subscribing to a topic at first time. |
SubscriptionInitialPosition |
true |
Latest |
patternAutoDiscoveryPeriod |
Topic auto discovery period when using a pattern for topic’s consumer. The default and minimum value is 1 minute. |
int |
true |
60 |
regexSubscriptionMode |
When subscribing to a topic using a regular expression, you can pick a certain type of topics. * PersistentOnly: only subscribe to persistent topics. |
RegexSubscriptionMode |
true |
PersistentOnly |
deadLetterPolicy |
Dead letter policy for consumers. By default, some messages are probably redelivered many times, even to the extent that it never stops. By using the dead letter mechanism, messages have the max redelivery count. When exceeding the maximum number of redeliveries, messages are sent to the Dead Letter Topic and acknowledged automatically. You can enable the dead letter mechanism by setting When specifying the dead letter policy while not specifying |
DeadLetterPolicy |
true |
|
retryEnable |
boolean |
true |
false |
|
batchReceivePolicy |
BatchReceivePolicy |
false |
||
autoUpdatePartitions |
If Note: this is only for partitioned consumers. |
boolean |
true |
true |
autoUpdatePartitionsIntervalSeconds |
long |
true |
60 |
|
replicateSubscriptionState |
If |
boolean |
true |
false |
resetIncludeHead |
boolean |
true |
false |
|
keySharedPolicy |
KeySharedPolicy |
false |
||
batchIndexAckEnabled |
boolean |
true |
false |
|
ackReceiptEnabled |
boolean |
true |
false |
|
poolMessages |
boolean |
true |
false |
|
payloadProcessor |
MessagePayloadProcessor |
false |
||
startPaused |
boolean |
true |
false |
|
autoScaledReceiverQueueSizeEnabled |
boolean |
true |
false |
|
topicConfigurations |
List |
true |
[] |
11.2. Outgoing channel configuration (publishing to Pulsar)
Attribute (alias) | Description | Type | Mandatory | Default |
---|---|---|---|---|
client-configuration |
Identifier of a CDI bean that provides the default Pulsar client configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier. |
string |
false |
|
health-enabled |
Whether health reporting is enabled (default) or disabled |
boolean |
false |
|
maxPendingMessages |
The maximum size of a queue holding pending messages, i.e messages waiting to receive an acknowledgment from a broker |
int |
false |
|
producer-configuration |
Identifier of a CDI bean that provides the default Pulsar producer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map<String, Object> and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier. |
string |
false |
|
schema |
The Pulsar schema type of this channel. When configured a schema is built with the given SchemaType and used for the channel. When absent, the schema is resolved searching for a CDI bean typed |
string |
false |
|
serviceUrl |
The service URL for the Pulsar service |
string |
false |
|
topic |
The consumed / populated Pulsar topic. If not set, the channel name is used |
string |
false |
|
tracing-enabled |
Whether tracing is enabled (default) or disabled |
boolean |
false |
|
waitForWriteCompletion |
Whether the client waits for the broker to acknowledge the written record before acknowledging the message |
boolean |
false |
|
You can also configure properties supported by the underlying Pulsar producer.
These properties can also be globally configured using pulsar.producer
prefix:
pulsar.producer.batchingEnabled=false
Attribute | Description | Type | Config file | Default |
---|---|---|---|---|
topicName |
Topic name |
String |
true |
|
producerName |
Producer name |
String |
true |
|
sendTimeoutMs |
Message send timeout in ms. |
long |
true |
30000 |
blockIfQueueFull |
If it is set to The |
boolean |
true |
false |
maxPendingMessages |
The maximum size of a queue holding pending messages. For example, a message waiting to receive an acknowledgment from a broker. By default, when the queue is full, all calls to the |
int |
true |
|
maxPendingMessagesAcrossPartitions |
The maximum number of pending messages across partitions. Use the setting to lower the max pending messages for each partition ( |
int |
true |
|
messageRoutingMode |
Message routing logic for producers on partitioned topics. |
MessageRoutingMode |
true |
|
hashingScheme |
Hashing function determining the partition where you publish a particular message (partitioned topics only). |
HashingScheme |
true |
JavaStringHash |
cryptoFailureAction |
Producer should take action when encryption fails. |
ProducerCryptoFailureAction |
true |
FAIL |
customMessageRouter |
MessageRouter |
false |
||
batchingMaxPublishDelayMicros |
Batching time period of sending messages. |
long |
true |
1000 |
batchingPartitionSwitchFrequencyByPublishDelay |
int |
true |
10 |
|
batchingMaxMessages |
The maximum number of messages permitted in a batch. |
int |
true |
1000 |
batchingMaxBytes |
int |
true |
131072 |
|
batchingEnabled |
Enable batching of messages. |
boolean |
true |
true |
batcherBuilder |
BatcherBuilder |
false |
||
chunkingEnabled |
Enable chunking of messages. |
boolean |
true |
false |
chunkMaxMessageSize |
int |
true |
-1 |
|
cryptoKeyReader |
CryptoKeyReader |
false |
||
messageCrypto |
MessageCrypto |
false |
||
encryptionKeys |
Set |
true |
[] |
|
compressionType |
Message data compression type used by a producer. |
CompressionType |
true |
NONE |
initialSequenceId |
Long |
true |
||
autoUpdatePartitions |
boolean |
true |
true |
|
autoUpdatePartitionsIntervalSeconds |
long |
true |
60 |
|
multiSchema |
boolean |
true |
true |
|
accessMode |
ProducerAccessMode |
true |
Shared |
|
lazyStartPartitionedProducers |
boolean |
true |
false |
|
properties |
SortedMap |
true |
{} |
|
initialSubscriptionName |
Use this configuration to automatically create an initial subscription when creating a topic. If this field is not set, the initial subscription is not created. |
String |
true |
11.3. Pulsar Client Configuration
Following is the configuration reference for the underlying PulsarClient
.
These options can be configured using the channel attribute:
mp.messaging.incoming.your-channel-name.numIoThreads=4
Or configured globally using pulsar.client
prefix:
pulsar.client.serviceUrl=pulsar://pulsar:6650
Attribute | Description | Type | Config file | Default |
---|---|---|---|---|
serviceUrl |
Pulsar cluster HTTP URL to connect to a broker. |
String |
true |
|
serviceUrlProvider |
The implementation class of ServiceUrlProvider used to generate ServiceUrl. |
ServiceUrlProvider |
false |
|
authentication |
Authentication settings of the client. |
Authentication |
false |
|
authPluginClassName |
Class name of authentication plugin of the client. |
String |
true |
|
authParams |
Authentication parameter of the client. |
String |
true |
|
authParamMap |
Authentication map of the client. |
Map |
true |
|
operationTimeoutMs |
Client operation timeout (in milliseconds). |
long |
true |
30000 |
lookupTimeoutMs |
Client lookup timeout (in milliseconds). |
long |
true |
-1 |
statsIntervalSeconds |
Interval to print client stats (in seconds). |
long |
true |
60 |
numIoThreads |
Number of IO threads. |
int |
true |
10 |
numListenerThreads |
Number of consumer listener threads. |
int |
true |
10 |
connectionsPerBroker |
Number of connections established between the client and each Broker. A value of 0 means to disable connection pooling. |
int |
true |
1 |
connectionMaxIdleSeconds |
Release the connection if it is not used for more than [connectionMaxIdleSeconds] seconds. If [connectionMaxIdleSeconds] < 0, disabled the feature that auto release the idle connections |
int |
true |
180 |
useTcpNoDelay |
Whether to use TCP NoDelay option. |
boolean |
true |
true |
useTls |
Whether to use TLS. |
boolean |
true |
false |
tlsKeyFilePath |
Path to the TLS key file. |
String |
true |
|
tlsCertificateFilePath |
Path to the TLS certificate file. |
String |
true |
|
tlsTrustCertsFilePath |
Path to the trusted TLS certificate file. |
String |
true |
|
tlsAllowInsecureConnection |
Whether the client accepts untrusted TLS certificates from the broker. |
boolean |
true |
false |
tlsHostnameVerificationEnable |
Whether the hostname is validated when the client creates a TLS connection with brokers. |
boolean |
true |
false |
concurrentLookupRequest |
The number of concurrent lookup requests that can be sent on each broker connection. Setting a maximum prevents overloading a broker. |
int |
true |
5000 |
maxLookupRequest |
Maximum number of lookup requests allowed on each broker connection to prevent overloading a broker. |
int |
true |
50000 |
maxLookupRedirects |
Maximum times of redirected lookup requests. |
int |
true |
20 |
maxNumberOfRejectedRequestPerConnection |
Maximum number of rejected requests of a broker in a certain time frame (60 seconds) after the current connection is closed and the client creating a new connection to connect to a different broker. |
int |
true |
50 |
keepAliveIntervalSeconds |
Seconds of keeping alive interval for each client broker connection. |
int |
true |
30 |
connectionTimeoutMs |
Duration of waiting for a connection to a broker to be established.If the duration passes without a response from a broker, the connection attempt is dropped. |
int |
true |
10000 |
requestTimeoutMs |
Maximum duration for completing a request. |
int |
true |
60000 |
readTimeoutMs |
Maximum read time of a request. |
int |
true |
60000 |
autoCertRefreshSeconds |
Seconds of auto refreshing certificate. |
int |
true |
300 |
initialBackoffIntervalNanos |
Initial backoff interval (in nanosecond). |
long |
true |
100000000 |
maxBackoffIntervalNanos |
Max backoff interval (in nanosecond). |
long |
true |
60000000000 |
enableBusyWait |
Whether to enable BusyWait for EpollEventLoopGroup. |
boolean |
true |
false |
listenerName |
Listener name for lookup. Clients can use listenerName to choose one of the listeners as the service URL to create a connection to the broker as long as the network is accessible."advertisedListeners" must enabled in broker side. |
String |
true |
|
useKeyStoreTls |
Set TLS using KeyStore way. |
boolean |
true |
false |
sslProvider |
The TLS provider used by an internal client to authenticate with other Pulsar brokers. |
String |
true |
|
tlsKeyStoreType |
TLS KeyStore type configuration. |
String |
true |
JKS |
tlsKeyStorePath |
Path of TLS KeyStore. |
String |
true |
|
tlsKeyStorePassword |
Password of TLS KeyStore. |
String |
true |
|
tlsTrustStoreType |
TLS TrustStore type configuration. You need to set this configuration when client authentication is required. |
String |
true |
JKS |
tlsTrustStorePath |
Path of TLS TrustStore. |
String |
true |
|
tlsTrustStorePassword |
Password of TLS TrustStore. |
String |
true |
|
tlsCiphers |
Set of TLS Ciphers. |
Set |
true |
[] |
tlsProtocols |
Protocols of TLS. |
Set |
true |
[] |
memoryLimitBytes |
Limit of client memory usage (in byte). The 64M default can guarantee a high producer throughput. |
long |
true |
67108864 |
proxyServiceUrl |
URL of proxy service. proxyServiceUrl and proxyProtocol must be mutually inclusive. |
String |
true |
|
proxyProtocol |
Protocol of proxy service. proxyServiceUrl and proxyProtocol must be mutually inclusive. |
ProxyProtocol |
true |
|
enableTransaction |
Whether to enable transaction. |
boolean |
true |
false |
clock |
Clock |
false |
||
dnsLookupBindAddress |
The Pulsar client dns lookup bind address, default behavior is bind on 0.0.0.0 |
String |
true |
|
dnsLookupBindPort |
The Pulsar client dns lookup bind port, takes effect when dnsLookupBindAddress is configured, default value is 0. |
int |
true |
|
socks5ProxyAddress |
Address of SOCKS5 proxy. |
InetSocketAddress |
true |
|
socks5ProxyUsername |
User name of SOCKS5 proxy. |
String |
true |
|
socks5ProxyPassword |
Password of SOCKS5 proxy. |
String |
true |
|
description |
The extra description of the client version. The length cannot exceed 64. |
String |
true |
Configuration properties not configurable in configuration files
(non-serializable) is noted in the column |
12. Going further
This guide has shown how you can interact with Pulsar using Quarkus. It utilizes SmallRye Reactive Messaging to build data streaming applications.
If you want to go further, check the documentation of SmallRye Reactive Messaging, the implementation used in Quarkus.