Scale Up
- Describing workflows as directed acyclic graphs.
- Expressing DAGs in code with Metaflow.
Terms defined: workflow
- Reminder: storing parameters as dataclass
"""Parameters for single invasion percolation sweep."""
from dataclasses import dataclass
@dataclass
class ParamsSingle:
"""A single set of invasion percolation parameters."""
width: int
height: int
depth: int
seed: int = None
- Load from JSON
{
"width": 11,
"height": 11,
"depth": 10,
"seed": 172839
}
- Build a workflow to run parameter sweeps using Metaflow
- Define a Python class with methods for workflow stages
- Use decorators to mark steps
- Parameters to methods coordinate dataflow
- Start
from metaflow import FlowSpec, Parameter, step
from invperc import invperc
from measure import collect_density, estimate_density, measure_dimension
from params_single import ParamsSingle
from params_sweep import ParamsSweep
class InvPercFlow(FlowSpec):
"""Metaflow for invasion percolation."""
sweep = Parameter("sweep", help="sweep parameter file", type=str, required=True)
- First step loads parameters and creates parameters for each task
@step
def start(self):
"""Collect parameters and run jobs."""
sweep = load_params(self.sweep)
self.args = make_sweeps(sweep)
self.next(self.run_job, foreach='args')
def load_params(filename):
"""Get sweep parameters from file."""
return ParamsSweep(**json.loads(Path(filename).read_text()))
def make_sweeps(sweeps):
"""Convert sweep parameters into individual jobs."""
random.seed(sweeps.seed)
result = []
for size in sweeps.size:
for depth in sweeps.depth:
for run in range(sweeps.runs):
result.append(ParamsSingle(width=size, height=size, depth=depth, seed=random.randrange(sys.maxsize)))
return result
- Run a single job
@step
def run_job(self):
"""Run a sweep with one set of parameters."""
grid = invperc(self.input)
self.result = {'size': grid.width(), 'depth': grid.depth(), 'density': collect_density(grid), 'dimension': measure_dimension(grid)}
self.next(self.join)
- Combine results from all jobs
@step
def join(self, inputs):
"""Combine results from all sweeps."""
counts = defaultdict(int)
dimensions = defaultdict(float)
densities = defaultdict(list)
for i in inputs:
key = (i.result['size'], i.result['depth'])
counts[key] += 1
dimensions[key] += i.result['dimension']
densities[key].extend(i.result['density'])
for key in densities:
densities[key] = estimate_density(densities[key])
self.results = {'counts': counts, 'dimensions': dimensions, 'densities': densities}
self.next(self.end)
- Report results
@step
def end(self):
"""Report results."""
table = [('size', 'depth', 'count', 'dimension', 'density_x', 'density_k')]
for key, count in sorted(self.results['counts'].items()):
size, depth = key
dim = self.results['dimensions'][key] / count
table.append((size, depth, count, dim, *self.results['densities'][key]))
csv.writer(sys.stdout, lineterminator='\n').writerows(table)
- Run with single-job JSON parameter file shown earlier
python invperc.py standalone.json
- Run full sweep
{
"size": [75, 95, 105],
"depth": [2, 10, 100],
"runs": 10,
"seed": 556677
}
python flow.py run --sweep sweep.json