FluxMQ
Reference

Configuration Reference

Comprehensive YAML configuration reference for server, broker, storage, clustering, and operational settings

Configuration Reference

Last Updated: 2026-02-25

FluxMQ uses a single YAML configuration file. Start the broker with:

./build/fluxmq --config /path/to/config.yaml

If --config is omitted, defaults are used (see config.Default() in config/config.go).

Looking for a guided walkthrough? See:

Configuration Overview

Top-level keys:

  • server
  • broker
  • session
  • queue_manager
  • queues
  • storage
  • cluster
  • auth
  • webhook
  • ratelimit
  • log

Durations use Go duration strings like 5s, 1m, 24h.

Server

server controls network listeners and telemetry endpoints.

server:
  tcp:
    v3:
      addr: ":1883"
      max_connections: 10000
      read_timeout: "60s"
      write_timeout: "60s"
      protocol: "v3"
    v5:
      addr: ":1884"
      max_connections: 10000
      read_timeout: "60s"
      write_timeout: "60s"
      protocol: "v5"
    tls: {}
    mtls: {}

  websocket:
    plain:
      addr: ":8083"
      path: "/mqtt"
      protocol: "auto" # auto | v3 | v5
      allowed_origins: ["https://app.example.com"]
    tls: {}
    mtls: {}

  http:
    plain:
      addr: ":8080"
    tls: {}
    mtls: {}

  coap:
    plain:
      addr: ":5683"
    dtls: {}
    mdtls: {}

  amqp:
    plain:
      addr: ":5672"
      max_connections: 10000
    tls: {}
    mtls: {}

  amqp091:
    plain:
      addr: ":5682"
      max_connections: 10000
    tls: {}
    mtls: {}

  health_enabled: true
  health_addr: ":8081"

  metrics_enabled: false
  metrics_addr: "localhost:4317" # OTLP endpoint

  otel_service_name: "fluxmq"
  otel_service_version: "1.0.0"
  otel_metrics_enabled: true
  otel_traces_enabled: false
  otel_trace_sample_rate: 0.1

  admin_api_addr: ":8082" # Admin API (HTTP + Connect/gRPC queue service); empty disables

  shutdown_timeout: "30s"

Listener Fields

These apply to listener blocks (for example server.tcp.v3, server.websocket.v3, server.amqp091.tls, and so on).

FieldDescription
addrListener bind address ("<host>:<port>" or ":<port>"). Empty string disables that listener.
max_connectionsConnection cap for that listener (>= 0). 0 means no explicit cap. Applies to TCP/AMQP/AMQP091 listeners.
read_timeoutRead timeout for TCP listeners (time.Duration).
write_timeoutWrite timeout for TCP listeners (time.Duration).
protocolMQTT parser mode. For TCP, use v3 on server.tcp.v3 and v5 on server.tcp.v5; for WebSocket listeners you can use auto, v3, or v5.
pathHTTP path for MQTT-over-WebSocket endpoint.
allowed_originsWebSocket origin allow-list. Empty list allows all origins; use explicit origins for production.

Server Runtime / Telemetry Fields

FieldDefaultDescription
health_enabledtrueEnables /health endpoint.
health_addr:8081Health endpoint bind address.
metrics_enabledfalseEnables OpenTelemetry exporters.
metrics_addrlocalhost:4317OTLP endpoint address (collector target).
otel_service_namefluxmqTelemetry service name.
otel_service_version1.0.0Telemetry service version tag.
otel_metrics_enabledtrueEnables OTel metrics export.
otel_traces_enabledfalseEnables OTel traces export.
otel_trace_sample_rate0.1Trace sampling ratio in [0.0, 1.0].
admin_api_addr:8082Admin API bind address. Set empty string to disable the listener.
shutdown_timeout30sGraceful server shutdown timeout.

TLS / DTLS Settings

TLS fields are shared across tls, mtls, dtls, and mdtls blocks via pkg/tls config:

  • cert_file, key_file
  • ca_file (client CA), server_ca_file
  • client_auth (none, request, require_any, verify_if_given, require)
  • min_version (tls1.0, tls1.1, tls1.2, tls1.3)
  • cipher_suites, prefer_server_cipher_suites
  • ocsp, crl (advanced verification)

