Module funsies.types

Object types.

Expand source code
"""Object types."""
# module
from ._constants import Encoding, hash_t
from ._funsies import Funsie, FunsieHow
from ._graph import Artefact, ArtefactStatus, Operation
from ._run import RunStatus
from ._shell import ShellOutput
from .config import Options
from .errors import Error, ErrorKind, Result, UnwrapError

# A simple mypy result type
Result = Result
"""see `funsies.errors.Result`."""

__all__ = [
    "Funsie",
    "FunsieHow",
    "Artefact",
    "ArtefactStatus",
    "Encoding",
    "Operation",
    "ShellOutput",
    "hash_t",
    "Error",
    "ErrorKind",
    "Result",
    "UnwrapError",
    "RunStatus",
    "Options",
]

Global variables

var Result

see Result.

Functions

def hash_t(x)
Expand source code
def new_type(x):
    return x

Classes

class Funsie (how: FunsieHow, what: str, inp: dict[str, Encoding], out: dict[str, Encoding], extra: dict[str, bytes], error_tolerant: int = 0)

A funsie is a wrapped command that can be backed up to the KV store.

A Funsie has a how, a what (a string that identifies the funsie, such as a function name or shell commands) and input and output artefact names. All of these are used to generate the hash of the Funsie instance.

Funsies also have an extra field that include auxiliary data that is not to be used in hashing the funsie, but is useful for executing it, such as a cloudpickled python function.

Expand source code
@dataclass
class Funsie:
    """A funsie is a wrapped command that can be backed up to the KV store.

    A Funsie has a `how`, a `what` (a string that identifies the funsie, such
    as a function name or shell commands) and input and output artefact names.
    All of these are used to generate the hash of the Funsie instance.

    Funsies also have an `extra` field that include auxiliary data that is not
    to be used in hashing the funsie, but is useful for executing it, such as
    a cloudpickled python function.

    """

    how: FunsieHow
    what: str
    inp: dict[str, Encoding]
    out: dict[str, Encoding]
    extra: dict[str, bytes]
    error_tolerant: int = 0
    hash: hash_t = field(init=False)

    def decode(
        self: Funsie, input_data: Mapping[str, Result[bytes]]
    ) -> dict[str, object]:
        """Decode input data according to `inp`."""
        out = {}
        for key, enc in self.inp.items():
            element = input_data[key]
            out[key] = _serdes.decode(enc, element)
            if self.error_tolerant == 0 and isinstance(out[key], Error):
                raise RuntimeError(
                    f"Decoding of input data {key} failed:\n{out[key].details}"
                )
        return out

    def put(self: Funsie, db: Redis[bytes]) -> None:
        """Save a Funsie to Redis."""
        db.hset(  # type:ignore
            join(FUNSIES, self.hash),
            mapping={
                "hash": self.hash,
                "how": int(self.how),
                "what": self.what,
                "error_tolerant": self.error_tolerant,
            },
        )
        if self.inp:
            db.hset(  # type:ignore
                join(FUNSIES, self.hash, "inp"),
                mapping=dict([(k, v.value) for k, v in self.inp.items()]),
            )
        if self.out:
            db.hset(  # type:ignore
                join(FUNSIES, self.hash, "out"),
                mapping=dict([(k, v.value) for k, v in self.out.items()]),
            )
        if self.extra:
            db.hset(  # type:ignore
                join(FUNSIES, self.hash, "extra"), mapping=self.extra  # type:ignore
            )

        # Save the hash in the quickhash db
        hash_save(db, self.hash)

    @classmethod
    def grab(cls: Type[Funsie], db: Redis[bytes], hash: hash_t) -> "Funsie":
        """Grab a Funsie from the Redis store."""
        pipe: Pipeline = db.pipeline(transaction=False)
        pipe.exists(join(FUNSIES, hash))
        pipe.hgetall(join(FUNSIES, hash))
        pipe.hgetall(join(FUNSIES, hash, "inp"))
        pipe.hgetall(join(FUNSIES, hash, "out"))
        pipe.hgetall(join(FUNSIES, hash, "extra"))
        exists, metadata, inp, out, extra = pipe.execute()

        if not exists:
            raise RuntimeError(f"No funsie at {hash}")

        return Funsie(
            how=FunsieHow(int(metadata[b"how"].decode())),
            what=metadata[b"what"].decode(),
            error_tolerant=int(metadata[b"error_tolerant"].decode()),
            inp=_artefacts(inp),
            out=_artefacts(out),
            extra=dict([(k.decode(), v) for k, v in extra.items()]),
        )

    def __str__(self: Funsie) -> str:
        """Get the string representation of a funsie."""
        # ==============================================================
        #     ALERT: DO NOT TOUCH THIS CODE WITHOUT CAREFUL THOUGHT
        # --------------------------------------------------------------
        # When hashes change, previous databases become deprecated. This
        # (will) require a change in version number!
        out = f"how={self.how}\n" + f"what={self.what}\n"
        for key in sorted(self.inp.keys()):
            out += f"input:{key} -> {self.inp[key].value}\n"
        for key in sorted(self.out.keys()):
            out += f"output:{key} -> {self.out[key].value}\n"
        out += f"error tolerant:{self.error_tolerant}\n"
        # ==============================================================
        return out

    def __post_init__(self: Funsie) -> None:
        """Calculate the hash."""
        # ==============================================================
        #     ALERT: DO NOT TOUCH THIS CODE WITHOUT CAREFUL THOUGHT
        # --------------------------------------------------------------
        # When hashes change, previous databases become deprecated. This
        # (will) require a change in version number!
        m = hashlib.sha1()
        # header
        m.update(b"funsie")
        # funsie
        m.update(str(self).encode())
        self.hash = hash_t(m.hexdigest())
        # ==============================================================

