Example
Runnable Go example demonstrating MQTT pub/sub, cross-protocol queue interop, and stream consumption
Messaging Example
A single Go program that demonstrates FluxMQ's messaging capabilities across three scenarios:
- MQTT pub/sub — standard topic-based publish and subscribe
- MQTT → AMQP 0.9.1 queue — publish via MQTT, consume via AMQP 0.9.1 on a durable queue
- AMQP 0.9.1 stream queue — declare a stream, publish events, and replay from the beginning
The example uses only third-party MQTT and AMQP client libraries — no FluxMQ-specific SDK is required.
Prerequisites
- Go 1.21+ installed
- FluxMQ running locally with default ports:
- MQTT:
1883 - AMQP 0.9.1:
5682
- MQTT:
Start FluxMQ with Docker or from source:
# Docker
docker compose -f docker/compose.yaml up -d
# From source
go run ./cmd/ --config examples/no-cluster.yamlSee Quick Start (Docker) for details.
Running the Example
go run ./examples/messaging/Custom addresses:
go run ./examples/messaging/ \
-mqtt localhost:1883 \
-amqp091 localhost:5682What It Does
Scenario 1: MQTT Pub/Sub
Standard MQTT messaging — a publisher sends a JSON payload to demo/sensors/temperature, and a subscriber on the same topic receives it. This is vanilla MQTT with no queues involved.
// Publisher
pub.Publish("demo/sensors/temperature", 1, false, `{"sensor":"temp-1","value":22.5}`)
// Subscriber
sub.Subscribe("demo/sensors/temperature", 1, func(_ mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received: %s\n", msg.Payload())
})Scenario 2: MQTT → AMQP 0.9.1 (Durable Queue)
Demonstrates cross-protocol interoperability. An MQTT client publishes orders to the $queue/demo-orders topic. An AMQP 0.9.1 consumer reads from the same durable queue — messages traverse the protocol boundary transparently.
The AMQP 0.9.1 consumer joins a consumer group and manually acknowledges each message:
deliveries, _ := ch.Consume(
"$queue/demo-orders/#",
"demo-consumer",
false, false, false, false,
amqp091.Table{
"x-consumer-group": "demo-workers",
},
)
for d := range deliveries {
fmt.Printf("Received: %s\n", d.Body)
d.Ack(false)
}Key points:
- The
$queue/prefix routes messages through the durable queue manager x-consumer-groupsets the consumer group for load balancing- Manual acks ensure at-least-once delivery
Scenario 3: AMQP 0.9.1 Stream Queue
Demonstrates stream consumption — a replayable, append-only log. The example declares a stream queue, publishes events, then consumes them from the beginning using x-stream-offset: "first".
// Declare stream queue
ch.QueueDeclare("demo-events", true, false, false, false, amqp091.Table{
"x-queue-type": "stream",
"x-max-age": "1h",
})
// Publish to stream
ch.PublishWithContext(ctx, "", "demo-events", false, false, amqp091.Publishing{
Body: []byte(`{"event":"user.action","seq":1}`),
})
// Consume from beginning (replay)
deliveries, _ := ch.Consume("demo-events", "stream-reader", false, false, false, false,
amqp091.Table{
"x-consumer-group": "demo-readers",
"x-stream-offset": "first",
},
)Key points:
- Stream queues use the queue name directly — no
$queue/prefix x-stream-offsetcontrols where consumption starts:"first","last","next", or a specific offset number- Each delivered message includes an
x-stream-offsetheader with its position in the log - Stream queues retain messages based on
x-max-age, unlike classic queues which remove messages after acknowledgment
Expected Output
=== Scenario 1: MQTT Pub/Sub ===
[MQTT sub] Subscribed to demo/sensors/temperature
[MQTT pub] Publishing to demo/sensors/temperature: {"sensor":"temp-1","value":22.5}
[MQTT sub] Received on demo/sensors/temperature: {"sensor":"temp-1","value":22.5}
[OK] MQTT pub/sub round-trip successful
=== Scenario 2: MQTT → AMQP 0.9.1 (Durable Queue) ===
[AMQP 0.9.1] Consuming from queue 'demo-orders' in group 'demo-workers'
[MQTT pub] Publishing to $queue/demo-orders: {"order_id":"order-1",...}
[AMQP 0.9.1] Received: {"order_id":"order-1",...}
...
[OK] All 5 messages published via MQTT, consumed via AMQP 0.9.1
=== Scenario 3: AMQP 0.9.1 Stream Queue ===
[AMQP 0.9.1] Declared stream queue 'demo-events'
[AMQP 0.9.1] Published to stream: {"event":"user.action","seq":1}
...
[AMQP 0.9.1] Consuming stream 'demo-events' from offset 'first'
[AMQP 0.9.1] Stream message (offset=1): {"event":"user.action","seq":1}
...
[OK] Published 5 events, replayed all 5 from stream
All scenarios completed.Libraries Used
| Library | Protocol | Purpose |
|---|---|---|
| eclipse/paho.mqtt.golang | MQTT 3.1.1 | Publish and subscribe |
| rabbitmq/amqp091-go | AMQP 0.9.1 | Queue consume, stream declare/publish/consume |
Next Steps
- Durable Queues — queue types, routing keys, retention, and acknowledgment semantics
- Consumer Groups — fan-out, load balancing, and group configuration
- Queue Client Example — a more advanced order-processing pipeline with multiple consumer groups across all three protocols