Broker

broker:
  max_message_size: 1048576
  max_retained_messages: 10000
  retry_interval: "20s"
  max_retries: 0
  max_qos: 2
  async_fan_out: false    # true = send PUBCOMP immediately, fan-out in worker pool
  fan_out_workers: 0      # worker pool size; 0 = GOMAXPROCS
FieldDefaultDescription
max_message_size1048576Maximum PUBLISH payload size in bytes (>= 1024).
max_retained_messages10000Cap on retained messages in the store.
retry_interval20sQoS 1/2 retry interval for unacknowledged outbound messages (>= 1s).
max_retries0Maximum retries before dropping; 0 = unlimited.
max_qos2Maximum QoS accepted from publishers (0, 1, or 2).
async_fan_outfalseWhen true, sends PUBCOMP immediately after PUBREL and dispatches fan-out to a worker pool.
fan_out_workers0Async fan-out worker count; 0 = GOMAXPROCS.

Fan-out Modes

  • async_fan_out: false (default): publisher acknowledgment and subscriber fan-out stay coupled.
  • async_fan_out: true: publisher PUBCOMP is sent earlier; subscriber fan-out runs in background workers.
  • fan_out_workers: tune worker pool size for high fan-out workloads.

Session

session:
  max_sessions: 10000
  default_expiry_interval: 300
  max_offline_queue_size: 1000
  max_inflight_messages: 256
  max_send_queue_size: 0         # 0 = synchronous writes, >0 = async buffered sends
  disconnect_on_full: false      # when async queue is full: false=block, true=disconnect client
  offline_queue_policy: "evict" # evict or reject
  inflight_overflow: 0           # 0 = backpressure, 1 = pending queue
  pending_queue_size: 1000       # per-subscriber buffer depth when inflight_overflow=1
FieldDefaultDescription
max_sessions10000Maximum concurrent sessions (>= 1).
default_expiry_interval300Session expiry interval in seconds when client does not set one.
max_offline_queue_size1000Maximum QoS 1/2 messages buffered for a disconnected client (>= 10).
max_inflight_messages256Per-session inflight window size (unacknowledged outbound messages).
max_send_queue_size0Per-connection async send queue depth. 0 = synchronous writes.
disconnect_on_fullfalseAsync send queue full behavior: false = block/backpressure, true = disconnect client.
offline_queue_policyevictevict drops oldest when full; reject drops newest incoming message.
inflight_overflow0Inflight full behavior: 0 = backpressure; 1 = per-subscriber pending queue.
pending_queue_size1000Pending queue depth when inflight_overflow=1 (must be >= 1).

Inflight Overflow

  • inflight_overflow: 0 (backpressure): delivery waits for ACK window to free.
  • inflight_overflow: 1 (pending queue): overflow is buffered per subscriber and drained as ACKs arrive.

Queue Manager

queue_manager:
  auto_commit_interval: "5s"
FieldDefaultDescription
auto_commit_interval5sStream-group auto-commit cadence. 0 means commit on every delivery batch.

Queues

Queue configuration controls durable queues and stream queues.

queues:
  - name: "mqtt"
    topics: ["$queue/#"]
    reserved: true
    type: "classic"               # classic or stream
    primary_group: ""             # stream status reporting

    retention:
      max_age: "0s"               # 0 = unlimited
      max_length_bytes: 0          # 0 = unlimited
      max_length_messages: 0       # 0 = unlimited

    limits:
      max_message_size: 10485760
      max_depth: 100000
      message_ttl: "168h"

    retry:
      max_retries: 10
      initial_backoff: "5s"
      max_backoff: "5m"
      multiplier: 2.0

    dlq:
      enabled: true
      topic: ""                    # optional override

    replication:
      enabled: false
      group: ""
      replication_factor: 3
      mode: "sync"                 # sync or async
      min_in_sync_replicas: 2
      ack_timeout: "5s"
      heartbeat_timeout: "0s"      # 0 = inherit group/default
      election_timeout: "0s"       # 0 = inherit group/default
      snapshot_interval: "0s"      # 0 = inherit group/default
      snapshot_threshold: 0         # 0 = inherit group/default

Queue Fields

