Package funsies

Funsies is a lightweight workflow engine 🔧.

In funsies, workflows written in pure python are saved to redis, a distributed, in-memory data store. Workflow execution is performed using the minimal distributed queuing library RQ. Workflows are automatically parallelized and computed incrementally. Command-line tools are provided that allow funsies to be easily integrated in pre-existing shell workflows.

Incremental computation and caching are generated by hashing the steps required to generate all file objects. Workflows are encoded using a Merkle tree data structure, similar to what is used by ccache for incremental computation or by distributed version control systems such as mercurial.

Workflows are written in pure python using a set of primitives such as shell commands (using the shell() function) and python glue code (encoded with py()). Errors are handled using a functional programming (a Result monad) inspired by Rust. In practice, python code can raise exceptions and shell commands can fail in specifc branches without compromising the execution of the whole workflow, and the origin of errors is readily traced.

Expand source code
"""Funsies is a lightweight workflow engine 🔧.

.. include:: documentation.md
"""
# module
from . import debug, dynamic, parametric, types, utils
from ._context import Fun, ManagedFun, options
from ._getter import get
from .errors import unwrap
from .fp import morph, py, reduce
from .template import template
from .ui import execute, put, reset, shell, take, takeout, wait_for

__all__ = [
    # shell
    "shell",
    # fp
    "py",
    "reduce",
    "morph",
    # template
    "template",
    # artefact manipulation
    "take",
    "takeout",
    "put",
    "execute",
    "wait_for",
    "reset",
    "get",
    # contexts
    "Fun",
    "ManagedFun",
    "options",
    # Error handling and types
    "unwrap",
    "types",
    "debug",
    # utility
    "utils",
    "dynamic",
    "parametric",
]


# Version information
# We grab it from setup.py so that we don't have to bump versions in multiple
# places.
try:
    # std
    from importlib import metadata

    __version__ = metadata.version("funsies")
except ImportError:
    # Running on pre-3.8 Python; use importlib-metadata package
    # external
    import importlib_metadata

    __version__ = importlib_metadata.version("funsies")

Sub-modules

funsies.config

Configuration dictionaries for jobs.

funsies.debug

Helpful function for debugging workflows.

funsies.dynamic

Dynamic DAG generation.

funsies.errors

Error for artefact results.

funsies.fp

User-friendly interfaces to funsies functionality.

funsies.parametric

User-facing functions for parametric DAGs.

funsies.types

Object types.

funsies.ui

User-friendly interfaces to funsies functionality.

funsies.utils

Some useful functions for workflows.

Functions

def shell(*args: str, inp: _INP_FILES = None, out: _OUT_FILES = None, env: Optional[dict[str, str]] = None, strict: bool = True, opt: Optional[Options] = None, connection: Optional[Redis[bytes]] = None) ‑> ShellOutput

Add a shell command to the workflow.

shell() puts a shell command in the workflow and returns a ShellOutput instance that provides a convenient wrapper to stdout, stderr and output files.

Input and output files need to be explicitly given as arguments inp and out. Input and output files containing path separators (/) are assumed to belong to the corresponding directory tree structures, which will be automatically generated for input files.

The strict flag determines how to interpret errors in input files. If True (the default), errors are propagated down: shell commands will not be executed if any input values currently hold Error. Instead, all output values will also be replaced by Error.

When strict=False, input files with errors will simply (and silently) be excluded from the shell script.

Shell commands are run in a temporary directory which conveys some measure of encapsulation, but it is quite weak, so the callee should make sure that commands only use relative paths etc. to ensure proper cleanup and function purity. This is done using python's tempfile module: the temporary directory can be set using the $TMPDIR environment variable.

Environment variables can be passed to the executed command with the env= keyword. In contrast with subprocess.Popen(), the environment of worker processes will be updated with those values, not replaced by them. Environment variables are not hashed as part of the operation's id and thus changing them will not result in workflow re-execution.

Args

*args
Lines of shell script to be evaluated.
inp
Input files to pass to the shell comand. This should be a Mapping from filenames (str, path etc.) to values. Values can either be Artefact instances or of type bytes, in which case they will be automatically converted using put().
out
Filenames of output files that will be used to populate the return ShellOutput object. Note that any file not included in this list will be deleted when the shell command terminates.
env
Environment variables to be set before calling the shell command.
strict
If False, error handling will be deferred to the shell command by not populating input files of type Error.
connection
An explicit Redis connection. Not required if called within a Fun() context.
opt
An Options instance as returned by options(). Not required if called within a Fun() context.

Returns

A ShellOutput object, populated with the generated Artefact instances.

Raises

