Durable Queues
Shared queue system for MQTT and AMQP with consumer groups, acknowledgments, stream queues, and append-only log storage
Durable Queues
Last Updated: 2026-02-05
FluxMQ provides durable queues shared across MQTT, AMQP 1.0, and AMQP 0.9.1. The queue manager is append-only with consumer groups and supports both classic work-queue semantics and stream-style consumption.
Overview
Durable queues are built around a simple model:
- Producers publish to
$queue/<queue-name>/.... - The broker appends the message to the queue’s log (durable storage).
- Consumers join a consumer group and receive messages from that log.
- Consumers ack, nack, or reject deliveries to control redelivery and truncation.
Queues can operate in two modes:
- Classic (work queue): messages are claimed, tracked in a pending list (PEL), and acknowledged.
- Stream: consumers read sequentially with cursor-based progress (replayable log semantics).
Do MQTT Producers Need $queue/?
Yes. In FluxMQ today, MQTT publishes are treated as durable-queue traffic only when the topic starts with $queue/.
- Publish to
sensors/tempgoes through normal pub/sub routing (subscriptions, retained, cluster pub/sub). - Publish to
$queue/orders/...goes through the queue manager (durable log + consumer groups + acks).
If you configured a queue with topic bindings that match a non-$queue/ topic, MQTT publishes to that topic will still not be enqueued, because the MQTT broker does not route non-$queue/ publishes into the queue manager.
Architecture
┌──────────────┐ ┌──────────────┐ ┌───────────────┐
│ MQTT Broker │ │ AMQP Broker │ │ AMQP091 Broker│
│ (TCP/WS/ │ │ (AMQP 1.0) │ │ (AMQP 0.9.1) │
│ HTTP/CoAP) │ │ │ │ │
└──────┬───────┘ └───────┬──────┘ └──────┬────────┘
│ │ │
└──────────────────┼────────────────┘
▼
┌─────────────────────────┐
│ Shared Queue Manager │
│ - Topic bindings │
│ - Consumer groups │
│ - Retention loop │
└───────────┬─────────────┘
▼
┌─────────────────────────┐
│ Log Storage (AOL) │
└─────────────────────────┘Queue Name, Topic, and Routing Key
Queue topics follow this convention:
- Queue root:
$queue/<name> - Routing key: everything after the root (optional)
Examples:
$queue/orders(root only)$queue/orders/created(routing keycreated)$queue/orders/images/png(routing keyimages/png)
When you subscribe to $queue/orders/images/#, FluxMQ treats:
- queue name:
orders - group filter pattern:
images/#
That pattern is applied inside the queue manager (not in the normal pub/sub router).
Wildcard Patterns and Routing Key Filtering
Queue subscriptions support MQTT-style wildcards (+ for single level, # for multiple levels). However, the wildcard handling is split into two stages:
- Queue binding: The queue itself is bound to
$queue/<name>/#(captures all messages for that queue) - Consumer filter: The pattern after the queue name filters messages during delivery
Example 1: Simple Queue
Subscribe: $queue/orders
Publish: $queue/orders- Queue name:
orders - Consumer filter: (none)
- All messages to
$queue/ordersare delivered
Example 2: Single-Level Routing Key
Subscribe: $queue/orders/images
Publish: $queue/orders/images- Queue name:
orders - Consumer filter:
images - Only messages with routing key
imagesare delivered
Example 3: Multi-Level Wildcard
Subscribe: $queue/orders/images/#
Publish: $queue/orders/images/png
Publish: $queue/orders/images/resize/thumbnail- Queue name:
orders - Consumer filter:
images/# - Queue binding:
$queue/orders/#(captures everything) - During delivery, filter
images/#matchesimages/pngandimages/resize/thumbnail
Example 4: Single-Level Wildcard
Subscribe: $queue/orders/+/images/#
Publish: $queue/orders/eu/images/resize
Publish: $queue/orders/us/images/png- Queue name:
orders - Consumer filter:
+/images/#
Flow for publish to $queue/orders/eu/images/resize:
FindMatchingQueues("$queue/orders/eu/images/resize")matches queueorders(bound to$queue/orders/#)- Message stored in queue with topic
$queue/orders/eu/images/resize - During delivery, routing key is extracted:
eu/images/resize - Filter
+/images/#matches:+matcheseuimagesmatchesimages#matchesresize
- Message delivered to consumer
Example 5: Multiple Consumer Groups with Different Filters
One queue can have multiple consumer groups with different filter patterns:
# Group A: processes all EU orders
Subscribe: $queue/orders/eu/#
(consumer-group: "eu-processors")
# Group B: processes all image-related orders
Subscribe: $queue/orders/+/images/#
(consumer-group: "image-processors")
# Publish
Publish: $queue/orders/eu/images/resizeBoth groups receive the message because:
- Group A filter
eu/#matcheseu/images/resize - Group B filter
+/images/#matcheseu/images/resize
Key Points
- The publisher must always use the
$queue/prefix for messages to be routed through the queue manager - The queue is bound to
$queue/<name>/#regardless of the subscription pattern - Wildcard filtering happens at delivery time, not at publish time
- Multiple consumer groups can have different filters on the same queue
Routing Model (How Messages Find Queues)
FluxMQ uses topic patterns on queues to decide where a publish should be stored. This is a fan-out model: one publish can land in multiple queues if multiple patterns match.
- Each queue has one or more topic patterns (MQTT wildcard syntax).
- On publish, the queue manager calls
FindMatchingQueues(topic)and appends the message to every matching queue. - If no queue matches, FluxMQ creates an ephemeral queue whose name and pattern equal the topic, then appends.
- If no queues are configured at all, FluxMQ creates a reserved
mqttqueue that matches$queue/#so queue publishes always have a landing zone.
Ephemeral queues are a safety net, but they can be surprising if you publish before configuring queues or before consumers subscribe. In production, prefer explicit queues: configuration so queue names and bindings are stable.
Walkthrough: Queue Message Lifecycle
This is the “classic” (work queue) lifecycle:
- Producer publishes to a
$queue/<name>/...topic (or a queue-capable AMQP address). - Queue manager matches the topic against queue bindings and appends to the queue log(s).
- A consumer group claims a message at some offset; the message becomes pending (tracked in the PEL).
- The broker delivers the message to the chosen consumer.
- The consumer acks/nacks/rejects; the queue manager updates group state and may advance the safe truncation point.
Queue Addressing
Queue topics use $queue/<queue-name>/....
MQTT
- Publish:
$queue/orders - Publish with routing key:
$queue/orders/eu/images/resize - Subscribe to a pattern:
$queue/orders/#or$queue/orders/+/images/# - Ack:
$queue/orders/$ack - Nack:
$queue/orders/$nack - Reject:
$queue/orders/$reject
AMQP 0.9.1
AMQP 0.9.1 clients can interact with queues in multiple ways:
Direct queue publish (default exchange with $queue/ routing key):
exchange: "" (default)
routing_key: "$queue/orders"
routing_key: "$queue/orders/eu/images/resize" # with routing keyStream queue publish (declare queue with x-queue-type: stream):
exchange: "" (default)
routing_key: "my-stream" # queue name without $queue/ prefixExchange-based routing (bind queue to exchange):
queue.bind(queue="orders", exchange="my-exchange", routing_key="orders.#")
publish(exchange="my-exchange", routing_key="orders.eu.images")Consume from queue:
basic.consume(queue="$queue/orders/#")
basic.consume(queue="$queue/orders/+/images/#")
basic.consume(queue="my-stream") # stream queue by nameAcknowledgments:
basic.ack→ Ackbasic.nack(requeue=true)→ Nack (retry)basic.reject(requeue=false)→ Reject (DLQ)
Consumer Groups
Consumer groups are what turn a queue log into a scalable worker pool:
- A single queue can have many groups (independent progress).
- A single group can have many consumers (load balancing).
- Each group has its own cursor state and (in classic mode) pending list.
How to set the group ID depends on the protocol:
- MQTT v5:
consumer-groupuser property on SUBSCRIBE. - MQTT v3: falls back to client ID (acks require MQTT v5 user properties).
- AMQP 1.0:
consumer-groupin attach properties. - AMQP 0.9.1:
x-consumer-groupargument onbasic.consume.
MQTT v5 Example
mosquitto_sub -p 1883 -t '$queue/orders/#' -q 1 \
-D subscribe user-property consumer-group workersAMQP 0.9.1 Example (Go)
ch.Consume(
"$queue/orders/#", // queue
"consumer-tag", // consumer tag
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
amqp091.Table{
"x-consumer-group": "workers",
},
)Message Properties
Queue deliveries include properties that make acknowledgments and observability consistent across protocols:
message-id(required for ack/nack/reject)group-id(consumer group name)queue(queue name)offset(sequence number)
Stream deliveries also include:
x-stream-offsetx-stream-timestamp(unix millis)x-work-committed-offset(if primary group is configured)x-work-acked(true when below committed offset)x-work-group(primary work group name)
Acknowledgments
MQTT
Ack/Nack/Reject are implemented by publishing to:
$queue/<queue>/$ack$queue/<queue>/$nack$queue/<queue>/$reject
If you include a routing key (for example $queue/orders/images/$ack), the broker still derives the queue name from the first segment after $queue/.
MQTT v5 user properties required:
message-idgroup-id
MQTT v3 can publish and subscribe to queue topics, but acknowledgments require MQTT v5 user properties.
AMQP 1.0
AMQP dispositions are mapped to queue acknowledgments:
- Accepted → Ack
- Released → Nack
- Rejected → Reject
AMQP 0.9.1
basic.ack,basic.nack,basic.rejectmap to Ack/Nack/Reject
Stream Commit (AMQP 0.9.1)
If a stream group has auto-commit disabled, AMQP 0.9.1 can explicitly commit offsets by publishing to:
$queue/<queue>/$commit
Headers:
x-group-idx-offset
Queue Types
Classic (Work Queue)
Classic queues are optimized for "do work once" semantics:
- Messages are claimed and tracked in a Pending Entry List (PEL)
ackremoves a message from the PEL;nackmakes it eligible for redelivery;rejectremoves it (DLQ wiring is incomplete)- A visibility timeout plus work stealing prevents stuck consumers from stalling progress
- The safe truncation point is derived from group state (see cursor/committed below)
Classic Mode Delivery Flow
- Consumer requests messages via
ClaimBatch - Manager reads message from queue log at cursor position
- Creates PEL entry:
{ offset, consumerID, claimedAt, deliveryCount } - Advances cursor to next position
- Delivers message to consumer
- On
ack: removes PEL entry, advances committed offset - On
nack: resets PEL entry for redelivery - On timeout: message becomes stealable by other consumers
Classic Mode State:
Queue Log: [0] [1] [2] [3] [4] [5] [6] [7] ...
▲ ▲
│ └── cursor (next to deliver)
└── committed (safe to truncate)
PEL: {
offset=3: { consumer: "c1", claimedAt: t1, deliveries: 1 }
offset=4: { consumer: "c2", claimedAt: t2, deliveries: 2 }
offset=5: { consumer: "c1", claimedAt: t3, deliveries: 1 }
}Stream
Stream queues are optimized for "replay and progress" semantics:
- Consumption is cursor-based (read position in an append-only log)
- No PEL tracking - messages are simply read, not "claimed"
- Groups can auto-commit progress (default) or require explicit commits (AMQP 0.9.1)
Stream Mode Delivery Flow
- Consumer requests messages via
ClaimBatchStream - Manager reads messages from queue log starting at cursor position
- No PEL entry created - message is simply read
- Advances cursor to next position
- Delivers messages to consumer
- If
autoCommit: true: committed offset is updated periodically - If
autoCommit: false: consumer must explicitly commit via$queue/<name>/$commit
Stream Mode State:
Queue Log: [0] [1] [2] [3] [4] [5] [6] [7] ...
▲ ▲
│ └── cursor (next to read)
└── committed (checkpoint)
No PEL - just two offsets to track!Why No PEL for Streams?
Stream mode is designed for different use cases than classic work queues:
| Aspect | Classic (PEL) | Stream (No PEL) |
|---|---|---|
| Semantics | Process once | Read/replay many times |
| Delivery guarantee | At-least-once with ack | At-least-once with cursor |
| Redelivery trigger | Visibility timeout | Consumer restart/seek |
| Work stealing | Yes (steal from slow peers) | No (each consumer independent) |
| Memory overhead | PEL entry per pending message | Only cursor position |
| Replayability | No (acked = done) | Yes (seek to any offset) |
Classic mode answers: "Has this message been successfully processed?"
Stream mode answers: "Where is my read position in the log?"
In stream mode, the cursor itself serves as the progress indicator. If a consumer crashes:
- It restarts from the last committed cursor position
- Messages between committed and actual progress are re-read
- No per-message tracking needed - just resume from checkpoint
This makes streams more efficient for high-throughput scenarios where:
- Messages are idempotent or replayable
- Multiple independent readers need the same data
- You want Kafka-like log semantics
AMQP 0.9.1 Stream Queues
Declare a stream queue with retention settings:
ch.QueueDeclare(
"my-stream", // name
true, // durable
false, // auto-delete
false, // exclusive
false, // no-wait
amqp091.Table{
"x-queue-type": "stream",
"x-max-age": "7D", // retain 7 days
"x-max-length-bytes": 1073741824, // 1GB max
"x-max-length": 1000000, // 1M messages max
},
)Consume with cursor positioning:
ch.Consume(
"my-stream", // queue name (not $queue/ prefix)
"consumer-tag",
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
amqp091.Table{
"x-consumer-group": "readers",
"x-stream-offset": "first", // "first", "last", "next", offset number, or "timestamp=..."
"x-auto-commit": false, // require manual commit
},
)Stream offset options:
"first"- Start from the beginning of the log"last"or"next"- Start from the end (new messages only)123(number) - Start from specific offset"offset=123"- Same as above, string format"timestamp=1706745600000"- Start from timestamp (unix millis)
Manual commit (when x-auto-commit: false):
// Publish to special commit topic
ch.Publish("", "$queue/my-stream/$commit", false, false, amqp091.Publishing{
Headers: amqp091.Table{
"x-group-id": "readers",
"x-offset": int64(lastProcessedOffset),
},
})Cursors, Pending Lists, and Safe Truncation
Each consumer group tracks progress using two offsets:
cursor: the next offset that will be delivered (read position)committed: the oldest offset considered safe for retention/truncation (durability floor)
In classic mode, committed is driven by the minimum pending offset in the PEL.
Practical intuition:
- Cursor moves forward when the group claims work.
- Committed moves forward when the group acks (or otherwise clears) the earliest pending work.
Example: Classic Group Progress
Imagine a queue log with offsets 0..5.
| Event | Cursor | PEL | Committed |
|---|---|---|---|
| Group starts | 0 | empty | 0 |
| Claims offsets 0,1 | 2 | 1 | 0 |
| Acks offset 0 | 2 | 1 | 1 |
| Acks offset 1 | 2 | empty | 2 |
This is why committed is the “safe truncation floor”: once everything below an offset is fully processed, the log can be truncated without breaking group semantics.
Retention
Retention policies can be configured per queue:
max_age(time-based)max_length_bytesmax_length_messages
A background retention loop computes a safe truncation offset and truncates the queue log:
- Start from the minimum committed offset across queue-mode consumer groups.
- Apply retention limits (time/size/message count) to compute the oldest offset that should be kept.
- Truncate the log to the lowest safe offset (segment-granular in log storage).
See Storage internals for on-disk format and retention behavior details.
Clustering and Replication Notes
In clustered deployments, queue behavior depends on cluster.raft.*:
write_policycontrols how followers handle incoming queue publishes.distribution_modecontrols whether deliveries are routed (forward) or logs are replicated (replicate).
See Cluster configuration and Clustering internals for the full picture.
DLQ Status
A DLQ handler exists in queue/consumer/dlq.go, but the main delivery path does not automatically move rejected or expired messages into a DLQ yet. Reject currently removes the message from the pending list without pushing it to a DLQ.
Configuration
Queues are configured under queues in the main config file:
queue_manager:
auto_commit_interval: "5s"
queues:
- name: "orders"
topics: ["$queue/orders/#"]
type: "classic" # classic or stream
primary_group: "" # optional for stream status
limits:
max_message_size: 10485760
max_depth: 100000
message_ttl: "168h"
retry:
max_retries: 10
initial_backoff: "5s"
max_backoff: "5m"
multiplier: 2.0
dlq:
enabled: true
topic: "" # optional override
retention:
max_age: "168h"
max_length_bytes: 0
max_length_messages: 0