A Message Queue with Publish-Subscribe

Real-world distributed systems need a way for components to communicate without being tightly coupled. When a web server processes an order, it might need to notify the inventory system, trigger an email, update analytics, and log the transaction. If the web server called each of these services directly, a failure in any one would block the entire operation. This is where message queues come in.

Systems like RabbitMQ, Apache Kafka, and Amazon SQS act as intermediaries that decouple message producers from consumers. A publisher sends messages to a named topic without knowing who (if anyone) will receive them. Subscribers express interest in topics and receive messages asynchronously, processing them at their own pace. This pattern is fundamental to event-driven architectures used throughout the industry—from LinkedIn's data pipeline that processes billions of events daily, to Netflix's recommendation engine that reacts to viewing patterns, to real-time analytics platforms that aggregate clickstream data.

Understanding the Publish-Subscribe Pattern

In the publish-subscribe pattern, publishers send messages to topics (sometimes called channels or exchanges). The message broker maintains subscriptions—mappings from topics to interested consumers. When a message arrives, the broker delivers it to all current subscribers of that topic. This is called fan-out: one message can reach many consumers.

The pattern provides several crucial benefits. First, publishers and subscribers don't need to know about each other—they only share knowledge of topic names. Second, the system can scale independently: you can add more publishers or subscribers without modifying existing code. Third, the broker provides buffering: if consumers are slow or temporarily unavailable, messages wait in queues rather than being lost.

Our Implementation

We'll build a message queue system using asimpy, a discrete event simulation framework based on Python's async/await syntax. Asimpy lets us model concurrent systems using coroutines without dealing with actual threads or network connections. This makes the code simpler and deterministic—perfect for understanding the core concepts.

Our system has three main components: publishers that send messages, a broker that routes messages to topics, and subscribers that receive and process messages. Let's start with the message broker itself:

@dataclass
class Message:
    """A message sent through the queue system."""
    topic: str
    content: str
    id: int
    timestamp: float

The Message class represents data flowing through our system. Each message has a topic (like "orders" or "user-activity"), content (the actual data), a unique ID, and a timestamp. In a real system, messages would contain rich structured data, but strings are sufficient for our example.

class MessageBroker:
    """A message broker that routes messages to topic subscribers."""

    def __init__(self, env: Environment, buffer_size: int = 100):
        self.env = env
        self.buffer_size = buffer_size
        # Topics map to lists of subscriber queues
        self.topics: Dict[str, List[Queue]] = defaultdict(list)
        # Statistics for observability
        self.messages_published = 0
        self.messages_delivered = 0

    def subscribe(self, topic: str) -> Queue:
        """Create a queue for a subscriber to a topic."""
        queue = Queue(self.env)
        self.topics[topic].append(queue)
        return queue

    async def publish(self, message: Message):
        """Publish a message to all subscribers of its topic."""
        self.messages_published += 1

        # Find all subscriber queues for this topic
        queues = self.topics.get(message.topic, [])

        if not queues:
            print(f"[{self.env.now:.1f}] No subscribers for topic '{message.topic}'")
            return

        # Deliver to each subscriber's queue
        for queue in queues:
            await queue.put(message)
            self.messages_delivered += 1

The broker maintains a dictionary mapping topics to lists of queues. When a message is published, the broker looks up the topic and places the message in each subscriber's queue. Using separate queues per subscriber ensures that a slow consumer doesn't block others—this is a key property of the pattern.

Unlike many message queue implementations that would drop messages when queues fill up, our asimpy queues grow unbounded. In a real system, you'd want to enforce limits and implement backpressure or message dropping policies. We'll discuss delivery semantics later.

Now let's implement publishers. A publisher sends messages to topics at some rate:

class Publisher(Process):
    """Publishes messages to topics."""

    def init(self, broker: MessageBroker, name: str, topic: str, interval: float):
        self.broker = broker
        self.name = name
        self.topic = topic
        self.interval = interval
        self.message_counter = 0

    async def run(self):
        """Generate and publish messages."""
        while True:
            # Create a message
            self.message_counter += 1
            message = Message(
                topic=self.topic,
                content=f"Message {self.message_counter} from {self.name}",
                id=self.message_counter,
                timestamp=self.now
            )

            # Publish it
            print(f"[{self.now:.1f}] {self.name} publishing: {message.content}")
            await self.broker.publish(message)

            # Wait before next message
            await self.timeout(self.interval)