TypeError
when types of arguments are wrong.
Expand source code
def shell(
    *args: str,
    inp: _INP_FILES = None,
    out: _OUT_FILES = None,
    env: Optional[dict[str, str]] = None,
    strict: bool = True,
    opt: Optional[Options] = None,
    connection: Optional[Redis[bytes]] = None,
) -> ShellOutput:
    """Add a shell command to the workflow.

    `shell()` puts a shell command in the workflow and returns a `types.ShellOutput`
    instance that provides a convenient wrapper to stdout, stderr and output
    files.

    Input and output files need to be explicitly given as arguments `inp` and
    `out`. Input and output files containing path separators (`/`) are assumed
    to belong to the corresponding directory tree structures, which will be
    automatically generated for input files.

    The `strict` flag determines how to interpret errors in input files. If
    `True` (the default), errors are propagated down: shell commands will not
    be executed if any input values currently hold `errors.Error`. Instead, all
    output values will also be replaced by `errors.Error`.

    When `strict=False`, input files with errors will simply (and silently) be
    excluded from the shell script.

    Shell commands are run in a temporary directory which conveys some measure
    of encapsulation, but it is quite weak, so the callee should make sure
    that commands only use relative paths etc. to ensure proper cleanup and
    function purity. This is done using python's `tempfile` module: the temporary
    directory can be set using the $TMPDIR environment variable.

    Environment variables can be passed to the executed command with the
    `env=` keyword. In contrast with `subprocess.Popen()`, the environment of
    worker processes will be updated with those values, *not* replaced by
    them. Environment variables are not hashed as part of the operation's id
    and thus changing them will not result in workflow re-execution.

    Args:
        *args: Lines of shell script to be evaluated.
        inp: Input files to pass to the shell comand. This should be a Mapping
            from filenames (str, path etc.) to values. Values can either be
            `types.Artefact` instances or of type `bytes`, in which case they
            will be automatically converted using `put()`.
        out: Filenames of output files that will be used to populate the return
            `types.ShellOutput` object. Note that any file not included in
            this list will be deleted when the shell command terminates.
        env: Environment variables to be set before calling the shell command.
        strict: If `False`, error handling will be deferred to the shell command
            by not populating input files of type `Error`.
        connection: An explicit Redis connection. Not required if called within a
            `Fun()` context.
        opt: An `types.Options` instance as returned by `options()`. Not
            required if called within a `Fun()` context.

    Returns:
        A `types.ShellOutput` object, populated with the generated
        `types.Artefact` instances.

    Raises:
        TypeError: when types of arguments are wrong.

    """
    opt = get_options(opt)
    db = get_db(connection)

    # Parse args --------------------------------------------
    cmds: list[str] = []
    inputs: dict[str, Artefact] = {}

    for arg in args:
        if isinstance(arg, str):
            cmds += [arg]
        else:
            raise TypeError(f"argument {arg} not str.")

    # Parse input files -------------------------------------
    if inp is None:
        pass
    # multiple input files as a mapping
    elif isinstance(inp, Mapping):
        for key, val in inp.items():
            if isinstance(val, str):
                logger.warning(
                    f"{key} passed to shell as a string.\nif you don't want it to be"
                    + ' converted to json (and wrapped with "), \nyou NEED to pass it'
                    + " as bytes (by .encode()-ing it first)"
                )
            inputs[str(key)] = _artefact(db, val)
    else:
        raise TypeError(f"{inp} not a valid file input")

    if out is None:
        outputs = []
    else:
        outputs = [str(o) for o in out]

    inputs_types = dict([(k, v.kind) for k, v in inputs.items()])
    funsie = shell_funsie(cmds, inputs_types, outputs, env, strict=strict)
    operation = make_op(db, funsie, inputs, opt)
    return ShellOutput(db, operation)
def py(fun: Callable[..., Any], *inp: _Target, out: Optional[Sequence[Encoding]] = None, name: Optional[str] = None, strict: bool = True, opt: Optional[Options] = None, connection: Optional[Redis[bytes]] = None) ‑> Union[Artefact, tuple[Artefact, ...]]

Add a python function to the workflow.

py(fun, *inp) puts a python function fun on the workflow and returns its output artefact.

As many arguments will be passed to fun() as there are input Artefact instances in *inp and fun() should return as many outputs as there are data types in out=. By default, out= will be inferred from annotations.

If strict=False, the function is taken to do it's own error handling and arguments will be of type Result[T] instead of T. See match_results() for a convenient way to process these values.

Python function hashes are generated based on their names (as given by fun.__qualname__) and functions are distributed to workers using cloudpickle. This is important because it means that:

  • Workers must have access to the function if it is imported, and must have access to any imported libraries.

  • Changing a function without modifiying its name (or modifying the name= argument) will not recompute the graph.

It is the therefore the caller's responsibility to reset() one of the return value of py() if the function is modified to ensure re-excution of its dependents.

Args

fun
Python function that operates on input artefacts and produces a single output artefact.
*inp
Input artefacts.
out
List of Encoding, one for each output of fun. These are the kind of serialization-deserialization used for the output variables. If None, out= is inferred using the type hint of fun(). It is Encoding.blob for all bytes outputs and Encoding.json for anything else.
name
Override the name of fun() used in hash generation.
strict
If False, error handling will be deferred to fun() by passing it argument of type Result[bytes] instead of bytes.
connection
An explicit Redis connection. Not required if called within a Fun() context.
opt
An Options instance generated from options(). Not required if called within a Fun() context.

Returns

A single Artefact instance if out= contains only one element or a tuple of Artefact otherwise.

Raises

TypeError: The output types could not be determined and were not given.

