Blog

Pub/Sub & Queues erklärt mit Python, Go und Redis

RedisPythonGoDockerPublishSubscribeMessageQueueAsynchronousMessagingMessageBroker

In moderner Software, besonders mit dem Aufkommen von Microservices, ist es zu einer kritischen Herausforderung geworden, verschiedene Teile Ihrer Anwendung zuverlässig miteinander kommunizieren zu lassen. Sie könnten sie mit direkten API-Aufrufen verketten, aber das schafft ein eng gekoppeltes System, das fragil und schwer skalierbar ist. Wenn ein Service ausfällt, bricht die ganze Kette zusammen.

Ein viel besserer Ansatz ist die Verwendung eines Message Brokers: einer zentralen Drehscheibe, die Ihren Services ermöglicht, asynchron zu kommunizieren und sie voneinander zu entkoppeln. Heute werden wir ein komplettes, containerisiertes Messaging-System von Grund auf erstellen, um zwei fundamentale Muster zu erforschen: Publish/Subscribe und Message Queues.

Wir verwenden einen mächtigen, beliebten Stack:

  • Redis: unser leichtgewichtiger, hochperformanter Message Broker
  • Python: um einen Service zu schreiben, der Nachrichten veröffentlicht (sendet)
  • Go: um einen Service zu schreiben, der Nachrichten abonniert (empfängt)
  • Docker: um unser gesamtes System zu verpacken und mit einem einzigen Befehl auszuführen

Am Ende dieses Posts werden Sie nicht nur die Theorie hinter diesen Mustern verstehen, sondern auch ein funktionierendes, polyglottisches (ich meine mit mehreren Programmiersprachen) System haben, das Sie auf Ihrem eigenen Computer ausführen und damit experimentieren können.

Sie finden das Projekt-Repository hier.

Warum brauchen wir Messaging-Muster?

Stellen Sie sich vor, Sie haben eine E-Commerce-Anwendung. Wenn ein Benutzer eine Bestellung aufgibt, müssen mehrere Dinge passieren:

  • Die Zahlung muss verarbeitet werden.
  • Eine Rechnung muss erstellt werden.
  • Die Versandabteilung muss benachrichtigt werden.
  • Eine Bestellbestätigungs-E-Mail muss gesendet werden.

Wenn Ihr Bestellservice jeden dieser anderen Services direkt aufruft, muss er warten, bis alle fertig sind. Wenn der E-Mail-Service langsam ist, wartet der Benutzer. Wenn der Rechnungsservice nicht verfügbar ist, könnte die gesamte Bestellung fehlschlagen. Durch die Verwendung eines Message Brokers veröffentlicht der Bestellservice einfach ein "OrderPlaced"-Event. Andere Services können dann auf dieses Event hören und ihre Arbeit unabhängig erledigen, ohne dass der Bestellservice überhaupt weiß, dass sie existieren. Das ist die Macht der Entkopplung.

Die zwei Konkurrenten: Pub/Sub vs. Queues

In jedem verteilten System kann die Entscheidung, wie Ihre Services miteinander kommunizieren, die Zuverlässigkeit und Skalierbarkeit Ihrer Anwendung bestimmen. In diesem Projekt habe ich zwei komplementäre Messaging-Muster verwendet: Publish/Subscribe und Message Queues, jedes mit seinen eigenen Stärken und Kompromissen.

Publish/Subscriber Pattern Beispiel Publish/Subscriber Pattern Beispiel. Quelle: https://www.geeksforgeeks.org/system-design/redis-publish-subscribe/

Mit dem Publish/Subscribe-Muster (Pub/Sub) haben Sie das, was ich gerne das "Stadtschreier"-Modell nenne. Ihr Service ruft einfach ein Event in einen benannten Kanal und kümmert sich nicht darum, wer zuhört. Hinter den Kulissen behandelt Redis das, wenn Sie PUBLISH auf einem Kanal aufrufen und andere Services SUBSCRIBE auf demselben Kanal aufrufen. Da jeder Abonnent eine Kopie der Nachricht erhält, ist es ideal für Live-Features wie Chat-Nachrichten, Benachrichtigungen oder Echtzeit-Dashboards. Der Haken? Es ist wirklich "fire-and-forget". Wenn ein Abonnent nicht gerade zuhört, wenn die Nachricht ausgeht, geht diese Information für immer verloren, genau wie wenn Sie Ihr Lieblingslied im Radio verpassen, weil Sie nicht zugehört haben, als es gespielt wurde.