Class variables

var how : funsies._funsies.FunsieHow
var what : str
var inp : dict
var out : dict
var extra : dict
var hashNewType..new_type()
var error_tolerant : int

Static methods

def grab(db: Redis[bytes], hash: NewType..new_type()) ‑> 'Funsie'

Grab a Funsie from the Redis store.

Expand source code
@classmethod
def grab(cls: Type[Funsie], db: Redis[bytes], hash: hash_t) -> "Funsie":
    """Grab a Funsie from the Redis store."""
    pipe: Pipeline = db.pipeline(transaction=False)
    pipe.exists(join(FUNSIES, hash))
    pipe.hgetall(join(FUNSIES, hash))
    pipe.hgetall(join(FUNSIES, hash, "inp"))
    pipe.hgetall(join(FUNSIES, hash, "out"))
    pipe.hgetall(join(FUNSIES, hash, "extra"))
    exists, metadata, inp, out, extra = pipe.execute()

    if not exists:
        raise RuntimeError(f"No funsie at {hash}")

    return Funsie(
        how=FunsieHow(int(metadata[b"how"].decode())),
        what=metadata[b"what"].decode(),
        error_tolerant=int(metadata[b"error_tolerant"].decode()),
        inp=_artefacts(inp),
        out=_artefacts(out),
        extra=dict([(k.decode(), v) for k, v in extra.items()]),
    )

Methods

def decode(self: Funsie, input_data: Mapping[str, Result[bytes]]) ‑> dict

Decode input data according to inp.

Expand source code
def decode(
    self: Funsie, input_data: Mapping[str, Result[bytes]]
) -> dict[str, object]:
    """Decode input data according to `inp`."""
    out = {}
    for key, enc in self.inp.items():
        element = input_data[key]
        out[key] = _serdes.decode(enc, element)
        if self.error_tolerant == 0 and isinstance(out[key], Error):
            raise RuntimeError(
                f"Decoding of input data {key} failed:\n{out[key].details}"
            )
    return out
def put(self: Funsie, db: Redis[bytes]) ‑> None

Save a Funsie to Redis.

