Skip to content

Simqueue

FIFO and priority queues.

PriorityQueue

Bases: Queue

Priority queue: items are dequeued in sorted (ascending) order.

Source code in src/asimpy/simqueue.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
class PriorityQueue(Queue):
    """Priority queue: items are dequeued in sorted (ascending) order."""

    def __init__(
        self,
        env: "Environment",
        max_capacity: int | None = None,
    ):
        """
        Construct priority queue.

        Args:
            env: simulation environment.
            max_capacity: maximum queue capacity (None for unlimited).

        Raises:
            ValueError: for invalid `max_capacity`.
        """
        super().__init__(env, max_capacity)
        # Override the deque with a sorted list.
        self._items: list = []  # type: ignore[assignment]

    def _add(self, item) -> None:
        bisect.insort(self._items, item)

    def _pop(self):
        return self._items.pop(0)

    def _put_back(self, item) -> None:
        bisect.insort(self._items, item)

    def is_empty(self) -> bool:
        return not self._items

    def is_full(self) -> bool:
        return self._max_capacity is not None and len(self._items) >= self._max_capacity

__init__(env, max_capacity=None)

Construct priority queue.

Parameters:

Name Type Description Default
env Environment

simulation environment.

required
max_capacity int | None

maximum queue capacity (None for unlimited).

None

Raises:

Type Description
ValueError

for invalid max_capacity.

Source code in src/asimpy/simqueue.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def __init__(
    self,
    env: "Environment",
    max_capacity: int | None = None,
):
    """
    Construct priority queue.

    Args:
        env: simulation environment.
        max_capacity: maximum queue capacity (None for unlimited).

    Raises:
        ValueError: for invalid `max_capacity`.
    """
    super().__init__(env, max_capacity)
    # Override the deque with a sorted list.
    self._items: list = []  # type: ignore[assignment]

Queue

FIFO queue backed by a deque for O(1) enqueue and dequeue.

Source code in src/asimpy/simqueue.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class Queue:
    """FIFO queue backed by a deque for O(1) enqueue and dequeue."""

    def __init__(
        self,
        env: "Environment",
        max_capacity: int | None = None,
    ):
        """
        Construct queue.

        Args:
            env: simulation environment.
            max_capacity: maximum queue capacity (None for unlimited).

        Raises:
            ValueError: for invalid `max_capacity`.
        """
        if max_capacity is not None and max_capacity <= 0:
            raise ValueError(
                f"queue max_capacity must be a positive integer, got {max_capacity}"
            )
        self._env = env
        self._max_capacity = max_capacity
        # _items, _getters, and _putters are all deques for O(1) front access.
        self._items: deque = deque()
        self._getters: deque = deque()
        self._putters: deque = deque()

    # ------------------------------------------------------------------
    # Overridable item-storage primitives (used by PriorityQueue)
    # ------------------------------------------------------------------

    def _add(self, item) -> None:
        self._items.append(item)

    def _pop(self):
        return self._items.popleft()

    def _put_back(self, item) -> None:
        """Return an item to the front of the store (used on interrupt)."""
        self._items.appendleft(item)

    # ------------------------------------------------------------------
    # Public interface
    # ------------------------------------------------------------------

    def is_empty(self) -> bool:
        """Is the queue empty?"""
        return not self._items

    def is_full(self) -> bool:
        """Has the queue reached capacity?"""
        return self._max_capacity is not None and len(self._items) >= self._max_capacity

    async def get(self):
        """Get one item from the queue."""
        if self._items:
            item = self._pop()

            # Promote one blocked putter if present, skipping cancelled ones.
            while self._putters:
                putter_evt, putter_item = self._putters[0]
                if putter_evt._value is _CANCELLED:
                    self._putters.popleft()
                    continue
                self._putters.popleft()
                self._add(putter_item)
                # Pre-trigger so the tight loop resumes the putter immediately.
                putter_evt.succeed(True)
                break

            # Pre-trigger the getter event so the tight loop resumes this
            # coroutine without a heap round-trip.
            evt = Event(self._env)
            evt.succeed(item)
            try:
                return await evt
            except Interrupt:
                # Pre-triggered events cannot really be interrupted here, but
                # restore the item defensively in case semantics change.
                self._put_back(item)
                raise
        else:
            evt = Event(self._env)
            # No _on_cancel: lazy deletion in put() skips cancelled getters.
            self._getters.append(evt)
            try:
                return await evt
            except Interrupt:
                # Don't remove from _getters; lazy deletion handles it.
                raise

    async def put(self, item: Any) -> bool:
        """
        Add one item to the queue.

        If a getter is waiting, the item is delivered directly.
        Otherwise, if the queue is not full, the item is added.
        If the queue is full, the operation blocks until space
        is available.

        Args:
            item: to add to the queue.

        Returns:
            `True` when the item has been added.
        """
        # Deliver directly to a waiting getter, skipping cancelled ones.
        while self._getters:
            evt = self._getters[0]
            if evt._value is _CANCELLED:
                self._getters.popleft()
                continue
            self._getters.popleft()
            evt.succeed(item)
            return True

        if not self.is_full():
            self._add(item)
            return True

        evt = Event(self._env)
        entry = (evt, item)
        self._putters.append(entry)
        # No _on_cancel: lazy deletion in get() skips cancelled putters.
        return await evt

