Skip to content

Producer-Consumer

Source and Output

"""Example: producer-consumer."""

from asimpy import Environment, Process, Queue
from _util import example

PRODUCE_INTERVAL = 5  # ticks between successive items being produced
CONSUME_DURATION = 3  # ticks to process each item
NUM_ITEMS = 3  # total items the producer creates before stopping


class Producer(Process):
    def init(self, queue):
        self._queue = queue

    async def run(self):
        for i in range(NUM_ITEMS):
            await self.timeout(PRODUCE_INTERVAL)
            self._env.log("producer", f"create item {i}")
            await self._queue.put(i)


class Consumer(Process):
    def init(self, queue):
        self._queue = queue

    async def run(self):
        while True:
            self._env.log("consumer", "wait for item")
            item = await self._queue.get()
            self._env.log("consumer", f"start item {item}")
            await self.timeout(CONSUME_DURATION)
            self._env.log("consumer", f"finish item {item}")


def main():
    env = Environment()
    queue = Queue(env)
    Producer(env, queue)
    Consumer(env, queue)
    env.run()
    return env


if __name__ == "__main__":
    example(main)

time name event
0 consumer wait for item
5 producer create item 0
5 consumer start item 0
8 consumer finish item 0
8 consumer wait for item
10 producer create item 1
10 consumer start item 1
13 consumer finish item 1
13 consumer wait for item
15 producer create item 2
15 consumer start item 2
18 consumer finish item 2
18 consumer wait for item

Key Points

  1. Classes derived from Process do initialization in init(), not __init__(), and define their behavior in run().

  2. The Queue has to be created first so that it can be passed as a constructor argument to both Producer and Consumer.

  3. The producer has to await the put() operation because it might block if the queue has limited capacity and is full. This isn't strictly necessary for a queue with infinite capacity, but requiring await for every put() keeps the interface simple.

Check for Understanding

If the queue's capacity was limited to two objects at a time, would the behavior of this program change?