Message Queue Pattern Beispiel Message Queue Pattern Beispiel. Quelle: https://medium.com/@anvannguyen/redis-message-queue-rpoplpush-vs-pub-sub-e8a19a3c071b/

Auf der anderen Seite steht die Message Queue, die sich mehr wie die Post anfühlt. Hier legt Ihr Publisher jede Nachricht am Ende einer Liste mit RPUSH ab, und Worker holen sie einzeln von vorne mit BLPOP ab. Sobald ein Worker eine Nachricht greift, wird sie entfernt, sodass nicht zwei Worker dieselbe Arbeit machen. Dieser Ansatz ist dauerhaft. Wenn Sie keine aktiven Worker haben, sammeln sich die Nachrichten einfach an, bis jemand kommt, um sie zu verarbeiten, und es balanciert natürlich die Arbeit über mehrere Verbraucher aus. Es ist die erste Wahl für Aufgaben, die nicht verloren gehen dürfen: denken Sie an E-Mails senden, Bilder verarbeiten oder Bestellungen im Hintergrund bearbeiten, wo Sie die Garantie brauchen, dass jede Nachricht genau einmal behandelt wird.

Durch die Kombination dieser beiden Muster erhalten wir das Beste aus beiden Welten: Pub/Sub für sofortige Broadcasts und Queues für zuverlässige, verteilte Aufgabenverarbeitung. Je nachdem, ob Sie Echtzeit-Fan-out oder dauerhafte Arbeitsverteilung benötigen, können Sie das richtige Werkzeug für den Job wählen, ohne jemals Ihre Services direkt miteinander koppeln zu müssen.

Eine geführte Tour durch das Projekt

Lassen Sie uns in den Code eintauchen. Das gesamte System wird von einer einzigen docker-compose.yml-Datei orchestriert, die als Dirigent für unsere drei Services fungiert: Redis, den Python-Publisher und den Go-Subscriber.

# docker-compose.yml
version: '3.8'

services:
  redis:
    image: redis:latest
    ports:
      - "6379:6379"
    command: redis-server --appendonly yes
    volumes:
      - redis_data:/data
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 3s
      retries: 3

  python-publisher:
    build: ./python-publisher
    depends_on:
      - redis
    environment:
      - REDIS_HOST=redis
      - REDIS_PORT=6379
      - PATTERN=${PATTERN:-both}
    restart: unless-stopped

  go-subscriber:
    build: ./go-subscriber
    depends_on:
      - redis
    environment:
      - REDIS_HOST=redis
      - REDIS_PORT=6379
      - PATTERN=${PATTERN:-both}
    restart: unless-stopped

volumes:
  redis_data:

Diese einfache Datei definiert unsere drei Container und stellt sicher, dass Publisher und Subscriber erst starten, nachdem Redis bereit ist. Beachten Sie die PATTERN-Umgebungsvariable, die uns ermöglicht, einfach zwischen Messaging-Modellen zu wechseln.

Schritt 1: Der Ansager - Der Python Publisher

Der python-publisher Service ist dafür verantwortlich, alle zwei Sekunden Nachrichten zu erstellen und zu senden. Seine Logik ist sauber in der MessagePublisher-Klasse in main.py gekapselt. Zuerst erstellt er eine strukturierte JSON-Nachricht. Die Verwendung eines konsistenten Formats ist entscheidend für die Interoperabilität.

# python-publisher/main.py
def create_message(self) -> dict:
    """
    Generates a unique message payload.
    """
    self.message_count += 1
    return {
        "id": str(uuid.uuid4()),
        "timestamp": self._get_timestamp(),
        "message": f"Hello from Python #{self.message_count}",
        "sender": "python-publisher",
        "count": self.message_count
    }

Die Magie passiert in der publish_message-Methode. Basierend auf der PATTERN-Umgebungsvariable sendet sie dieselbe Nachricht mit einer oder beiden Methoden.

