Publishing Messages


Overview

Togo MQ provides flexible methods for publishing messages. You can publish individual messages, batches, or use streaming channels for high-throughput scenarios.

{danger} Important: The topic name is required for all published messages. Each message must specify a topic.

Message Structure

A message in Togo MQ consists of:

type Message struct {
    Topic     string            // Message topic (required)
    Body      []byte            // Message payload
    Variables map[string]string // Custom key-value metadata
    Postpone  int64             // Delay in seconds before message is available
    Retention int64             // How long to keep message (seconds)
}

Creating a message:

import "github.com/TogoMQ/togomq-sdk-go"

// Simple message
msg := togomq.NewMessage("orders", []byte("order-data-here"))

// Message with options
msg := togomq.NewMessage("orders", []byte("order-data")).
    WithVariables(map[string]string{
        "priority": "high",
        "customer": "12345",
    }).
    WithPostpone(60).      // Delay 60 seconds
    WithRetention(3600)    // Keep for 1 hour

Publishing a Single Message

To publish a single message, use PubBatch with a slice containing one message:

package main

import (
    "context"
    "log"

    "github.com/TogoMQ/togomq-sdk-go"
)

func main() {
    // Create client
    config := togomq.NewConfig(togomq.WithToken("your-token"))
    client, err := togomq.NewClient(config)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // Create a message
    msg := togomq.NewMessage("user-events", []byte("User logged in"))

    // Publish
    resp, err := client.PubBatch(context.Background(), []*togomq.Message{msg})
    if err != nil {
        log.Fatalf("Failed to publish: %v", err)
    }

    log.Printf("Published %d messages\n", resp.MessagesReceived)
}

Publishing a Batch of Messages

For better performance when publishing multiple messages, use batch publishing:

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/TogoMQ/togomq-sdk-go"
)

func main() {
    // Create client
    config := togomq.NewConfig(togomq.WithToken("your-token"))
    client, err := togomq.NewClient(config)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // Create multiple messages
    messages := []*togomq.Message{
        togomq.NewMessage("orders", []byte("order-1")),
        togomq.NewMessage("orders", []byte("order-2")).
            WithVariables(map[string]string{
                "priority": "high",
                "customer": "12345",
            }),
        togomq.NewMessage("orders", []byte("order-3")).
            WithPostpone(60).      // Delay 60 seconds
            WithRetention(3600),   // Keep for 1 hour
    }

    // Publish all messages at once
    resp, err := client.PubBatch(context.Background(), messages)
    if err != nil {
        log.Fatalf("Failed to publish: %v", err)
    }

    log.Printf("Published %d messages successfully!\n", resp.MessagesReceived)
}

Publishing via Channel (Streaming)

For high-throughput scenarios or when messages are generated over time, use channel-based streaming:

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/TogoMQ/togomq-sdk-go"
)

func main() {
    // Create client
    config := togomq.NewConfig(togomq.WithToken("your-token"))
    client, err := togomq.NewClient(config)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    ctx := context.Background()

    // Create a channel for messages
    msgChan := make(chan *togomq.Message, 100)

    // Start publishing in background
    go func() {
        resp, err := client.Pub(ctx, msgChan)
        if err != nil {
            log.Printf("Publish error: %v", err)
            return
        }
        log.Printf("Published %d messages\n", resp.MessagesReceived)
    }()

    // Send messages over time
    for i := 0; i < 1000; i++ {
        msg := togomq.NewMessage("events", []byte(fmt.Sprintf("event-%d", i)))
        msgChan <- msg

        // Simulate work between messages
        time.Sleep(10 * time.Millisecond)
    }

    // Close channel to signal end of stream
    close(msgChan)

    // Wait for completion
    time.Sleep(2 * time.Second)
}

Message Options

Custom Metadata (Variables)

Add custom key-value pairs to your messages:

msg := togomq.NewMessage("orders", []byte("order-data")).
    WithVariables(map[string]string{
        "order_id":    "12345",
        "customer_id": "98765",
        "priority":    "high",
        "region":      "us-east",
    })

Subscribers can access these variables to filter or route messages.

Delayed Messages (Postpone)

Schedule a message to be delivered after a delay:

// Delay message by 60 seconds
msg := togomq.NewMessage("reminders", []byte("Send email")).
    WithPostpone(60)

// Delay by 1 hour (3600 seconds)
msg := togomq.NewMessage("scheduled-tasks", []byte("Run backup")).
    WithPostpone(3600)

Use cases:

  • Retry logic with exponential backoff
  • Scheduled tasks
  • Delayed notifications
  • Rate limiting

Message Retention

Control how long a message is kept:

// Keep message for 1 hour
msg := togomq.NewMessage("temporary-events", []byte("data")).
    WithRetention(3600)

// Keep message for 24 hours
msg := togomq.NewMessage("daily-reports", []byte("report")).
    WithRetention(86400)

Best Practices

1. Reuse Clients

Create one client per application and reuse it:

// Good ✅
client, _ := togomq.NewClient(config)
defer client.Close()

for i := 0; i < 1000; i++ {
    client.PubBatch(ctx, messages)
}

// Bad ❌ - Creates unnecessary connections
for i := 0; i < 1000; i++ {
    client, _ := togomq.NewClient(config)
    client.PubBatch(ctx, messages)
    client.Close()
}

2. Use Batch Publishing

For multiple messages, always use batching:

// Good ✅ - Single network call
messages := []*togomq.Message{msg1, msg2, msg3}
client.PubBatch(ctx, messages)

// Bad ❌ - Multiple network calls
client.PubBatch(ctx, []*togomq.Message{msg1})
client.PubBatch(ctx, []*togomq.Message{msg2})
client.PubBatch(ctx, []*togomq.Message{msg3})

3. Handle Errors Properly

Always check and handle errors:

resp, err := client.PubBatch(ctx, messages)
if err != nil {
    // Check error type for specific handling
    if togomqErr, ok := err.(*togomq.TogoMQError); ok {
        switch togomqErr.Code {
        case togomq.ErrCodeAuth:
            log.Println("Authentication failed - check your token")
        case togomq.ErrCodeConnection:
            log.Println("Connection error - retry with backoff")
        default:
            log.Printf("Error: %v", togomqErr)
        }
    }
    return
}

4. Use Context for Timeouts

Implement timeouts to prevent hanging:

// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

resp, err := client.PubBatch(ctx, messages)
if err != nil {
    log.Printf("Publish failed or timed out: %v", err)
}

5. Set Appropriate Message Sizes

Messages can be up to 50 MB, but smaller is better for performance:

// Good ✅ - Reasonable message size
msg := togomq.NewMessage("events", []byte("small payload"))

// Be careful ⚠️ - Large messages impact performance
largePayload := make([]byte, 10*1024*1024) // 10 MB
msg := togomq.NewMessage("files", largePayload)

{success} Next: Learn about Subscribing to Messages to receive and process messages.