Expand source code
def py(  # noqa:C901
    fun: Callable[..., Any],
    *inp: _Target,
    out: Optional[Sequence[Encoding]] = None,
    name: Optional[str] = None,
    strict: bool = True,
    opt: Optional[Options] = None,
    connection: Optional[Redis[bytes]] = None,
) -> Union[Artefact, tuple[Artefact, ...]]:
    """Add a python function to the workflow.

    `py(fun, *inp)` puts a python function `fun` on the workflow and returns
    its output artefact.

    As many arguments will be passed to `fun()` as there are input
    `types.Artefact` instances in `*inp` and `fun()` should return as many
    outputs as there are data types in `out=`. By default, `out=` will be
    inferred from annotations.

    If `strict=False`, the function is taken to do it's own error handling and
    arguments will be of type `errors.Result[T]` instead of `T`. See
    `utils.match_results()` for a convenient way to process these values.

    Python function hashes are generated based on their names (as given by
    `fun.__qualname__`) and functions are distributed to workers using
    `cloudpickle`. This is important because it means that:

    - Workers must have access to the function if it is imported, and must
        have access to any imported libraries.

    - Changing a function without modifiying its name (or modifying the
        `name=` argument) will not recompute the graph.

    It is the therefore the caller's responsibility to `reset()` one of the
    return value of `py()` if the function is modified to ensure re-excution
    of its dependents.

    Args:
        fun: Python function that operates on input artefacts and produces a
            single output artefact.
        *inp: Input artefacts.
        out: List of Encoding, one for each output of fun. These are the kind
            of serialization-deserialization used for the output variables. If
            None, `out=` is inferred using the type hint of `fun()`. It is
            `types.Encoding.blob` for all `bytes` outputs and
            `types.Encoding.json` for anything else.
        name: Override the name of `fun()` used in hash generation.
        strict: If `False`, error handling will be deferred to `fun()` by
            passing it argument of type `errors.Result[bytes]` instead of
            `bytes`.
        connection: An explicit Redis connection. Not required if called
            within a `Fun()` context.
        opt: An `types.Options` instance generated from `options()`. Not
            required if called within a `Fun()` context.

    Returns:
        A single `types.Artefact` instance if `out=` contains only one element
        or a tuple of `types.Artefact` otherwise.

    Raises:
        TypeError:
            The output types could not be determined and were not given.

    """
    # Attempt to infer output
    if out is None:
        out = output_types(fun)

    opt = get_options(opt)
    db = get_db(connection)
    inputs = {}
    for k, arg in enumerate(inp):
        inputs[f"in{k}"] = _artefact(db, arg)

    in_types = dict([(k, val.kind) for k, val in inputs.items()])

    noutputs = len(out)
    out_type = dict([(f"out{k}", out[k]) for k in range(noutputs)])
    out_keys = list(out_type.keys())
    in_keys = list(in_types.keys())

    if name is not None:
        fun_name = name
    else:
        fun_name = f"mapping_{len(inp)}:{fun.__qualname__}"

    def __map(inpd: Mapping[str, _Data]) -> dict[str, _Data]:
        """Perform a reduction."""
        args = [inpd[key] for key in in_keys]
        out = fun(*args)
        if noutputs == 1:
            out = (out,)
        return dict(zip(out_keys, out))

    funsie = python_funsie(__map, in_types, out_type, name=fun_name, strict=strict)
    operation = make_op(db, funsie, inputs, opt)
    returnval = tuple(
        [Artefact.grab(db, operation.out[o]) for o in out_keys]  # type:ignore
    )
    if len(returnval) == 1:
        return returnval[0]
    else:
        return returnval
def reduce(fun: Callable[..., Tout1], *inp: _Target, out: Optional[Encoding] = None, name: Optional[str] = None, strict: bool = True, opt: Optional[Options] = None, connection: Optional[Redis[bytes]] = None) ‑> Artefact[Tout1]

Add to workflow a many-to-one python function y = f(*x).

This is syntactic sugar around py(). By default, the output encoding is inferred, and if this fails, is set to match the encoding of the arguments if they are all the same. Output encoding can also be explicitly set to a given Encoding using the out= keyword.

Expand source code
def reduce(
    fun: Callable[..., Tout1],
    *inp: _Target,  # noqa:DAR101,DAR201
    out: Optional[Encoding] = None,
    name: Optional[str] = None,
    strict: bool = True,
    opt: Optional[Options] = None,
    connection: Optional[Redis[bytes]] = None,
) -> Artefact[Tout1]:
    """Add to workflow a many-to-one python function `y = f(*x)`.

    This is syntactic sugar around `py()`. By default, the output encoding is
    inferred, and if this fails, is set to match the encoding of the
    arguments if they are all the same. Output encoding can also be
    explicitly set to a given `types.Encoding` using the `out=` keyword.

    """
    inps = list(inp)
    db = get_db(connection)
    inps2 = [_artefact(db, inp) for inp in inps]
    if out is None:
        try:
            typ = output_types(fun)
        except TypeError:
            typ = tuple(set(el.kind for el in inps2))
            if len(typ) > 1:
                raise TypeError(
                    "Inference failed for function reduce(): more than one input type was"
                    + " passed but no out= encoding.\n"
                    + "Either explicitly set return with out= or ensures all inputs "
                    + "have the same encoding.\n"
                    + f"args: {list(el.kind for el in inps2)}\n"
                    + f"inferred possible return values: {typ}"
                )

        if len(typ) > 1:
            raise TypeError(
                "Attempted to use reduce but the function has more than one output.\n"
                + f"inferred return value: {typ}"
            )
        else:
            out = typ[0]

    if name is not None:
        morpher_name = name
    else:
        morpher_name = f"reduce:{fun.__qualname__}"
    out_type = [out]
    return py(
        fun,
        *inps2,
        out=out_type,
        name=morpher_name,
        strict=strict,
        opt=opt,
        connection=db,
    )
