Module funsies.debug
Helpful function for debugging workflows.
Expand source code
"""Helpful function for debugging workflows."""
from __future__ import annotations
# std
from dataclasses import asdict
import json
import os
import os.path
from typing import Any, Optional, Union
# external
from redis import Redis
# module
from ._constants import _AnyPath
from ._context import get_db
from ._funsies import Funsie, FunsieHow
from ._graph import Artefact, get_data, Operation
from ._shell import ShellOutput
from .errors import UnwrapError
from .ui import takeout
# ----------------------------------------------------------------------
# Debugging functions
def shell( # noqa:C901
shell_output: ShellOutput,
directory: _AnyPath,
connection: Optional[Redis[bytes]] = None,
) -> None:
"""Extract all the files and outputs of a shell function to a directory."""
os.makedirs(directory, exist_ok=True)
inp = os.path.join(directory, "input_files")
out = os.path.join(directory, "output_files")
db = get_db(connection)
errors = {}
for key, val in shell_output.inp.items():
try:
p = os.path.join(inp, key)
os.makedirs(os.path.dirname(p), exist_ok=True)
takeout(val, p, connection=db)
except UnwrapError:
errors[f"input:{key}"] = asdict(get_data(db, val))
for key, val in shell_output.out.items():
try:
p = os.path.join(out, key)
os.makedirs(os.path.dirname(p), exist_ok=True)
takeout(val, p, connection=db)
except UnwrapError:
errors[f"output:{key}"] = asdict(get_data(db, val))
for i in range(len(shell_output.stdouts)):
try:
takeout(
shell_output.stdouts[i],
os.path.join(directory, f"stdout{i}"),
connection=db,
)
except UnwrapError:
errors[f"stdout:{key}"] = asdict(get_data(db, val))
try:
takeout(
shell_output.stderrs[i],
os.path.join(directory, f"stderr{i}"),
connection=db,
)
except UnwrapError:
errors[f"stderr:{key}"] = asdict(get_data(db, val))
with open(os.path.join(directory, "errors.json"), "w") as f:
f.write(
json.dumps(
errors,
sort_keys=True,
indent=2,
)
)
with open(os.path.join(directory, "operation.json"), "w") as f:
f.write(json.dumps(asdict(shell_output.op), sort_keys=True, indent=2))
extra = Funsie.grab(db, shell_output.op.funsie).extra
cmds = json.loads(extra["cmds"].decode())
env = json.loads(extra["env"].decode())
with open(os.path.join(directory, "op.sh"), "w") as f:
f.write("\n".join(cmds))
f.write("\n")
if env is not None:
with open(os.path.join(directory, "op.env"), "w") as f:
for key, val in env.items():
f.write(f"{key}={val}\n")
# --------------
# Debug artefact
def artefact(
target: Artefact, directory: _AnyPath, connection: Optional[Redis[bytes]] = None
) -> None:
"""Output content of any hash object to a file."""
db = get_db(connection)
os.makedirs(directory, exist_ok=True)
with open(os.path.join(directory, "metadata.json"), "w") as f:
f.write(json.dumps(asdict(target), sort_keys=True, indent=2))
try:
takeout(
target,
os.path.join(directory, "data"),
connection=db,
)
except UnwrapError:
# dump error to json file
with open(os.path.join(directory, "error.json"), "w") as f:
f.write(
json.dumps(
asdict(get_data(db, target)),
sort_keys=True,
indent=2,
)
)
def python(
target: Union[Operation, Artefact],
directory: _AnyPath,
connection: Optional[Redis[bytes]] = None,
) -> None:
"""Output content of any hash object to a file."""
db = get_db(connection)
if isinstance(target, Artefact):
# Get the corresponding operation
target = Operation.grab(db, target.parent)
if target is None:
raise RuntimeError(f"Operation not found at {target.parent}")
os.makedirs(directory, exist_ok=True)
funsie = Funsie.grab(db, target.funsie)
inp = os.path.join(directory, "inputs")
out = os.path.join(directory, "outputs")
errors = {}
if funsie.how != FunsieHow.python:
raise RuntimeError(f"Operation is of type {funsie.how}, not a python function.")
for key, v in target.inp.items():
val = Artefact[Any].grab(db, v)
try:
p = os.path.join(inp, key)
os.makedirs(os.path.dirname(p), exist_ok=True)
takeout(val, p, connection=db)
except UnwrapError:
errors[f"input:{key}"] = asdict(get_data(db, val))
for key, v in target.out.items():
val = Artefact.grab(db, v)
try:
p = os.path.join(out, key)
os.makedirs(os.path.dirname(p), exist_ok=True)
takeout(val, p, connection=db)
except UnwrapError:
errors[f"output:{key}"] = asdict(get_data(db, val))
with open(os.path.join(directory, "errors.json"), "w") as f:
f.write(
json.dumps(
errors,
sort_keys=True,
indent=2,
)
)
meta = {
"what": funsie.what,
"inp": funsie.inp,
"out": funsie.out,
"error_tolerant": funsie.error_tolerant,
}
with open(os.path.join(directory, "funsie.json"), "w") as f:
f.write(json.dumps(meta, sort_keys=True, indent=2))
with open(os.path.join(directory, "operation.json"), "w") as f:
f.write(json.dumps(asdict(target), sort_keys=True, indent=2))
# TODO
# with open(os.path.join(directory, "function.pkl"), "wb") as f:
# assert funsie.aux is not None
# f.write(funsie.aux)
# --------------
# Debug anything
def anything(
obj: Union[Artefact, Funsie, Operation, ShellOutput],
output: _AnyPath,
connection: Optional[Redis[bytes]] = None,
) -> None:
"""Debug anything really."""
db = get_db(connection)
if isinstance(obj, Operation):
funsie = Funsie.grab(db, obj.funsie)
if funsie.how == FunsieHow.shell:
shell_output = ShellOutput(db, obj)
shell(shell_output, output, db)
elif funsie.how == FunsieHow.python:
python(obj, output, db)
else:
raise RuntimeError()
elif isinstance(obj, Artefact):
artefact(obj, output, db)
elif isinstance(obj, ShellOutput):
shell(obj, output, db)
else:
raise NotImplementedError(f"Object of type {obj} cannot be debugged.")
Functions
def shell(shell_output: ShellOutput, directory: _AnyPath, connection: Optional[Redis[bytes]] = None) ‑> None
-
Extract all the files and outputs of a shell function to a directory.
Expand source code
def shell( # noqa:C901 shell_output: ShellOutput, directory: _AnyPath, connection: Optional[Redis[bytes]] = None, ) -> None: """Extract all the files and outputs of a shell function to a directory.""" os.makedirs(directory, exist_ok=True) inp = os.path.join(directory, "input_files") out = os.path.join(directory, "output_files") db = get_db(connection) errors = {} for key, val in shell_output.inp.items(): try: p = os.path.join(inp, key) os.makedirs(os.path.dirname(p), exist_ok=True) takeout(val, p, connection=db) except UnwrapError: errors[f"input:{key}"] = asdict(get_data(db, val)) for key, val in shell_output.out.items(): try: p = os.path.join(out, key) os.makedirs(os.path.dirname(p), exist_ok=True) takeout(val, p, connection=db) except UnwrapError: errors[f"output:{key}"] = asdict(get_data(db, val)) for i in range(len(shell_output.stdouts)): try: takeout( shell_output.stdouts[i], os.path.join(directory, f"stdout{i}"), connection=db, ) except UnwrapError: errors[f"stdout:{key}"] = asdict(get_data(db, val)) try: takeout( shell_output.stderrs[i], os.path.join(directory, f"stderr{i}"), connection=db, ) except UnwrapError: errors[f"stderr:{key}"] = asdict(get_data(db, val)) with open(os.path.join(directory, "errors.json"), "w") as f: f.write( json.dumps( errors, sort_keys=True, indent=2, ) ) with open(os.path.join(directory, "operation.json"), "w") as f: f.write(json.dumps(asdict(shell_output.op), sort_keys=True, indent=2)) extra = Funsie.grab(db, shell_output.op.funsie).extra cmds = json.loads(extra["cmds"].decode()) env = json.loads(extra["env"].decode()) with open(os.path.join(directory, "op.sh"), "w") as f: f.write("\n".join(cmds)) f.write("\n") if env is not None: with open(os.path.join(directory, "op.env"), "w") as f: for key, val in env.items(): f.write(f"{key}={val}\n")
def artefact(target: Artefact, directory: _AnyPath, connection: Optional[Redis[bytes]] = None) ‑> None
-
Output content of any hash object to a file.
Expand source code
def artefact( target: Artefact, directory: _AnyPath, connection: Optional[Redis[bytes]] = None ) -> None: """Output content of any hash object to a file.""" db = get_db(connection) os.makedirs(directory, exist_ok=True) with open(os.path.join(directory, "metadata.json"), "w") as f: f.write(json.dumps(asdict(target), sort_keys=True, indent=2)) try: takeout( target, os.path.join(directory, "data"), connection=db, ) except UnwrapError: # dump error to json file with open(os.path.join(directory, "error.json"), "w") as f: f.write( json.dumps( asdict(get_data(db, target)), sort_keys=True, indent=2, ) )
def python(target: Union[Operation, Artefact], directory: _AnyPath, connection: Optional[Redis[bytes]] = None) ‑> None
-
Output content of any hash object to a file.
Expand source code
def python( target: Union[Operation, Artefact], directory: _AnyPath, connection: Optional[Redis[bytes]] = None, ) -> None: """Output content of any hash object to a file.""" db = get_db(connection) if isinstance(target, Artefact): # Get the corresponding operation target = Operation.grab(db, target.parent) if target is None: raise RuntimeError(f"Operation not found at {target.parent}") os.makedirs(directory, exist_ok=True) funsie = Funsie.grab(db, target.funsie) inp = os.path.join(directory, "inputs") out = os.path.join(directory, "outputs") errors = {} if funsie.how != FunsieHow.python: raise RuntimeError(f"Operation is of type {funsie.how}, not a python function.") for key, v in target.inp.items(): val = Artefact[Any].grab(db, v) try: p = os.path.join(inp, key) os.makedirs(os.path.dirname(p), exist_ok=True) takeout(val, p, connection=db) except UnwrapError: errors[f"input:{key}"] = asdict(get_data(db, val)) for key, v in target.out.items(): val = Artefact.grab(db, v) try: p = os.path.join(out, key) os.makedirs(os.path.dirname(p), exist_ok=True) takeout(val, p, connection=db) except UnwrapError: errors[f"output:{key}"] = asdict(get_data(db, val)) with open(os.path.join(directory, "errors.json"), "w") as f: f.write( json.dumps( errors, sort_keys=True, indent=2, ) ) meta = { "what": funsie.what, "inp": funsie.inp, "out": funsie.out, "error_tolerant": funsie.error_tolerant, } with open(os.path.join(directory, "funsie.json"), "w") as f: f.write(json.dumps(meta, sort_keys=True, indent=2)) with open(os.path.join(directory, "operation.json"), "w") as f: f.write(json.dumps(asdict(target), sort_keys=True, indent=2)) # TODO # with open(os.path.join(directory, "function.pkl"), "wb") as f: # assert funsie.aux is not None # f.write(funsie.aux)
def anything(obj: Union[Artefact, Funsie, Operation, ShellOutput], output: _AnyPath, connection: Optional[Redis[bytes]] = None) ‑> None
-
Debug anything really.
Expand source code
def anything( obj: Union[Artefact, Funsie, Operation, ShellOutput], output: _AnyPath, connection: Optional[Redis[bytes]] = None, ) -> None: """Debug anything really.""" db = get_db(connection) if isinstance(obj, Operation): funsie = Funsie.grab(db, obj.funsie) if funsie.how == FunsieHow.shell: shell_output = ShellOutput(db, obj) shell(shell_output, output, db) elif funsie.how == FunsieHow.python: python(obj, output, db) else: raise RuntimeError() elif isinstance(obj, Artefact): artefact(obj, output, db) elif isinstance(obj, ShellOutput): shell(obj, output, db) else: raise NotImplementedError(f"Object of type {obj} cannot be debugged.")