Skip to content

Process

Base class for active process.

Process

Bases: ABC

Abstract base class for active process.

Source code in src/asimpy/process.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
class Process(ABC):
    """Abstract base class for active process."""

    def __init__(self, env: "Environment", *args: Any, **kwargs: Any):
        """
        Construct new process.

        Args:
            env: simulation environment.
            args: extra constructor arguments passed to `init()`.
        """
        self._env = env
        self._done = False
        self._started = False   # True after the first coro.send(None) call
        self._interrupt = None
        self._current_event = None

        self.init(*args, **kwargs)

        self._coro = self.run()
        self._env.immediate(self._loop)

    def init(self, *args: Any, **kwargs: Any):
        """
        Extra construction after generic setup but before coroutine created.

        Args:
            args: extra constructor arguments passed to `init()`.
            kwargs: extra construct arguments passed to `init()`.
        """
        pass

    @abstractmethod
    def run(self):
        """Implementation of process behavior."""
        pass

    @property
    def now(self):
        """Shortcut to access simulation time."""
        return self._env.now

    def timeout(self, delay: int | float):
        """
        Delay this process for a specified time.

        Args:
            delay: how long to wait.
        """
        return self._env.timeout(delay)

    def interrupt(self, cause: Any):
        """
        Interrupt this process

        Args:
            cause: reason for interrupt.
        """
        if not self._done:
            self._interrupt = Interrupt(cause)
            self._env.immediate(self._loop)

    def _loop(self, value=None) -> None:
        if self._done:
            return

        try:
            self._env.active_process = self

            while True:
                # ── Advance the coroutine ─────────────────────────────────
                if self._interrupt is not None and self._started:
                    # An interrupt is pending and the coroutine has been
                    # started, so it is safe to throw into it.
                    # Cancel the event we are currently awaiting so it
                    # won't fire and call resume() later.
                    if self._current_event is not None:
                        self._current_event.cancel()
                        self._current_event = None
                    exc = self._interrupt
                    self._interrupt = None
                    yielded = self._coro.throw(exc)
                else:
                    # Either no interrupt is pending, or the coroutine has
                    # not been started yet (throwing into an unstarted
                    # coroutine bypasses its try/except blocks).
                    self._started = True
                    yielded = self._coro.send(value)

                # ── Tight loop: skip the heap for already-triggered events ─
                self._current_event = yielded
                v = yielded._value
                if v is not _PENDING and v is not _CANCELLED:
                    if self._interrupt is not None:
                        # Interrupt arrived while in the tight loop.
                        # Loop again: the interrupt branch above will deliver
                        # it via throw into the coroutine's current yield.
                        continue
                    value = v
                    self._current_event = None
                    continue    # resume immediately without a heap round-trip

                # ── Event not yet done: park until it fires ───────────────
                yielded._add_waiter(self.resume)
                break

        except StopIteration:
            self._done = True
            self._current_event = None

        except Exception as exc:
            self._done = True
            self._current_event = None
            raise exc

        finally:
            self._env.active_process = None

    def resume(self, value=None) -> None:
        if not self._done:
            # partial is a C-level callable — cheaper than a Python closure.
            self._env.immediate(partial(self._loop, value))

now property

Shortcut to access simulation time.

__init__(env, *args, **kwargs)

Construct new process.

Parameters:

Name Type Description Default
env Environment

simulation environment.

required
args Any

extra constructor arguments passed to init().

()
Source code in src/asimpy/process.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def __init__(self, env: "Environment", *args: Any, **kwargs: Any):
    """
    Construct new process.

    Args:
        env: simulation environment.
        args: extra constructor arguments passed to `init()`.
    """
    self._env = env
    self._done = False
    self._started = False   # True after the first coro.send(None) call
    self._interrupt = None
    self._current_event = None

    self.init(*args, **kwargs)

    self._coro = self.run()
    self._env.immediate(self._loop)

init(*args, **kwargs)

Extra construction after generic setup but before coroutine created.

Parameters:

Name Type Description Default
args Any

extra constructor arguments passed to init().

()
kwargs Any

extra construct arguments passed to init().

{}
Source code in src/asimpy/process.py
36
37
38
39
40
41
42
43
44
def init(self, *args: Any, **kwargs: Any):
    """
    Extra construction after generic setup but before coroutine created.

    Args:
        args: extra constructor arguments passed to `init()`.
        kwargs: extra construct arguments passed to `init()`.
    """
    pass

interrupt(cause)

Interrupt this process

Parameters:

Name Type Description Default
cause Any

reason for interrupt.

required
Source code in src/asimpy/process.py
65
66
67
68
69
70
71
72
73
74
def interrupt(self, cause: Any):
    """
    Interrupt this process

    Args:
        cause: reason for interrupt.
    """
    if not self._done:
        self._interrupt = Interrupt(cause)
        self._env.immediate(self._loop)

run() abstractmethod

Implementation of process behavior.

Source code in src/asimpy/process.py
46
47
48
49
@abstractmethod
def run(self):
    """Implementation of process behavior."""
    pass

timeout(delay)

Delay this process for a specified time.

Parameters:

Name Type Description Default
delay int | float

how long to wait.

required
Source code in src/asimpy/process.py
56
57
58
59
60
61
62
63
def timeout(self, delay: int | float):
    """
    Delay this process for a specified time.

    Args:
        delay: how long to wait.
    """
    return self._env.timeout(delay)