def morph(fun: Callable[[Tin1], Tout1], inp: Union[Tin1, Artefact[Tin1]], *, out: Optional[Encoding] = None, name: Optional[str] = None, strict: bool = True, opt: Optional[Options] = None, connection: Optional[Redis[bytes]] = None) ‑> Artefact[Tout1]

Add to workflow a one-to-one python function y = f(x).

This is syntactic sugar around py(). By default, the output type will match the input type if it can't be inferred, but it can be set to a given Encoding using the out= keyword.

Expand source code
def morph(
    fun: Callable[[Tin1], Tout1],
    inp: Union[Tin1, Artefact[Tin1]],
    *,  # noqa:DAR101,DAR201
    out: Optional[Encoding] = None,
    name: Optional[str] = None,
    strict: bool = True,
    opt: Optional[Options] = None,
    connection: Optional[Redis[bytes]] = None,
) -> Artefact[Tout1]:
    """Add to workflow a one-to-one python function `y = f(x)`.

    This is syntactic sugar around `py()`. By default, the output type will
    match the input type if it can't be inferred, but it can be set to a given
    `types.Encoding` using the `out=` keyword.
    """
    db = get_db(connection)
    inp2 = _artefact(db, inp)
    if out is None:
        try:
            typ = output_types(fun)
        except TypeError:
            typ = (inp2.kind,)

        if len(typ) > 1:
            raise TypeError(
                "Attempted to use morph but the function has more than one output.\n"
                + f"inferred return value: {typ}"
            )
        else:
            out = typ[0]

    if name is not None:
        morpher_name = name
    else:
        morpher_name = f"morph:{fun.__qualname__}"
    out_type = [out]
    return py(
        fun,
        inp2,
        out=out_type,
        name=morpher_name,
        strict=strict,
        opt=opt,
        connection=db,
    )
def template(template: _Template, data: Mapping[str, _Value], strip: bool = True, *, env: Optional[Mapping[str, str]] = None, name: Optional[str] = None, opt: Optional[Options] = None, connection: Optional[Redis[bytes]] = None) ‑> Artefact[bytes]

Fill in a template using data from artefacts.

This function takes a chevron template and fills it with the dictionary data,

funsies = f.template(template, data)
# corresponds basically to running
normal = chevron.render(template, data)

template() is a full-featured funsies function: both the template and the data can come from the database and are (as usual) lazily evaluated. Substitutions provided in the env= dictionary are expanded using environment variables.

The primary intended use of template() is the generation of input files for simulation software. For example,

# funsies
import funsies as f

g16_template = """%NProcShared={{nthreads}}
# {{functional}}/{{basis}} Symm=None {{type}}

Gaussian calculation

{{charge}} {{spin}}
{{structure}}

"""

with f.Fun():
    inp = f.template(
        g16_template,
        {
            "functional": "b3lyp",
            "basis": "6-31g",
            "type": "sp",
            "spin": 1,
            # the next two could be obtained eg from conformer generation.
            "charge": charge,
            "structure": coords,
        },
        env={"nthreads": "OMP_NUM_THREADS"},
    )
    dft_job = f.shell(
        "g16 input.com", inp={"input.com": inp}, out=["input.log", "data.chk"]
    )

Args

template
The template, either as a string or as an Artefact[str].
data
A dict[key, value] of substitutions to perform on the template. value can be any type accepted by chevrons (str but also int, bytes etc.) and/or Artefact objects containing those types.
strip
If True, substitutions will be .strip() before templating.
env
A dict[str,str] of substitutions to fill in from the environment variables of the worker process.
name
Provide an explicit name to the template.
connection
An explicit Redis connection. Not required if called within a Fun() context.
opt
An Options instance as returned by options(). Not required if called within a Fun() context.

Returns

An Artefact[bytes] object, populated with the generated template as a bytestring.