Expand source code
def put(self: Funsie, db: Redis[bytes]) -> None:
    """Save a Funsie to Redis."""
    db.hset(  # type:ignore
        join(FUNSIES, self.hash),
        mapping={
            "hash": self.hash,
            "how": int(self.how),
            "what": self.what,
            "error_tolerant": self.error_tolerant,
        },
    )
    if self.inp:
        db.hset(  # type:ignore
            join(FUNSIES, self.hash, "inp"),
            mapping=dict([(k, v.value) for k, v in self.inp.items()]),
        )
    if self.out:
        db.hset(  # type:ignore
            join(FUNSIES, self.hash, "out"),
            mapping=dict([(k, v.value) for k, v in self.out.items()]),
        )
    if self.extra:
        db.hset(  # type:ignore
            join(FUNSIES, self.hash, "extra"), mapping=self.extra  # type:ignore
        )

    # Save the hash in the quickhash db
    hash_save(db, self.hash)
class FunsieHow (value, names=None, *, module=None, qualname=None, type=None, start=1)

Kinds of funsies.

This enum contains the various kinds of funsies: python code, shell code, etc.

Expand source code
class FunsieHow(IntEnum):
    """Kinds of funsies.

    This enum contains the various kinds of funsies: python code, shell code,
    etc.
    """

    python = 0
    shell = 1
    subdag = 2

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.Enum

Class variables

var python
var shell
var subdag
class Artefact (hash: NewType..new_type(), parent: NewType..new_type(), kind: Encoding)

Artefacts are the main data structure.

Expand source code
@dataclass(frozen=True)
class Artefact(Generic[T]):
    """Artefacts are the main data structure."""

    hash: hash_t
    parent: hash_t
    kind: Encoding

    def put(self: Artefact[Any], db: Redis[bytes]) -> None:
        """Save an artefact to Redis."""
        data = dict(hash=self.hash, parent=self.parent, kind=self.kind.value)
        db.hset(  # type:ignore
            join(ARTEFACTS, self.hash),
            mapping=data,  # type:ignore
        )
        # Save the hash in the quickhash db
        hash_save(db, self.hash)

    @classmethod
    def grab(cls: Type[Artefact[T]], db: Redis[bytes], hash: hash_t) -> Artefact[T]:
        """Grab an artefact from the Redis store."""
        pipe: Pipeline = db.pipeline(transaction=False)
        pipe.exists(join(ARTEFACTS, hash))
        pipe.hgetall(join(ARTEFACTS, hash))
        exists, data = pipe.execute()

        if not exists:
            raise RuntimeError(f"No artefact at {hash}")

        return Artefact[T](
            hash=hash_t(data[b"hash"].decode()),
            parent=hash_t(data[b"parent"].decode()),
            kind=Encoding(data[b"kind"].decode()),
        )

Ancestors

  • typing.Generic

Class variables

var hashNewType..new_type()
var parentNewType..new_type()
var kind : funsies._constants.Encoding

Static methods

def grab(db: Redis[bytes], hash: NewType..new_type()) ‑> Artefact[T]

Grab an artefact from the Redis store.

Expand source code
@classmethod
def grab(cls: Type[Artefact[T]], db: Redis[bytes], hash: hash_t) -> Artefact[T]:
    """Grab an artefact from the Redis store."""
    pipe: Pipeline = db.pipeline(transaction=False)
    pipe.exists(join(ARTEFACTS, hash))
    pipe.hgetall(join(ARTEFACTS, hash))
    exists, data = pipe.execute()

    if not exists:
        raise RuntimeError(f"No artefact at {hash}")

    return Artefact[T](
        hash=hash_t(data[b"hash"].decode()),
        parent=hash_t(data[b"parent"].decode()),
        kind=Encoding(data[b"kind"].decode()),
    )

Methods

def put(self: Artefact[Any], db: Redis[bytes]) ‑> None

Save an artefact to Redis.

