Blog

Pub/Sub & Queues Explained with Python, Go, and Redis

RedisPythonGoDockerPublishSubscribeMessageQueueAsynchronousMessagingMessageBroker

In modern software and especially with the rise of microservices, getting different parts of your application to talk to each other reliably became a critical challenge. You could chain them together with direct API calls, but that creates a tightly coupled system that's brittle and hard to scale. If one service goes down, the whole chain breaks.

A much better approach is to use a message broker: a central hub that lets your services communicate asynchronously, decoupling them from one another. Today we'll build a complete, containerized messaging system from scratch to explore two fundamental patterns: publish/subscribe and message queues.

We'll use a powerful, popular stack:

  • Redis: our lightweight, high-performance message broker
  • Python: to write a service that publishes (sends) messages
  • Go: to write a service that subscribes to (receives) messages
  • Docker: to package and run our entire system with a single command

By the end of this post, you'll not only understand the theory behind these patterns but also have a working, polyglot (I mean using multiple programming languages) system you can run and experiment with on your own machine.

You can find the project repository here.

Why Do We Need Messaging Patterns?

Imagine you have an e-commerce application. When a user places an order, several things need to happen:

  • The payment needs to be processed.
  • An invoice needs to be generated.
  • The shipping department needs to be notified.
  • An order confirmation email needs to be sent.

If your order service calls each of these other services directly, it has to wait for all of them to finish. If the email service is slow, the user is left waiting. If the invoicing service is down, the entire order might fail. By using a message broker, the order service simply publishes an "OrderPlaced" event. Other services can then listen for this event and do their work independently, without the order service even knowing they exist. This is the power of decoupling.

The Two Contenders: Pub/Sub vs. Queues

In any distributed system, deciding how your services talk to each other can make or break your application’s reliability and scalability. In this project, I’ve embraced two complementary messaging patterns: Publish/Subscribe and Message Queues, each with its own strengths and trade-offs.

Publish/Subscriber Pattern Example Publish/Subscriber Pattern Example. Source: https://www.geeksforgeeks.org/system-design/redis-publish-subscribe/

With the Publish/Subscribe pattern (Pub/Sub), you have what I like to call the “Town Crier” model. Your service simply shouts out an event to a named channel and doesn’t care who’s listening. Behind the scenes, Redis handles this when you call PUBLISH on a channel and other services call SUBSCRIBE to that same channel. Because every subscriber gets a copy of the message, it’s ideal for live features like chat messages, notifications, or real-time dashboards. The catch? It’s truly fire-and-forget. If a subscriber isn’t tuned in at the moment the message goes out, that information is lost forever, just like missing your favorite song on the radio if you weren’t listening when it played.

Message Queue Pattern Example Message Queue Pattern Example. Source: https://medium.com/@anvannguyen/redis-message-queue-rpoplpush-vs-pub-sub-e8a19a3c071b/

On the flip side is the Message Queue, which feels more like the post office. Here, your publisher drops each message at the end of a list with RPUSH, and workers pick them up one by one from the front with BLPOP. Once a worker grabs a message, it’s removed, so no two workers end up doing the same job. This approach is durable. If you have no active workers, the messages simply accumulate until someone comes along to process them and it naturally balances work across multiple consumers. It’s the go to choice for tasks that must not be lost: think sending emails, processing images, or handling orders in the background where you need the guarantee that every message gets handled exactly once.

By combining these two patterns, we get the best of both worlds: Pub/Sub for instant broadcasts and queues for reliable, distributed task processing. Depending on whether you need real-time fan-out or durable work distribution, you can choose the right tool for the job without ever having to couple your services directly to each other.

A Guided Tour of the Project

Let's dive into the code. The entire system is orchestrated by a single docker-compose.yml file, which acts as the conductor for our three services: Redis, the Python publisher, and the Go subscriber.

# docker-compose.yml
version: '3.8'

services:
  redis:
    image: redis:latest
    ports:
      - "6379:6379"
    command: redis-server --appendonly yes
    volumes:
      - redis_data:/data
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 3s
      retries: 3

  python-publisher:
    build: ./python-publisher
    depends_on:
      - redis
    environment:
      - REDIS_HOST=redis
      - REDIS_PORT=6379
      - PATTERN=${PATTERN:-both}
    restart: unless-stopped

  go-subscriber:
    build: ./go-subscriber
    depends_on:
      - redis
    environment:
      - REDIS_HOST=redis
      - REDIS_PORT=6379
      - PATTERN=${PATTERN:-both}
    restart: unless-stopped