Expand source code
def template(
    template: _Template,
    data: Mapping[str, _Value],
    strip: bool = True,
    *,
    env: Optional[Mapping[str, str]] = None,
    name: Optional[str] = None,
    opt: Optional[Options] = None,
    connection: Optional[Redis[bytes]] = None,
) -> Artefact[bytes]:
    """Fill in a template using data from artefacts.

    This function takes a [chevron template](https://mustache.github.io/) and
    fills it with the dictionary ``data``,

    ```python
    funsies = f.template(template, data)
    # corresponds basically to running
    normal = chevron.render(template, data)
    ```

    ``template()`` is a full-featured funsies function: both the template and
    the data can come from the database and are (as usual) lazily evaluated.
    Substitutions provided in the ``env=`` dictionary are expanded using
    environment variables.

    The primary intended use of ``template()`` is the generation of input
    files for simulation software. For example,

    ```python
    # funsies
    import funsies as f

    g16_template = \"""%NProcShared={{nthreads}}
    # {{functional}}/{{basis}} Symm=None {{type}}

    Gaussian calculation

    {{charge}} {{spin}}
    {{structure}}

    \"""

    with f.Fun():
        inp = f.template(
            g16_template,
            {
                "functional": "b3lyp",
                "basis": "6-31g",
                "type": "sp",
                "spin": 1,
                # the next two could be obtained eg from conformer generation.
                "charge": charge,
                "structure": coords,
            },
            env={"nthreads": "OMP_NUM_THREADS"},
        )
        dft_job = f.shell(
            "g16 input.com", inp={"input.com": inp}, out=["input.log", "data.chk"]
        )
    ```

    Args:
        template: The template, either as a string or as an ``types.Artefact[str]``.
        data: A ``dict[key, value]`` of substitutions to perform on the
            template. ``value`` can be any type accepted by ``chevrons``
            (``str`` but also ``int``, ``bytes`` etc.) and/or ``types.Artefact``
            objects containing those types.
        strip: If `True`, substitutions will be ``.strip()`` before templating.
        env: A ``dict[str,str]`` of substitutions to fill in from the
            environment variables of the worker process.
        name: Provide an explicit name to the template.
        connection: An explicit Redis connection. Not required if called within a
            `Fun()` context.
        opt: An `types.Options` instance as returned by `options()`. Not
            required if called within a `Fun()` context.

    Returns:
        An `types.Artefact[bytes]` object, populated with the generated
        template as a bytestring.
    """  # noqa:D300,D301
    # Get context elements
    opt = get_options(opt)
    db = get_db(connection)

    # Make sure template is a string in the db
    if isinstance(template, bytes):
        tmp = _artefact(db, template.decode())
    else:
        tmp = _artefact(db, template)  # type:ignore

    # Make sure all substitutions are strings in the db
    args = dict()
    for key, value in data.items():
        if isinstance(value, bytes):
            value = value.decode()
        args[key] = _artefact(db, value)

    template_key = "template"
    while template_key in args:
        template_key += "_"  # append more _ until template_key is unique

    env_key = "env"
    while env_key in args:
        env_key += "_"  # append more _ until template_key is unique

    args[template_key] = tmp
    args[env_key] = _artefact(db, env)

    in_types = dict([(k, val.kind) for k, val in args.items()])

    if name is not None:
        fun_name = name
    else:
        do_strip = ""
        if strip:
            do_strip = ", stripped"
        fun_name = f"template (chevrons{do_strip})"

    def __exec(inpd: Mapping[str, Any]) -> dict[str, bytes]:
        """Substitute into template."""
        args = {}
        for key, val in inpd.items():
            if isinstance(val, bytes):
                val = val.decode()
            if isinstance(val, str) and strip and key != template_key:
                val = val.strip()
            args[key] = val

        # read template
        template = args[template_key]

        # read env variables
        if args[env_key] is not None:
            env = args[env_key]
            for key, val in env.items():
                args[key] = os.environ.get(val)

        del args[template_key]
        del args[env_key]

        return {"out": chevron.render(template, args).encode()}

    funsie = python_funsie(
        __exec, in_types, {"out": Encoding.blob}, name=fun_name, strict=True
    )
    operation = make_op(db, funsie, args, opt)
    return Artefact.grab(db, operation.out["out"])
def take(where: Artefact[T], *, strict: bool = True, connection: Optional[Redis[bytes]] = None) ‑> Union[T, Result[T]]

Take data corresponding to a given artefact from Redis.

take() returns the currently held value of pointed to by the Artefact instance where as bytes.

If strict=True (the default) and where points to an Error value, this function will raise UnwrapError. This is equivalent to running unwrap() on the return value.

However if strict=False, the return value of take() is a Result[bytes] variable, that is, either an instance of bytes or whatever Error is currently held by where.

Finally, if where does not point to a valid redis-backed Artefact an Error is returned of kind ErrorKind.Mismatch.

Args

where
Artefact pointer to data taken from the database.
strict
If False, return a value of type Result[bytes].
connection
An explicit Redis connection. Not required if called within a Fun() context.

Returns

Either bytes or Result[bytes] depending on strictness.

Raises

errors.UnwrapError: if where contains an Error and strict=True.

Expand source code
def take(
    where: Artefact[T],
    *,
    strict: bool = True,
    connection: Optional[Redis[bytes]] = None,
) -> Union[T, Result[T]]:
    """Take data corresponding to a given artefact from Redis.

    `take()` returns the currently held value of pointed to by the
    `types.Artefact` instance `where` as `bytes`.

    If `strict=True` (the default) and `where` points to an `types.Error`
    value, this function will raise `errors.UnwrapError`. This is equivalent
    to running `unwrap()` on the return value.

    However if `strict=False`, the return value of `take()` is a
    `errors.Result[bytes]` variable, that is, either an instance of `bytes` or
    whatever `types.Error` is currently held by `where`.

    Finally, if `where` does not point to a valid redis-backed
    `types.Artefact` an `errors.Error` is returned of kind
    `errors.ErrorKind.Mismatch`.

    Args:
        where: `types.Artefact` pointer to data taken from the database.
        strict: If `False`, return a value of type `errors.Result[bytes]`.
        connection: An explicit Redis connection. Not required if called
            within a `Fun()` context.

    Returns:
        Either `bytes` or `errors.Result[bytes]` depending on strictness.

    Raises:
        errors.UnwrapError:
            if `where` contains an `errors.Error` and `strict=True`.

    """
    db = get_db(connection)
    dat = get_data(db, where)
    __log_error(where.hash, dat)
    if strict:
        return unwrap(dat)
    else:
        return dat