Expand source code
def put(self: Artefact[Any], db: Redis[bytes]) -> None:
    """Save an artefact to Redis."""
    data = dict(hash=self.hash, parent=self.parent, kind=self.kind.value)
    db.hset(  # type:ignore
        join(ARTEFACTS, self.hash),
        mapping=data,  # type:ignore
    )
    # Save the hash in the quickhash db
    hash_save(db, self.hash)
class ArtefactStatus (value, names=None, *, module=None, qualname=None, type=None, start=1)

Status of data associated with an artefact.

Expand source code
class ArtefactStatus(IntEnum):
    """Status of data associated with an artefact."""

    deleted = -2
    not_found = -1
    no_data = 0
    # > absent  =  artefact has been computed
    done = 1
    const = 2
    error = 3
    linked = 4

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.Enum

Class variables

var deleted
var not_found
var no_data
var done
var const
var error
var linked
class Encoding (value, names=None, *, module=None, qualname=None, type=None, start=1)

Types for data objects.

Funsies does not support a full-blown type system. For this, we defer to json's encoding of various data structures.

Expand source code
class Encoding(str, Enum):
    """Types for data objects.

    Funsies does not support a full-blown type system. For this, we defer to
    json's encoding of various data structures.
    """

    json = "json"
    blob = "blob"

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var json
var blob
class Operation (hash: NewType..new_type(), funsie: NewType..new_type(), inp: dict[str, NewType..new_type()], out: dict[str, NewType..new_type()], options: Optional[Options] = None)

An operation on data in the graph.

Expand source code
@dataclass(frozen=True)
class Operation:
    """An operation on data in the graph."""

    hash: hash_t
    funsie: hash_t
    inp: dict[str, hash_t]
    out: dict[str, hash_t]
    options: Optional[Options] = None

    # pipeline-able
    def put(self: "Operation", db: Redis[bytes]) -> None:
        """Save an operation to Redis."""
        if self.inp:
            db.hset(join(OPERATIONS, self.hash, "inp"), mapping=self.inp)  # type:ignore
        if self.out:
            db.hset(join(OPERATIONS, self.hash, "out"), mapping=self.out)  # type:ignore
        if self.options:
            db.set(join(OPERATIONS, self.hash, "options"), self.options.pack())
        db.hset(  # type:ignore
            join(OPERATIONS, self.hash),
            mapping={"funsie": self.funsie, "hash": self.hash},
        )

        # Save the hash in the quickhash db
        hash_save(db, self.hash)

    # pipelined
    @classmethod
    def grab(cls: Type["Operation"], db: Redis[bytes], hash: hash_t) -> "Operation":
        """Grab an operation from the Redis store."""
        if not db.exists(join(OPERATIONS, hash)):
            raise RuntimeError(f"No operation at {hash}")

        pipe: Pipeline = db.pipeline(transaction=False)
        pipe.hgetall(join(OPERATIONS, hash))
        pipe.hgetall(join(OPERATIONS, hash, "inp"))
        pipe.hgetall(join(OPERATIONS, hash, "out"))
        pipe.get(join(OPERATIONS, hash, "options"))
        metadata, inp, out, tmp = pipe.execute()

        if tmp is not None:
            options: Optional[Options] = Options.unpack(tmp.decode())
        else:
            options = None

        return Operation(
            hash=hash_t(metadata[b"hash"].decode()),
            funsie=hash_t(metadata[b"funsie"].decode()),
            inp=dict([(k.decode(), hash_t(v.decode())) for k, v in inp.items()]),
            out=dict([(k.decode(), hash_t(v.decode())) for k, v in out.items()]),
            options=options,
        )

Class variables

var hashNewType..new_type()
var funsieNewType..new_type()
var inp : dict
var out : dict
var options : Optional[Options]

Static methods

def grab(db: Redis[bytes], hash: NewType..new_type()) ‑> 'Operation'

Grab an operation from the Redis store.

