Handling Interrupts

Terms defined: lifecycle method, state machine

Throwing Work Away

class Params:
    # …as before…
    t_interrupt_arrival: float = 5.0

class Simulation(Environment):
    # …as before…
    def rand_interrupt_arrival(self):
        return random.expovariate(1.0 / self.params.t_interrupt_arrival)
class Simulation(Environment):
    def simulate(self):
        # …queue, manager, and monitor as before…

        self.process(Interrupter(self).run())

        self.coders = []
        for _ in range(self.params.n_coder):
            coder = Coder(self)
            coder.proc = self.process(coder.run())
            self.coders.append(coder)

        self.run(until=self.params.t_sim)
class Interrupter(Recorder):
    def run(self):
        while True:
            yield self.sim.timeout(self.sim.rand_interrupt_arrival())
            coder = random.choice(self.sim.coders)
            coder.proc.interrupt()
from simpy import Interrupt

class Coder(Recorder):
    def run(self):
        while True:
            try:
                job = yield self.sim.queue.get()
                job.t_start = self.sim.now
                yield self.sim.timeout(job.duration)
                job.t_end = self.sim.now
                self.t_work += job.t_end - job.t_start
            except Interrupt:
                self.n_interrupt += 1
                job.t_end = self.sim.now
                job.discarded = True
            self.t_work += job.t_end - job.t_start

Resuming Work

class Params:
    # …as before…
    t_interrupt_arrival: float = 5.0
    t_interrupt_mean: float = 0.2
    t_interrupt_std: float = 0.1

class JobRegular(Job):
    def __init__(self, sim):
        super().__init__(sim)
        self.duration = self.sim.rand_job_duration()


class JobInterrupt(Job):
    def __init__(self, sim):
        super().__init__(sim)
        self.duration = self.sim.rand_interrupt_duration()
class Interrupter(Recorder):
    def run(self):
        while True:
            yield self.sim.timeout(self.sim.rand_interrupt_arrival())
            coder = random.choice(self.sim.coders)
            coder.proc.interrupt(JobInterrupt(self.sim))
class Coder(Recorder):
    def __init__(self, sim):
        super().__init__(sim)
        self.proc = None
        self.stack = []

    def run(self):
        while True:
            started = None
            try:
                # No work in hand, so get a new job.
                if len(self.stack) == 0:
                    job = yield self.sim.code_queue.get()
                    job.start()
                    self.stack.append(job)
                # Current job is incomplete, so try to finish it.
                elif self.stack[-1].done < self.stack[-1].duration:
                    job = self.stack[-1]
                    started = self.sim.now
                    yield self.sim.timeout(job.duration - job.done)
                    job.done = job.duration
                # Current job is complete.
                else:
                    job = self.stack.pop()
                    job.complete()
            except Interrupt as exc:
                # Some work has been done on the current job, so save it.
                if (len(self.stack) > 0) and (started is not None):
                    now = self.sim.now
                    job = self.stack[-1]
                    job.interrupt()
                    job.done += now - started
                # Put the interrupting job on the stack.
                job = exc.cause
                job.start()
                self.stack.append(job)

Decomposing Jobs

class Priority:
    HIGH = 0
    MEDIUM = 1
    LOW = 2
class Job(Recorder):
    def __init__(self, sim, priority):
        super().__init__(sim)
        self.priority = priority
        self.t_create = self.sim.now
        self.t_start = None
        self.t_complete = None

    def start(self):
        self.t_start = self.sim.now

    def complete(self):
        self.t_complete = self.sim.now
        return self.sim.do_nothing()

    def is_complete(self):
        return self.t_complete is not None

    def needs_decomp(self):
        return False

    def __lt__(self, other):
        if self.priority == other.priority:
            return self.t_create < other.t_create
        return self.priority < other.priority
class JobInterrupt(Job):
    def __init__(self, sim):
        super().__init__(sim, Priority.HIGH)
        self.duration = self.sim.rand_interrupt_duration()
class JobRegular(Job):
    def __init__(self, sim):
        super().__init__(sim, Priority.LOW)
        self.duration = self.sim.rand_job_duration()

    def needs_decomp(self):
        return (not self.is_complete()) and (self.duration > self.sim.params.t_decomposition)
class JobFragment(Job):
    def __init__(self, coder, placeholder, duration):
        super().__init__(coder.sim, Priority.MEDIUM)
        self.coder = coder
        self.placeholder = placeholder
        self.duration = duration

    def complete(self):
        super().complete()
        self.placeholder.count -= 1
        if self.placeholder.count == 0:
            self.placeholder.job.complete()
            self.placeholder.job.priority = Priority.MEDIUM
            return self.coder.queue.put(self.placeholder.job)
        else:
            return self.sim.do_nothing()
class Simulation(Environment):
    def do_nothing(self):
        return self.timeout(0)
    def run(self):
        while True:
            job = yield from self.get()
            job.start()
            if job.needs_decomp():
                yield from self.decompose(job)
            elif not job.is_complete():
                yield self.sim.timeout(job.duration)
                yield job.complete()
    def decompose(self, job):
        size = self.sim.params.t_decomposition
        num = int(job.duration / size)
        extra = job.duration - (num * size)
        durations = [extra, *[size for _ in range(num)]]
        placeholder = Placeholder(job=job, count=len(durations))
        for d in durations:
            yield self.queue.put(JobFragment(self, placeholder, d))