FluxMQ
Messaging

Durable Queues

Persistent queue system for MQTT and AMQP — durable work queues and stream queues with consumer groups, acknowledgments, and append-only log storage

Durable Queues

Last Updated: 2026-02-12

This page covers FluxMQ's two persistent queue types: durable queues (work queue semantics) and stream queues (append-only log semantics). Both are shared across MQTT, AMQP 1.0, and AMQP 0.9.1.

For an overview of all three queue types (including ephemeral queues), see Queue Types.

Overview

Persistent queues are built around a simple model:

  • Producers publish to $queue/<queue-name>/....
  • The broker appends the message to the queue's log (durable storage).
  • Consumers join a consumer group and receive messages from that log.
  • What happens next depends on the queue type:
Durable (Classic)Stream
AckRemove from pending list, advance truncation pointCheckpoint cursor position
NackRedeliver the message (with backoff)No-op (replay via cursor seek)
RejectDiscard message (future: DLQ)Advance cursor past message
RetentionTruncate after all groups ackTime/size/count policy
RedeliveryAutomatic (visibility timeout + work stealing)Manual (consumer restarts from checkpoint)

Design rule: Do not overload ack semantics. "Ack" means "processed, remove it" in a durable queue and "checkpoint my position" in a stream. If you need both task processing and replay for the same data, use two queues with overlapping topic bindings.

Do MQTT Producers Need $queue/?

Yes. In FluxMQ, MQTT publishes are routed to the queue manager only when the topic starts with $queue/.

  • Publish to sensors/temp goes through normal pub/sub routing (subscriptions, retained, cluster pub/sub).
  • Publish to $queue/orders/... goes through the queue manager (log storage + consumer groups + acks).

The $queue/ prefix is what activates queue semantics — whether the target is an ephemeral, durable, or stream queue. If you configured a queue with topic bindings that match a non-$queue/ topic, MQTT publishes to that topic will still not be enqueued, because the MQTT broker does not route non-$queue/ publishes into the queue manager.

Architecture

┌──────────────┐  ┌──────────────┐  ┌───────────────┐
│ MQTT Broker  │  │ AMQP Broker  │  │ AMQP091 Broker│
│ (TCP/WS/     │  │ (AMQP 1.0)   │  │ (AMQP 0.9.1)  │
│ HTTP/CoAP)   │  │              │  │               │
└──────┬───────┘  └───────┬──────┘  └──────┬────────┘
       │                  │                │
       └──────────────────┼────────────────┘

              ┌─────────────────────────┐
              │ Shared Queue Manager    │
              │ - Topic bindings        │
              │ - Consumer groups       │
              │ - Retention loop        │
              └───────────┬─────────────┘

              ┌─────────────────────────┐
              │ Log Storage (AOL)       │
              └─────────────────────────┘

Queue Name, Topic, and Routing Key

Queue topics follow this convention:

  • Queue root: $queue/<name>
  • Routing key: everything after the root (optional)

Examples:

  • $queue/orders (root only)
  • $queue/orders/created (routing key created)
  • $queue/orders/images/png (routing key images/png)

When you subscribe to $queue/orders/images/#, FluxMQ treats:

  • queue name: orders
  • group filter pattern: images/#

That pattern is applied inside the queue manager (not in the normal pub/sub router).

Wildcard Patterns and Routing Key Filtering

