FluxMQ
Messaging

Durable Queues

Shared queue system for MQTT and AMQP with consumer groups, acknowledgments, stream queues, and append-only log storage

Durable Queues

Last Updated: 2026-02-05

FluxMQ provides durable queues shared across MQTT, AMQP 1.0, and AMQP 0.9.1. The queue manager is append-only with consumer groups and supports both classic work-queue semantics and stream-style consumption.

Overview

Durable queues are built around a simple model:

  • Producers publish to $queue/<queue-name>/....
  • The broker appends the message to the queue’s log (durable storage).
  • Consumers join a consumer group and receive messages from that log.
  • Consumers ack, nack, or reject deliveries to control redelivery and truncation.

Queues can operate in two modes:

  • Classic (work queue): messages are claimed, tracked in a pending list (PEL), and acknowledged.
  • Stream: consumers read sequentially with cursor-based progress (replayable log semantics).

Do MQTT Producers Need $queue/?

Yes. In FluxMQ today, MQTT publishes are treated as durable-queue traffic only when the topic starts with $queue/.

  • Publish to sensors/temp goes through normal pub/sub routing (subscriptions, retained, cluster pub/sub).
  • Publish to $queue/orders/... goes through the queue manager (durable log + consumer groups + acks).

If you configured a queue with topic bindings that match a non-$queue/ topic, MQTT publishes to that topic will still not be enqueued, because the MQTT broker does not route non-$queue/ publishes into the queue manager.

Architecture

┌──────────────┐  ┌──────────────┐  ┌───────────────┐
│ MQTT Broker  │  │ AMQP Broker  │  │ AMQP091 Broker│
│ (TCP/WS/     │  │ (AMQP 1.0)   │  │ (AMQP 0.9.1)  │
│ HTTP/CoAP)   │  │              │  │               │
└──────┬───────┘  └───────┬──────┘  └──────┬────────┘
       │                  │                │
       └──────────────────┼────────────────┘

              ┌─────────────────────────┐
              │ Shared Queue Manager    │
              │ - Topic bindings        │
              │ - Consumer groups       │
              │ - Retention loop        │
              └───────────┬─────────────┘

              ┌─────────────────────────┐
              │ Log Storage (AOL)       │
              └─────────────────────────┘

Queue Name, Topic, and Routing Key

Queue topics follow this convention:

  • Queue root: $queue/<name>
  • Routing key: everything after the root (optional)

Examples:

  • $queue/orders (root only)
  • $queue/orders/created (routing key created)
  • $queue/orders/images/png (routing key images/png)

When you subscribe to $queue/orders/images/#, FluxMQ treats:

  • queue name: orders
  • group filter pattern: images/#

That pattern is applied inside the queue manager (not in the normal pub/sub router).

Wildcard Patterns and Routing Key Filtering