def takeout(where: Artefact, filename: _AnyPath, *, connection: Optional[Redis[bytes]] = None) ‑> None

take() an artefact and save it to filename.

This is syntactic sugar around take(). This function is always strict.

Expand source code
def takeout(
    where: Artefact,
    filename: _AnyPath,
    *,
    connection: Optional[Redis[bytes]] = None,
) -> None:  # noqa:DAR101,DAR201
    """`take()` an artefact and save it to `filename`.

    This is syntactic sugar around `take()`. This function is always strict.
    """
    db = get_db(connection)
    dat = get_bytes(db, where)
    __log_error(where.hash, dat)
    dat = unwrap(dat)
    with open(filename, "wb") as f:
        f.write(dat)
def put(value: T, *, connection: Optional[Redis[bytes]] = None) ‑> Artefact[T]

Save data to Redis and return an Artefact.

put() explicitly saves value, a bytes or string value, to the database and return an Artefact pointing to this value.

The returned artefact's status is ArtefactStatus.const and its parent hash is root. This means that:

  • The arterfact is populated before any workflow operation is executed.
  • It has no dependencies
  • It is hashed according to content, not history.

Thus, put() is used to set input values to workflows.

Args

value
Data to be held in database. str data is encoded to bytes.
connection : optional
An explicit Redis connection. Not required if called within a Fun() context.

Returns

An Artefact instance with status const.

Expand source code
def put(
    value: T,
    *,
    connection: Optional[Redis[bytes]] = None,
) -> Artefact[T]:
    """Save data to Redis and return an Artefact.

    `put()` explicitly saves `value`, a bytes or string value, to the database
    and return an `types.Artefact` pointing to this value.

    The returned artefact's status is `types.ArtefactStatus.const` and its
    parent hash is `root`. This means that:

    - The arterfact is populated before any workflow operation is executed.
    - It has no dependencies
    - It is hashed according to content, not history.

    Thus, `put()` is used to set input values to workflows.

    Args:
        value: Data to be held in database. `str` data is encoded to `bytes`.
        connection (optional): An explicit Redis connection. Not required if
            called within a `Fun()` context.

    Returns:
        An `types.Artefact` instance with status `const`.
    """
    db = get_db(connection)
    return _artefact(db, value)
def execute(*outputs: Union[Operation, Artefact, ShellOutput], connection: Optional[Redis[bytes]] = None) ‑> None

Trigger execution of a workflow to obtain a given output.

Args

*outputs
Final artefacts or operations to be evaluated in the workflow. These objects and all of their dependencies will be executed by workers.
connection
An explicit Redis connection. Not required if called within a Fun() context.
Expand source code
def execute(
    *outputs: Union[Operation, Artefact, ShellOutput],
    connection: Optional[Redis[bytes]] = None,
) -> None:
    """Trigger execution of a workflow to obtain a given output.

    Args:
        *outputs: Final artefacts or operations to be evaluated in the
            workflow. These objects and all of their dependencies will be
            executed by workers.
        connection: An explicit Redis connection. Not required if called
            within a `Fun()` context.
    """
    # get redis
    db = get_db(connection)

    # run dag
    for el in outputs:
        start_dag_execution(db, el.hash)
def wait_for(thing: Union[ShellOutput, Artefact, Operation], timeout: Optional[float] = None, *, connection: Optional[Redis[bytes]] = None) ‑> None

Block execution until an artefact is generated or an operation is executed.

Args

thing
Artefact or operation to wait on.
timeout : optional
Number of seconds to wait for before raising an exception. If unspecified, timeout is taken to be infinite.
connection : optional
An explicit Redis connection. Not required if called within a Fun() context.

Raises

TimeoutError
if timeout is exceeded.
Expand source code
def wait_for(
    thing: Union[ShellOutput, Artefact, Operation],
    timeout: Optional[float] = None,
    *,
    connection: Optional[Redis[bytes]] = None,
) -> None:
    """Block execution until an artefact is generated or an operation is executed.

    Args:
        thing: `types.Artefact` or operation to wait on.
        timeout (optional): Number of seconds to wait for before raising an
            exception. If unspecified, timeout is taken to be infinite.
        connection (optional): An explicit Redis connection. Not required if
            called within a `Fun()` context.

    Raises:
        TimeoutError: if timeout is exceeded.
    """
    db = get_db(connection)
    if isinstance(thing, Artefact):

        def __stat() -> bool:
            return get_status(db, resolve_link(db, thing.hash)) > 0

    else:
        if isinstance(thing, Operation):
            op = thing
        else:
            op = thing.op

        def __stat() -> bool:
            return is_it_cached(db, op)

    t0 = time.time()
    while True:
        t1 = time.time()

        if __stat():
            return

        if timeout is not None:
            if t1 - t0 > timeout:
                raise TimeoutError(
                    f"waited on {shorten_hash(thing.hash)} " + f"for {t1-t0} seconds."
                )

        # avoids hitting the DB way too often
        time.sleep(0.3)
