Subscribing to Messages


Overview

Subscribing to messages in Togo MQ allows you to receive messages published to specific topics. The SDK provides a simple channel-based API for receiving messages asynchronously.

{danger} Important: Topic is required for subscriptions. Use wildcards like "orders.*" for pattern matching, or "*" to receive messages from all topics.

Received Message Structure

When you receive a message, it contains:

type Message struct {
    Topic     string            // Message topic
    UUID      string            // Unique message identifier
    Body      []byte            // Message payload
    Variables map[string]string // Custom key-value metadata
}

Basic Subscription

Here's a simple example of subscribing to a specific topic:

package main

import (
    "context"
    "log"

    "github.com/TogoMQ/togomq-sdk-go"
)

func main() {
    // Create client
    config := togomq.NewConfig(togomq.WithToken("your-token"))
    client, err := togomq.NewClient(config)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // Subscribe to specific topic
    opts := togomq.NewSubscribeOptions("orders")
    msgChan, errChan, err := client.Sub(context.Background(), opts)
    if err != nil {
        log.Fatal(err)
    }

    // Receive messages
    for {
        select {
        case msg, ok := <-msgChan:
            if !ok {
                log.Println("Subscription ended")
                return
            }

            log.Printf("Received message from %s: %s\n", msg.Topic, string(msg.Body))
            log.Printf("Message UUID: %s\n", msg.UUID)

            // Access custom variables
            if priority, ok := msg.Variables["priority"]; ok {
                log.Printf("Priority: %s\n", priority)
            }

        case err := <-errChan:
            log.Printf("Subscription error: %v\n", err)
            return
        }
    }
}

Advanced Subscription Options

You can customize subscription behavior with additional options:

Batch Size

Control how many messages to receive at once:

// Receive up to 10 messages at once
opts := togomq.NewSubscribeOptions("orders").
    WithBatch(10)

msgChan, errChan, err := client.Sub(context.Background(), opts)

Default: 0 (uses server default of 1000)

Rate Limiting

Limit message delivery rate per second:

// Limit to 100 messages per second
opts := togomq.NewSubscribeOptions("orders").
    WithSpeedPerSec(100)

msgChan, errChan, err := client.Sub(context.Background(), opts)

Default: 0 (unlimited)

Combined Options

Use both batch size and rate limiting:

// Receive batches of 50 messages, max 200/second
opts := togomq.NewSubscribeOptions("events").
    WithBatch(50).
    WithSpeedPerSec(200)

msgChan, errChan, err := client.Sub(context.Background(), opts)
if err != nil {
    log.Fatal(err)
}

// Process messages
for {
    select {
    case msg := <-msgChan:
        if msg == nil {
            return
        }
        processMessage(msg)

    case err := <-errChan:
        log.Printf("Error: %v\n", err)
        return
    }
}

Wildcard Subscriptions

Subscribe to All Topics

Use "*" to receive messages from all topics:

// Subscribe to all topics
opts := togomq.NewSubscribeOptions("*")
msgChan, errChan, err := client.Sub(ctx, opts)

// Process all messages
for msg := range msgChan {
    log.Printf("Received from %s: %s\n", msg.Topic, string(msg.Body))
}

Subscribe to Topic Patterns

Use wildcard patterns to match multiple topics:

// Subscribe to all order-related topics (orders.new, orders.updated, etc.)
opts := togomq.NewSubscribeOptions("orders.*")
msgChan, errChan, err := client.Sub(ctx, opts)

// Subscribe to all user events
opts := togomq.NewSubscribeOptions("users.*")
msgChan, errChan, err := client.Sub(ctx, opts)

Pattern Examples:

  • "orders.*" - Matches orders.new, orders.updated, orders.cancelled
  • "*.critical" - Matches alerts.critical, events.critical
  • "*" - Matches all topics

Context Cancellation

Use contexts to control subscription lifecycle:

Timeout-Based Subscription

// Subscribe for maximum 30 seconds
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

opts := togomq.NewSubscribeOptions("events")
msgChan, errChan, err := client.Sub(ctx, opts)
if err != nil {
    log.Fatal(err)
}

// Process messages until timeout
for {
    select {
    case msg := <-msgChan:
        if msg == nil {
            return
        }
        processMessage(msg)

    case err := <-errChan:
        log.Printf("Error: %v\n", err)
        return

    case <-ctx.Done():
        log.Println("Subscription timeout")
        return
    }
}

Manual Cancellation

// Create cancellable context
ctx, cancel := context.WithCancel(context.Background())

opts := togomq.NewSubscribeOptions("orders")
msgChan, errChan, err := client.Sub(ctx, opts)
if err != nil {
    log.Fatal(err)
}