FieldDescription
nameUnique queue name.
topicsTopic filters routed into this queue (must be non-empty).
reservedMarks system-managed/builtin queue definitions.
typeQueue mode: classic or stream. Empty value falls back to default mode.
primary_groupFor stream queues: consumer group used for status reporting.

queues[].retention

FieldDescription
max_ageTime retention limit. 0s disables age-based retention.
max_length_bytesByte-size retention cap. 0 means unlimited.
max_length_messagesMessage-count retention cap. 0 means unlimited.

queues[].limits

FieldDescription
max_message_sizeQueue-level max payload size in bytes.
max_depthMax queued message count.
message_ttlPer-message TTL. Messages older than this are skipped at delivery time. 0 disables per-message expiry (default: 168h).

queues[].retry

FieldDescription
max_retriesMax delivery retries per message (>= 0).
initial_backoffInitial retry delay.
max_backoffMaximum retry delay.
multiplierExponential backoff multiplier (>= 1.0).

queues[].dlq

FieldDescription
enabledEnables dead-letter queue routing for exhausted messages.
topicOptional DLQ topic override; empty uses generated/default topic.

queues[].replication

FieldDescription
enabledEnables per-queue Raft replication.
groupRaft group ID for this queue. Empty means default.
replication_factorNumber of replicas (1..10 when enabled).
modesync or async.
min_in_sync_replicasMinimum replicas required to ACK (1..replication_factor).
ack_timeoutTimeout for sync replication acknowledgments (> 0).
heartbeat_timeoutOptional per-queue heartbeat override. 0 inherits cluster/group value.
election_timeoutOptional per-queue election timeout override. 0 inherits cluster/group value.
snapshot_intervalOptional per-queue snapshot interval override. 0 inherits cluster/group value.
snapshot_thresholdOptional per-queue snapshot threshold override. 0 inherits cluster/group value.

Storage

storage:
  type: "badger"      # memory or badger
  badger_dir: "/tmp/fluxmq/data"
  sync_writes: false
  recover_on_startup: false
FieldDefaultDescription
typebadgerStorage backend: memory or badger.
badger_dir/tmp/fluxmq/dataData directory for Badger backend (required when type=badger).
sync_writesfalseIf true, fsync-like durability on write path; if false, better throughput.
recover_on_startupfalseRun segment recovery before loading queues. Truncates corrupted segments at the last valid batch and rebuilds indexes.

Cluster

Clustering combines:

  • Embedded etcd (cluster.etcd): metadata coordination (session ownership, subscriptions, queue consumers, retained/will metadata).
  • gRPC transport (cluster.transport): cross-node routing (publishes, queue messages, session takeover, hybrid payload fetch), including delivery to local MQTT, AMQP 1.0, and AMQP 0.9.1 clients.
  • Optional Raft (cluster.raft): replicates durable queue operations.

For a “how it works” deep dive, see Clustering internals.

cluster:
  enabled: true
  node_id: "broker-1"

  etcd:
    data_dir: "/tmp/fluxmq/etcd"
    bind_addr: "0.0.0.0:2380"
    client_addr: "0.0.0.0:2379"
    initial_cluster: "broker-1=http://0.0.0.0:2380"
    bootstrap: true
    hybrid_retained_size_threshold: 1024

  transport:
    bind_addr: "0.0.0.0:7948"
    peers: {}
    route_batch_max_size: 256
    route_batch_max_delay: "5ms"
    route_batch_flush_workers: 4
    route_publish_timeout: "15s"
    tls_enabled: false
    tls_cert_file: ""
    tls_key_file: ""
    tls_ca_file: ""

  raft:
    enabled: false
    auto_provision_groups: true
    replication_factor: 3
    sync_mode: true
    min_in_sync_replicas: 2
    ack_timeout: "5s"
    write_policy: "forward"        # local, reject, forward
    distribution_mode: "replicate" # forward, replicate
    bind_addr: "127.0.0.1:7100"
    data_dir: "/tmp/fluxmq/raft"
    peers: {}
    heartbeat_timeout: "1s"
    election_timeout: "3s"
    snapshot_interval: "5m"
    snapshot_threshold: 8192

    groups:
      default:
        bind_addr: "127.0.0.1:7100"
        data_dir: "/tmp/fluxmq/raft"
        peers: {}
      hot:
        bind_addr: "127.0.0.1:7200"
        data_dir: "/tmp/fluxmq/raft/groups/hot"
        peers: {}

