Module funsies.dynamic
Dynamic DAG generation.
Expand source code
"""Dynamic DAG generation."""
from __future__ import annotations
# std
from typing import Any, Callable, Optional, Sequence, TypeVar
# external
from redis import Redis
# module
from ._constants import _Data, Encoding
from ._context import get_db, get_options
from ._graph import Artefact, constant_artefact, make_op
from ._subdag import subdag_funsie
from .config import Options
from .ui import _Target, put
# --------------------------------------------------------------------------------
Tin = TypeVar("Tin", bound=_Data)
T1 = TypeVar("T1", bound=_Data)
T2 = TypeVar("T2", bound=_Data)
T3 = TypeVar("T3", bound=_Data)
def sac(
split_fun: Callable[..., Sequence[T1]],
apply_fun: Callable[[Artefact[T1]], Artefact[T2]],
combine_fun: Callable[[Sequence[Artefact[T2]]], Artefact[T3]],
*inp: _Target,
out: Encoding,
name: Optional[str] = None,
strict: bool = True,
opt: Optional[Options] = None,
connection: Optional[Redis[bytes]] = None,
) -> Artefact[T3]:
"""Perform a split/apply/combine dynamic DAG."""
opt = get_options(opt)
db = get_db(connection)
inputs: dict[str, Artefact] = {}
# Parse input -------------------------------------
inputs = {}
arg_names = []
for k, arg in enumerate(inp):
arg_names += [f"in{k}"]
if isinstance(arg, Artefact):
inputs[arg_names[-1]] = arg
else:
inputs[arg_names[-1]] = put(arg, connection=db)
inp_types = dict([(k, val.kind) for k, val in inputs.items()])
if name is not None:
fun_name = name
else:
fun_name = (
f"SAC|{split_fun.__qualname__}|{apply_fun.__qualname__}"
+ f"|{combine_fun.__qualname__}"
)
def __sac(inpd: dict[str, Any]) -> dict[str, Artefact[T3]]:
"""Perform the map reduce."""
db = get_db()
args = [inpd[k] for k in arg_names]
split_data = [constant_artefact(db, d) for d in split_fun(*args)]
apply_data = [apply_fun(d) for d in split_data]
combine_data = combine_fun(apply_data)
return dict(out=combine_data)
# Generate the subdag operations
cmd = subdag_funsie(__sac, inp_types, {"out": out}, name=fun_name, strict=strict)
operation = make_op(db, cmd, inputs, opt)
return Artefact.grab(db, operation.out["out"])
Functions
def sac(split_fun: Callable[..., Sequence[T1]], apply_fun: Callable[[Artefact[T1]], Artefact[T2]], combine_fun: Callable[[Sequence[Artefact[T2]]], Artefact[T3]], *inp: _Target, out: Encoding, name: Optional[str] = None, strict: bool = True, opt: Optional[Options] = None, connection: Optional[Redis[bytes]] = None) ‑> Artefact[T3]
-
Perform a split/apply/combine dynamic DAG.
Expand source code
def sac( split_fun: Callable[..., Sequence[T1]], apply_fun: Callable[[Artefact[T1]], Artefact[T2]], combine_fun: Callable[[Sequence[Artefact[T2]]], Artefact[T3]], *inp: _Target, out: Encoding, name: Optional[str] = None, strict: bool = True, opt: Optional[Options] = None, connection: Optional[Redis[bytes]] = None, ) -> Artefact[T3]: """Perform a split/apply/combine dynamic DAG.""" opt = get_options(opt) db = get_db(connection) inputs: dict[str, Artefact] = {} # Parse input ------------------------------------- inputs = {} arg_names = [] for k, arg in enumerate(inp): arg_names += [f"in{k}"] if isinstance(arg, Artefact): inputs[arg_names[-1]] = arg else: inputs[arg_names[-1]] = put(arg, connection=db) inp_types = dict([(k, val.kind) for k, val in inputs.items()]) if name is not None: fun_name = name else: fun_name = ( f"SAC|{split_fun.__qualname__}|{apply_fun.__qualname__}" + f"|{combine_fun.__qualname__}" ) def __sac(inpd: dict[str, Any]) -> dict[str, Artefact[T3]]: """Perform the map reduce.""" db = get_db() args = [inpd[k] for k in arg_names] split_data = [constant_artefact(db, d) for d in split_fun(*args)] apply_data = [apply_fun(d) for d in split_data] combine_data = combine_fun(apply_data) return dict(out=combine_data) # Generate the subdag operations cmd = subdag_funsie(__sac, inp_types, {"out": out}, name=fun_name, strict=strict) operation = make_op(db, cmd, inputs, opt) return Artefact.grab(db, operation.out["out"])