// Cancel subscription after processing 100 messages
count := 0
for {
    select {
    case msg := <-msgChan:
        if msg == nil {
            return
        }
        processMessage(msg)
        count++

        if count >= 100 {
            cancel() // Stop subscription
            return
        }

    case err := <-errChan:
        log.Printf("Error: %v\n", err)
        return

    case <-ctx.Done():
        log.Println("Subscription cancelled")
        return
    }
}

Processing Messages

Basic Message Processing

func processMessage(msg *togomq.Message) {
    log.Printf("Processing message %s from topic %s\n", msg.UUID, msg.Topic)

    // Parse message body
    data := string(msg.Body)

    // Access metadata
    if userID, ok := msg.Variables["user_id"]; ok {
        log.Printf("User ID: %s\n", userID)
    }

    // Your business logic here
    // ...
}

Concurrent Processing

For high-throughput scenarios, process messages concurrently:

func main() {
    // ... create client and subscription ...

    // Create worker pool
    workers := 10
    workerChan := make(chan *togomq.Message, 100)

    // Start workers
    for i := 0; i < workers; i++ {
        go worker(workerChan)
    }

    // Distribute messages to workers
    for {
        select {
        case msg := <-msgChan:
            if msg == nil {
                close(workerChan)
                return
            }
            workerChan <- msg

        case err := <-errChan:
            log.Printf("Error: %v\n", err)
            close(workerChan)
            return
        }
    }
}

func worker(msgChan <-chan *togomq.Message) {
    for msg := range msgChan {
        processMessage(msg)
    }
}

Best Practices

1. Always Monitor Error Channel

Never ignore the error channel:

// Good ✅
for {
    select {
    case msg := <-msgChan:
        processMessage(msg)
    case err := <-errChan:
        log.Printf("Error: %v\n", err)
        return
    }
}

// Bad ❌ - Ignores errors
for msg := range msgChan {
    processMessage(msg)
}

2. Handle Nil Messages

Check for channel closure:

case msg := <-msgChan:
    if msg == nil {
        log.Println("Subscription ended")
        return
    }
    processMessage(msg)

3. Use Appropriate Batch Sizes

Choose batch sizes based on your processing capacity:

// For real-time processing
opts := togomq.NewSubscribeOptions("events").
    WithBatch(1)

// For batch processing
opts := togomq.NewSubscribeOptions("reports").
    WithBatch(100)

4. Implement Graceful Shutdown

Use contexts for clean shutdown:

// Listen for interrupt signal
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
    <-sigChan
    log.Println("Shutting down...")
    cancel()
}()

// Subscription will stop when context is cancelled
msgChan, errChan, err := client.Sub(ctx, opts)

5. Use Rate Limiting for Resource Protection

Protect downstream services with rate limiting:

// Limit to 50 messages/second to protect database
opts := togomq.NewSubscribeOptions("database-updates").
    WithSpeedPerSec(50)

6. Log Message UUIDs

Always log message UUIDs for troubleshooting:

log.Printf("Processing message UUID: %s from topic: %s\n", msg.UUID, msg.Topic)

Complete Example

Here's a production-ready subscription example:

package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/TogoMQ/togomq-sdk-go"
)

func main() {
    // Create client
    config := togomq.NewConfig(
        togomq.WithToken(os.Getenv("TOGOMQ_TOKEN")),
        togomq.WithLogLevel("info"),
    )

    client, err := togomq.NewClient(config)
    if err != nil {
        log.Fatalf("Failed to create client: %v", err)
    }
    defer client.Close()

    // Setup graceful shutdown
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

    go func() {
        <-sigChan
        log.Println("Received shutdown signal")
        cancel()
    }()

    // Subscribe with options
    opts := togomq.NewSubscribeOptions("orders.*").
        WithBatch(10).
        WithSpeedPerSec(100)

    msgChan, errChan, err := client.Sub(ctx, opts)
    if err != nil {
        log.Fatalf("Failed to subscribe: %v", err)
    }

    log.Println("Subscribed successfully, waiting for messages...")

    // Process messages
    for {
        select {
        case msg, ok := <-msgChan:
            if !ok {
                log.Println("Message channel closed")
                return
            }
            if msg == nil {
                log.Println("Subscription ended")
                return
            }

            processMessage(msg)

        case err := <-errChan:
            log.Printf("Subscription error: %v\n", err)
            return

        case <-ctx.Done():
            log.Println("Shutting down gracefully...")
            time.Sleep(1 * time.Second) // Allow in-flight messages to complete
            return
        }
    }
}

func processMessage(msg *togomq.Message) {
    log.Printf("[%s] Topic: %s, Body: %s\n", msg.UUID, msg.Topic, string(msg.Body))

    // Your business logic here
    // ...
}

{success} Next: Learn about Error Handling to properly handle errors in your application.