Togo MQ provides flexible methods for publishing messages. You can publish individual messages, batches, or use streaming channels for high-throughput scenarios.
{danger} Important: The topic name is required for all published messages. Each message must specify a topic.
A message in Togo MQ consists of:
type Message struct {
Topic string // Message topic (required)
Body []byte // Message payload
Variables map[string]string // Custom key-value metadata
Postpone int64 // Delay in seconds before message is available
Retention int64 // How long to keep message (seconds)
}
Creating a message:
import "github.com/TogoMQ/togomq-sdk-go"
// Simple message
msg := togomq.NewMessage("orders", []byte("order-data-here"))
// Message with options
msg := togomq.NewMessage("orders", []byte("order-data")).
WithVariables(map[string]string{
"priority": "high",
"customer": "12345",
}).
WithPostpone(60). // Delay 60 seconds
WithRetention(3600) // Keep for 1 hour
To publish a single message, use PubBatch with a slice containing one message:
package main
import (
"context"
"log"
"github.com/TogoMQ/togomq-sdk-go"
)
func main() {
// Create client
config := togomq.NewConfig(togomq.WithToken("your-token"))
client, err := togomq.NewClient(config)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Create a message
msg := togomq.NewMessage("user-events", []byte("User logged in"))
// Publish
resp, err := client.PubBatch(context.Background(), []*togomq.Message{msg})
if err != nil {
log.Fatalf("Failed to publish: %v", err)
}
log.Printf("Published %d messages\n", resp.MessagesReceived)
}
For better performance when publishing multiple messages, use batch publishing:
package main
import (
"context"
"fmt"
"log"
"github.com/TogoMQ/togomq-sdk-go"
)
func main() {
// Create client
config := togomq.NewConfig(togomq.WithToken("your-token"))
client, err := togomq.NewClient(config)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Create multiple messages
messages := []*togomq.Message{
togomq.NewMessage("orders", []byte("order-1")),
togomq.NewMessage("orders", []byte("order-2")).
WithVariables(map[string]string{
"priority": "high",
"customer": "12345",
}),
togomq.NewMessage("orders", []byte("order-3")).
WithPostpone(60). // Delay 60 seconds
WithRetention(3600), // Keep for 1 hour
}
// Publish all messages at once
resp, err := client.PubBatch(context.Background(), messages)
if err != nil {
log.Fatalf("Failed to publish: %v", err)
}
log.Printf("Published %d messages successfully!\n", resp.MessagesReceived)
}
For high-throughput scenarios or when messages are generated over time, use channel-based streaming:
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/TogoMQ/togomq-sdk-go"
)
func main() {
// Create client
config := togomq.NewConfig(togomq.WithToken("your-token"))
client, err := togomq.NewClient(config)
if err != nil {
log.Fatal(err)
}
defer client.Close()
ctx := context.Background()
// Create a channel for messages
msgChan := make(chan *togomq.Message, 100)
// Start publishing in background
go func() {
resp, err := client.Pub(ctx, msgChan)
if err != nil {
log.Printf("Publish error: %v", err)
return
}
log.Printf("Published %d messages\n", resp.MessagesReceived)
}()
// Send messages over time
for i := 0; i < 1000; i++ {
msg := togomq.NewMessage("events", []byte(fmt.Sprintf("event-%d", i)))
msgChan <- msg
// Simulate work between messages
time.Sleep(10 * time.Millisecond)
}
// Close channel to signal end of stream
close(msgChan)
// Wait for completion
time.Sleep(2 * time.Second)
}
Add custom key-value pairs to your messages:
msg := togomq.NewMessage("orders", []byte("order-data")).
WithVariables(map[string]string{
"order_id": "12345",
"customer_id": "98765",
"priority": "high",
"region": "us-east",
})
Subscribers can access these variables to filter or route messages.
Schedule a message to be delivered after a delay:
// Delay message by 60 seconds
msg := togomq.NewMessage("reminders", []byte("Send email")).
WithPostpone(60)
// Delay by 1 hour (3600 seconds)
msg := togomq.NewMessage("scheduled-tasks", []byte("Run backup")).
WithPostpone(3600)
Use cases:
Control how long a message is kept:
// Keep message for 1 hour
msg := togomq.NewMessage("temporary-events", []byte("data")).
WithRetention(3600)
// Keep message for 24 hours
msg := togomq.NewMessage("daily-reports", []byte("report")).
WithRetention(86400)
Create one client per application and reuse it:
// Good ✅
client, _ := togomq.NewClient(config)
defer client.Close()
for i := 0; i < 1000; i++ {
client.PubBatch(ctx, messages)
}
// Bad ❌ - Creates unnecessary connections
for i := 0; i < 1000; i++ {
client, _ := togomq.NewClient(config)
client.PubBatch(ctx, messages)
client.Close()
}
For multiple messages, always use batching:
// Good ✅ - Single network call
messages := []*togomq.Message{msg1, msg2, msg3}
client.PubBatch(ctx, messages)
// Bad ❌ - Multiple network calls
client.PubBatch(ctx, []*togomq.Message{msg1})
client.PubBatch(ctx, []*togomq.Message{msg2})
client.PubBatch(ctx, []*togomq.Message{msg3})
Always check and handle errors:
resp, err := client.PubBatch(ctx, messages)
if err != nil {
// Check error type for specific handling
if togomqErr, ok := err.(*togomq.TogoMQError); ok {
switch togomqErr.Code {
case togomq.ErrCodeAuth:
log.Println("Authentication failed - check your token")
case togomq.ErrCodeConnection:
log.Println("Connection error - retry with backoff")
default:
log.Printf("Error: %v", togomqErr)
}
}
return
}
Implement timeouts to prevent hanging:
// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
resp, err := client.PubBatch(ctx, messages)
if err != nil {
log.Printf("Publish failed or timed out: %v", err)
}
Messages can be up to 50 MB, but smaller is better for performance:
// Good ✅ - Reasonable message size
msg := togomq.NewMessage("events", []byte("small payload"))
// Be careful ⚠️ - Large messages impact performance
largePayload := make([]byte, 10*1024*1024) // 10 MB
msg := togomq.NewMessage("files", largePayload)
{success} Next: Learn about Subscribing to Messages to receive and process messages.