Expand source code
@classmethod
def grab(cls: Type["Operation"], db: Redis[bytes], hash: hash_t) -> "Operation":
    """Grab an operation from the Redis store."""
    if not db.exists(join(OPERATIONS, hash)):
        raise RuntimeError(f"No operation at {hash}")

    pipe: Pipeline = db.pipeline(transaction=False)
    pipe.hgetall(join(OPERATIONS, hash))
    pipe.hgetall(join(OPERATIONS, hash, "inp"))
    pipe.hgetall(join(OPERATIONS, hash, "out"))
    pipe.get(join(OPERATIONS, hash, "options"))
    metadata, inp, out, tmp = pipe.execute()

    if tmp is not None:
        options: Optional[Options] = Options.unpack(tmp.decode())
    else:
        options = None

    return Operation(
        hash=hash_t(metadata[b"hash"].decode()),
        funsie=hash_t(metadata[b"funsie"].decode()),
        inp=dict([(k.decode(), hash_t(v.decode())) for k, v in inp.items()]),
        out=dict([(k.decode(), hash_t(v.decode())) for k, v in out.items()]),
        options=options,
    )

Methods

def put(self: "'Operation'", db: Redis[bytes]) ‑> None

Save an operation to Redis.

Expand source code
def put(self: "Operation", db: Redis[bytes]) -> None:
    """Save an operation to Redis."""
    if self.inp:
        db.hset(join(OPERATIONS, self.hash, "inp"), mapping=self.inp)  # type:ignore
    if self.out:
        db.hset(join(OPERATIONS, self.hash, "out"), mapping=self.out)  # type:ignore
    if self.options:
        db.set(join(OPERATIONS, self.hash, "options"), self.options.pack())
    db.hset(  # type:ignore
        join(OPERATIONS, self.hash),
        mapping={"funsie": self.funsie, "hash": self.hash},
    )

    # Save the hash in the quickhash db
    hash_save(db, self.hash)
class ShellOutput (store: Redis[bytes], op: Operation)

A convenience wrapper for a shell operation.

Generate a ShellOutput wrapper around a shell operation.

Expand source code
class ShellOutput:
    """A convenience wrapper for a shell operation."""

    op: Operation
    hash: hash_t
    out: dict[str, Artefact[bytes]]
    inp: dict[str, Artefact[Any]]

    def __init__(self: "ShellOutput", store: Redis[bytes], op: Operation) -> None:
        """Generate a ShellOutput wrapper around a shell operation."""
        # import the constants
        # module
        from ._shell import RETURNCODE, SPECIAL, STDERR, STDOUT

        # stuff that is the same
        self.op = op
        self.hash = op.hash

        self.out = {}
        self.n = 0
        for key, val in op.out.items():
            if SPECIAL in key:
                if RETURNCODE in key:
                    self.n += 1  # count the number of commands
            else:
                self.out[key] = Artefact[bytes].grab(store, val)

        self.inp = {}
        for key, val in op.inp.items():
            self.inp[key] = Artefact[Any].grab(store, val)

        self.stdouts = []
        self.stderrs = []
        self.returncodes = []
        for i in range(self.n):
            self.stdouts += [Artefact[bytes].grab(store, op.out[f"{STDOUT}{i}"])]
            self.stderrs += [Artefact[bytes].grab(store, op.out[f"{STDERR}{i}"])]
            self.returncodes += [Artefact[int].grab(store, op.out[f"{RETURNCODE}{i}"])]

    def __check_len(self: "ShellOutput") -> None:
        if self.n > 1:
            raise AttributeError(
                "More than one shell command are included in this run."
            )

    @property
    def returncode(self: "ShellOutput") -> Artefact[int]:
        """Return code of a shell command."""
        self.__check_len()
        return self.returncodes[0]

    @property
    def stdout(self: "ShellOutput") -> Artefact[bytes]:
        """Stdout of a shell command."""
        self.__check_len()
        return self.stdouts[0]

    @property
    def stderr(self: "ShellOutput") -> Artefact[bytes]:
        """Stderr of a shell command."""
        self.__check_len()
        return self.stderrs[0]

Class variables

var op : funsies._graph.Operation
var hashNewType..new_type()
var out : dict
var inp : dict

Instance variables

