Handling Interrupts
Terms defined: lifecycle method, state machine
Throwing Work Away
- Jobs don't have priorities
- The manager interrupts
- Any work done on the current job is lost
- We'll fix this later
- Parameters and simulation class
class Params:
# …as before…
t_interrupt_arrival: float = 5.0
class Simulation(Environment):
# …as before…
def rand_interrupt_arrival(self):
return random.expovariate(1.0 / self.params.t_interrupt_arrival)
- But we need a way to get at the coder's process (i.e., the generator) in order to interrupt it
- The
Coderinstance is something we've built around the generator Environment.process(…)returns the generator, so we can store that
- The
class Simulation(Environment):
def simulate(self):
# …queue, manager, and monitor as before…
self.process(Interrupter(self).run())
self.coders = []
for _ in range(self.params.n_coder):
coder = Coder(self)
coder.proc = self.process(coder.run())
self.coders.append(coder)
self.run(until=self.params.t_sim)
- Our new
Interrupterreaches inside a coder to get its process
class Interrupter(Recorder):
def run(self):
while True:
yield self.sim.timeout(self.sim.rand_interrupt_arrival())
coder = random.choice(self.sim.coders)
coder.proc.interrupt()
- When we call
proc.interrupt(), SimPy raises anInterruptexception inside the generator - But it can only do this while the framework is running, not while the process is running
- Because only one thing runs at a time
- So the exception is only raised when the process interacts with the environment
- I.e., at
queue.get(),timeout(), or otheryieldpoints
- I.e., at
- Write a
Coderthat throws away whatever job it's doing when it is interrupted- Not realistic, but it gives us a chance to learn about interrupts
from simpy import Interrupt
class Coder(Recorder):
def run(self):
while True:
try:
job = yield self.sim.queue.get()
job.t_start = self.sim.now
yield self.sim.timeout(job.duration)
job.t_end = self.sim.now
self.t_work += job.t_end - job.t_start
except Interrupt:
self.n_interrupt += 1
job.t_end = self.sim.now
job.discarded = True
self.t_work += job.t_end - job.t_start
- Exercise: how does the percentage of discarded jobs change as the interrupt rate changes?
Resuming Work
- General idea: coders have a stack of work
- At most one regular job
- And zero or more interrupts stacked on top of it
- When an interrupt arrives, it goes on the top of the stack
- Do the simple bits first
- Subclass
Jobso that we can call different methods for defining duration
- Subclass
- Notice that we're piling up a bunch of parameters whose values we probably don't know
class Params:
# …as before…
t_interrupt_arrival: float = 5.0
t_interrupt_mean: float = 0.2
t_interrupt_std: float = 0.1
class JobRegular(Job):
def __init__(self, sim):
super().__init__(sim)
self.duration = self.sim.rand_job_duration()
class JobInterrupt(Job):
def __init__(self, sim):
super().__init__(sim)
self.duration = self.sim.rand_interrupt_duration()
ManagercreatesJobRegular,InterruptercreatesJobInterrupt- Note that
Interrupterpasses the new job to.interrupt()so that it becomes the exception's.cause
class Interrupter(Recorder):
def run(self):
while True:
yield self.sim.timeout(self.sim.rand_interrupt_arrival())
coder = random.choice(self.sim.coders)
coder.proc.interrupt(JobInterrupt(self.sim))
- It took several tries to get the
Coderright - The problem is that interrupts can occur whenever the coder interacts with SimPy
- So if the coder does anything with SimPy in the
exceptblock, we can have an interrupt while we're handling an interrupt
- So if the coder does anything with SimPy in the
- Solution is to implement a state machine
- No work, so get a new job from the coding queue.
- Job on top of the stack is incomplete, so do some work.
- Job on top of the stack is complete, so pop it.
- If an interrupt occurs:
- Add some time to the current job if we actually started it
- Push the new job on the stack
- Note: the new job arrives as the
Interruptexception's cause
class Coder(Recorder):
def __init__(self, sim):
super().__init__(sim)
self.proc = None
self.stack = []
def run(self):
while True:
started = None
try:
# No work in hand, so get a new job.
if len(self.stack) == 0:
job = yield self.sim.code_queue.get()
job.start()
self.stack.append(job)
# Current job is incomplete, so try to finish it.
elif self.stack[-1].done < self.stack[-1].duration:
job = self.stack[-1]
started = self.sim.now
yield self.sim.timeout(job.duration - job.done)
job.done = job.duration
# Current job is complete.
else:
job = self.stack.pop()
job.complete()
except Interrupt as exc:
# Some work has been done on the current job, so save it.
if (len(self.stack) > 0) and (started is not None):
now = self.sim.now
job = self.stack[-1]
job.interrupt()
job.done += now - started
# Put the interrupting job on the stack.
job = exc.cause
job.start()
self.stack.append(job)
- This works, but the code is hard to understand, debug, and extend
Decomposing Jobs
- Design has two parts:
- Treat interrupts as high-priority jobs
- Break regular jobs into short fragments so that interrupts are handled promptly (but not immediately)
- Define three priorities:
- High: interrupt
- Medium: fragments of regular job
- Low: regular jobs
class Priority:
HIGH = 0
MEDIUM = 1
LOW = 2
- Generic
Jobhas a few lifecycle methods for child classes to overrideJob.startis called when work starts on a jobJob.completeis called when the job is completedJob.is_completetells us whether the job has been completed or notJob.needs_decomptells us whether the job needs to be decomposed
- We will explain
sim.do_nothing()shortly
class Job(Recorder):
def __init__(self, sim, priority):
super().__init__(sim)
self.priority = priority
self.t_create = self.sim.now
self.t_start = None
self.t_complete = None
def start(self):
self.t_start = self.sim.now
def complete(self):
self.t_complete = self.sim.now
return self.sim.do_nothing()
def is_complete(self):
return self.t_complete is not None
def needs_decomp(self):
return False
def __lt__(self, other):
if self.priority == other.priority:
return self.t_create < other.t_create
return self.priority < other.priority
JobInterruptis the simplest child class
class JobInterrupt(Job):
def __init__(self, sim):
super().__init__(sim, Priority.HIGH)
self.duration = self.sim.rand_interrupt_duration()
JobRegularoverrides.needs_decomp()- If this job isn't complete and the time required is greater than the decomposition threshold
- The latter parameter is another completely arbitrary number
class JobRegular(Job):
def __init__(self, sim):
super().__init__(sim, Priority.LOW)
self.duration = self.sim.rand_job_duration()
def needs_decomp(self):
return (not self.is_complete()) and (self.duration > self.sim.params.t_decomposition)
JobFragmentis the most complex- Duration is specified by its creator (part of the total time required by a regular job)
- And it has a reference to a placeholder that keeps track of undone fragments
- When then fragment is completed, it checks to see if it is the last one in its group
- If so, it bumps the priority of the completed job to medium and puts it back in the coder's queue
- If not, it does nothing
class JobFragment(Job):
def __init__(self, coder, placeholder, duration):
super().__init__(coder.sim, Priority.MEDIUM)
self.coder = coder
self.placeholder = placeholder
self.duration = duration
def complete(self):
super().complete()
self.placeholder.count -= 1
if self.placeholder.count == 0:
self.placeholder.job.complete()
self.placeholder.job.priority = Priority.MEDIUM
return self.coder.queue.put(self.placeholder.job)
else:
return self.sim.do_nothing()
- When
.completewants to put the original (regular) job back in the coder's queue, it would be natural to callyield self.code.queue.put(…) - But what if it doesn't?
- Solution:
.completealways returns something that can be yielded- Either "put this job in queue" or "wait for 0 ticks"
class Simulation(Environment):
def do_nothing(self):
return self.timeout(0)
Coder.rungets a job from the general "new work" queue or from its priority queue- Gives preference to the latter so that interrupts and fragments are done before regular work
- Always yields result of
job.complete()
def run(self):
while True:
job = yield from self.get()
job.start()
if job.needs_decomp():
yield from self.decompose(job)
elif not job.is_complete():
yield self.sim.timeout(job.duration)
yield job.complete()
- To decompose a job:
- Figure out the durations of the fragments
- Create a placeholder to keep track of them and the original job
- Put the fragments in the coder's priority queue
def decompose(self, job):
size = self.sim.params.t_decomposition
num = int(job.duration / size)
extra = job.duration - (num * size)
durations = [extra, *[size for _ in range(num)]]
placeholder = Placeholder(job=job, count=len(durations))
for d in durations:
yield self.queue.put(JobFragment(self, placeholder, d))
- So is this better than using interrupts?
- 250 lines for decomposition vs. 212 for interrupts
- Decomposition approach was a lot simpler to debug