Subscribing to messages in Togo MQ allows you to receive messages published to specific topics. The SDK provides a simple channel-based API for receiving messages asynchronously.
{danger} Important: Topic is required for subscriptions. Use wildcards like
"orders.*"for pattern matching, or"*"to receive messages from all topics.
When you receive a message, it contains:
type Message struct {
Topic string // Message topic
UUID string // Unique message identifier
Body []byte // Message payload
Variables map[string]string // Custom key-value metadata
}
Here's a simple example of subscribing to a specific topic:
package main
import (
"context"
"log"
"github.com/TogoMQ/togomq-sdk-go"
)
func main() {
// Create client
config := togomq.NewConfig(togomq.WithToken("your-token"))
client, err := togomq.NewClient(config)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Subscribe to specific topic
opts := togomq.NewSubscribeOptions("orders")
msgChan, errChan, err := client.Sub(context.Background(), opts)
if err != nil {
log.Fatal(err)
}
// Receive messages
for {
select {
case msg, ok := <-msgChan:
if !ok {
log.Println("Subscription ended")
return
}
log.Printf("Received message from %s: %s\n", msg.Topic, string(msg.Body))
log.Printf("Message UUID: %s\n", msg.UUID)
// Access custom variables
if priority, ok := msg.Variables["priority"]; ok {
log.Printf("Priority: %s\n", priority)
}
case err := <-errChan:
log.Printf("Subscription error: %v\n", err)
return
}
}
}
You can customize subscription behavior with additional options:
Control how many messages to receive at once:
// Receive up to 10 messages at once
opts := togomq.NewSubscribeOptions("orders").
WithBatch(10)
msgChan, errChan, err := client.Sub(context.Background(), opts)
Default: 0 (uses server default of 1000)
Limit message delivery rate per second:
// Limit to 100 messages per second
opts := togomq.NewSubscribeOptions("orders").
WithSpeedPerSec(100)
msgChan, errChan, err := client.Sub(context.Background(), opts)
Default: 0 (unlimited)
Use both batch size and rate limiting:
// Receive batches of 50 messages, max 200/second
opts := togomq.NewSubscribeOptions("events").
WithBatch(50).
WithSpeedPerSec(200)
msgChan, errChan, err := client.Sub(context.Background(), opts)
if err != nil {
log.Fatal(err)
}
// Process messages
for {
select {
case msg := <-msgChan:
if msg == nil {
return
}
processMessage(msg)
case err := <-errChan:
log.Printf("Error: %v\n", err)
return
}
}
Use "*" to receive messages from all topics:
// Subscribe to all topics
opts := togomq.NewSubscribeOptions("*")
msgChan, errChan, err := client.Sub(ctx, opts)
// Process all messages
for msg := range msgChan {
log.Printf("Received from %s: %s\n", msg.Topic, string(msg.Body))
}
Use wildcard patterns to match multiple topics:
// Subscribe to all order-related topics (orders.new, orders.updated, etc.)
opts := togomq.NewSubscribeOptions("orders.*")
msgChan, errChan, err := client.Sub(ctx, opts)
// Subscribe to all user events
opts := togomq.NewSubscribeOptions("users.*")
msgChan, errChan, err := client.Sub(ctx, opts)
Pattern Examples:
"orders.*" - Matches orders.new, orders.updated, orders.cancelled"*.critical" - Matches alerts.critical, events.critical"*" - Matches all topicsUse contexts to control subscription lifecycle:
// Subscribe for maximum 30 seconds
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
opts := togomq.NewSubscribeOptions("events")
msgChan, errChan, err := client.Sub(ctx, opts)
if err != nil {
log.Fatal(err)
}
// Process messages until timeout
for {
select {
case msg := <-msgChan:
if msg == nil {
return
}
processMessage(msg)
case err := <-errChan:
log.Printf("Error: %v\n", err)
return
case <-ctx.Done():
log.Println("Subscription timeout")
return
}
}
// Create cancellable context
ctx, cancel := context.WithCancel(context.Background())
opts := togomq.NewSubscribeOptions("orders")
msgChan, errChan, err := client.Sub(ctx, opts)
if err != nil {
log.Fatal(err)
}
// Cancel subscription after processing 100 messages
count := 0
for {
select {
case msg := <-msgChan:
if msg == nil {
return
}
processMessage(msg)
count++
if count >= 100 {
cancel() // Stop subscription
return
}
case err := <-errChan:
log.Printf("Error: %v\n", err)
return
case <-ctx.Done():
log.Println("Subscription cancelled")
return
}
}
func processMessage(msg *togomq.Message) {
log.Printf("Processing message %s from topic %s\n", msg.UUID, msg.Topic)
// Parse message body
data := string(msg.Body)
// Access metadata
if userID, ok := msg.Variables["user_id"]; ok {
log.Printf("User ID: %s\n", userID)
}
// Your business logic here
// ...
}
For high-throughput scenarios, process messages concurrently:
func main() {
// ... create client and subscription ...
// Create worker pool
workers := 10
workerChan := make(chan *togomq.Message, 100)
// Start workers
for i := 0; i < workers; i++ {
go worker(workerChan)
}
// Distribute messages to workers
for {
select {
case msg := <-msgChan:
if msg == nil {
close(workerChan)
return
}
workerChan <- msg
case err := <-errChan:
log.Printf("Error: %v\n", err)
close(workerChan)
return
}
}
}
func worker(msgChan <-chan *togomq.Message) {
for msg := range msgChan {
processMessage(msg)
}
}
Never ignore the error channel:
// Good ✅
for {
select {
case msg := <-msgChan:
processMessage(msg)
case err := <-errChan:
log.Printf("Error: %v\n", err)
return
}
}
// Bad ❌ - Ignores errors
for msg := range msgChan {
processMessage(msg)
}
Check for channel closure:
case msg := <-msgChan:
if msg == nil {
log.Println("Subscription ended")
return
}
processMessage(msg)
Choose batch sizes based on your processing capacity:
// For real-time processing
opts := togomq.NewSubscribeOptions("events").
WithBatch(1)
// For batch processing
opts := togomq.NewSubscribeOptions("reports").
WithBatch(100)
Use contexts for clean shutdown:
// Listen for interrupt signal
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
<-sigChan
log.Println("Shutting down...")
cancel()
}()
// Subscription will stop when context is cancelled
msgChan, errChan, err := client.Sub(ctx, opts)
Protect downstream services with rate limiting:
// Limit to 50 messages/second to protect database
opts := togomq.NewSubscribeOptions("database-updates").
WithSpeedPerSec(50)
Always log message UUIDs for troubleshooting:
log.Printf("Processing message UUID: %s from topic: %s\n", msg.UUID, msg.Topic)
Here's a production-ready subscription example:
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/TogoMQ/togomq-sdk-go"
)
func main() {
// Create client
config := togomq.NewConfig(
togomq.WithToken(os.Getenv("TOGOMQ_TOKEN")),
togomq.WithLogLevel("info"),
)
client, err := togomq.NewClient(config)
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
defer client.Close()
// Setup graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigChan
log.Println("Received shutdown signal")
cancel()
}()
// Subscribe with options
opts := togomq.NewSubscribeOptions("orders.*").
WithBatch(10).
WithSpeedPerSec(100)
msgChan, errChan, err := client.Sub(ctx, opts)
if err != nil {
log.Fatalf("Failed to subscribe: %v", err)
}
log.Println("Subscribed successfully, waiting for messages...")
// Process messages
for {
select {
case msg, ok := <-msgChan:
if !ok {
log.Println("Message channel closed")
return
}
if msg == nil {
log.Println("Subscription ended")
return
}
processMessage(msg)
case err := <-errChan:
log.Printf("Subscription error: %v\n", err)
return
case <-ctx.Done():
log.Println("Shutting down gracefully...")
time.Sleep(1 * time.Second) // Allow in-flight messages to complete
return
}
}
}
func processMessage(msg *togomq.Message) {
log.Printf("[%s] Topic: %s, Body: %s\n", msg.UUID, msg.Topic, string(msg.Body))
// Your business logic here
// ...
}
{success} Next: Learn about Error Handling to properly handle errors in your application.