This publisher sends messages at regular intervals. Real publishers would react to external events (like HTTP requests or database changes), but timed generation works well for simulation. The await self.timeout() pauses this process and resumes after the specified time.

Notice that we inherit from Process, which is asimpy's base class for active components. The init() method is called during construction to set up our state, and run() is the coroutine that defines the publisher's behavior.

Finally, subscribers receive and process messages:

class Subscriber(Process):
    """Subscribes to topics and processes messages."""

    def init(self, broker: MessageBroker, name: str, topics: List[str], 
             processing_time: float):
        self.broker = broker
        self.name = name
        self.topics = topics
        self.processing_time = processing_time
        self.messages_received = 0

        # Subscribe to all topics and get a queue for each
        self.queues = {}
        for topic in topics:
            queue = broker.subscribe(topic)
            self.queues[topic] = queue

    async def run(self):
        """Process messages from subscribed topics."""
        while True:
            # Wait for a message from any queue
            # Build a dict of topic -> get() coroutines
            get_operations = {
                topic: queue.get() 
                for topic, queue in self.queues.items()
            }

            # Wait for the first one to complete
            topic, message = await FirstOf(self._env, **get_operations)

            self.messages_received += 1
            latency = self.now - message.timestamp

            print(f"[{self.now:.1f}] {self.name} received from '{topic}': "
                  f"{message.content} (latency: {latency:.1f})")

            # Simulate processing time
            await self.timeout(self.processing_time)

The subscriber uses asimpy's FirstOf to wait on multiple queues simultaneously—whichever queue has a message first will complete. This is more elegant than round-robin polling. Real implementations use event-driven APIs or threads, but FirstOf captures the same semantics: we wait for any subscribed topic to produce a message.

The key point is that processing happens asynchronously: the subscriber takes messages from its queues and processes them at its own pace, independently of the publishers and other subscribers.

Running a Simulation

Let's create a scenario with multiple publishers and subscribers to see the system in action:

def run_simulation():
    """Run a simulation of the message queue system."""
    env = Environment()
    broker = MessageBroker(env, buffer_size=10)

    # Create publishers
    order_pub = Publisher(env, broker, "OrderService", "orders", interval=2.0)
    user_pub = Publisher(env, broker, "UserService", "user-activity", interval=1.5)

    # Create subscribers
    # Fast subscriber handling orders
    inventory = Subscriber(env, broker, "Inventory", ["orders"], 
                          processing_time=0.5)

    # Slow subscriber handling orders
    email = Subscriber(env, broker, "Email", ["orders"], 
                      processing_time=3.0)

    # Subscriber handling multiple topics
    analytics = Subscriber(env, broker, "Analytics", 
                          ["orders", "user-activity"], 
                          processing_time=1.0)

    # Run simulation
    env.run(until=20)

    # Print statistics
    print("\n=== Statistics ===")
    print(f"Messages published: {broker.messages_published}")
    print(f"Messages delivered: {broker.messages_delivered}")
    print(f"Inventory received: {inventory.messages_received}")
    print(f"Email received: {email.messages_received}")
    print(f"Analytics received: {analytics.messages_received}")

if __name__ == "__main__":
    run_simulation()

When you run this code, you'll see messages being published and consumed asynchronously. Notice how the fast Inventory service keeps up with orders, while the slow Email service falls behind. Messages queue up waiting for processing—this is the buffering we mentioned earlier.

The Analytics service receives messages from multiple topics, demonstrating how subscribers can aggregate different event streams. This is common in real systems: a data warehouse might subscribe to dozens of topics to build a complete picture of system activity.

Delivery Guarantees

Our implementation provides unbounded queuing, which means messages are never dropped (assuming infinite memory). This is closer to "at-least-once" delivery, though we haven't implemented acknowledgments or redelivery on failure. Let's discuss the spectrum of delivery guarantees:

At-most-once delivery ensures that messages are delivered zero or one time—never duplicated, but possibly lost. This is achieved by dropping messages when queues are full or when subscribers are unavailable. It's the weakest guarantee but the simplest to implement and the fastest.

At-least-once delivery ensures every message is delivered, possibly multiple times. This requires acknowledgments: the broker keeps messages until subscribers confirm receipt. If a subscriber crashes before acknowledging, the broker redelivers to another subscriber or retries. Kafka and RabbitMQ support this mode.