volumes:
  redis_data:

This simple file defines our three containers and ensures the publisher and subscriber only start after Redis is ready. Notice the PATTERN environment variable, which lets us easily switch between messaging models.

Step 1: The Announcer - The Python Publisher

The python-publisher service is responsible for creating and sending messages every two seconds. Its logic is neatly encapsulated in the MessagePublisher class in main.py. First, it creates a structured JSON message. Using a consistent format is crucial for interoperability.

# python-publisher/main.py
def create_message(self) -> dict:
    """
    Generates a unique message payload.
    """
    self.message_count += 1
    return {
        "id": str(uuid.uuid4()),
        "timestamp": self._get_timestamp(),
        "message": f"Hello from Python #{self.message_count}",
        "sender": "python-publisher",
        "count": self.message_count
    }

The magic happens in the publish_message method. Based on the PATTERN environment variable, it sends the same message using one or both methods.

# python-publisher/main.py
def publish_message(self, message: dict, method: str = None) -> bool:
    """
    Publishes a message to Redis using the selected method.
    """
    try:
        if method is None:
            method = self.pattern

        message_json = json.dumps(message)

        if method in ['pubsub', 'both']:
            subscribers = self.redis_client.publish('messages', message_json)
            print(f"[{self._get_timestamp()}] Published to channel 'messages' ({subscribers} subscribers): {message['message']}")

        if method in ['queue', 'both']:
            list_length = self.redis_client.rpush('message_queue', message_json)
            print(f"[{self._get_timestamp()}] Pushed to queue 'message_queue' (length: {list_length}): {message['message']}")

        return True

    except redis.RedisError as e:
        print(f"[{self._get_timestamp()}] Failed to publish message: {e}")
        return False

Step 2: The Listener - The Go Subscriber

The go-subscriber service is built for performance and concurrency, making Go an excellent choice for a high-throughput consumer. It's designed to listen for messages from both patterns simultaneously using goroutines (Go's lightweight threads).

Listening for Pub/Sub Messages

The subscribePubSub function connects to the messages channel and simply waits. The pubsub.Channel() provides a Go channel that blocks until a new message arrives.

// go-subscriber/main.go
// subscribePubSub subscribes to a Redis pub-sub channel and listens for messages.
func (s *Subscriber) subscribePubSub() {
	log.Printf("[%s] Starting pub-sub subscriber on channel 'messages'", time.Now().Format(time.RFC3339))

	pubsub := s.client.Subscribe(s.ctx, "messages")
	defer pubsub.Close()

	if _, err := pubsub.Receive(s.ctx); err != nil {
		log.Printf("[%s] Failed to confirm subscription: %v", time.Now().Format(time.RFC3339), err)
		return
	}

	log.Printf("[%s] Successfully subscribed to 'messages' channel", time.Now().Format(time.RFC3339))

	ch := pubsub.Channel()
	for {
		select {
		case msg := <-ch:
			if msg != nil {
				s.parseAndDisplayMessage(msg.Payload, "pub-sub")
			}
		case <-s.ctx.Done():
			log.Printf("[%s] Pub-sub subscriber shutting down", time.Now().Format(time.RFC3339))
			return
		}
	}
}

Listening for Queue Messages

The subscribeQueue function uses a different, more robust strategy. It uses BLPOP, which stands for Blocking List Pop. This command is highly efficient: instead of constantly asking Redis "Is there a message yet?", it tells Redis, "Wake me up when a message is available in message_queue."

// go-subscriber/main.go
// subscribeQueue uses BLPOP to consume messages from a Redis list (queue).
func (s *Subscriber) subscribeQueue() {
	log.Printf("[%s] Starting queue subscriber on list 'message_queue'", time.Now().Format(time.RFC3339))

	for {
		select {
		case <-s.ctx.Done():
			log.Printf("[%s] Queue subscriber shutting down", time.Now().Format(time.RFC3339))
			return
		default:
			result, err := s.client.BLPop(s.ctx, 1*time.Second, "message_queue").Result()
			if err != nil {
				if err == redis.Nil || err == context.Canceled {
					continue
				}
				log.Printf("[%s] Error in BLPOP: %v", time.Now().Format(time.RFC3339), err)
				time.Sleep(1 * time.Second)
				continue
			}

			if len(result) == 2 {
				s.parseAndDisplayMessage(result[1], "queue")
			}
		}
	}
}

