Skip to content

Simqueue

FIFO and priority queues.

Queue

FIFO or priority queue.

Source code in src/asimpy/simqueue.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class Queue:
    """FIFO or priority queue."""

    def __init__(self, env: "Environment", max_capacity: int | None = None,
                 priority: bool = False):
        """
        Construct queue.

        Args:
            env: simulation environment.
            max_capacity: maximum queue capacity (None for unlimited).
            priority: if `True`, maintain items in sorted order.

        Raises:
            ValueError: for invalid `max_capacity`.
        """
        _validate(
            (max_capacity is None) or (max_capacity > 0),
            "require None or positive integer for max capacity not {max_capacity}"
        )
        self._env = env
        self._priority = priority
        self._max_capacity = max_capacity
        self._items = []
        self._getters = []

    async def get(self):
        """Get one item from the queue."""
        if self._items:
            item = self._items.pop(0)
            evt = Event(self._env)
            evt._on_cancel = lambda: self._items.insert(0, item)
            self._env.immediate(lambda: evt.succeed(item))
            return await evt
        else:
            evt = Event(self._env)
            self._getters.append(evt)
            return await evt

    def is_full(self):
        """Has the queue reached capacity?"""
        return self._max_capacity is not None and len(self._items) >= self._max_capacity

    def put(self, item: Any) -> bool:
        """
        Add one item to the queue.

        If a getter is waiting, the item is delivered directly.
        Otherwise, if the queue is not full, the item is added.
        If the queue is full, the item is discarded (in priority
        mode, the lowest-priority item is discarded instead).

        Args:
            item: to add to the queue.

        Returns:
            `True` if item added, `False` otherwise.
        """
        if self._getters:
            evt = self._getters.pop(0)
            evt.succeed(item)
            return True

        if not self.is_full():
            if self._priority:
                bisect.insort(self._items, item)
            else:
                self._items.append(item)
            return True

        if not self._priority:
            return False

        bisect.insort(self._items, item)
        result = item is not self._items[-1]
        self._items = self._items[:self._max_capacity]
        return result

__init__(env, max_capacity=None, priority=False)

Construct queue.

Parameters:

Name Type Description Default
env Environment

simulation environment.

required
max_capacity int | None

maximum queue capacity (None for unlimited).

None
priority bool

if True, maintain items in sorted order.

False

Raises:

Type Description
ValueError

for invalid max_capacity.

Source code in src/asimpy/simqueue.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def __init__(self, env: "Environment", max_capacity: int | None = None,
             priority: bool = False):
    """
    Construct queue.

    Args:
        env: simulation environment.
        max_capacity: maximum queue capacity (None for unlimited).
        priority: if `True`, maintain items in sorted order.

    Raises:
        ValueError: for invalid `max_capacity`.
    """
    _validate(
        (max_capacity is None) or (max_capacity > 0),
        "require None or positive integer for max capacity not {max_capacity}"
    )
    self._env = env
    self._priority = priority
    self._max_capacity = max_capacity
    self._items = []
    self._getters = []

get() async

Get one item from the queue.

Source code in src/asimpy/simqueue.py
38
39
40
41
42
43
44
45
46
47
48
49
async def get(self):
    """Get one item from the queue."""
    if self._items:
        item = self._items.pop(0)
        evt = Event(self._env)
        evt._on_cancel = lambda: self._items.insert(0, item)
        self._env.immediate(lambda: evt.succeed(item))
        return await evt
    else:
        evt = Event(self._env)
        self._getters.append(evt)
        return await evt

is_full()

Has the queue reached capacity?

Source code in src/asimpy/simqueue.py
51
52
53
def is_full(self):
    """Has the queue reached capacity?"""
    return self._max_capacity is not None and len(self._items) >= self._max_capacity

put(item)

Add one item to the queue.

If a getter is waiting, the item is delivered directly. Otherwise, if the queue is not full, the item is added. If the queue is full, the item is discarded (in priority mode, the lowest-priority item is discarded instead).

Parameters:

Name Type Description Default
item Any

to add to the queue.

required

Returns:

Type Description
bool

True if item added, False otherwise.

Source code in src/asimpy/simqueue.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def put(self, item: Any) -> bool:
    """
    Add one item to the queue.

    If a getter is waiting, the item is delivered directly.
    Otherwise, if the queue is not full, the item is added.
    If the queue is full, the item is discarded (in priority
    mode, the lowest-priority item is discarded instead).

    Args:
        item: to add to the queue.

    Returns:
        `True` if item added, `False` otherwise.
    """
    if self._getters:
        evt = self._getters.pop(0)
        evt.succeed(item)
        return True

    if not self.is_full():
        if self._priority:
            bisect.insort(self._items, item)
        else:
            self._items.append(item)
        return True

    if not self._priority:
        return False

    bisect.insort(self._items, item)
    result = item is not self._items[-1]
    self._items = self._items[:self._max_capacity]
    return result