var returncode : funsies._graph.Artefact[int]

Return code of a shell command.

Expand source code
@property
def returncode(self: "ShellOutput") -> Artefact[int]:
    """Return code of a shell command."""
    self.__check_len()
    return self.returncodes[0]
var stdout : funsies._graph.Artefact[bytes]

Stdout of a shell command.

Expand source code
@property
def stdout(self: "ShellOutput") -> Artefact[bytes]:
    """Stdout of a shell command."""
    self.__check_len()
    return self.stdouts[0]
var stderr : funsies._graph.Artefact[bytes]

Stderr of a shell command.

Expand source code
@property
def stderr(self: "ShellOutput") -> Artefact[bytes]:
    """Stderr of a shell command."""
    self.__check_len()
    return self.stderrs[0]
class Error (kind: ErrorKind, source: Optional[NewType..new_type()] = None, details: Optional[str] = None)

An Error value for artefacts.

Expand source code
@dataclass
class Error:
    """An Error value for artefacts."""

    kind: ErrorKind
    source: Optional[hash_t] = None
    details: Optional[str] = None

    def put(self: "Error", db: Redis[bytes], hash: hash_t) -> None:
        """Save an Error to Redis."""
        data = dict(kind=self.kind.name)
        if self.source:
            data["source"] = str(self.source)
        if self.details:
            data["details"] = str(self.details)
        db.hset(  # type:ignore
            join(ARTEFACTS, hash, "error"),
            mapping=data,  # type:ignore
        )

    @classmethod
    def grab(cls: Type["Error"], db: Redis[bytes], hash: hash_t) -> "Error":
        """Grab an Error from the Redis store."""
        if not join(ARTEFACTS, hash, "error"):
            raise RuntimeError(f"No error for artefact at {hash}")

        data = db.hgetall(join(ARTEFACTS, hash, "error"))
        kind = ErrorKind(data[b"kind"].decode())

        # Sometimes the python boilerplate is really freaking annoying...
        tmp = data.get(b"source", None)
        if tmp is not None:
            source: Optional[hash_t] = hash_t(tmp.decode())
        else:
            source = None
        tmp = data.get(b"details", None)
        if tmp is not None:
            details: Optional[str] = tmp.decode()
        else:
            details = None

        return Error(kind, source=source, details=details)

Class variables

var kindErrorKind
var source : Optional[NewType..new_type()]
var details : Optional[str]

Static methods

def grab(db: Redis[bytes], hash: NewType..new_type()) ‑> 'Error'

Grab an Error from the Redis store.

Expand source code
@classmethod
def grab(cls: Type["Error"], db: Redis[bytes], hash: hash_t) -> "Error":
    """Grab an Error from the Redis store."""
    if not join(ARTEFACTS, hash, "error"):
        raise RuntimeError(f"No error for artefact at {hash}")

    data = db.hgetall(join(ARTEFACTS, hash, "error"))
    kind = ErrorKind(data[b"kind"].decode())

    # Sometimes the python boilerplate is really freaking annoying...
    tmp = data.get(b"source", None)
    if tmp is not None:
        source: Optional[hash_t] = hash_t(tmp.decode())
    else:
        source = None
    tmp = data.get(b"details", None)
    if tmp is not None:
        details: Optional[str] = tmp.decode()
    else:
        details = None

    return Error(kind, source=source, details=details)

Methods

def put(self: "'Error'", db: Redis[bytes], hash: NewType..new_type()) ‑> None

Save an Error to Redis.

Expand source code
def put(self: "Error", db: Redis[bytes], hash: hash_t) -> None:
    """Save an Error to Redis."""
    data = dict(kind=self.kind.name)
    if self.source:
        data["source"] = str(self.source)
    if self.details:
        data["details"] = str(self.details)
    db.hset(  # type:ignore
        join(ARTEFACTS, hash, "error"),
        mapping=data,  # type:ignore
    )
class ErrorKind (value, names=None, *, module=None, qualname=None, type=None, start=1)

Kinds of errors.

