Durable Queues
Persistent queue system for MQTT and AMQP — durable work queues and stream queues with consumer groups, acknowledgments, and append-only log storage
Durable Queues
Last Updated: 2026-02-12
This page covers FluxMQ's two persistent queue types: durable queues (work queue semantics) and stream queues (append-only log semantics). Both are shared across MQTT, AMQP 1.0, and AMQP 0.9.1.
For an overview of all three queue types (including ephemeral queues), see Queue Types.
Overview
Persistent queues are built around a simple model:
- Producers publish to
$queue/<queue-name>/.... - The broker appends the message to the queue's log (durable storage).
- Consumers join a consumer group and receive messages from that log.
- What happens next depends on the queue type:
| Durable (Classic) | Stream | |
|---|---|---|
| Ack | Remove from pending list, advance truncation point | Checkpoint cursor position |
| Nack | Redeliver the message (with backoff) | No-op (replay via cursor seek) |
| Reject | Discard message (future: DLQ) | Advance cursor past message |
| Retention | Truncate after all groups ack | Time/size/count policy |
| Redelivery | Automatic (visibility timeout + work stealing) | Manual (consumer restarts from checkpoint) |
Design rule: Do not overload ack semantics. "Ack" means "processed, remove it" in a durable queue and "checkpoint my position" in a stream. If you need both task processing and replay for the same data, use two queues with overlapping topic bindings.
Do MQTT Producers Need $queue/?
Yes. In FluxMQ, MQTT publishes are routed to the queue manager only when the topic starts with $queue/.
- Publish to
sensors/tempgoes through normal pub/sub routing (subscriptions, retained, cluster pub/sub). - Publish to
$queue/orders/...goes through the queue manager (log storage + consumer groups + acks).
The $queue/ prefix is what activates queue semantics — whether the target is an ephemeral, durable, or stream queue. If you configured a queue with topic bindings that match a non-$queue/ topic, MQTT publishes to that topic will still not be enqueued, because the MQTT broker does not route non-$queue/ publishes into the queue manager.
Architecture
┌──────────────┐ ┌──────────────┐ ┌───────────────┐
│ MQTT Broker │ │ AMQP Broker │ │ AMQP091 Broker│
│ (TCP/WS/ │ │ (AMQP 1.0) │ │ (AMQP 0.9.1) │
│ HTTP/CoAP) │ │ │ │ │
└──────┬───────┘ └───────┬──────┘ └──────┬────────┘
│ │ │
└──────────────────┼────────────────┘
▼
┌─────────────────────────┐
│ Shared Queue Manager │
│ - Topic bindings │
│ - Consumer groups │
│ - Retention loop │
└───────────┬─────────────┘
▼
┌─────────────────────────┐
│ Log Storage (AOL) │
└─────────────────────────┘Queue Name, Topic, and Routing Key
Queue topics follow this convention:
- Queue root:
$queue/<name> - Routing key: everything after the root (optional)
Examples:
$queue/orders(root only)$queue/orders/created(routing keycreated)$queue/orders/images/png(routing keyimages/png)
When you subscribe to $queue/orders/images/#, FluxMQ treats:
- queue name:
orders - group filter pattern:
images/#
That pattern is applied inside the queue manager (not in the normal pub/sub router).
Wildcard Patterns and Routing Key Filtering
Queue subscriptions support MQTT-style wildcards (+ for single level, # for multiple levels). However, the wildcard handling is split into two stages:
- Queue binding: The queue itself is bound to
$queue/<name>/#(captures all messages for that queue) - Consumer filter: The pattern after the queue name filters messages during delivery
Example 1: Simple Queue
Subscribe: $queue/orders
Publish: $queue/orders- Queue name:
orders - Consumer filter: (none)
- All messages to
$queue/ordersare delivered
Example 2: Single-Level Routing Key
Subscribe: $queue/orders/images
Publish: $queue/orders/images- Queue name:
orders - Consumer filter:
images - Only messages with routing key
imagesare delivered
Example 3: Multi-Level Wildcard
Subscribe: $queue/orders/images/#
Publish: $queue/orders/images/png
Publish: $queue/orders/images/resize/thumbnail- Queue name:
orders - Consumer filter:
images/# - Queue binding:
$queue/orders/#(captures everything) - During delivery, filter
images/#matchesimages/pngandimages/resize/thumbnail
Example 4: Single-Level Wildcard
Subscribe: $queue/orders/+/images/#
Publish: $queue/orders/eu/images/resize
Publish: $queue/orders/us/images/png- Queue name:
orders - Consumer filter:
+/images/#
Flow for publish to $queue/orders/eu/images/resize:
FindMatchingQueues("$queue/orders/eu/images/resize")matches queueorders(bound to$queue/orders/#)- Message stored in queue with topic
$queue/orders/eu/images/resize - During delivery, routing key is extracted:
eu/images/resize - Filter
+/images/#matches:+matcheseuimagesmatchesimages#matchesresize
- Message delivered to consumer
Example 5: Multiple Consumer Groups with Different Filters
One queue can have multiple consumer groups with different filter patterns:
# Group A: processes all EU orders
Subscribe: $queue/orders/eu/#
(consumer-group: "eu-processors")
# Group B: processes all image-related orders
Subscribe: $queue/orders/+/images/#
(consumer-group: "image-processors")
# Publish
Publish: $queue/orders/eu/images/resizeBoth groups receive the message because:
- Group A filter
eu/#matcheseu/images/resize - Group B filter
+/images/#matcheseu/images/resize
Key Points
- The publisher must always use the
$queue/prefix for messages to be routed through the queue manager - The queue is bound to
$queue/<name>/#regardless of the subscription pattern - Wildcard filtering happens at delivery time, not at publish time
- Multiple consumer groups can have different filters on the same queue
Routing Model (How Messages Find Queues)
FluxMQ uses topic patterns on queues to decide where a publish should be stored. This is a fan-out model: one publish can land in multiple queues if multiple patterns match.
- Each queue has one or more topic patterns (MQTT wildcard syntax).
- On publish, the queue manager calls
FindMatchingQueues(topic)and appends the message to every matching queue. - If no queue matches, FluxMQ creates an ephemeral queue whose name and pattern equal the topic, then appends. Ephemeral queues are in-memory, best-effort, and cleaned up after the last consumer disconnects.
- If no queues are configured at all, FluxMQ creates a reserved
mqttqueue that matches$queue/#so queue publishes always have a landing zone.
In production, prefer explicit queues: configuration so queue names, types, and bindings are stable. Ephemeral queues are a development convenience — they prevent silent message loss but don't provide durability or delivery guarantees. See Queue Types for when to use each type.
Walkthrough: Queue Message Lifecycle
This is the “classic” (work queue) lifecycle:
- Producer publishes to a
$queue/<name>/...topic (or a queue-capable AMQP address). - Queue manager matches the topic against queue bindings and appends to the queue log(s).
- A consumer group claims a message at some offset; the message becomes pending (tracked in the PEL).
- The broker delivers the message to the chosen consumer.
- The consumer acks/nacks/rejects; the queue manager updates group state and may advance the safe truncation point.
Queue Addressing
Queue topics use $queue/<queue-name>/....
MQTT
- Publish:
$queue/orders - Publish with routing key:
$queue/orders/eu/images/resize - Subscribe to a pattern:
$queue/orders/#or$queue/orders/+/images/# - Ack:
$queue/orders/$ack - Nack:
$queue/orders/$nack - Reject:
$queue/orders/$reject
AMQP 0.9.1
AMQP 0.9.1 clients can interact with queues in multiple ways:
Direct queue publish (default exchange with $queue/ routing key):
exchange: "" (default)
routing_key: "$queue/orders"
routing_key: "$queue/orders/eu/images/resize" # with routing keyStream queue publish (declare queue with x-queue-type: stream):
exchange: "" (default)
routing_key: "my-stream" # queue name without $queue/ prefixExchange-based routing (bind queue to exchange):
queue.bind(queue="orders", exchange="my-exchange", routing_key="orders.#")
publish(exchange="my-exchange", routing_key="orders.eu.images")Consume from queue:
basic.consume(queue="$queue/orders/#")
basic.consume(queue="$queue/orders/+/images/#")
basic.consume(queue="my-stream") # stream queue by nameAcknowledgments:
basic.ack→ Ackbasic.nack(requeue=true)→ Nack (retry)basic.reject(requeue=false)→ Reject (DLQ)
Exchanges and Bindings Are Per-Connection
Important difference from RabbitMQ: In FluxMQ, AMQP 0.9.1 exchanges and bindings are per-connection routing state. They are not shared across connections, not visible to other clients, and not persisted to disk — even when declared as
durable.
FluxMQ's actual routing layer is topic-based: queues are matched by topic patterns, not by server-side exchange state. When an AMQP 0.9.1 client declares an exchange and binds a queue to it, the channel stores that binding locally. On publish, the channel consults its local binding table to translate exchange + routing_key into a $queue/<queue>/<routing-key> topic, then routes through the shared queue manager like any other protocol.
This means:
- Client A declares an exchange and bindings → client B cannot see them. Each connection maintains its own exchange/binding state.
- Exchanges don't survive reconnection. Clients must re-declare exchanges and bindings on every new connection (which is already standard practice in most AMQP 0.9.1 client libraries).
durable: trueon exchange.declare is accepted but does not persist. The flag is not rejected, but it has no effect — exchange state lives only in connection memory.- Passive exchange.declare checks local state only. It reports whether this connection has declared the exchange, not whether it exists server-wide.
Why this design: FluxMQ routes messages through a shared topic index on queues. Exchanges are a compatibility layer that translates AMQP 0.9.1 routing concepts into FluxMQ topic semantics. Building a second server-wide routing layer for exchanges would duplicate what the queue topic index already provides.
Practical impact: For most AMQP 0.9.1 usage patterns, this is transparent — clients typically declare exchanges and bindings on connection setup before publishing. If your application depends on exchanges being shared across connections (e.g., one client declares, another publishes), use direct $queue/ routing keys with the default exchange instead.
Consumer Groups
Consumer groups are what turn a queue log into a scalable worker pool:
- A single queue can have many groups (independent progress).
- A single group can have many consumers (load balancing).
- Each group has its own cursor state and (in classic mode) pending list.
How to set the group ID depends on the protocol:
- MQTT v5:
consumer-groupuser property on SUBSCRIBE. - MQTT v3: falls back to client ID (acks require MQTT v5 user properties).
- AMQP 1.0:
consumer-groupin attach properties. - AMQP 0.9.1:
x-consumer-groupargument onbasic.consume.
MQTT v5 Example
mosquitto_sub -V mqttv5 -p 1884 -t '$queue/orders/#' -q 1 \
-D subscribe user-property consumer-group workersAMQP 0.9.1 Example (Go)
ch.Consume(
"$queue/orders/#", // queue
"consumer-tag", // consumer tag
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
amqp091.Table{
"x-consumer-group": "workers",
},
)Message Properties
Queue deliveries include properties that make acknowledgments and observability consistent across protocols:
message-id(required for ack/nack/reject)group-id(consumer group name)queue(queue name)offset(sequence number)
Stream deliveries also include:
x-stream-offsetx-stream-timestamp(unix millis)x-work-committed-offset(if primary group is configured)x-work-acked(true when below committed offset)x-work-group(primary work group name)
Acknowledgments
MQTT
Ack/Nack/Reject are implemented by publishing to:
$queue/<queue>/$ack$queue/<queue>/$nack$queue/<queue>/$reject
If you include a routing key (for example $queue/orders/images/$ack), the broker still derives the queue name from the first segment after $queue/.
MQTT v5 user properties required:
message-idgroup-id
MQTT v3 can publish and subscribe to queue topics, but acknowledgments require MQTT v5 user properties.
AMQP 1.0
AMQP dispositions are mapped to queue acknowledgments:
- Accepted → Ack
- Released → Nack
- Rejected → Reject
AMQP 0.9.1
basic.ack,basic.nack,basic.rejectmap to Ack/Nack/Reject
Stream Commit (AMQP 0.9.1)
If a stream group has auto-commit disabled, AMQP 0.9.1 can explicitly commit offsets by publishing to:
$queue/<queue>/$commit
Headers:
x-group-idx-offset
Queue Types
Classic (Work Queue)
Classic queues are optimized for "do work once" semantics:
- Messages are claimed and tracked in a Pending Entry List (PEL)
ackremoves a message from the PEL;nackmakes it eligible for redelivery;rejectremoves it (DLQ wiring is incomplete)- A visibility timeout plus work stealing prevents stuck consumers from stalling progress
- The safe truncation point is derived from group state (see cursor/committed below)
Classic Mode Delivery Flow
- Consumer requests messages via
ClaimBatch - Manager reads message from queue log at cursor position
- Creates PEL entry:
{ offset, consumerID, claimedAt, deliveryCount } - Advances cursor to next position
- Delivers message to consumer
- On
ack: removes PEL entry, advances committed offset - On
nack: resets PEL entry for redelivery - On timeout: message becomes stealable by other consumers
Classic Mode State:
Queue Log: [0] [1] [2] [3] [4] [5] [6] [7] ...
▲ ▲
│ └── cursor (next to deliver)
└── committed (safe to truncate)
PEL: {
offset=3: { consumer: "c1", claimedAt: t1, deliveries: 1 }
offset=4: { consumer: "c2", claimedAt: t2, deliveries: 2 }
offset=5: { consumer: "c1", claimedAt: t3, deliveries: 1 }
}Stream
Stream queues are optimized for "replay and progress" semantics:
- Consumption is cursor-based (read position in an append-only log)
- No PEL tracking - messages are simply read, not "claimed"
- Groups can auto-commit progress (default) or require explicit commits (AMQP 0.9.1)
Stream Mode Delivery Flow
- Consumer requests messages via
ClaimBatchStream - Manager reads messages from queue log starting at cursor position
- No PEL entry created - message is simply read
- Advances cursor to next position
- Delivers messages to consumer
- If
autoCommit: true: committed offset is updated periodically - If
autoCommit: false: consumer must explicitly commit via$queue/<name>/$commit
Stream Mode State:
Queue Log: [0] [1] [2] [3] [4] [5] [6] [7] ...
▲ ▲
│ └── cursor (next to read)
└── committed (checkpoint)
No PEL - just two offsets to track!Why No PEL for Streams?
Stream mode is designed for different use cases than classic work queues:
| Aspect | Classic (PEL) | Stream (No PEL) |
|---|---|---|
| Semantics | Process once | Read/replay many times |
| Delivery guarantee | At-least-once with ack | At-least-once with cursor |
| Redelivery trigger | Visibility timeout | Consumer restart/seek |
| Work stealing | Yes (steal from slow peers) | No (each consumer independent) |
| Memory overhead | PEL entry per pending message | Only cursor position |
| Replayability | No (acked = done) | Yes (seek to any offset) |
Classic mode answers: "Has this message been successfully processed?"
Stream mode answers: "Where is my read position in the log?"
In stream mode, the cursor itself serves as the progress indicator. If a consumer crashes:
- It restarts from the last committed cursor position
- Messages between committed and actual progress are re-read
- No per-message tracking needed - just resume from checkpoint
This makes streams more efficient for high-throughput scenarios where:
- Messages are idempotent or replayable
- Multiple independent readers need the same data
- You want Kafka-like log semantics
AMQP 0.9.1 Stream Queues
Declare a stream queue with retention settings:
ch.QueueDeclare(
"my-stream", // name
true, // durable
false, // auto-delete
false, // exclusive
false, // no-wait
amqp091.Table{
"x-queue-type": "stream",
"x-max-age": "7D", // retain 7 days
"x-max-length-bytes": 1073741824, // 1GB max
"x-max-length": 1000000, // 1M messages max
},
)Consume with cursor positioning:
ch.Consume(
"my-stream", // queue name (not $queue/ prefix)
"consumer-tag",
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
amqp091.Table{
"x-consumer-group": "readers",
"x-stream-offset": "first", // "first", "last", "next", offset number, or "timestamp=..."
"x-auto-commit": false, // require manual commit
},
)Stream offset options:
- (omitted) - Resume from the last committed offset, or start from the beginning if no prior state exists (default)
"first"- Start from the beginning of the log"last"or"next"- Start from the end (new messages only)123(number) - Start from specific offset"offset=123"- Same as above, string format"timestamp=1706745600000"- Start from timestamp (unix millis)
When x-stream-offset is omitted the broker uses resume semantics: if the
consumer group already has a stored committed offset it picks up where it left
off; otherwise it starts from the beginning of the log. This is the recommended
default for event-sourcing and long-lived consumers that should never miss
messages after a reconnect.
Filtered Stream Subscriptions
A consumer can subscribe to a subset of messages in a stream queue by using a topic filter. The broker routes only matching messages to the consumer while sharing the same underlying log.
ch.Consume(
"$queue/events/sensors/#", // queue name + topic filter
"consumer-tag",
false, false, false, false,
amqp091.Table{
"x-consumer-group": "sensor-readers",
},
)The filter follows MQTT topic semantics (+ single-level, # multi-level).
Each filter + consumer group combination maintains an independent cursor, so
multiple groups can track different subsets of the same stream independently.
Manual commit (when x-auto-commit: false):
// Publish to special commit topic
ch.Publish("", "$queue/my-stream/$commit", false, false, amqp091.Publishing{
Headers: amqp091.Table{
"x-group-id": "readers",
"x-offset": int64(lastProcessedOffset),
},
})Cursors, Pending Lists, and Safe Truncation
Each consumer group tracks progress using two offsets:
cursor: the next offset that will be delivered (read position)committed: the oldest offset considered safe for retention/truncation (durability floor)
In classic mode, committed is driven by the minimum pending offset in the PEL.
Practical intuition:
- Cursor moves forward when the group claims work.
- Committed moves forward when the group acks (or otherwise clears) the earliest pending work.
Example: Classic Group Progress
Imagine a queue log with offsets 0..5.
| Event | Cursor | PEL | Committed |
|---|---|---|---|
| Group starts | 0 | empty | 0 |
| Claims offsets 0,1 | 2 | 1 | 0 |
| Acks offset 0 | 2 | 1 | 1 |
| Acks offset 1 | 2 | empty | 2 |
This is why committed is the “safe truncation floor”: once everything below an offset is fully processed, the log can be truncated without breaking group semantics.
Retention
Retention policies can be configured per queue:
max_age(time-based)max_length_bytesmax_length_messages
A background retention loop computes a safe truncation offset and truncates the queue log:
- Start from the minimum committed offset across queue-mode consumer groups.
- Apply retention limits (time/size/message count) to compute the oldest offset that should be kept.
- Truncate the log to the lowest safe offset (segment-granular in log storage).
See Storage internals for on-disk format and retention behavior details.
Per-Message TTL
In addition to queue-level retention, each message can have an individual time-to-live controlled by the message_ttl queue setting. When a message's TTL expires, it is silently skipped during delivery — consumers never see it.
queues:
- name: "tasks"
topics: ["$queue/tasks/#"]
limits:
message_ttl: "30m" # messages expire 30 minutes after publishKey behavior:
- The expiry timestamp is computed at publish time (
created_at + message_ttl) and persisted with the message. - Both classic (queue-mode) and stream consumers skip expired messages. In queue mode, expired messages never enter the pending entry list.
- Setting
message_ttlto0disables per-message expiry. Messages then live until queue-level retention truncates them. - Per-message TTL and queue-level retention are complementary. TTL prevents delivery of stale individual messages; retention reclaims storage by truncating the head of the log in bulk.
Clustering and Replication Notes
In clustered deployments, queue behavior depends on cluster.raft.*:
write_policycontrols how followers handle incoming queue publishes.distribution_modecontrols whether deliveries are routed (forward) or logs are replicated (replicate).
Enabling replication per queue
Replication is configured per queue under queues[].replication.
queues:
- name: "orders"
topics: ["$queue/orders/#"]
type: "classic"
replication:
enabled: true
group: "default" # optional (empty = default)
mode: "sync" # sync or async
ack_timeout: "5s" # applies to sync mode
replication_factor: 3
min_in_sync_replicas: 2Behavior summary:
- With replication enabled, appends and consumer-group mutations are leader-owned and replicated.
- With
write_policy=forward, clients may publish to any node and followers forward to the leader. - With
mode=sync, publish latency increases because the leader waits for commit (bounded byack_timeout).
Raft groups (sharding)
replication.group lets you assign a queue to a specific Raft replication group (a shard).
Use groups to isolate hot queues or to spread leadership across nodes. Keep the number of groups modest: each group is a Raft instance with background overhead.
See Queue Replication Groups (Raft) for the model and tradeoffs.
See Cluster configuration and Clustering internals for the full picture.
DLQ Status
DLQ support is partial: the main delivery path does not automatically move rejected or expired messages into a DLQ yet. Reject currently removes the message from the pending list without pushing it to a DLQ.
Configuration
Queues are configured under queues in the main config file:
queue_manager:
auto_commit_interval: "5s"
queues:
- name: "orders"
topics: ["$queue/orders/#"]
type: "classic" # classic or stream
primary_group: "" # optional for stream status
replication:
enabled: false
group: ""
mode: "sync"
ack_timeout: "5s"
replication_factor: 3
min_in_sync_replicas: 2
limits:
max_message_size: 10485760
max_depth: 100000
message_ttl: "168h"
retry:
max_retries: 10
initial_backoff: "5s"
max_backoff: "5m"
multiplier: 2.0
dlq:
enabled: true
topic: "" # optional override
retention:
max_age: "168h"
max_length_bytes: 0
max_length_messages: 0