# python-publisher/main.py
def publish_message(self, message: dict, method: str = None) -> bool:
    """
    Publishes a message to Redis using the selected method.
    """
    try:
        if method is None:
            method = self.pattern

        message_json = json.dumps(message)

        if method in ['pubsub', 'both']:
            subscribers = self.redis_client.publish('messages', message_json)
            print(f"[{self._get_timestamp()}] Published to channel 'messages' ({subscribers} subscribers): {message['message']}")

        if method in ['queue', 'both']:
            list_length = self.redis_client.rpush('message_queue', message_json)
            print(f"[{self._get_timestamp()}] Pushed to queue 'message_queue' (length: {list_length}): {message['message']}")

        return True

    except redis.RedisError as e:
        print(f"[{self._get_timestamp()}] Failed to publish message: {e}")
        return False

Schritt 2: Der Zuhörer - Der Go Subscriber

Der go-subscriber Service ist auf Performance und Nebenläufigkeit ausgelegt, was Go zu einer ausgezeichneten Wahl für einen hochdurchsatz-fähigen Consumer macht. Er ist darauf ausgelegt, gleichzeitig auf Nachrichten von beiden Mustern zu hören, unter Verwendung von Goroutinen (Go's leichtgewichtigen Threads).

Auf Pub/Sub-Nachrichten hören

Die subscribePubSub-Funktion verbindet sich mit dem messages-Kanal und wartet einfach. Das pubsub.Channel() stellt einen Go-Kanal zur Verfügung, der blockiert, bis eine neue Nachricht ankommt.

// go-subscriber/main.go
// subscribePubSub subscribes to a Redis pub-sub channel and listens for messages.
func (s *Subscriber) subscribePubSub() {
	log.Printf("[%s] Starting pub-sub subscriber on channel 'messages'", time.Now().Format(time.RFC3339))

	pubsub := s.client.Subscribe(s.ctx, "messages")
	defer pubsub.Close()

	if _, err := pubsub.Receive(s.ctx); err \!= nil {
		log.Printf("[%s] Failed to confirm subscription: %v", time.Now().Format(time.RFC3339), err)
		return
	}

	log.Printf("[%s] Successfully subscribed to 'messages' channel", time.Now().Format(time.RFC3339))

	ch := pubsub.Channel()
	for {
		select {
		case msg := <-ch:
			if msg \!= nil {
				s.parseAndDisplayMessage(msg.Payload, "pub-sub")
			}
		case <-s.ctx.Done():
			log.Printf("[%s] Pub-sub subscriber shutting down", time.Now().Format(time.RFC3339))
			return
		}
	}
}

Auf Queue-Nachrichten hören

Die subscribeQueue-Funktion verwendet eine andere, robustere Strategie. Sie verwendet BLPOP, was für Blocking List Pop steht. Dieser Befehl ist hocheffizient: Anstatt Redis ständig zu fragen "Ist schon eine Nachricht da?", sagt er Redis: "Weck mich auf, wenn eine Nachricht in message_queue verfügbar ist."

// go-subscriber/main.go
// subscribeQueue uses BLPOP to consume messages from a Redis list (queue).
func (s *Subscriber) subscribeQueue() {
	log.Printf("[%s] Starting queue subscriber on list 'message_queue'", time.Now().Format(time.RFC3339))

	for {
		select {
		case <-s.ctx.Done():
			log.Printf("[%s] Queue subscriber shutting down", time.Now().Format(time.RFC3339))
			return
		default:
			result, err := s.client.BLPop(s.ctx, 1*time.Second, "message_queue").Result()
			if err \!= nil {
				if err == redis.Nil || err == context.Canceled {
					continue
				}
				log.Printf("[%s] Error in BLPOP: %v", time.Now().Format(time.RFC3339), err)
				time.Sleep(1 * time.Second)
				continue
			}

			if len(result) == 2 {
				s.parseAndDisplayMessage(result[1], "queue")
			}
		}
	}
}

Die Haupt-run-Funktion startet beide Listener in separaten Goroutinen, wodurch sie parallel arbeiten können.

// go-subscriber/main.go
// run starts one or both message listeners and handles graceful shutdown.
func (s *Subscriber) run() error {
	log.Printf("[%s] Starting Go Subscriber...", time.Now().Format(time.RFC3339))
	log.Printf("[%s] Pattern: %s", time.Now().Format(time.RFC3339), s.pattern)

	// Retry connection attempts
	maxRetries := 5
	for attempt := 1; attempt <= maxRetries; attempt++ {
		if err := s.connect(); err \!= nil {
			log.Printf("[%s] Connection attempt %d/%d failed: %v",
				time.Now().Format(time.RFC3339), attempt, maxRetries, err)
			if attempt < maxRetries {
				time.Sleep(5 * time.Second)
				continue
			}
			return fmt.Errorf("failed to connect after %d attempts", maxRetries)
		}
		break
	}

	var wg sync.WaitGroup

	if s.pattern == "pubsub" || s.pattern == "both" {
		wg.Add(1)
		go func() {
			defer wg.Done()
			s.subscribePubSub()
		}()
	}

	if s.pattern == "queue" || s.pattern == "both" {
		wg.Add(1)
		go func() {
			defer wg.Done()
			s.subscribeQueue()
		}()
	}

	// Handle shutdown signal
	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
	<-sigCh

	log.Printf("[%s] Received shutdown signal. Shutting down gracefully...", time.Now().Format(time.RFC3339))
	s.cancel()
	wg.Wait()

	log.Printf("[%s] Subscriber stopped. Total messages received: %d", time.Now().Format(time.RFC3339), s.messageCount)

	if err := s.client.Close(); err \!= nil {
		log.Printf("[%s] Error closing Redis client: %v", time.Now().Format(time.RFC3339), err)
	}

	return nil
}

Mit installiertem Docker und Docker Compose ist das Ausführen des gesamten Systems so einfach wie:

docker-compose up --build

Sie werden die Logs von beiden Services sehen. Der Python-Publisher wird ankündigen, dass er Nachrichten sendet, und der Go-Subscriber wird sie ausgeben, wenn sie ankommen. Da das Standard-PATTERN "both" ist, sehen Sie jede Nachricht zweimal ankommen - einmal vom Echtzeit-Pub/Sub-Kanal und einmal von der persistenten Queue.

# Output from the Go Subscriber
go-subscriber-1   | [2025-03-29 08:32:00] [python-publisher] Hello from Python #1 (ID: ...) [Received #1 from pub-sub]
go-subscriber-1   | [2025-03-29 08:32:00] [python-publisher] Hello from Python #1 (ID: ...) [Received #2 from queue]
go-subscriber-1   | [2025-03-29 08:32:02] [python-publisher] Hello from Python #2 (ID: ...) [Received #3 from pub-sub]
go-subscriber-1   | [2025-03-29 08:32:02] [python-publisher] Hello from Python #2 (ID: ...) [Received #4 from queue]

Wir haben erfolgreich ein robustes, polyglottisches Messaging-System gebaut und erkundet. Dieses Projekt zeigt Ihnen nicht nur Code; es bietet einen praktischen Spielplatz, um die kritischen Unterschiede zwischen zwei wesentlichen Messaging-Mustern zu verstehen.

Wann Pub/Sub vs. Queues verwenden

  • Verwenden Sie Pub/Sub für Echtzeit-, Eins-zu-Viele-Broadcasts, wo der Verlust einer Nachricht nicht kritisch ist.
    Beispiele: Live-Score-Updates, Chat-Benachrichtigungen.

  • Verwenden Sie Queues für zuverlässige, Eins-zu-Eins-Aufgabenverteilung, wo jede Nachricht verarbeitet werden muss.
    Beispiele: Bestellverarbeitung, E-Mail-Versand.

Wie geht es weiter?

Diese Grundlage eröffnet aufregende Möglichkeiten für den Aufbau anspruchsvollerer verteilter Systeme. Sie könnten dieses Projekt erweitern, indem Sie mehrere Subscriber hinzufügen, um horizontale Skalierung zu demonstrieren, Nachrichtenprioritäten mit Redis Sorted Sets implementieren oder Fehlerbehandlung und Retry-Mechanismen für fehlgeschlagene Nachrichten erforschen.

Die Schönheit dieser Architektur liegt in ihrer Flexibilität. Ob Sie ein Echtzeit-Analytics-Dashboard erstellen, das sofortige Updates benötigt, oder einen Hintergrund-Job-Prozessor, der Tausende von Aufgaben pro Minute bewältigt - Sie haben jetzt die Bausteine, um widerstandsfähige, skalierbare Messaging-Systeme zu erstellen, die mit den Anforderungen Ihrer Anwendung wachsen können.

Versuchen Sie, mit den verschiedenen Mustern zu experimentieren, indem Sie PATTERN=pubsub oder PATTERN=queue beim Ausführen des Systems setzen. Beobachten Sie, wie sich das Verhalten ändert, und überlegen Sie, wie Sie diese Muster anwenden könnten, um reale Probleme in Ihren eigenen Projekten zu lösen. EOF < /dev/null