def reset(thing: Union[ShellOutput, Operation, Artefact], *, recursive: bool = True, connection: Optional[Redis[bytes]] = None) ‑> None

Reset data associated with an operation and its dependents.

This function deletes data associated with an operation or the operation generating a given artefact without actually removing it from the workflow. This is useful if an operation failed due to circumstances outside of the control of funsies, such as a non-reproducible step or worker setup error. When the workflow is executed again, all the reset() steps will be re-computed.

By default, reset() is applied recursively to all dependents of an operation.

Args

thing
Operation to reset. If an Artefact is given, its parent operation is reset().
recursive
If False, only this operation is reset; its dependents are untouched. Note that this is dangerous, as it can make non-reproducible workflows.
connection
An explicit Redis connection. Not required if called within a Fun() context.

Raises

AttributeError: when an Artefact is reset that has status ArtefactStatus.const.

Expand source code
def reset(
    thing: Union[ShellOutput, Operation, Artefact],
    *,
    recursive: bool = True,
    connection: Optional[Redis[bytes]] = None,
) -> None:
    """Reset data associated with an operation and its dependents.

    This function deletes data associated with an operation or the operation
    generating a given artefact without actually removing it from the
    workflow. This is useful if an operation failed due to circumstances
    outside of the control of `funsies`, such as a non-reproducible step or
    worker setup error. When the workflow is executed again, all the `reset()`
    steps will be re-computed.

    By default, `reset()` is applied recursively to all dependents of an
    operation.

    Args:
        thing: Operation to reset. If an `types.Artefact` is given, its parent
            operation is `reset()`.
        recursive: If False, only this operation is reset; its dependents are
            untouched. Note that this is dangerous, as it can make
            non-reproducible workflows.
        connection: An explicit Redis connection. Not required if called
            within a `Fun()` context.

    Raises:
        AttributeError:
            when an `types.Artefact` is reset that has status
            `types.ArtefactStatus.const`.
    """
    db = get_db(connection)
    if isinstance(thing, Artefact):
        h = thing.parent
        if h == "root":
            raise AttributeError("attempted to delete a const artefact.")
    else:
        h = thing.hash

    # Delete everything from the operation
    op = Operation.grab(db, h)
    for art in op.out.values():
        delete_artefact(db, art)

    if recursive:
        # and its dependencies
        for el in descendants(db, h):
            op = Operation.grab(db, el)
            for art in op.out.values():
                delete_artefact(db, art)
def get(target: str, connection: Optional[Redis[bytes]] = None) ‑> list[Union[Artefact, Funsie, Operation]]

Get object or objects that correspond to a given hash value.

get() returns a list of objects (Artefact, Operation and Funsie instances) currently on the active Redis connection that have a hash address starting with target. This function allows programatically retrieving hashes like the funsies cat command does.

Args

target
A hash or truncated hash value.
connection : optional
An explicit Redis connection. Not required if called within a Fun() context.

Returns

A list of objects with ids that start with target. Empty if no such objects exist.

Expand source code
def get(
    target: str,
    connection: Optional[Redis[bytes]] = None,
) -> list[Union[Artefact, Funsie, Operation]]:
    """Get object or objects that correspond to a given hash value.

    `get()` returns a list of objects (`Artefact`, `Operation` and `Funsie`
    instances) currently on the active Redis connection that have a hash
    address starting with `target`. This function allows programatically
    retrieving hashes like the ```funsies cat``` command does.

    Args:
        target: A hash or truncated hash value.
        connection (optional): An explicit Redis connection. Not required if
            called within a `Fun()` context.

    Returns:
        A list of objects with ids that start with `target`. Empty if no such
        objects exist.
    """
    db = get_db(connection)
    hashes = hash_load(db, target)
    out: list[Union[Artefact, Funsie, Operation]] = []
    for h in hashes:
        if db.exists(c.join(c.ARTEFACTS, h)):
            logger.debug(f"{h} is Artefact")
            out += [Artefact.grab(db, h)]

        elif db.exists(c.join(c.FUNSIES, h)):
            logger.debug(f"{h} is Funsie")
            out += [Funsie.grab(db, h)]

        elif db.exists(c.join(c.OPERATIONS, h)):
            logger.debug(f"{h} is Operation")
            out += [Operation.grab(db, h)]

        else:
            logger.debug(f"{h} does not exist")
    return out
def Fun(connection: Optional[Redis[bytes]] = None, defaults: Optional[Options] = None, cleanup: bool = False) ‑> Iterator[Redis[bytes]]

Context manager for redis connections.