__init__(env, max_capacity=None)

Construct queue.

Parameters:

Name Type Description Default
env Environment

simulation environment.

required
max_capacity int | None

maximum queue capacity (None for unlimited).

None

Raises:

Type Description
ValueError

for invalid max_capacity.

Source code in src/asimpy/simqueue.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def __init__(
    self,
    env: "Environment",
    max_capacity: int | None = None,
):
    """
    Construct queue.

    Args:
        env: simulation environment.
        max_capacity: maximum queue capacity (None for unlimited).

    Raises:
        ValueError: for invalid `max_capacity`.
    """
    if max_capacity is not None and max_capacity <= 0:
        raise ValueError(
            f"queue max_capacity must be a positive integer, got {max_capacity}"
        )
    self._env = env
    self._max_capacity = max_capacity
    # _items, _getters, and _putters are all deques for O(1) front access.
    self._items: deque = deque()
    self._getters: deque = deque()
    self._putters: deque = deque()

get() async

Get one item from the queue.

Source code in src/asimpy/simqueue.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
async def get(self):
    """Get one item from the queue."""
    if self._items:
        item = self._pop()

        # Promote one blocked putter if present, skipping cancelled ones.
        while self._putters:
            putter_evt, putter_item = self._putters[0]
            if putter_evt._value is _CANCELLED:
                self._putters.popleft()
                continue
            self._putters.popleft()
            self._add(putter_item)
            # Pre-trigger so the tight loop resumes the putter immediately.
            putter_evt.succeed(True)
            break

        # Pre-trigger the getter event so the tight loop resumes this
        # coroutine without a heap round-trip.
        evt = Event(self._env)
        evt.succeed(item)
        try:
            return await evt
        except Interrupt:
            # Pre-triggered events cannot really be interrupted here, but
            # restore the item defensively in case semantics change.
            self._put_back(item)
            raise
    else:
        evt = Event(self._env)
        # No _on_cancel: lazy deletion in put() skips cancelled getters.
        self._getters.append(evt)
        try:
            return await evt
        except Interrupt:
            # Don't remove from _getters; lazy deletion handles it.
            raise

is_empty()

Is the queue empty?

Source code in src/asimpy/simqueue.py
61
62
63
def is_empty(self) -> bool:
    """Is the queue empty?"""
    return not self._items

is_full()

Has the queue reached capacity?

Source code in src/asimpy/simqueue.py
65
66
67
def is_full(self) -> bool:
    """Has the queue reached capacity?"""
    return self._max_capacity is not None and len(self._items) >= self._max_capacity

put(item) async

Add one item to the queue.

If a getter is waiting, the item is delivered directly. Otherwise, if the queue is not full, the item is added. If the queue is full, the operation blocks until space is available.

Parameters:

Name Type Description Default
item Any

to add to the queue.

required

Returns:

Type Description
bool

True when the item has been added.

Source code in src/asimpy/simqueue.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
async def put(self, item: Any) -> bool:
    """
    Add one item to the queue.

    If a getter is waiting, the item is delivered directly.
    Otherwise, if the queue is not full, the item is added.
    If the queue is full, the operation blocks until space
    is available.

    Args:
        item: to add to the queue.

    Returns:
        `True` when the item has been added.
    """
    # Deliver directly to a waiting getter, skipping cancelled ones.
    while self._getters:
        evt = self._getters[0]
        if evt._value is _CANCELLED:
            self._getters.popleft()
            continue
        self._getters.popleft()
        evt.succeed(item)
        return True

    if not self.is_full():
        self._add(item)
        return True

    evt = Event(self._env)
    entry = (evt, item)
    self._putters.append(entry)
    # No _on_cancel: lazy deletion in get() skips cancelled putters.
    return await evt