Queue subscriptions support MQTT-style wildcards (+ for single level, # for multiple levels). However, the wildcard handling is split into two stages:

  1. Queue binding: The queue itself is bound to $queue/<name>/# (captures all messages for that queue)
  2. Consumer filter: The pattern after the queue name filters messages during delivery

Example 1: Simple Queue

Subscribe:  $queue/orders
Publish:    $queue/orders
  • Queue name: orders
  • Consumer filter: (none)
  • All messages to $queue/orders are delivered

Example 2: Single-Level Routing Key

Subscribe:  $queue/orders/images
Publish:    $queue/orders/images
  • Queue name: orders
  • Consumer filter: images
  • Only messages with routing key images are delivered

Example 3: Multi-Level Wildcard

Subscribe:  $queue/orders/images/#
Publish:    $queue/orders/images/png
Publish:    $queue/orders/images/resize/thumbnail
  • Queue name: orders
  • Consumer filter: images/#
  • Queue binding: $queue/orders/# (captures everything)
  • During delivery, filter images/# matches images/png and images/resize/thumbnail

Example 4: Single-Level Wildcard

Subscribe:  $queue/orders/+/images/#
Publish:    $queue/orders/eu/images/resize
Publish:    $queue/orders/us/images/png
  • Queue name: orders
  • Consumer filter: +/images/#

Flow for publish to $queue/orders/eu/images/resize:

  1. FindMatchingQueues("$queue/orders/eu/images/resize") matches queue orders (bound to $queue/orders/#)
  2. Message stored in queue with topic $queue/orders/eu/images/resize
  3. During delivery, routing key is extracted: eu/images/resize
  4. Filter +/images/# matches:
    • + matches eu
    • images matches images
    • # matches resize
  5. Message delivered to consumer

Example 5: Multiple Consumer Groups with Different Filters

One queue can have multiple consumer groups with different filter patterns:

# Group A: processes all EU orders
Subscribe:  $queue/orders/eu/#
            (consumer-group: "eu-processors")

# Group B: processes all image-related orders
Subscribe:  $queue/orders/+/images/#
            (consumer-group: "image-processors")

# Publish
Publish:    $queue/orders/eu/images/resize

Both groups receive the message because:

  • Group A filter eu/# matches eu/images/resize
  • Group B filter +/images/# matches eu/images/resize

Key Points

  • The publisher must always use the $queue/ prefix for messages to be routed through the queue manager
  • The queue is bound to $queue/<name>/# regardless of the subscription pattern
  • Wildcard filtering happens at delivery time, not at publish time
  • Multiple consumer groups can have different filters on the same queue

Routing Model (How Messages Find Queues)

FluxMQ uses topic patterns on queues to decide where a publish should be stored. This is a fan-out model: one publish can land in multiple queues if multiple patterns match.

  • Each queue has one or more topic patterns (MQTT wildcard syntax).
  • On publish, the queue manager calls FindMatchingQueues(topic) and appends the message to every matching queue.
  • If no queue matches, FluxMQ creates an ephemeral queue whose name and pattern equal the topic, then appends. Ephemeral queues are in-memory, best-effort, and cleaned up after the last consumer disconnects.
  • If no queues are configured at all, FluxMQ creates a reserved mqtt queue that matches $queue/# so queue publishes always have a landing zone.

In production, prefer explicit queues: configuration so queue names, types, and bindings are stable. Ephemeral queues are a development convenience — they prevent silent message loss but don't provide durability or delivery guarantees. See Queue Types for when to use each type.

Walkthrough: Queue Message Lifecycle

This is the “classic” (work queue) lifecycle:

  1. Producer publishes to a $queue/<name>/... topic (or a queue-capable AMQP address).
  2. Queue manager matches the topic against queue bindings and appends to the queue log(s).
  3. A consumer group claims a message at some offset; the message becomes pending (tracked in the PEL).
  4. The broker delivers the message to the chosen consumer.
  5. The consumer acks/nacks/rejects; the queue manager updates group state and may advance the safe truncation point.

Queue Addressing

Queue topics use $queue/<queue-name>/....

MQTT

  • Publish: $queue/orders
  • Publish with routing key: $queue/orders/eu/images/resize
  • Subscribe to a pattern: $queue/orders/# or $queue/orders/+/images/#
  • Ack: $queue/orders/$ack
  • Nack: $queue/orders/$nack
  • Reject: $queue/orders/$reject

AMQP 0.9.1

AMQP 0.9.1 clients can interact with queues in multiple ways:

Direct queue publish (default exchange with $queue/ routing key):

exchange: "" (default)
routing_key: "$queue/orders"
routing_key: "$queue/orders/eu/images/resize"  # with routing key

Stream queue publish (declare queue with x-queue-type: stream):

exchange: "" (default)
routing_key: "my-stream"  # queue name without $queue/ prefix

Exchange-based routing (bind queue to exchange):

queue.bind(queue="orders", exchange="my-exchange", routing_key="orders.#")
publish(exchange="my-exchange", routing_key="orders.eu.images")

Consume from queue:

basic.consume(queue="$queue/orders/#")
basic.consume(queue="$queue/orders/+/images/#")
basic.consume(queue="my-stream")  # stream queue by name

Acknowledgments:

  • basic.ack → Ack
  • basic.nack(requeue=true) → Nack (retry)
  • basic.reject(requeue=false) → Reject (DLQ)

Exchanges and Bindings Are Per-Connection

Important difference from RabbitMQ: In FluxMQ, AMQP 0.9.1 exchanges and bindings are per-connection routing state. They are not shared across connections, not visible to other clients, and not persisted to disk — even when declared as durable.

FluxMQ's actual routing layer is topic-based: queues are matched by topic patterns, not by server-side exchange state. When an AMQP 0.9.1 client declares an exchange and binds a queue to it, the channel stores that binding locally. On publish, the channel consults its local binding table to translate exchange + routing_key into a $queue/<queue>/<routing-key> topic, then routes through the shared queue manager like any other protocol.

This means:

  • Client A declares an exchange and bindings → client B cannot see them. Each connection maintains its own exchange/binding state.
  • Exchanges don't survive reconnection. Clients must re-declare exchanges and bindings on every new connection (which is already standard practice in most AMQP 0.9.1 client libraries).
  • durable: true on exchange.declare is accepted but does not persist. The flag is not rejected, but it has no effect — exchange state lives only in connection memory.
  • Passive exchange.declare checks local state only. It reports whether this connection has declared the exchange, not whether it exists server-wide.

Why this design: FluxMQ routes messages through a shared topic index on queues. Exchanges are a compatibility layer that translates AMQP 0.9.1 routing concepts into FluxMQ topic semantics. Building a second server-wide routing layer for exchanges would duplicate what the queue topic index already provides.

Practical impact: For most AMQP 0.9.1 usage patterns, this is transparent — clients typically declare exchanges and bindings on connection setup before publishing. If your application depends on exchanges being shared across connections (e.g., one client declares, another publishes), use direct $queue/ routing keys with the default exchange instead.

Consumer Groups

Consumer groups are what turn a queue log into a scalable worker pool:

  • A single queue can have many groups (independent progress).
  • A single group can have many consumers (load balancing).
  • Each group has its own cursor state and (in classic mode) pending list.

How to set the group ID depends on the protocol:

  • MQTT v5: consumer-group user property on SUBSCRIBE.
  • MQTT v3: falls back to client ID (acks require MQTT v5 user properties).
  • AMQP 1.0: consumer-group in attach properties.
  • AMQP 0.9.1: x-consumer-group argument on basic.consume.

MQTT v5 Example

mosquitto_sub -V mqttv5 -p 1884 -t '$queue/orders/#' -q 1 \
  -D subscribe user-property consumer-group workers

AMQP 0.9.1 Example (Go)

ch.Consume(
    "$queue/orders/#",  // queue
    "consumer-tag",     // consumer tag
    false,              // auto-ack
    false,              // exclusive
    false,              // no-local
    false,              // no-wait
    amqp091.Table{
        "x-consumer-group": "workers",
    },
)

Message Properties

Queue deliveries include properties that make acknowledgments and observability consistent across protocols:

  • message-id (required for ack/nack/reject)
  • group-id (consumer group name)
  • queue (queue name)
  • offset (sequence number)

Stream deliveries also include:

  • x-stream-offset
  • x-stream-timestamp (unix millis)
  • x-work-committed-offset (if primary group is configured)
  • x-work-acked (true when below committed offset)
  • x-work-group (primary work group name)

Acknowledgments

MQTT

Ack/Nack/Reject are implemented by publishing to:

  • $queue/<queue>/$ack
  • $queue/<queue>/$nack
  • $queue/<queue>/$reject

If you include a routing key (for example $queue/orders/images/$ack), the broker still derives the queue name from the first segment after $queue/.

MQTT v5 user properties required:

  • message-id
  • group-id

MQTT v3 can publish and subscribe to queue topics, but acknowledgments require MQTT v5 user properties.

AMQP 1.0

AMQP dispositions are mapped to queue acknowledgments:

  • Accepted → Ack
  • Released → Nack
  • Rejected → Reject

AMQP 0.9.1

  • basic.ack, basic.nack, basic.reject map to Ack/Nack/Reject

Stream Commit (AMQP 0.9.1)

If a stream group has auto-commit disabled, AMQP 0.9.1 can explicitly commit offsets by publishing to:

  • $queue/<queue>/$commit

Headers:

  • x-group-id
  • x-offset

Queue Types

Classic (Work Queue)

Classic queues are optimized for "do work once" semantics:

  • Messages are claimed and tracked in a Pending Entry List (PEL)
  • ack removes a message from the PEL; nack makes it eligible for redelivery; reject removes it (DLQ wiring is incomplete)
  • A visibility timeout plus work stealing prevents stuck consumers from stalling progress
  • The safe truncation point is derived from group state (see cursor/committed below)

Classic Mode Delivery Flow

  1. Consumer requests messages via ClaimBatch
  2. Manager reads message from queue log at cursor position
  3. Creates PEL entry: { offset, consumerID, claimedAt, deliveryCount }
  4. Advances cursor to next position
  5. Delivers message to consumer
  6. On ack: removes PEL entry, advances committed offset
  7. On nack: resets PEL entry for redelivery
  8. On timeout: message becomes stealable by other consumers
Classic Mode State:

Queue Log:  [0] [1] [2] [3] [4] [5] [6] [7] ...
                          ▲     ▲
                          │     └── cursor (next to deliver)
                          └── committed (safe to truncate)

PEL: {
  offset=3: { consumer: "c1", claimedAt: t1, deliveries: 1 }
  offset=4: { consumer: "c2", claimedAt: t2, deliveries: 2 }
  offset=5: { consumer: "c1", claimedAt: t3, deliveries: 1 }
}

Stream

Stream queues are optimized for "replay and progress" semantics:

  • Consumption is cursor-based (read position in an append-only log)
  • No PEL tracking - messages are simply read, not "claimed"
  • Groups can auto-commit progress (default) or require explicit commits (AMQP 0.9.1)

Stream Mode Delivery Flow

  1. Consumer requests messages via ClaimBatchStream
  2. Manager reads messages from queue log starting at cursor position
  3. No PEL entry created - message is simply read
  4. Advances cursor to next position
  5. Delivers messages to consumer
  6. If autoCommit: true: committed offset is updated periodically
  7. If autoCommit: false: consumer must explicitly commit via $queue/<name>/$commit
Stream Mode State:

Queue Log:  [0] [1] [2] [3] [4] [5] [6] [7] ...
                          ▲     ▲
                          │     └── cursor (next to read)
                          └── committed (checkpoint)

No PEL - just two offsets to track!

Why No PEL for Streams?

Stream mode is designed for different use cases than classic work queues:

AspectClassic (PEL)Stream (No PEL)
SemanticsProcess onceRead/replay many times
Delivery guaranteeAt-least-once with ackAt-least-once with cursor
Redelivery triggerVisibility timeoutConsumer restart/seek
Work stealingYes (steal from slow peers)No (each consumer independent)
Memory overheadPEL entry per pending messageOnly cursor position
ReplayabilityNo (acked = done)Yes (seek to any offset)

Classic mode answers: "Has this message been successfully processed?"

Stream mode answers: "Where is my read position in the log?"

In stream mode, the cursor itself serves as the progress indicator. If a consumer crashes:

  • It restarts from the last committed cursor position
  • Messages between committed and actual progress are re-read
  • No per-message tracking needed - just resume from checkpoint

This makes streams more efficient for high-throughput scenarios where:

  • Messages are idempotent or replayable
  • Multiple independent readers need the same data
  • You want Kafka-like log semantics

AMQP 0.9.1 Stream Queues

Declare a stream queue with retention settings:

ch.QueueDeclare(
    "my-stream",  // name
    true,         // durable
    false,        // auto-delete
    false,        // exclusive
    false,        // no-wait
    amqp091.Table{
        "x-queue-type":        "stream",
        "x-max-age":           "7D",         // retain 7 days
        "x-max-length-bytes":  1073741824,   // 1GB max
        "x-max-length":        1000000,      // 1M messages max
    },
)

Consume with cursor positioning:

ch.Consume(
    "my-stream",    // queue name (not $queue/ prefix)
    "consumer-tag",
    false,          // auto-ack
    false,          // exclusive
    false,          // no-local
    false,          // no-wait
    amqp091.Table{
        "x-consumer-group": "readers",
        "x-stream-offset":  "first",      // "first", "last", "next", offset number, or "timestamp=..."
        "x-auto-commit":    false,        // require manual commit
    },
)

Stream offset options:

  • (omitted) - Resume from the last committed offset, or start from the beginning if no prior state exists (default)
  • "first" - Start from the beginning of the log
  • "last" or "next" - Start from the end (new messages only)
  • 123 (number) - Start from specific offset
  • "offset=123" - Same as above, string format
  • "timestamp=1706745600000" - Start from timestamp (unix millis)

When x-stream-offset is omitted the broker uses resume semantics: if the consumer group already has a stored committed offset it picks up where it left off; otherwise it starts from the beginning of the log. This is the recommended default for event-sourcing and long-lived consumers that should never miss messages after a reconnect.

Filtered Stream Subscriptions

A consumer can subscribe to a subset of messages in a stream queue by using a topic filter. The broker routes only matching messages to the consumer while sharing the same underlying log.

ch.Consume(
    "$queue/events/sensors/#",  // queue name + topic filter
    "consumer-tag",
    false, false, false, false,
    amqp091.Table{
        "x-consumer-group": "sensor-readers",
    },
)

The filter follows MQTT topic semantics (+ single-level, # multi-level). Each filter + consumer group combination maintains an independent cursor, so multiple groups can track different subsets of the same stream independently.

Manual commit (when x-auto-commit: false):

// Publish to special commit topic
ch.Publish("", "$queue/my-stream/$commit", false, false, amqp091.Publishing{
    Headers: amqp091.Table{
        "x-group-id": "readers",
        "x-offset":   int64(lastProcessedOffset),
    },
})

Cursors, Pending Lists, and Safe Truncation

Each consumer group tracks progress using two offsets:

  • cursor: the next offset that will be delivered (read position)
  • committed: the oldest offset considered safe for retention/truncation (durability floor)

In classic mode, committed is driven by the minimum pending offset in the PEL.

Practical intuition:

  • Cursor moves forward when the group claims work.
  • Committed moves forward when the group acks (or otherwise clears) the earliest pending work.

Example: Classic Group Progress

Imagine a queue log with offsets 0..5.

EventCursorPELCommitted
Group starts0empty0
Claims offsets 0,1210
Acks offset 0211
Acks offset 12empty2

This is why committed is the “safe truncation floor”: once everything below an offset is fully processed, the log can be truncated without breaking group semantics.

Retention

Retention policies can be configured per queue:

  • max_age (time-based)
  • max_length_bytes
  • max_length_messages

A background retention loop computes a safe truncation offset and truncates the queue log:

  • Start from the minimum committed offset across queue-mode consumer groups.
  • Apply retention limits (time/size/message count) to compute the oldest offset that should be kept.
  • Truncate the log to the lowest safe offset (segment-granular in log storage).

See Storage internals for on-disk format and retention behavior details.

Per-Message TTL

In addition to queue-level retention, each message can have an individual time-to-live controlled by the message_ttl queue setting. When a message's TTL expires, it is silently skipped during delivery — consumers never see it.

queues:
  - name: "tasks"
    topics: ["$queue/tasks/#"]
    limits:
      message_ttl: "30m"   # messages expire 30 minutes after publish

Key behavior:

  • The expiry timestamp is computed at publish time (created_at + message_ttl) and persisted with the message.
  • Both classic (queue-mode) and stream consumers skip expired messages. In queue mode, expired messages never enter the pending entry list.
  • Setting message_ttl to 0 disables per-message expiry. Messages then live until queue-level retention truncates them.
  • Per-message TTL and queue-level retention are complementary. TTL prevents delivery of stale individual messages; retention reclaims storage by truncating the head of the log in bulk.

Clustering and Replication Notes

In clustered deployments, queue behavior depends on cluster.raft.*:

  • write_policy controls how followers handle incoming queue publishes.
  • distribution_mode controls whether deliveries are routed (forward) or logs are replicated (replicate).

Enabling replication per queue

Replication is configured per queue under queues[].replication.

queues:
  - name: "orders"
    topics: ["$queue/orders/#"]
    type: "classic"

    replication:
      enabled: true
      group: "default"          # optional (empty = default)
      mode: "sync"              # sync or async
      ack_timeout: "5s"         # applies to sync mode
      replication_factor: 3
      min_in_sync_replicas: 2

Behavior summary:

  • With replication enabled, appends and consumer-group mutations are leader-owned and replicated.
  • With write_policy=forward, clients may publish to any node and followers forward to the leader.
  • With mode=sync, publish latency increases because the leader waits for commit (bounded by ack_timeout).

Raft groups (sharding)

replication.group lets you assign a queue to a specific Raft replication group (a shard).

Use groups to isolate hot queues or to spread leadership across nodes. Keep the number of groups modest: each group is a Raft instance with background overhead.

See Queue Replication Groups (Raft) for the model and tradeoffs.

See Cluster configuration and Clustering internals for the full picture.

DLQ Status

DLQ support is partial: the main delivery path does not automatically move rejected or expired messages into a DLQ yet. Reject currently removes the message from the pending list without pushing it to a DLQ.

Configuration

Queues are configured under queues in the main config file:

queue_manager:
  auto_commit_interval: "5s"

queues:
  - name: "orders"
    topics: ["$queue/orders/#"]
    type: "classic"               # classic or stream
    primary_group: ""             # optional for stream status

    replication:
      enabled: false
      group: ""
      mode: "sync"
      ack_timeout: "5s"
      replication_factor: 3
      min_in_sync_replicas: 2

    limits:
      max_message_size: 10485760
      max_depth: 100000
      message_ttl: "168h"

    retry:
      max_retries: 10
      initial_backoff: "5s"
      max_backoff: "5m"
      multiplier: 2.0

    dlq:
      enabled: true
      topic: ""                    # optional override

    retention:
      max_age: "168h"
      max_length_bytes: 0
      max_length_messages: 0

On this page