asimpy
A simple discrete event simulation framework in Python using async/await.
Thanks to the creators of SimPy for inspiration.
Core Concepts
Discrete event simulation (DES) simulates systems in which events occur at discrete points in time. The simulation maintains a virtual clock and executes events in chronological order. Unlike real-time systems, the simulation jumps directly from one event time to the next, skipping empty intervals. (Time steps are often referred to as "ticks".)
Async/Await
Python's async/await syntax enables cooperative multitasking without threads.
Functions defined as async def return coroutine objects when called.
These coroutines can be paused at await points and later resumed.
More specifically,
when a coroutine executes value = await expr, it:
- yields the awaited object
exprto its caller; - suspends execution at that point;
- resumes later when
send(value)is called on it; an thend - returns the value passed to
send()as the result of theawaitexpression inside the resumed coroutine.
asimpy uses this mechanism to pause and resume coroutines to simulate simultaneously execution.
This is similar to the yield-based mechanism used in SimPy.
Environment: Process and Event Management
The Environment class maintains the simulation state:
_nowis the current simulated time._pendingis a priority queue of callbacks waiting to be run in order of increasing time (so that the next one to run is at the front of the queue).
Environment.schedule(time, callback) adds a callback to the queue.
The _Pending dataclass used to store it includes a serial number
to ensure deterministic ordering when multiple events occur at the same time.
Environment.run() implements the main simulation loop:
- Extract the next pending event from the priority queue.
- If an
untilparameter is specified and the event time exceeds it, stop. - Execute the callback.
- If the callback doesn't return
NO_TIMEand the event time is greater than the current simulated time, advance the clock.
The NO_TIME sentinel prevents time from advancing mistakenly when events are canceled.
This is explained in detail later.
Event: the Synchronization Primitive
The Event class represents an action that will complete in the future.
It has four members:
_triggeredindicates whether the event has completed._cancelledindicaets whether the event was cancelled._valueis the event's result value._waitersis a list of processes waiting for this event to occur.
When Event.succeed(value) is called, it:
- sets
_triggeredtoTrueto show that the event has completed; - stores the value for later retrieval;
- calls
resume(value)on all waiting processes; and - clears the list of waiting processes.
The internal Event._add_waiter(proc) method handles three cases:
- If the event has already completed (i.e., if
_triggeredisTrue), it immediately callsproc.resume(value). - If the event has been canceled, it does nothing.
- Otherwise, it adds
procto the list of waiting processes.
Finally,
Event implements __await__(),
which Python calls automatically when it executes await evt.
Event.__await__ yields self so that the awaiting process gets the event back.
Process: Active Entities
Process is the base class for simulation processes.
(Unlike SimPy, asimpy uses a class rather than bare coroutines.)
When a Process is constructed, it:
- store a reference to the simulation environment;
- calls
init()for subclass-specific setup (the default implementation of this method does nothing); - create a coroutine by calling
run(); and - schedules immediate execution of
Process._loop().
The _loop() method drives coroutine execution:
- If an interrupt is pending, throw it into the coroutine via
throw(). - Otherwise, send the value into the coroutine via
send(). - Receive the yielded event.
- Register this process as a waiter on that event
When StopIteration is raised by the coroutine,
the process is marked as done.
If any other exception occurs,
the process is marked as done and the exception is re-raised.
Note: The word "process" can be confusing. These are not operating system processes with their own memory and permissions.
A Note on Scheduling
When an event completes it calls proc.resume(value) to schedules another iteration of _loop()
with the provided value.
This continues the coroutine past its await point.
A Note on Interrupts
The interrupt mechanism sets _interrupt and schedules immediate execution of the process.
The next _loop() iteration throws the interrupt into the coroutine,
where it can be caught with try/except.
This is a bit clumsy,
but is the only way to inject exceptions into running coroutines.
Note:
A process can only be interrupted at an await point.
Exceptions cannot be raised from the outside at arbitrary points.
Timeout: Waiting Until
A Timeout object schedules a callback at a future time.
Timeout._fire() method returns NO_TIME if the timeout has ben canceled,
which prevents canceled timeouts from accidentally advancing the simulation time.
Otherwise,
Timeout._fire() calls succeed() to trigger the event.
Queue and PriorityQueue: Exchanging Data
Queue enables processes to exchange data.
It has two members:
_itemsis a list of items being passed between processes._gettersis a list of processes waiting for items.
The invariant for Queue is that one or the other list must be empty,
i.e.,
if there are processes waiting then there aren't any items to take,
while if there are items waiting to be taken there aren't any waiting processes.
Queue.put(item) immediately calls evt.succeed(item) if a process is waiting
to pass that item to the waiting process
(which is stored in the event).
Otherwise,
the item is appended to queue._items.
Queue.get() is a bit more complicated.
If the queue has items,
queue.get() creates an event that immediately succeeds with the first item.
If the queue is empty,
the call creates an event and adds the caller to the list of processes waiting to get items.
The complication is that if there is an item to get,
queue.get() sets the _on_cancel callback of the event to handles cancellation
by returning the item taken to the front of the queue.
PriorityQueue uses heapq operations to maintain ordering,
which means items must be comparable (i.e., must implement __lt__).
get() pops the minimum element;
put() pushes onto the heap and potentially satisfies a waiting getter.
Resource: Capacity-Limited Sharing
The Resource class simulates a shared resource with limited capacity.
It has three members:
capacityis the maximum number of concurrent users._countis the current number of users._waitersis a list of processes waiting for the resource to be available.
If the resource is below capacity when res.acquire() is called,
it calls increments the internal count and immediately succeeds.
Otherwise,
it adds the caller to the list of waiting processes.
Similarly,
res.release() decrements the count and then checks the list of waiting processes.
If there are any,
it calls evt.succeed() for the event representing the first waiting process.
Resource.acquire depends on internal methods
Resource._acquire_available and Resource._acquire_unavailable,
both of which set the _on_cancel callback of the event they create
to restore the counter to its original state
or remove the event marking a waiting process.
Finally,
the context manager protocol methods __aenter__ and __aexit__
allows processes to use async with res
to acquire and release a resource in a block.
Barrier: Synchronizing Multiple Processes
A Barrier holds multiple processes until they are explicitly released,
i.e.,
it allows the simulation to synchronize multiple processes.
wait()creates an event and adds it to the list of waiters.release()callssucceed()on all waiting events and clears the list.
AllOf: Waiting for Multiple Events
AllOf and FirstOf are the most complicated parts of asimpy,
and the reason that parts such as cancellation management exist.
AllOf succeeds when all provided events complete.
It:
- converts each input to an event (discussed later);
- registers an
_AllOfWatcheron each of those events; - accumulates results in
_resultsdictionary; and - succeeds when all results collected.
Each watcher calls _child_done(key, value) when its event completes.
This stores the result and checks if all events are done.
A Note on Interface
A process calls AllOf like this:
await AllOf(self._env, a=self.timeout(5), b=self.timeout(10))
The eventual result is a dictionary in which
the name of the events are keys and the results of the events are values;
in this case,
the keys will be "a" and "b".
This gives callers an easy way to keep track of events,
though it doesn't support waiting on all events in a list.
AllOf's interface would be tidier
if it didn't require the simulation environment as its first argument.
However,
removing it made the implementation significantly more complicated.
FirstOf: Racing Multiple Events
FirstOf succeeds as soon as any of the provided events succeeds,
and then cancels all of the other events.
To do this, it:
- converts each input to an event;
- registers a
_FirstOfWatcheron each; - on first completion, cancels all other events; and
- succeeds with a
(key, value)to identify the winning event.
FirstOf's _done flag prevents multiple completions.
When _child_done() is called,
it checks this flag,
cancels other waiters,
and succeeds.
Control Flow Example
Consider a process that waits 5 ticks:
class Waiter(Process):
async def run(self):
await self.timeout(5)
print("done")
When it executes:
- Construction calls
__init__(), which creates a coroutine by callingrun()and immediately schedules_loop(). - The first
_loop()callssend(None)to the coroutine, which executes to theawaitand yields aTimeoutevent. _loop()registers this process as a waiter on the timeout event.- The timeout schedules a callback to run at time 5.
- The environment takes the event from its
_pendingqueue and updates the simulated time to 5. - The environment runs the callback, which calls
succeed()on the timeout. - The timeout calls
resume()on the process. resume()schedules an immediate call to_loop()with the valueNone._loop()callssend(None)on the coroutine, causing it to advance past theawait.- The process prints
"done"and raises aStopIterationexception. - The process is marked as done.
- Since there are no other events in the pending queue, the environment ends the simulation.
A Note on Coroutine Adaptation
The ensure_event() function handles both Event objects and bare coroutines.
For coroutines, it creates a _Runner process that awaits the coroutine
and then calls succeed() on an event with the result.
This allows AllOf and FirstOf to accept both events and coroutines.
AllOf and FirstOf must accept coroutines in addition to events
because of the way Python's async/await syntax works
and what users naturally write.
In the statement:
await AllOf(env, a=queue.get(), b=resource.acquire())
the expressions queue.get() and resource.acquire() are calls to async def functions.
In Python,
calling an async function *does not execute it.
Instead, it returns a coroutine object.
If AllOf couldn't accept coroutines directly,
this code would fail because it expects Events.
If AllOf only accepted events, users would need to write:
# Manually create events
evt_a = Event(env)
evt_b = Event(env)
# Manually create runners
_Runner(env, evt_a, queue.get())
_Runner(env, evt_b, resource.acquire())
# Now use the events
await AllOf(env, a=evt_a, b=evt_b)
This is verbose and exposes internal implementation details.
Things I Learned the Hard Way
Requirements for Correctness
Event waiter notification must occur before clearing the list.
: If the list were cleared first, waiters couldn't be resumed.
The _Pending serial number is necessary.
: Heap operations require total ordering.
Without this value,
events occurring at the same time wouldn't be deterministically ordered,
which would make simulations irreproducible.
Cancelled events must not advance time.
: The NO_TIME sentinel prevents this.
Without it,
cancelled timeouts create gaps in the simulation timeline.
Process interrupt checking must occur before coroutine sends. : This ensures interrupts are handled immediately rather than being delayed until the next event.
Queue cancellation handlers must remove items or waiters.
: Without this,
cancelled gets leave processes in the waiters list indefinitely,
and cancelled items disappear from the queue.
Resource cancellation handlers must adjust state.
: Without them,
cancelled acquires permanently reduce available capacity or leave ghost waiters.
AllOf must track completion.
: Without checking if all events are done, it succeeds prematurely.
FirstOf must cancel losing events.
: Otherwise,
those events remain active and can run later.
Why Not Just Use Coroutines?
SimPy uses bare coroutines.
asimpy uses Event as the internal primitive for several reasons.
Events can be triggered externally.
: A Timeout schedules a callback that later calls succeed().
A coroutine cannot be "succeeded" from outside: it must run to completion.
Events support multiple waiters.
: Multiple processes can await the same event.
A coroutine can only be awaited once.
Events decouple triggering from waiting.
: The thing that creates an event (like Timeout.__init__())
is separate from the thing that waits for it.
With coroutines, creation and execution are more tightly coupled.
Event.__await__
Event.__await__ is defined as:
def __await__(self):
value = yield self
return value
This appears redundant but each part serves a specific purpose in the coroutine protocol.
When a coroutine executes await event,
Python calls event.__await__(),
which must return an iterator.
The yield self statement:
- makes
__await__()a generator function, so it returns a generator (which is a kind of iterator). - Yields the
Eventobject itself up to theProcess's_loop()method.
The Process needs the Event object so it can call _add_waiter() on it:
def _loop(self, value=None):
# ...
yielded = self._coro.send(value) # This receives the Event
yielded._add_waiter(self) # Register as waiter
Without yield self, the Process wouldn't know which event to register on.
The value = yield self statement captures what gets sent back into the generator.
When the event completes:
Eventcallsproc.resume(value).Processcallsself._loop(value)._loopcallsself._coro.send(value).- This resumes the generator, making
yield selfreturnvalue.
The assignment therefore captures the event's result value.
Why Return Value
The return value statement makes that result available to the code that wrote await event.
When a generator returns (via return or falling off the end)
Python raises StopIteration with the return value as an attribute.
The async/await machinery extracts this and provides it as the result of the await expression,
So when a user writes:
result = await queue.get()
the flow is:
queue.get()creates and returns anEvent.awaitcallsEvent.__await__()which yields theEventobject.Process._loop()receives theEventand registers itself as a waiter.- Later, the queue calls
event.succeed(item). Eventcallsprocess.resume(item).Processcallscoro.send(item).- The generator resumes, and
yield selfevaluates toitem. - The generator executes
return item. StopIteration(item)is raised.- The
asyncmachinery catches this and makesawaitevaluate toitem.
None of the simpler alternatives would work:
yield selfalone (no return): the await expression would evaluate toNone.return self(no yield): not a generator, so it violates the iterator protocol.yield valuethenreturn value: the first yield wouldn't provide theEventobject to theProcess.