Expand source code
class ErrorKind(str, Enum):
    """Kinds of errors."""

    # db errors
    NotFound = "NotFound"
    Mismatch = "Mismatch"
    UnresolvedLink = "UnresolvedLink"
    # Type errors
    WrongType = "WrongType"
    JSONEncodingError = "JSONEncodingError"
    JSONDecodingError = "JSONDecodingError"
    UnknownEncodingError = "UnknownEncodingError"
    # Job error conditions
    MissingOutput = "MissingOutput"
    MissingInput = "MissingInput"
    ExceptionRaised = "ExceptionRaised"
    JobTimedOut = "JobTimedOut"
    NoErrorData = "NoErrorData"
    KilledBySignal = "KilledBySignal"

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var NotFound
var Mismatch
var WrongType
var JSONEncodingError
var JSONDecodingError
var UnknownEncodingError
var MissingOutput
var MissingInput
var ExceptionRaised
var JobTimedOut
var NoErrorData
var KilledBySignal
class UnwrapError (*args, **kwargs)

Exception thrown when unwrapping an error.

Expand source code
class UnwrapError(Exception):
    """Exception thrown when unwrapping an error."""

    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class RunStatus (value, names=None, *, module=None, qualname=None, type=None, start=1)

Possible status of running an operation.

Expand source code
class RunStatus(IntEnum):
    """Possible status of running an operation."""

    # <= 0 -> issue prevented running job.
    subdag_ready = -5
    delayed = -3
    unmet_dependencies = -2
    not_ready = -1
    # > 0 -> executed, can run dependents.
    executed = 1
    using_cached = 2
    input_error = 4

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.Enum

Class variables

var subdag_ready
var delayed
var unmet_dependencies
var not_ready
var executed
var using_cached
var input_error
class Options (timeout: int = -1, queue: str = 'default', distributed: bool = True, reset: bool = False, evaluate: bool = True, ttl: int = 86400, result_ttl: int = 60, failure_ttl: int = 86400, serializer: str = 'rq.serializers.JSONSerializer')

Runtime options for an Operation.

This is a class that basically contains all the random options that may need to be set when building a workflow, such as timeouts, heterogeneous compute etc. It should generally be instantiated using the options() function.

Expand source code
@dataclass
class Options:
    """Runtime options for an Operation.

    This is a class that basically contains all the random options that may
    need to be set when building a workflow, such as timeouts, heterogeneous
    compute etc. It should generally be instantiated using the
    `funsies.options()` function.

    """

    timeout: int = INFINITE
    """Max execution time for this operation, in seconds or -1 for an operation
    that never timeouts. Defaults to -1."""

    queue: str = "default"
    """Defines which queue this operation should be executed by. For example,
    if a complex workflow requires GPUs for certain jobs, those jobs would be
    setup with option `queue="gpu"` and workers on the nodes with available
    GPUs would be instantiated with `funsies worker gpu`. Then, only worker
    processes in the GPU queue would execute the GPU jobs."""

    distributed: bool = True
    """If False, jobs are executed by the local enqueuing process. Used to
    test workflows without having to start workers."""

    reset: bool = False
    """If `True`, this operation is `funsies.reset()` when generated."""

    evaluate: bool = True
    """If False, calling `funsies.execute()` on this job or its dependencies will fail.
    Can be used to ensure a specific branch is never executed."""

    ttl: int = ONE_DAY
    """Time to live (ttl) in queue for the operation. Defaults to 24h. Equivalent
    to the [rq keyword with the same name](https://python-rq.org/docs/). """

    result_ttl: int = ONE_MINUTE
    """Time to live (ttl) in queue for the rq result objects. Defaults to one
    minute. Equivalent to the [rq keyword with the same
    name](https://python-rq.org/docs/). (Note that this has nothing to do with
    the actual data results.) """

    failure_ttl: int = ONE_DAY
    """Time to live (ttl) in queue for the rq result objects of failing jobs.
    Defaults to one day. Equivalent to the [rq keyword with the same
    name](https://python-rq.org/docs/). (Note that this has nothing to do with
    the actual data results.) """

    # TODO: make meaningfully adjustable
    serializer: str = "rq.serializers.JSONSerializer"

    @property
    def job_args(self: "Options") -> Mapping[str, Any]:
        """Return a dictionary of arguments for rq.enqueue's job_args."""
        return dict(
            timeout=self.timeout,
            ttl=self.ttl,
            result_ttl=self.result_ttl,
            failure_ttl=self.failure_ttl,
        )

    @property
    def task_args(self: "Options") -> Mapping[str, Any]:
        """Return a dictionary of arguments for dag.task()."""
        return dict(
            evaluate=self.evaluate,
        )

    @property
    def queue_args(self: "Options") -> Mapping[str, Any]:
        """Return a dictionary of arguments for rq.Queue."""
        return dict(is_async=self.distributed, serializer=self.serializer)

    def pack(self: "Options") -> str:
        """Pack an Options instance to a bytestring."""
        return json.dumps(asdict(self))

    @classmethod
    def unpack(cls: Type["Options"], data: str) -> "Options":
        """Unpack an Options instance from a byte string."""
        return Options(**json.loads(data))