Expand source code
@contextmanager
def Fun(
    connection: Optional[Redis[bytes]] = None,
    defaults: Optional[Options] = None,
    cleanup: bool = False,
) -> Iterator[Redis[bytes]]:
    """Context manager for redis connections."""
    if connection is None:
        logger.warning("Opening new redis connection with default settings...")
        url = _get_funsies_url()
        hn = _extract_hostname(url)
        connection = Redis.from_url(url, decode_responses=False)
        logger.success(f"connected to {hn}")

    if defaults is None:
        defaults = Options()

    if cleanup:
        cleanup_funsies(connection)

    _connect_stack.push(connection)
    _options_stack.push(defaults)

    # also push on rq
    # TODO maybe just use the RQ version of this?
    rq.connections.push_connection(connection)
    try:
        yield _connect_stack.top
    finally:
        popped = _connect_stack.pop()
        assert popped == connection, (
            "Unexpected Redis connection was popped off the stack. "
            "Check your Redis connection setup."
        )
        rq.connections.pop_connection()
        _ = _options_stack.pop()
def ManagedFun(nworkers: int = 1, worker_args: Optional[Sequence[str]] = None, redis_args: Optional[Sequence[str]] = None, defaults: Optional[Options] = None, directory: Optional[_AnyPath] = None) ‑> Iterator[Redis[bytes]]

Make a fully managed funsies db.

Expand source code
@contextmanager
def ManagedFun(
    nworkers: int = 1,
    worker_args: Optional[Sequence[str]] = None,
    redis_args: Optional[Sequence[str]] = None,
    defaults: Optional[Options] = None,
    directory: Optional[_AnyPath] = None,
) -> Iterator[Redis[bytes]]:
    """Make a fully managed funsies db."""
    if directory is None:
        dir = tempfile.mkdtemp()
    else:
        dir = str(directory)

    logger.debug(f"running redis-server in {dir}")

    if worker_args is not None:
        wargs = [el for el in worker_args]
    else:
        wargs = []

    if redis_args is not None:
        rargs = [el for el in redis_args]
    else:
        rargs = []

    # Start redis
    port = 16379
    url = f"redis://localhost:{port}"
    cmdline = ["redis-server"] + rargs + ["--port", f"{port}"]

    redis_server = subprocess.Popen(
        cmdline,
        cwd=dir,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )
    # TODO:CHECK that server started successfully
    time.sleep(0.1)
    logger.debug(f"redis running at {url}")

    # spawn workers
    logger.debug(f"spawning {nworkers} funsies workers")
    worker_pool = [
        subprocess.Popen(["funsies", "--url", url, "worker"] + wargs, cwd=dir)
        for i in range(nworkers)
    ]

    try:
        logger.success(f"{nworkers} workers connected to {url}")
        with Fun(Redis.from_url(url), defaults=defaults) as db:
            yield db
    finally:
        logger.debug("terminating worker pool and server")
        for w in worker_pool:
            w.kill()
            w.wait()
        # stop db
        db.shutdown()  # type:ignore
        db.connection_pool.disconnect()
        redis_server.wait()
        if directory is None:
            shutil.rmtree(dir)
        logger.success("stopping managed fun")
def options(**kwargs: Any) ‑> Options

Set operation and workflow options.

This function sets specific configuration options for an operation or a workflow that do not change hash values or cause re-execution, but do change runtime behaviour, such as job timeouts, queue selection, etc. Available options and their names are described in the entry for Options.

This function wraps the Options with layering of default values. The value of each attribute of Options is set based on:

  1. The values set by the **kwargs dictionary.

  2. The values set in the **kwargs dictionary of the enclosing Fun() context, if default=options(**kwargs) is passed to Fun().

  3. The default values in Options.

This allows layering of fairly complex runtime behaviour.

Expand source code
def options(**kwargs: Any) -> Options:
    """Set operation and workflow options.

    This function sets specific configuration options for an operation or a
    workflow that do not change hash values or cause re-execution, but do
    change runtime behaviour, such as job timeouts, queue selection, etc.
    Available options and their names are described in the entry for
    `config.Options`.

    This function wraps the `config.Options` with layering of default values.
    The value of each attribute of `config.Options` is set based on:

    1. The values set by the `**kwargs` dictionary.

    2. The values set in the `**kwargs` dictionary of the enclosing
    `funsies.Fun()` context, if `default=options(**kwargs)` is passed to
    `funsies.Fun()`.

    3. The default values in `config.Options`.

    This allows layering of fairly complex runtime behaviour.

    """
    if _options_stack.top is None:
        return Options(**kwargs)
    else:
        defaults: Options = _options_stack.top
        return replace(defaults, **kwargs)
def unwrap(it: Result[T]) ‑> ~T

Unwrap a Result type.

Unwrap Result[T] and return T. If Result[T] is of type Error, this function raises UnwrapError.

Args

it
An object of type Result[T].

Returns

The value of it with type T.

Raises

UnwrapError
Result[T] is an Error instance.
Expand source code
def unwrap(it: Result[T]) -> T:
    """Unwrap a `errors.Result` type.

    Unwrap `errors.Result[T]` and return `T`. If `errors.Result[T]` is of type
    `Error`, this function raises `errors.UnwrapError`.

    Args:
        it: An object of type `errors.Result[T]`.

    Returns:
        The value of it with type `T`.

    Raises:
        UnwrapError: `errors.Result[T]` is an `errors.Error` instance.

    """
    if isinstance(it, Error):
        raise UnwrapError(
            f"data is errored: kind={it.kind}"
            + f"\nsource={it.source}"
            + f"\ndetails={it.details}"
        )
    else:
        return it