The main run function starts both of these listeners in separate goroutines, allowing them to work in parallel.

// go-subscriber/main.go
// run starts one or both message listeners and handles graceful shutdown.
func (s *Subscriber) run() error {
	log.Printf("[%s] Starting Go Subscriber...", time.Now().Format(time.RFC3339))
	log.Printf("[%s] Pattern: %s", time.Now().Format(time.RFC3339), s.pattern)

	// Retry connection attempts
	maxRetries := 5
	for attempt := 1; attempt <= maxRetries; attempt++ {
		if err := s.connect(); err != nil {
			log.Printf("[%s] Connection attempt %d/%d failed: %v",
				time.Now().Format(time.RFC3339), attempt, maxRetries, err)
			if attempt < maxRetries {
				time.Sleep(5 * time.Second)
				continue
			}
			return fmt.Errorf("failed to connect after %d attempts", maxRetries)
		}
		break
	}

	var wg sync.WaitGroup

	if s.pattern == "pubsub" || s.pattern == "both" {
		wg.Add(1)
		go func() {
			defer wg.Done()
			s.subscribePubSub()
		}()
	}

	if s.pattern == "queue" || s.pattern == "both" {
		wg.Add(1)
		go func() {
			defer wg.Done()
			s.subscribeQueue()
		}()
	}

	// Handle shutdown signal
	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
	<-sigCh

	log.Printf("[%s] Received shutdown signal. Shutting down gracefully...", time.Now().Format(time.RFC3339))
	s.cancel()
	wg.Wait()

	log.Printf("[%s] Subscriber stopped. Total messages received: %d", time.Now().Format(time.RFC3339), s.messageCount)

	if err := s.client.Close(); err != nil {
		log.Printf("[%s] Error closing Redis client: %v", time.Now().Format(time.RFC3339), err)
	}

	return nil
}

With Docker and Docker Compose installed, running the entire system is as simple as:

docker-compose up --build

You will see the logs from both services. The Python publisher will announce it's sending messages, and the Go subscriber will print them as they arrive. Since the default PATTERN is both, you'll see each message arrive twice—once from the real-time pub/sub channel and once from the persistent queue.

# Output from the Go Subscriber
go-subscriber-1   | [2025-03-29 08:32:00] [python-publisher] Hello from Python #1 (ID: ...) [Received #1 from pub-sub]
go-subscriber-1   | [2025-03-29 08:32:00] [python-publisher] Hello from Python #1 (ID: ...) [Received #2 from queue]
go-subscriber-1   | [2025-03-29 08:32:02] [python-publisher] Hello from Python #2 (ID: ...) [Received #3 from pub-sub]
go-subscriber-1   | [2025-03-29 08:32:02] [python-publisher] Hello from Python #2 (ID: ...) [Received #4 from queue]

We've successfully built and explored a robust, polyglot messaging system. This project does more than just show you code; it provides a hands-on playground to understand the critical differences between two essential messaging patterns.

When to Use Pub/Sub vs. Queues

  • Use Pub/Sub for real-time, one-to-many broadcasts where losing a message isn't critical.
    Examples: live score updates, chat notifications.

  • Use Queues for reliable, one-to-one task distribution where every message must be processed.
    Examples: order processing, email sending.

What's Next?

This foundation opens up exciting possibilities for building more sophisticated distributed systems. You could extend this project by adding multiple subscribers to demonstrate horizontal scaling, implementing message priorities using Redis sorted sets, or exploring error handling and retry mechanisms for failed messages.

The beauty of this architecture is its flexibility. Whether you're building a real-time analytics dashboard that needs instant updates or a background job processor that handles thousands of tasks per minute, you now have the building blocks to create resilient, scalable messaging systems that can grow with your application's needs.

Try experimenting with the different patterns by setting PATTERN=pubsub or PATTERN=queue when running the system. Watch how the behavior changes, and consider how you might apply these patterns to solve real problems in your own projects.