Class variables

var timeout : int

Max execution time for this operation, in seconds or -1 for an operation that never timeouts. Defaults to -1.

var queue : str

Defines which queue this operation should be executed by. For example, if a complex workflow requires GPUs for certain jobs, those jobs would be setup with option queue="gpu" and workers on the nodes with available GPUs would be instantiated with funsies worker gpu. Then, only worker processes in the GPU queue would execute the GPU jobs.

var distributed : bool

If False, jobs are executed by the local enqueuing process. Used to test workflows without having to start workers.

var reset : bool

If True, this operation is reset() when generated.

var evaluate : bool

If False, calling execute() on this job or its dependencies will fail. Can be used to ensure a specific branch is never executed.

var ttl : int

Time to live (ttl) in queue for the operation. Defaults to 24h. Equivalent to the rq keyword with the same name.

var result_ttl : int

Time to live (ttl) in queue for the rq result objects. Defaults to one minute. Equivalent to the rq keyword with the same name. (Note that this has nothing to do with the actual data results.)

var failure_ttl : int

Time to live (ttl) in queue for the rq result objects of failing jobs. Defaults to one day. Equivalent to the rq keyword with the same name. (Note that this has nothing to do with the actual data results.)

var serializer : str

Static methods

def unpack(data: str) ‑> Options

Unpack an Options instance from a byte string.

Expand source code
@classmethod
def unpack(cls: Type["Options"], data: str) -> "Options":
    """Unpack an Options instance from a byte string."""
    return Options(**json.loads(data))

Instance variables

var job_args : Mapping[str, Any]

Return a dictionary of arguments for rq.enqueue's job_args.

Expand source code
@property
def job_args(self: "Options") -> Mapping[str, Any]:
    """Return a dictionary of arguments for rq.enqueue's job_args."""
    return dict(
        timeout=self.timeout,
        ttl=self.ttl,
        result_ttl=self.result_ttl,
        failure_ttl=self.failure_ttl,
    )
var task_args : Mapping[str, Any]

Return a dictionary of arguments for dag.task().

Expand source code
@property
def task_args(self: "Options") -> Mapping[str, Any]:
    """Return a dictionary of arguments for dag.task()."""
    return dict(
        evaluate=self.evaluate,
    )
var queue_args : Mapping[str, Any]

Return a dictionary of arguments for rq.Queue.

Expand source code
@property
def queue_args(self: "Options") -> Mapping[str, Any]:
    """Return a dictionary of arguments for rq.Queue."""
    return dict(is_async=self.distributed, serializer=self.serializer)

Methods

def pack(self: Options) ‑> str

Pack an Options instance to a bytestring.

Expand source code
def pack(self: "Options") -> str:
    """Pack an Options instance to a bytestring."""
    return json.dumps(asdict(self))