Messaging
Consuming Messages
Subscribe with MQTT and receive messages from topics or queues
Consuming Messages
Last Updated: 2026-02-05
MQTT Subscribe
mosquitto_sub -p 1883 -t "sensors/#" -vUse QoS 1 or 2 when you need delivery guarantees:
mosquitto_sub -p 1883 -t "sensors/#" -q 1 -vQueue Consumption
Queue consumers subscribe to $queue/<queue>/... and set a consumer group (protocol-specific).
Basic Queue Subscription
# Subscribe to all messages in the "orders" queue
mosquitto_sub -p 1883 -t '$queue/orders/#' -q 1 -vFiltered Queue Subscription
Use wildcards to filter messages within a queue:
# Only receive messages with routing key starting with "images/"
mosquitto_sub -p 1883 -t '$queue/orders/images/#' -q 1 -v
# Only receive messages matching +/images/# (e.g., eu/images/resize, us/images/png)
mosquitto_sub -p 1883 -t '$queue/orders/+/images/#' -q 1 -vWith Consumer Group (MQTT v5)
# Join consumer group "workers" for load balancing
mosquitto_sub -p 1883 -t '$queue/orders/#' -q 1 -v \
-D subscribe user-property consumer-group workersAMQP 0.9.1 Queue Consumption
Basic Queue Subscription (Go)
// Subscribe to queue with consumer group
deliveries, err := ch.Consume(
"$queue/orders/#", // queue filter
"", // consumer tag (auto-generated)
false, // auto-ack (manual ack for durability)
false, // exclusive
false, // no-local
false, // no-wait
amqp091.Table{
"x-consumer-group": "workers",
},
)
for d := range deliveries {
// Process message
fmt.Printf("Received: %s\n", d.Body)
// Acknowledge
d.Ack(false)
}Filtered Queue Subscription
// Only receive messages matching +/images/#
deliveries, _ := ch.Consume(
"$queue/orders/+/images/#",
"", false, false, false, false,
amqp091.Table{"x-consumer-group": "image-processors"},
)Stream Queue Consumption
// Declare stream queue first
ch.QueueDeclare("events", true, false, false, false, amqp091.Table{
"x-queue-type": "stream",
"x-max-age": "24h",
})
// Consume from beginning
deliveries, _ := ch.Consume(
"events", "", false, false, false, false,
amqp091.Table{
"x-consumer-group": "replay-consumer",
"x-stream-offset": "first",
},
)
for d := range deliveries {
// Access stream metadata from headers
offset := d.Headers["x-stream-offset"]
timestamp := d.Headers["x-stream-timestamp"]
fmt.Printf("Offset %v at %v: %s\n", offset, timestamp, d.Body)
d.Ack(false)
}Acknowledgments
// Acknowledge successful processing
d.Ack(false)
// Negative acknowledgment - retry the message
d.Nack(false, true) // multiple=false, requeue=true
// Reject - send to DLQ (no retry)
d.Reject(false) // requeue=falseSee: