Module funsies.debug
Helpful function for debugging workflows.
Expand source code
"""Helpful function for debugging workflows."""
from __future__ import annotations
# std
from dataclasses import asdict
import json
import os
import os.path
from typing import Any, Optional, Union
# external
from redis import Redis
# module
from ._constants import _AnyPath
from ._context import get_db
from ._funsies import Funsie, FunsieHow
from ._graph import Artefact, get_data, Operation
from ._shell import ShellOutput
from .errors import UnwrapError
from .ui import takeout
# ----------------------------------------------------------------------
# Debugging functions
def shell(  # noqa:C901
    shell_output: ShellOutput,
    directory: _AnyPath,
    connection: Optional[Redis[bytes]] = None,
) -> None:
    """Extract all the files and outputs of a shell function to a directory."""
    os.makedirs(directory, exist_ok=True)
    inp = os.path.join(directory, "input_files")
    out = os.path.join(directory, "output_files")
    db = get_db(connection)
    errors = {}
    for key, val in shell_output.inp.items():
        try:
            p = os.path.join(inp, key)
            os.makedirs(os.path.dirname(p), exist_ok=True)
            takeout(val, p, connection=db)
        except UnwrapError:
            errors[f"input:{key}"] = asdict(get_data(db, val))
    for key, val in shell_output.out.items():
        try:
            p = os.path.join(out, key)
            os.makedirs(os.path.dirname(p), exist_ok=True)
            takeout(val, p, connection=db)
        except UnwrapError:
            errors[f"output:{key}"] = asdict(get_data(db, val))
    for i in range(len(shell_output.stdouts)):
        try:
            takeout(
                shell_output.stdouts[i],
                os.path.join(directory, f"stdout{i}"),
                connection=db,
            )
        except UnwrapError:
            errors[f"stdout:{key}"] = asdict(get_data(db, val))
        try:
            takeout(
                shell_output.stderrs[i],
                os.path.join(directory, f"stderr{i}"),
                connection=db,
            )
        except UnwrapError:
            errors[f"stderr:{key}"] = asdict(get_data(db, val))
    with open(os.path.join(directory, "errors.json"), "w") as f:
        f.write(
            json.dumps(
                errors,
                sort_keys=True,
                indent=2,
            )
        )
    with open(os.path.join(directory, "operation.json"), "w") as f:
        f.write(json.dumps(asdict(shell_output.op), sort_keys=True, indent=2))
    extra = Funsie.grab(db, shell_output.op.funsie).extra
    cmds = json.loads(extra["cmds"].decode())
    env = json.loads(extra["env"].decode())
    with open(os.path.join(directory, "op.sh"), "w") as f:
        f.write("\n".join(cmds))
        f.write("\n")
    if env is not None:
        with open(os.path.join(directory, "op.env"), "w") as f:
            for key, val in env.items():
                f.write(f"{key}={val}\n")
# --------------
# Debug artefact
def artefact(
    target: Artefact, directory: _AnyPath, connection: Optional[Redis[bytes]] = None
) -> None:
    """Output content of any hash object to a file."""
    db = get_db(connection)
    os.makedirs(directory, exist_ok=True)
    with open(os.path.join(directory, "metadata.json"), "w") as f:
        f.write(json.dumps(asdict(target), sort_keys=True, indent=2))
    try:
        takeout(
            target,
            os.path.join(directory, "data"),
            connection=db,
        )
    except UnwrapError:
        # dump error to json file
        with open(os.path.join(directory, "error.json"), "w") as f:
            f.write(
                json.dumps(
                    asdict(get_data(db, target)),
                    sort_keys=True,
                    indent=2,
                )
            )
def python(
    target: Union[Operation, Artefact],
    directory: _AnyPath,
    connection: Optional[Redis[bytes]] = None,
) -> None:
    """Output content of any hash object to a file."""
    db = get_db(connection)
    if isinstance(target, Artefact):
        # Get the corresponding operation
        target = Operation.grab(db, target.parent)
        if target is None:
            raise RuntimeError(f"Operation not found at {target.parent}")
    os.makedirs(directory, exist_ok=True)
    funsie = Funsie.grab(db, target.funsie)
    inp = os.path.join(directory, "inputs")
    out = os.path.join(directory, "outputs")
    errors = {}
    if funsie.how != FunsieHow.python:
        raise RuntimeError(f"Operation is of type {funsie.how}, not a python function.")
    for key, v in target.inp.items():
        val = Artefact[Any].grab(db, v)
        try:
            p = os.path.join(inp, key)
            os.makedirs(os.path.dirname(p), exist_ok=True)
            takeout(val, p, connection=db)
        except UnwrapError:
            errors[f"input:{key}"] = asdict(get_data(db, val))
    for key, v in target.out.items():
        val = Artefact.grab(db, v)
        try:
            p = os.path.join(out, key)
            os.makedirs(os.path.dirname(p), exist_ok=True)
            takeout(val, p, connection=db)
        except UnwrapError:
            errors[f"output:{key}"] = asdict(get_data(db, val))
    with open(os.path.join(directory, "errors.json"), "w") as f:
        f.write(
            json.dumps(
                errors,
                sort_keys=True,
                indent=2,
            )
        )
    meta = {
        "what": funsie.what,
        "inp": funsie.inp,
        "out": funsie.out,
        "error_tolerant": funsie.error_tolerant,
    }
    with open(os.path.join(directory, "funsie.json"), "w") as f:
        f.write(json.dumps(meta, sort_keys=True, indent=2))
    with open(os.path.join(directory, "operation.json"), "w") as f:
        f.write(json.dumps(asdict(target), sort_keys=True, indent=2))
    # TODO
    # with open(os.path.join(directory, "function.pkl"), "wb") as f:
    #     assert funsie.aux is not None
    #     f.write(funsie.aux)
# --------------
# Debug anything
def anything(
    obj: Union[Artefact, Funsie, Operation, ShellOutput],
    output: _AnyPath,
    connection: Optional[Redis[bytes]] = None,
) -> None:
    """Debug anything really."""
    db = get_db(connection)
    if isinstance(obj, Operation):
        funsie = Funsie.grab(db, obj.funsie)
        if funsie.how == FunsieHow.shell:
            shell_output = ShellOutput(db, obj)
            shell(shell_output, output, db)
        elif funsie.how == FunsieHow.python:
            python(obj, output, db)
        else:
            raise RuntimeError()
    elif isinstance(obj, Artefact):
        artefact(obj, output, db)
    elif isinstance(obj, ShellOutput):
        shell(obj, output, db)
    else:
        raise NotImplementedError(f"Object of type {obj} cannot be debugged.")
Functions
def shell(shell_output: ShellOutput, directory: _AnyPath, connection: Optional[Redis[bytes]] = None) ‑> None- 
Extract all the files and outputs of a shell function to a directory.
Expand source code
def shell( # noqa:C901 shell_output: ShellOutput, directory: _AnyPath, connection: Optional[Redis[bytes]] = None, ) -> None: """Extract all the files and outputs of a shell function to a directory.""" os.makedirs(directory, exist_ok=True) inp = os.path.join(directory, "input_files") out = os.path.join(directory, "output_files") db = get_db(connection) errors = {} for key, val in shell_output.inp.items(): try: p = os.path.join(inp, key) os.makedirs(os.path.dirname(p), exist_ok=True) takeout(val, p, connection=db) except UnwrapError: errors[f"input:{key}"] = asdict(get_data(db, val)) for key, val in shell_output.out.items(): try: p = os.path.join(out, key) os.makedirs(os.path.dirname(p), exist_ok=True) takeout(val, p, connection=db) except UnwrapError: errors[f"output:{key}"] = asdict(get_data(db, val)) for i in range(len(shell_output.stdouts)): try: takeout( shell_output.stdouts[i], os.path.join(directory, f"stdout{i}"), connection=db, ) except UnwrapError: errors[f"stdout:{key}"] = asdict(get_data(db, val)) try: takeout( shell_output.stderrs[i], os.path.join(directory, f"stderr{i}"), connection=db, ) except UnwrapError: errors[f"stderr:{key}"] = asdict(get_data(db, val)) with open(os.path.join(directory, "errors.json"), "w") as f: f.write( json.dumps( errors, sort_keys=True, indent=2, ) ) with open(os.path.join(directory, "operation.json"), "w") as f: f.write(json.dumps(asdict(shell_output.op), sort_keys=True, indent=2)) extra = Funsie.grab(db, shell_output.op.funsie).extra cmds = json.loads(extra["cmds"].decode()) env = json.loads(extra["env"].decode()) with open(os.path.join(directory, "op.sh"), "w") as f: f.write("\n".join(cmds)) f.write("\n") if env is not None: with open(os.path.join(directory, "op.env"), "w") as f: for key, val in env.items(): f.write(f"{key}={val}\n") def artefact(target: Artefact, directory: _AnyPath, connection: Optional[Redis[bytes]] = None) ‑> None- 
Output content of any hash object to a file.
Expand source code
def artefact( target: Artefact, directory: _AnyPath, connection: Optional[Redis[bytes]] = None ) -> None: """Output content of any hash object to a file.""" db = get_db(connection) os.makedirs(directory, exist_ok=True) with open(os.path.join(directory, "metadata.json"), "w") as f: f.write(json.dumps(asdict(target), sort_keys=True, indent=2)) try: takeout( target, os.path.join(directory, "data"), connection=db, ) except UnwrapError: # dump error to json file with open(os.path.join(directory, "error.json"), "w") as f: f.write( json.dumps( asdict(get_data(db, target)), sort_keys=True, indent=2, ) ) def python(target: Union[Operation, Artefact], directory: _AnyPath, connection: Optional[Redis[bytes]] = None) ‑> None- 
Output content of any hash object to a file.
Expand source code
def python( target: Union[Operation, Artefact], directory: _AnyPath, connection: Optional[Redis[bytes]] = None, ) -> None: """Output content of any hash object to a file.""" db = get_db(connection) if isinstance(target, Artefact): # Get the corresponding operation target = Operation.grab(db, target.parent) if target is None: raise RuntimeError(f"Operation not found at {target.parent}") os.makedirs(directory, exist_ok=True) funsie = Funsie.grab(db, target.funsie) inp = os.path.join(directory, "inputs") out = os.path.join(directory, "outputs") errors = {} if funsie.how != FunsieHow.python: raise RuntimeError(f"Operation is of type {funsie.how}, not a python function.") for key, v in target.inp.items(): val = Artefact[Any].grab(db, v) try: p = os.path.join(inp, key) os.makedirs(os.path.dirname(p), exist_ok=True) takeout(val, p, connection=db) except UnwrapError: errors[f"input:{key}"] = asdict(get_data(db, val)) for key, v in target.out.items(): val = Artefact.grab(db, v) try: p = os.path.join(out, key) os.makedirs(os.path.dirname(p), exist_ok=True) takeout(val, p, connection=db) except UnwrapError: errors[f"output:{key}"] = asdict(get_data(db, val)) with open(os.path.join(directory, "errors.json"), "w") as f: f.write( json.dumps( errors, sort_keys=True, indent=2, ) ) meta = { "what": funsie.what, "inp": funsie.inp, "out": funsie.out, "error_tolerant": funsie.error_tolerant, } with open(os.path.join(directory, "funsie.json"), "w") as f: f.write(json.dumps(meta, sort_keys=True, indent=2)) with open(os.path.join(directory, "operation.json"), "w") as f: f.write(json.dumps(asdict(target), sort_keys=True, indent=2)) # TODO # with open(os.path.join(directory, "function.pkl"), "wb") as f: # assert funsie.aux is not None # f.write(funsie.aux) def anything(obj: Union[Artefact, Funsie, Operation, ShellOutput], output: _AnyPath, connection: Optional[Redis[bytes]] = None) ‑> None- 
Debug anything really.
Expand source code
def anything( obj: Union[Artefact, Funsie, Operation, ShellOutput], output: _AnyPath, connection: Optional[Redis[bytes]] = None, ) -> None: """Debug anything really.""" db = get_db(connection) if isinstance(obj, Operation): funsie = Funsie.grab(db, obj.funsie) if funsie.how == FunsieHow.shell: shell_output = ShellOutput(db, obj) shell(shell_output, output, db) elif funsie.how == FunsieHow.python: python(obj, output, db) else: raise RuntimeError() elif isinstance(obj, Artefact): artefact(obj, output, db) elif isinstance(obj, ShellOutput): shell(obj, output, db) else: raise NotImplementedError(f"Object of type {obj} cannot be debugged.")