Queue subscriptions support MQTT-style wildcards (+ for single level, # for multiple levels). However, the wildcard handling is split into two stages:

  1. Queue binding: The queue itself is bound to $queue/<name>/# (captures all messages for that queue)
  2. Consumer filter: The pattern after the queue name filters messages during delivery

Example 1: Simple Queue

Subscribe:  $queue/orders
Publish:    $queue/orders
  • Queue name: orders
  • Consumer filter: (none)
  • All messages to $queue/orders are delivered

Example 2: Single-Level Routing Key

Subscribe:  $queue/orders/images
Publish:    $queue/orders/images
  • Queue name: orders
  • Consumer filter: images
  • Only messages with routing key images are delivered

Example 3: Multi-Level Wildcard

Subscribe:  $queue/orders/images/#
Publish:    $queue/orders/images/png
Publish:    $queue/orders/images/resize/thumbnail
  • Queue name: orders
  • Consumer filter: images/#
  • Queue binding: $queue/orders/# (captures everything)
  • During delivery, filter images/# matches images/png and images/resize/thumbnail

Example 4: Single-Level Wildcard

Subscribe:  $queue/orders/+/images/#
Publish:    $queue/orders/eu/images/resize
Publish:    $queue/orders/us/images/png
  • Queue name: orders
  • Consumer filter: +/images/#

Flow for publish to $queue/orders/eu/images/resize:

  1. FindMatchingQueues("$queue/orders/eu/images/resize") matches queue orders (bound to $queue/orders/#)
  2. Message stored in queue with topic $queue/orders/eu/images/resize
  3. During delivery, routing key is extracted: eu/images/resize
  4. Filter +/images/# matches:
    • + matches eu
    • images matches images
    • # matches resize
  5. Message delivered to consumer

Example 5: Multiple Consumer Groups with Different Filters

One queue can have multiple consumer groups with different filter patterns:

# Group A: processes all EU orders
Subscribe:  $queue/orders/eu/#
            (consumer-group: "eu-processors")

# Group B: processes all image-related orders
Subscribe:  $queue/orders/+/images/#
            (consumer-group: "image-processors")

# Publish
Publish:    $queue/orders/eu/images/resize

Both groups receive the message because:

  • Group A filter eu/# matches eu/images/resize
  • Group B filter +/images/# matches eu/images/resize

Key Points

  • The publisher must always use the $queue/ prefix for messages to be routed through the queue manager
  • The queue is bound to $queue/<name>/# regardless of the subscription pattern
  • Wildcard filtering happens at delivery time, not at publish time
  • Multiple consumer groups can have different filters on the same queue

Routing Model (How Messages Find Queues)

FluxMQ uses topic patterns on queues to decide where a publish should be stored. This is a fan-out model: one publish can land in multiple queues if multiple patterns match.

  • Each queue has one or more topic patterns (MQTT wildcard syntax).
  • On publish, the queue manager calls FindMatchingQueues(topic) and appends the message to every matching queue.
  • If no queue matches, FluxMQ creates an ephemeral queue whose name and pattern equal the topic, then appends.
  • If no queues are configured at all, FluxMQ creates a reserved mqtt queue that matches $queue/# so queue publishes always have a landing zone.

Ephemeral queues are a safety net, but they can be surprising if you publish before configuring queues or before consumers subscribe. In production, prefer explicit queues: configuration so queue names and bindings are stable.

Walkthrough: Queue Message Lifecycle

This is the “classic” (work queue) lifecycle:

  1. Producer publishes to a $queue/<name>/... topic (or a queue-capable AMQP address).
  2. Queue manager matches the topic against queue bindings and appends to the queue log(s).
  3. A consumer group claims a message at some offset; the message becomes pending (tracked in the PEL).
  4. The broker delivers the message to the chosen consumer.
  5. The consumer acks/nacks/rejects; the queue manager updates group state and may advance the safe truncation point.

Queue Addressing

Queue topics use $queue/<queue-name>/....

MQTT

  • Publish: $queue/orders
  • Publish with routing key: $queue/orders/eu/images/resize
  • Subscribe to a pattern: $queue/orders/# or $queue/orders/+/images/#
  • Ack: $queue/orders/$ack
  • Nack: $queue/orders/$nack
  • Reject: $queue/orders/$reject

AMQP 0.9.1

AMQP 0.9.1 clients can interact with queues in multiple ways:

Direct queue publish (default exchange with $queue/ routing key):

exchange: "" (default)
routing_key: "$queue/orders"
routing_key: "$queue/orders/eu/images/resize"  # with routing key

Stream queue publish (declare queue with x-queue-type: stream):

exchange: "" (default)
routing_key: "my-stream"  # queue name without $queue/ prefix

Exchange-based routing (bind queue to exchange):

queue.bind(queue="orders", exchange="my-exchange", routing_key="orders.#")
publish(exchange="my-exchange", routing_key="orders.eu.images")

Consume from queue:

basic.consume(queue="$queue/orders/#")
basic.consume(queue="$queue/orders/+/images/#")
basic.consume(queue="my-stream")  # stream queue by name

Acknowledgments:

  • basic.ack → Ack
  • basic.nack(requeue=true) → Nack (retry)
  • basic.reject(requeue=false) → Reject (DLQ)

Consumer Groups

Consumer groups are what turn a queue log into a scalable worker pool:

  • A single queue can have many groups (independent progress).
  • A single group can have many consumers (load balancing).
  • Each group has its own cursor state and (in classic mode) pending list.

How to set the group ID depends on the protocol:

  • MQTT v5: consumer-group user property on SUBSCRIBE.
  • MQTT v3: falls back to client ID (acks require MQTT v5 user properties).
  • AMQP 1.0: consumer-group in attach properties.
  • AMQP 0.9.1: x-consumer-group argument on basic.consume.

MQTT v5 Example

mosquitto_sub -p 1883 -t '$queue/orders/#' -q 1 \
  -D subscribe user-property consumer-group workers

AMQP 0.9.1 Example (Go)

ch.Consume(
    "$queue/orders/#",  // queue
    "consumer-tag",     // consumer tag
    false,              // auto-ack
    false,              // exclusive
    false,              // no-local
    false,              // no-wait
    amqp091.Table{
        "x-consumer-group": "workers",
    },
)

Message Properties

Queue deliveries include properties that make acknowledgments and observability consistent across protocols:

  • message-id (required for ack/nack/reject)
  • group-id (consumer group name)
  • queue (queue name)
  • offset (sequence number)

Stream deliveries also include:

  • x-stream-offset
  • x-stream-timestamp (unix millis)
  • x-work-committed-offset (if primary group is configured)
  • x-work-acked (true when below committed offset)
  • x-work-group (primary work group name)

Acknowledgments

MQTT

Ack/Nack/Reject are implemented by publishing to:

  • $queue/<queue>/$ack
  • $queue/<queue>/$nack
  • $queue/<queue>/$reject

If you include a routing key (for example $queue/orders/images/$ack), the broker still derives the queue name from the first segment after $queue/.

MQTT v5 user properties required:

  • message-id
  • group-id

MQTT v3 can publish and subscribe to queue topics, but acknowledgments require MQTT v5 user properties.

AMQP 1.0

AMQP dispositions are mapped to queue acknowledgments:

  • Accepted → Ack
  • Released → Nack
  • Rejected → Reject

AMQP 0.9.1

  • basic.ack, basic.nack, basic.reject map to Ack/Nack/Reject

Stream Commit (AMQP 0.9.1)

If a stream group has auto-commit disabled, AMQP 0.9.1 can explicitly commit offsets by publishing to:

  • $queue/<queue>/$commit

Headers:

  • x-group-id
  • x-offset

Queue Types

Classic (Work Queue)

Classic queues are optimized for "do work once" semantics:

  • Messages are claimed and tracked in a Pending Entry List (PEL)
  • ack removes a message from the PEL; nack makes it eligible for redelivery; reject removes it (DLQ wiring is incomplete)
  • A visibility timeout plus work stealing prevents stuck consumers from stalling progress
  • The safe truncation point is derived from group state (see cursor/committed below)

Classic Mode Delivery Flow

  1. Consumer requests messages via ClaimBatch
  2. Manager reads message from queue log at cursor position
  3. Creates PEL entry: { offset, consumerID, claimedAt, deliveryCount }
  4. Advances cursor to next position
  5. Delivers message to consumer
  6. On ack: removes PEL entry, advances committed offset
  7. On nack: resets PEL entry for redelivery
  8. On timeout: message becomes stealable by other consumers
Classic Mode State:

Queue Log:  [0] [1] [2] [3] [4] [5] [6] [7] ...
                          ▲     ▲
                          │     └── cursor (next to deliver)
                          └── committed (safe to truncate)

PEL: {
  offset=3: { consumer: "c1", claimedAt: t1, deliveries: 1 }
  offset=4: { consumer: "c2", claimedAt: t2, deliveries: 2 }
  offset=5: { consumer: "c1", claimedAt: t3, deliveries: 1 }
}

Stream

Stream queues are optimized for "replay and progress" semantics:

  • Consumption is cursor-based (read position in an append-only log)
  • No PEL tracking - messages are simply read, not "claimed"
  • Groups can auto-commit progress (default) or require explicit commits (AMQP 0.9.1)

Stream Mode Delivery Flow

  1. Consumer requests messages via ClaimBatchStream
  2. Manager reads messages from queue log starting at cursor position
  3. No PEL entry created - message is simply read
  4. Advances cursor to next position
  5. Delivers messages to consumer
  6. If autoCommit: true: committed offset is updated periodically
  7. If autoCommit: false: consumer must explicitly commit via $queue/<name>/$commit
Stream Mode State:

Queue Log:  [0] [1] [2] [3] [4] [5] [6] [7] ...
                          ▲     ▲
                          │     └── cursor (next to read)
                          └── committed (checkpoint)

No PEL - just two offsets to track!

Why No PEL for Streams?

Stream mode is designed for different use cases than classic work queues:

AspectClassic (PEL)Stream (No PEL)
SemanticsProcess onceRead/replay many times
Delivery guaranteeAt-least-once with ackAt-least-once with cursor
Redelivery triggerVisibility timeoutConsumer restart/seek
Work stealingYes (steal from slow peers)No (each consumer independent)
Memory overheadPEL entry per pending messageOnly cursor position
ReplayabilityNo (acked = done)Yes (seek to any offset)

Classic mode answers: "Has this message been successfully processed?"

Stream mode answers: "Where is my read position in the log?"

In stream mode, the cursor itself serves as the progress indicator. If a consumer crashes:

  • It restarts from the last committed cursor position
  • Messages between committed and actual progress are re-read
  • No per-message tracking needed - just resume from checkpoint

This makes streams more efficient for high-throughput scenarios where:

  • Messages are idempotent or replayable
  • Multiple independent readers need the same data
  • You want Kafka-like log semantics

AMQP 0.9.1 Stream Queues

Declare a stream queue with retention settings:

ch.QueueDeclare(
    "my-stream",  // name
    true,         // durable
    false,        // auto-delete
    false,        // exclusive
    false,        // no-wait
    amqp091.Table{
        "x-queue-type":        "stream",
        "x-max-age":           "7D",         // retain 7 days
        "x-max-length-bytes":  1073741824,   // 1GB max
        "x-max-length":        1000000,      // 1M messages max
    },
)

Consume with cursor positioning:

ch.Consume(
    "my-stream",    // queue name (not $queue/ prefix)
    "consumer-tag",
    false,          // auto-ack
    false,          // exclusive
    false,          // no-local
    false,          // no-wait
    amqp091.Table{
        "x-consumer-group": "readers",
        "x-stream-offset":  "first",      // "first", "last", "next", offset number, or "timestamp=..."
        "x-auto-commit":    false,        // require manual commit
    },
)

Stream offset options:

  • "first" - Start from the beginning of the log
  • "last" or "next" - Start from the end (new messages only)
  • 123 (number) - Start from specific offset
  • "offset=123" - Same as above, string format
  • "timestamp=1706745600000" - Start from timestamp (unix millis)

Manual commit (when x-auto-commit: false):

// Publish to special commit topic
ch.Publish("", "$queue/my-stream/$commit", false, false, amqp091.Publishing{
    Headers: amqp091.Table{
        "x-group-id": "readers",
        "x-offset":   int64(lastProcessedOffset),
    },
})

Cursors, Pending Lists, and Safe Truncation

Each consumer group tracks progress using two offsets:

  • cursor: the next offset that will be delivered (read position)
  • committed: the oldest offset considered safe for retention/truncation (durability floor)

In classic mode, committed is driven by the minimum pending offset in the PEL.

Practical intuition:

  • Cursor moves forward when the group claims work.
  • Committed moves forward when the group acks (or otherwise clears) the earliest pending work.

Example: Classic Group Progress

Imagine a queue log with offsets 0..5.

EventCursorPELCommitted
Group starts0empty0
Claims offsets 0,1210
Acks offset 0211
Acks offset 12empty2

This is why committed is the “safe truncation floor”: once everything below an offset is fully processed, the log can be truncated without breaking group semantics.

Retention

Retention policies can be configured per queue:

  • max_age (time-based)
  • max_length_bytes
  • max_length_messages

A background retention loop computes a safe truncation offset and truncates the queue log:

  • Start from the minimum committed offset across queue-mode consumer groups.
  • Apply retention limits (time/size/message count) to compute the oldest offset that should be kept.
  • Truncate the log to the lowest safe offset (segment-granular in log storage).

See Storage internals for on-disk format and retention behavior details.

Clustering and Replication Notes

In clustered deployments, queue behavior depends on cluster.raft.*:

  • write_policy controls how followers handle incoming queue publishes.
  • distribution_mode controls whether deliveries are routed (forward) or logs are replicated (replicate).

See Cluster configuration and Clustering internals for the full picture.

DLQ Status

A DLQ handler exists in queue/consumer/dlq.go, but the main delivery path does not automatically move rejected or expired messages into a DLQ yet. Reject currently removes the message from the pending list without pushing it to a DLQ.

Configuration

Queues are configured under queues in the main config file:

queue_manager:
  auto_commit_interval: "5s"

queues:
  - name: "orders"
    topics: ["$queue/orders/#"]
    type: "classic"               # classic or stream
    primary_group: ""             # optional for stream status

    limits:
      max_message_size: 10485760
      max_depth: 100000
      message_ttl: "168h"

    retry:
      max_retries: 10
      initial_backoff: "5s"
      max_backoff: "5m"
      multiplier: 2.0

    dlq:
      enabled: true
      topic: ""                    # optional override

    retention:
      max_age: "168h"
      max_length_bytes: 0
      max_length_messages: 0

On this page