Cluster Root Fields

FieldDefaultDescription
enabledtrueEnables clustering features. Use false for standalone deployments.
node_idbroker-1Unique node identifier in the cluster.

cluster.etcd

FieldDescription
data_dirLocal etcd data directory.
bind_addretcd peer address (:2380) for member replication.
client_addretcd client address (:2379) used by broker components.
initial_clusterComma-separated cluster map (name=http://host:2380,...).
bootstraptrue when bootstrapping new cluster; false when joining existing cluster.
hybrid_retained_size_thresholdPayload size threshold for retained/will hybrid storage strategy.

cluster.transport

FieldDefaultDescription
bind_addr0.0.0.0:7948Inter-node gRPC transport bind address.
peers{}Map of node_id -> transport address.
route_batch_max_size256Flush batch after this many queued messages (>= 0).
route_batch_max_delay5msFlush partial batch after this delay (>= 0).
route_batch_flush_workers4Concurrent flush workers per remote node (>= 0).
route_publish_timeout15sMax time for cross-node publish operation (0 uses default).
tls_enabledfalseEnables mTLS/TLS for transport gRPC channel.
tls_cert_file""Required when tls_enabled=true.
tls_key_file""Required when tls_enabled=true.
tls_ca_file""Required when tls_enabled=true.

cluster.raft

FieldDefaultDescription
enabledfalseEnables queue Raft replication engine.
auto_provision_groupstrueAllows dynamic creation of queue-referenced groups not listed under groups.
replication_factor3Target replica count (1..10 when enabled).
sync_modetruetrue waits for apply/commit path; false returns earlier (async path).
min_in_sync_replicas2Minimum in-sync replicas required for sync behavior.
ack_timeout5sTimeout for sync commit/apply acknowledgments.
write_policyforwardFollower write behavior: local, reject, forward.
distribution_modereplicateCross-node delivery strategy: forward or replicate.
bind_addr127.0.0.1:7100Base Raft bind address for default group runtime.
data_dir/tmp/fluxmq/raftBase Raft data directory.
peers{}Map of node_id -> raft address.
heartbeat_timeout1sRaft heartbeat interval/tick.
election_timeout3sRaft election timeout.
snapshot_interval5mSnapshot interval.
snapshot_threshold8192Snapshot threshold in log entries.
groups{}Optional per-group overrides (default, hot, etc.).

cluster.raft.groups.<group_id>

FieldDescription
enabledOptional group enable switch. If omitted, inherits enabled behavior.
bind_addrGroup-specific Raft bind address (required for non-default groups).
data_dirGroup-specific data directory.
peersGroup-specific peer map (node_id -> raft address; required for non-default groups).
replication_factorOptional override for this group. 0 inherits base value.
sync_modeOptional per-group sync-mode override.
min_in_sync_replicasOptional per-group ISR override. 0 inherits base value.
ack_timeoutOptional per-group ack timeout override. 0 inherits base value.
heartbeat_timeoutOptional per-group heartbeat override. 0 inherits base value.
election_timeoutOptional per-group election timeout override. 0 inherits base value.
snapshot_intervalOptional per-group snapshot interval override. 0 inherits base value.
snapshot_thresholdOptional per-group snapshot threshold override. 0 inherits base value.

Transport Batching

The gRPC transport batches outbound messages per remote node before flushing them over the wire.

SettingDefaultDescription
route_batch_max_size256Maximum number of messages collected before flush.
route_batch_max_delay5msMaximum wait before flushing partial batch.
route_batch_flush_workers4Concurrent flush goroutines per remote node.
route_publish_timeout15sMaximum time for cross-cluster publish completion.

Raft Behavior (What The Knobs Mean)

Two fields control most queue behavior tradeoffs:

  • cluster.raft.write_policy: behavior on follower writes.
  • cluster.raft.distribution_mode: how deliveries are routed across nodes.

Other durability and timing fields:

  • sync_mode, ack_timeout
  • heartbeat_timeout, election_timeout
  • snapshot_interval, snapshot_threshold

Implementation notes:

  • FluxMQ supports multiple Raft replication groups; queues choose group via queues[].replication.group.
  • Group membership comes from peer configuration. replication_factor and min_in_sync_replicas are validated and used by policy logic, but do not replace Raft quorum mechanics.

Webhooks

webhook:
  enabled: false
  queue_size: 10000
  drop_policy: "oldest" # oldest or newest
  workers: 5
  include_payload: false
  shutdown_timeout: "30s"

  defaults:
    timeout: "5s"
    retry:
      max_attempts: 3
      initial_interval: "1s"
      max_interval: "30s"
      multiplier: 2.0
    circuit_breaker:
      failure_threshold: 5
      reset_timeout: "60s"

  endpoints:
    - name: "analytics"
      type: "http"
      url: "https://example.com/webhook"
      events: ["message.published"]
      topic_filters: ["sensors/#"]
      headers:
        Authorization: "Bearer token"
      timeout: "10s"

Only http endpoints are currently supported.

Webhook Fields

FieldDefaultDescription
enabledfalseEnables webhook event delivery.
queue_size10000In-memory webhook queue depth (>= 100 when enabled).
drop_policyoldestQueue full behavior: oldest or newest.
workers5Concurrent webhook workers (>= 1).
include_payloadfalseIncludes message payload in webhook body.
shutdown_timeout30sGraceful drain timeout during shutdown.
defaultsDefault delivery settings applied to endpoints.
endpoints[]List of webhook endpoint configs.

webhook.defaults

FieldDescription
timeoutDefault endpoint timeout.
retryRetry policy defaults.
circuit_breakerCircuit breaker defaults.

webhook.defaults.retry

FieldDescription
max_attemptsMax delivery attempts (>= 1).
initial_intervalInitial retry delay.
max_intervalMax retry delay.
multiplierExponential backoff multiplier (>= 1.0).

webhook.defaults.circuit_breaker

FieldDescription
failure_thresholdFailures before opening breaker (>= 1).
reset_timeoutTime before half-open probe/reset.

webhook.endpoints[]

FieldDescription
nameUnique endpoint identifier.
typeEndpoint type. Currently only http is supported.
urlTarget endpoint URL.
eventsEvent-type filter. Empty means all events.
topic_filtersTopic filter list for message events. Empty means all topics.
headersStatic headers attached to webhook requests.
timeoutOptional endpoint-specific timeout override.
retryOptional endpoint-specific retry override.

Rate Limiting

ratelimit:
  enabled: false

  connection:
    enabled: true
    rate: 1.6667           # connections per second per IP
    burst: 20
    cleanup_interval: "5m"

  message:
    enabled: true
    rate: 1000             # messages per second per client
    burst: 100

  subscribe:
    enabled: true
    rate: 100              # subscriptions per second per client
    burst: 10
FieldDescription
enabledGlobal rate-limit feature switch.
connection.enabledEnables per-IP connection rate limiting.
connection.rateAllowed connection attempts per second per IP.
connection.burstToken-bucket burst allowance for connection limiter.
connection.cleanup_intervalCleanup interval for stale connection limiter entries.
message.enabledEnables per-client publish/message limiter.
message.rateAllowed messages per second per client.
message.burstToken-bucket burst for message limiter.
subscribe.enabledEnables per-client subscribe limiter.
subscribe.rateAllowed subscribe operations per second per client.
subscribe.burstToken-bucket burst for subscribe limiter.

Auth

auth:
  url: "auth-service:7016"
  transport: "grpc"
  timeout: 5s
  protocols:
    mqtt: true
    http: true
    coap: true
    amqp: true
    amqp091: false
FieldDefaultDescription
url""Auth service address. Empty disables auth callout entirely.
transportgrpcWire format for callout: grpc or http.
timeout0Per-call timeout (e.g. 5s). Zero uses the transport default.
protocols{}Per-protocol auth toggle. Empty map = all protocols require auth. When set, only true entries get auth.

Valid protocols keys: mqtt, amqp, amqp091, http, coap.

See Security configuration for detailed examples.

Logging

log:
  level: "info"   # debug, info, warn, error
  format: "text"  # text or json
FieldDefaultDescription
levelinfoLog level: debug, info, warn, error.
formattextLog format: text or json.

On this page