Exactly-once delivery is the strongest guarantee: each message is processed exactly once. This is surprisingly difficult in distributed systems due to failures and network issues. Kafka achieves this through idempotent producers and transactional consumers—essentially assigning each message a unique ID and having consumers track which IDs they've processed.

Here's how we could extend our broker to support at-least-once delivery with acknowledgments:

class AckMessage(Message):
    """Message that requires acknowledgment."""
    ack_id: int = 0

class AckBroker(MessageBroker):
    """Broker with acknowledgment support."""

    def __init__(self, env: Environment, buffer_size: int = 100, 
                 ack_timeout: float = 10.0):
        super().__init__(env, buffer_size)
        self.ack_timeout = ack_timeout
        self.pending_acks = {}  # ack_id -> (message, timestamp, queue)
        self.next_ack_id = 0

    async def publish(self, message: Message):
        """Publish with ack tracking."""
        queues = self.topics.get(message.topic, [])

        for queue in queues:
            ack_id = self.next_ack_id
            self.next_ack_id += 1

            ack_msg = AckMessage(
                topic=message.topic,
                content=message.content,
                id=message.id,
                timestamp=message.timestamp,
                ack_id=ack_id
            )

            self.pending_acks[ack_id] = (ack_msg, self.env.now, queue)
            await queue.put(ack_msg)

            # Schedule redelivery if not acked
            self.env.schedule(
                self.env.now + self.ack_timeout,
                lambda aid=ack_id: self._check_ack(aid)
            )

    def acknowledge(self, ack_id: int):
        """Acknowledge receipt of a message."""
        if ack_id in self.pending_acks:
            del self.pending_acks[ack_id]

    async def _check_ack(self, ack_id: int):
        """Redeliver if not acknowledged."""
        if ack_id in self.pending_acks:
            msg, original_time, queue = self.pending_acks[ack_id]
            print(f"[{self.env.now:.1f}] Redelivering {msg.content} "
                  f"(ack_id {ack_id})")
            await queue.put(msg)

A subscriber using this broker would call broker.acknowledge(message.ack_id) after successfully processing each message. Messages not acknowledged within the timeout would be redelivered.

Consumer Groups and Load Balancing

In production systems, we often want multiple instances of the same subscriber type to share the workload. This is called a consumer group: messages on a topic are distributed among group members rather than duplicated to each. Here's a simple implementation:

class ConsumerGroup:
    """Distribute messages among multiple consumers."""

    def __init__(self, env: Environment, broker: MessageBroker, 
                 topic: str, num_consumers: int):
        self.env = env
        self.queue = broker.subscribe(topic)
        self.consumers = []

        # Create consumer queues for load balancing
        for i in range(num_consumers):
            consumer_queue = Queue(env)
            self.consumers.append(consumer_queue)

        # Start distributor process
        self._distributor = _Distributor(env, self.queue, self.consumers)

    def get_consumer_queue(self, index: int) -> Queue:
        """Get queue for a specific consumer in the group."""
        return self.consumers[index]

class _Distributor(Process):
    """Distribute messages round-robin to consumers."""

    def init(self, source: Queue, destinations: List[Queue]):
        self.source = source
        self.destinations = destinations
        self.next_dest = 0

    async def run(self):
        """Forward messages to consumers in round-robin order."""
        while True:
            message = await self.source.get()
            dest = self.destinations[self.next_dest]
            await dest.put(message)
            self.next_dest = (self.next_dest + 1) % len(self.destinations)

This consumer group receives messages from the broker on a single queue, then distributes them round-robin to individual consumer queues. Each consumer in the group processes a subset of the messages, enabling parallel processing. Real systems use more sophisticated load balancing—weighted distribution, least-loaded routing, or partition-based assignment.

Conclusion

The publish-subscribe pattern decouples system components, enabling independent scaling and evolution. By routing messages through a broker, we gain buffering, fan-out, and fault tolerance. The pattern appears throughout modern architectures: microservices use it for inter-service communication, frontend applications use it for real-time updates, and data pipelines use it for stream processing.

The code we've written captures the essential ideas: topics, subscriptions, asynchronous delivery, and queuing. We've seen how asimpy's async/await syntax makes concurrent behavior natural to express, and how FirstOf enables waiting on multiple message sources simultaneously. Real systems add persistence (writing messages to disk), replication (for fault tolerance), partitioning (for parallelism), and sophisticated delivery semantics. But the core pattern remains the same: publishers and subscribers communicate through topics, with a broker managing the complexity in between.