Module funsies.types
Object types.
Expand source code
"""Object types."""
# module
from ._constants import Encoding, hash_t
from ._funsies import Funsie, FunsieHow
from ._graph import Artefact, ArtefactStatus, Operation
from ._run import RunStatus
from ._shell import ShellOutput
from .config import Options
from .errors import Error, ErrorKind, Result, UnwrapError
# A simple mypy result type
Result = Result
"""see `funsies.errors.Result`."""
__all__ = [
"Funsie",
"FunsieHow",
"Artefact",
"ArtefactStatus",
"Encoding",
"Operation",
"ShellOutput",
"hash_t",
"Error",
"ErrorKind",
"Result",
"UnwrapError",
"RunStatus",
"Options",
]
Global variables
var Result
-
see
Result
.
Functions
def hash_t(x)
-
Expand source code
def new_type(x): return x
Classes
class Funsie (how: FunsieHow, what: str, inp: dict[str, Encoding], out: dict[str, Encoding], extra: dict[str, bytes], error_tolerant: int = 0)
-
A funsie is a wrapped command that can be backed up to the KV store.
A Funsie has a
how
, awhat
(a string that identifies the funsie, such as a function name or shell commands) and input and output artefact names. All of these are used to generate the hash of the Funsie instance.Funsies also have an
extra
field that include auxiliary data that is not to be used in hashing the funsie, but is useful for executing it, such as a cloudpickled python function.Expand source code
@dataclass class Funsie: """A funsie is a wrapped command that can be backed up to the KV store. A Funsie has a `how`, a `what` (a string that identifies the funsie, such as a function name or shell commands) and input and output artefact names. All of these are used to generate the hash of the Funsie instance. Funsies also have an `extra` field that include auxiliary data that is not to be used in hashing the funsie, but is useful for executing it, such as a cloudpickled python function. """ how: FunsieHow what: str inp: dict[str, Encoding] out: dict[str, Encoding] extra: dict[str, bytes] error_tolerant: int = 0 hash: hash_t = field(init=False) def decode( self: Funsie, input_data: Mapping[str, Result[bytes]] ) -> dict[str, object]: """Decode input data according to `inp`.""" out = {} for key, enc in self.inp.items(): element = input_data[key] out[key] = _serdes.decode(enc, element) if self.error_tolerant == 0 and isinstance(out[key], Error): raise RuntimeError( f"Decoding of input data {key} failed:\n{out[key].details}" ) return out def put(self: Funsie, db: Redis[bytes]) -> None: """Save a Funsie to Redis.""" db.hset( # type:ignore join(FUNSIES, self.hash), mapping={ "hash": self.hash, "how": int(self.how), "what": self.what, "error_tolerant": self.error_tolerant, }, ) if self.inp: db.hset( # type:ignore join(FUNSIES, self.hash, "inp"), mapping=dict([(k, v.value) for k, v in self.inp.items()]), ) if self.out: db.hset( # type:ignore join(FUNSIES, self.hash, "out"), mapping=dict([(k, v.value) for k, v in self.out.items()]), ) if self.extra: db.hset( # type:ignore join(FUNSIES, self.hash, "extra"), mapping=self.extra # type:ignore ) # Save the hash in the quickhash db hash_save(db, self.hash) @classmethod def grab(cls: Type[Funsie], db: Redis[bytes], hash: hash_t) -> "Funsie": """Grab a Funsie from the Redis store.""" pipe: Pipeline = db.pipeline(transaction=False) pipe.exists(join(FUNSIES, hash)) pipe.hgetall(join(FUNSIES, hash)) pipe.hgetall(join(FUNSIES, hash, "inp")) pipe.hgetall(join(FUNSIES, hash, "out")) pipe.hgetall(join(FUNSIES, hash, "extra")) exists, metadata, inp, out, extra = pipe.execute() if not exists: raise RuntimeError(f"No funsie at {hash}") return Funsie( how=FunsieHow(int(metadata[b"how"].decode())), what=metadata[b"what"].decode(), error_tolerant=int(metadata[b"error_tolerant"].decode()), inp=_artefacts(inp), out=_artefacts(out), extra=dict([(k.decode(), v) for k, v in extra.items()]), ) def __str__(self: Funsie) -> str: """Get the string representation of a funsie.""" # ============================================================== # ALERT: DO NOT TOUCH THIS CODE WITHOUT CAREFUL THOUGHT # -------------------------------------------------------------- # When hashes change, previous databases become deprecated. This # (will) require a change in version number! out = f"how={self.how}\n" + f"what={self.what}\n" for key in sorted(self.inp.keys()): out += f"input:{key} -> {self.inp[key].value}\n" for key in sorted(self.out.keys()): out += f"output:{key} -> {self.out[key].value}\n" out += f"error tolerant:{self.error_tolerant}\n" # ============================================================== return out def __post_init__(self: Funsie) -> None: """Calculate the hash.""" # ============================================================== # ALERT: DO NOT TOUCH THIS CODE WITHOUT CAREFUL THOUGHT # -------------------------------------------------------------- # When hashes change, previous databases become deprecated. This # (will) require a change in version number! m = hashlib.sha1() # header m.update(b"funsie") # funsie m.update(str(self).encode()) self.hash = hash_t(m.hexdigest()) # ==============================================================
Class variables
var how : funsies._funsies.FunsieHow
var what : str
var inp : dict
var out : dict
var extra : dict
var hash : NewType.
.new_type() var error_tolerant : int
Static methods
def grab(db: Redis[bytes], hash: NewType.
.new_type() ) ‑> 'Funsie'-
Grab a Funsie from the Redis store.
Expand source code
@classmethod def grab(cls: Type[Funsie], db: Redis[bytes], hash: hash_t) -> "Funsie": """Grab a Funsie from the Redis store.""" pipe: Pipeline = db.pipeline(transaction=False) pipe.exists(join(FUNSIES, hash)) pipe.hgetall(join(FUNSIES, hash)) pipe.hgetall(join(FUNSIES, hash, "inp")) pipe.hgetall(join(FUNSIES, hash, "out")) pipe.hgetall(join(FUNSIES, hash, "extra")) exists, metadata, inp, out, extra = pipe.execute() if not exists: raise RuntimeError(f"No funsie at {hash}") return Funsie( how=FunsieHow(int(metadata[b"how"].decode())), what=metadata[b"what"].decode(), error_tolerant=int(metadata[b"error_tolerant"].decode()), inp=_artefacts(inp), out=_artefacts(out), extra=dict([(k.decode(), v) for k, v in extra.items()]), )
Methods
def decode(self: Funsie, input_data: Mapping[str, Result[bytes]]) ‑> dict
-
Decode input data according to
inp
.Expand source code
def decode( self: Funsie, input_data: Mapping[str, Result[bytes]] ) -> dict[str, object]: """Decode input data according to `inp`.""" out = {} for key, enc in self.inp.items(): element = input_data[key] out[key] = _serdes.decode(enc, element) if self.error_tolerant == 0 and isinstance(out[key], Error): raise RuntimeError( f"Decoding of input data {key} failed:\n{out[key].details}" ) return out
def put(self: Funsie, db: Redis[bytes]) ‑> None
-
Save a Funsie to Redis.
Expand source code
def put(self: Funsie, db: Redis[bytes]) -> None: """Save a Funsie to Redis.""" db.hset( # type:ignore join(FUNSIES, self.hash), mapping={ "hash": self.hash, "how": int(self.how), "what": self.what, "error_tolerant": self.error_tolerant, }, ) if self.inp: db.hset( # type:ignore join(FUNSIES, self.hash, "inp"), mapping=dict([(k, v.value) for k, v in self.inp.items()]), ) if self.out: db.hset( # type:ignore join(FUNSIES, self.hash, "out"), mapping=dict([(k, v.value) for k, v in self.out.items()]), ) if self.extra: db.hset( # type:ignore join(FUNSIES, self.hash, "extra"), mapping=self.extra # type:ignore ) # Save the hash in the quickhash db hash_save(db, self.hash)
class FunsieHow (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Kinds of funsies.
This enum contains the various kinds of funsies: python code, shell code, etc.
Expand source code
class FunsieHow(IntEnum): """Kinds of funsies. This enum contains the various kinds of funsies: python code, shell code, etc. """ python = 0 shell = 1 subdag = 2
Ancestors
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var python
var shell
var subdag
class Artefact (hash: NewType.
.new_type() , parent: NewType..new_type() , kind: Encoding)-
Artefacts are the main data structure.
Expand source code
@dataclass(frozen=True) class Artefact(Generic[T]): """Artefacts are the main data structure.""" hash: hash_t parent: hash_t kind: Encoding def put(self: Artefact[Any], db: Redis[bytes]) -> None: """Save an artefact to Redis.""" data = dict(hash=self.hash, parent=self.parent, kind=self.kind.value) db.hset( # type:ignore join(ARTEFACTS, self.hash), mapping=data, # type:ignore ) # Save the hash in the quickhash db hash_save(db, self.hash) @classmethod def grab(cls: Type[Artefact[T]], db: Redis[bytes], hash: hash_t) -> Artefact[T]: """Grab an artefact from the Redis store.""" pipe: Pipeline = db.pipeline(transaction=False) pipe.exists(join(ARTEFACTS, hash)) pipe.hgetall(join(ARTEFACTS, hash)) exists, data = pipe.execute() if not exists: raise RuntimeError(f"No artefact at {hash}") return Artefact[T]( hash=hash_t(data[b"hash"].decode()), parent=hash_t(data[b"parent"].decode()), kind=Encoding(data[b"kind"].decode()), )
Ancestors
- typing.Generic
Class variables
var hash : NewType.
.new_type() var parent : NewType.
.new_type() var kind : funsies._constants.Encoding
Static methods
def grab(db: Redis[bytes], hash: NewType.
.new_type() ) ‑> Artefact[T]-
Grab an artefact from the Redis store.
Expand source code
@classmethod def grab(cls: Type[Artefact[T]], db: Redis[bytes], hash: hash_t) -> Artefact[T]: """Grab an artefact from the Redis store.""" pipe: Pipeline = db.pipeline(transaction=False) pipe.exists(join(ARTEFACTS, hash)) pipe.hgetall(join(ARTEFACTS, hash)) exists, data = pipe.execute() if not exists: raise RuntimeError(f"No artefact at {hash}") return Artefact[T]( hash=hash_t(data[b"hash"].decode()), parent=hash_t(data[b"parent"].decode()), kind=Encoding(data[b"kind"].decode()), )
Methods
def put(self: Artefact[Any], db: Redis[bytes]) ‑> None
-
Save an artefact to Redis.
Expand source code
def put(self: Artefact[Any], db: Redis[bytes]) -> None: """Save an artefact to Redis.""" data = dict(hash=self.hash, parent=self.parent, kind=self.kind.value) db.hset( # type:ignore join(ARTEFACTS, self.hash), mapping=data, # type:ignore ) # Save the hash in the quickhash db hash_save(db, self.hash)
class ArtefactStatus (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Status of data associated with an artefact.
Expand source code
class ArtefactStatus(IntEnum): """Status of data associated with an artefact.""" deleted = -2 not_found = -1 no_data = 0 # > absent = artefact has been computed done = 1 const = 2 error = 3 linked = 4
Ancestors
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var deleted
var not_found
var no_data
var done
var const
var error
var linked
class Encoding (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Types for data objects.
Funsies does not support a full-blown type system. For this, we defer to json's encoding of various data structures.
Expand source code
class Encoding(str, Enum): """Types for data objects. Funsies does not support a full-blown type system. For this, we defer to json's encoding of various data structures. """ json = "json" blob = "blob"
Ancestors
- builtins.str
- enum.Enum
Class variables
var json
var blob
class Operation (hash: NewType.
.new_type() , funsie: NewType..new_type() , inp: dict[str, NewType..new_type() ], out: dict[str, NewType..new_type() ], options: Optional[Options] = None)-
An operation on data in the graph.
Expand source code
@dataclass(frozen=True) class Operation: """An operation on data in the graph.""" hash: hash_t funsie: hash_t inp: dict[str, hash_t] out: dict[str, hash_t] options: Optional[Options] = None # pipeline-able def put(self: "Operation", db: Redis[bytes]) -> None: """Save an operation to Redis.""" if self.inp: db.hset(join(OPERATIONS, self.hash, "inp"), mapping=self.inp) # type:ignore if self.out: db.hset(join(OPERATIONS, self.hash, "out"), mapping=self.out) # type:ignore if self.options: db.set(join(OPERATIONS, self.hash, "options"), self.options.pack()) db.hset( # type:ignore join(OPERATIONS, self.hash), mapping={"funsie": self.funsie, "hash": self.hash}, ) # Save the hash in the quickhash db hash_save(db, self.hash) # pipelined @classmethod def grab(cls: Type["Operation"], db: Redis[bytes], hash: hash_t) -> "Operation": """Grab an operation from the Redis store.""" if not db.exists(join(OPERATIONS, hash)): raise RuntimeError(f"No operation at {hash}") pipe: Pipeline = db.pipeline(transaction=False) pipe.hgetall(join(OPERATIONS, hash)) pipe.hgetall(join(OPERATIONS, hash, "inp")) pipe.hgetall(join(OPERATIONS, hash, "out")) pipe.get(join(OPERATIONS, hash, "options")) metadata, inp, out, tmp = pipe.execute() if tmp is not None: options: Optional[Options] = Options.unpack(tmp.decode()) else: options = None return Operation( hash=hash_t(metadata[b"hash"].decode()), funsie=hash_t(metadata[b"funsie"].decode()), inp=dict([(k.decode(), hash_t(v.decode())) for k, v in inp.items()]), out=dict([(k.decode(), hash_t(v.decode())) for k, v in out.items()]), options=options, )
Class variables
var hash : NewType.
.new_type() var funsie : NewType.
.new_type() var inp : dict
var out : dict
var options : Optional[Options]
Static methods
def grab(db: Redis[bytes], hash: NewType.
.new_type() ) ‑> 'Operation'-
Grab an operation from the Redis store.
Expand source code
@classmethod def grab(cls: Type["Operation"], db: Redis[bytes], hash: hash_t) -> "Operation": """Grab an operation from the Redis store.""" if not db.exists(join(OPERATIONS, hash)): raise RuntimeError(f"No operation at {hash}") pipe: Pipeline = db.pipeline(transaction=False) pipe.hgetall(join(OPERATIONS, hash)) pipe.hgetall(join(OPERATIONS, hash, "inp")) pipe.hgetall(join(OPERATIONS, hash, "out")) pipe.get(join(OPERATIONS, hash, "options")) metadata, inp, out, tmp = pipe.execute() if tmp is not None: options: Optional[Options] = Options.unpack(tmp.decode()) else: options = None return Operation( hash=hash_t(metadata[b"hash"].decode()), funsie=hash_t(metadata[b"funsie"].decode()), inp=dict([(k.decode(), hash_t(v.decode())) for k, v in inp.items()]), out=dict([(k.decode(), hash_t(v.decode())) for k, v in out.items()]), options=options, )
Methods
def put(self: "'Operation'", db: Redis[bytes]) ‑> None
-
Save an operation to Redis.
Expand source code
def put(self: "Operation", db: Redis[bytes]) -> None: """Save an operation to Redis.""" if self.inp: db.hset(join(OPERATIONS, self.hash, "inp"), mapping=self.inp) # type:ignore if self.out: db.hset(join(OPERATIONS, self.hash, "out"), mapping=self.out) # type:ignore if self.options: db.set(join(OPERATIONS, self.hash, "options"), self.options.pack()) db.hset( # type:ignore join(OPERATIONS, self.hash), mapping={"funsie": self.funsie, "hash": self.hash}, ) # Save the hash in the quickhash db hash_save(db, self.hash)
class ShellOutput (store: Redis[bytes], op: Operation)
-
A convenience wrapper for a shell operation.
Generate a ShellOutput wrapper around a shell operation.
Expand source code
class ShellOutput: """A convenience wrapper for a shell operation.""" op: Operation hash: hash_t out: dict[str, Artefact[bytes]] inp: dict[str, Artefact[Any]] def __init__(self: "ShellOutput", store: Redis[bytes], op: Operation) -> None: """Generate a ShellOutput wrapper around a shell operation.""" # import the constants # module from ._shell import RETURNCODE, SPECIAL, STDERR, STDOUT # stuff that is the same self.op = op self.hash = op.hash self.out = {} self.n = 0 for key, val in op.out.items(): if SPECIAL in key: if RETURNCODE in key: self.n += 1 # count the number of commands else: self.out[key] = Artefact[bytes].grab(store, val) self.inp = {} for key, val in op.inp.items(): self.inp[key] = Artefact[Any].grab(store, val) self.stdouts = [] self.stderrs = [] self.returncodes = [] for i in range(self.n): self.stdouts += [Artefact[bytes].grab(store, op.out[f"{STDOUT}{i}"])] self.stderrs += [Artefact[bytes].grab(store, op.out[f"{STDERR}{i}"])] self.returncodes += [Artefact[int].grab(store, op.out[f"{RETURNCODE}{i}"])] def __check_len(self: "ShellOutput") -> None: if self.n > 1: raise AttributeError( "More than one shell command are included in this run." ) @property def returncode(self: "ShellOutput") -> Artefact[int]: """Return code of a shell command.""" self.__check_len() return self.returncodes[0] @property def stdout(self: "ShellOutput") -> Artefact[bytes]: """Stdout of a shell command.""" self.__check_len() return self.stdouts[0] @property def stderr(self: "ShellOutput") -> Artefact[bytes]: """Stderr of a shell command.""" self.__check_len() return self.stderrs[0]
Class variables
var op : funsies._graph.Operation
var hash : NewType.
.new_type() var out : dict
var inp : dict
Instance variables
var returncode : funsies._graph.Artefact[int]
-
Return code of a shell command.
Expand source code
@property def returncode(self: "ShellOutput") -> Artefact[int]: """Return code of a shell command.""" self.__check_len() return self.returncodes[0]
var stdout : funsies._graph.Artefact[bytes]
-
Stdout of a shell command.
Expand source code
@property def stdout(self: "ShellOutput") -> Artefact[bytes]: """Stdout of a shell command.""" self.__check_len() return self.stdouts[0]
var stderr : funsies._graph.Artefact[bytes]
-
Stderr of a shell command.
Expand source code
@property def stderr(self: "ShellOutput") -> Artefact[bytes]: """Stderr of a shell command.""" self.__check_len() return self.stderrs[0]
class Error (kind: ErrorKind, source: Optional[NewType.
.new_type() ] = None, details: Optional[str] = None)-
An Error value for artefacts.
Expand source code
@dataclass class Error: """An Error value for artefacts.""" kind: ErrorKind source: Optional[hash_t] = None details: Optional[str] = None def put(self: "Error", db: Redis[bytes], hash: hash_t) -> None: """Save an Error to Redis.""" data = dict(kind=self.kind.name) if self.source: data["source"] = str(self.source) if self.details: data["details"] = str(self.details) db.hset( # type:ignore join(ARTEFACTS, hash, "error"), mapping=data, # type:ignore ) @classmethod def grab(cls: Type["Error"], db: Redis[bytes], hash: hash_t) -> "Error": """Grab an Error from the Redis store.""" if not join(ARTEFACTS, hash, "error"): raise RuntimeError(f"No error for artefact at {hash}") data = db.hgetall(join(ARTEFACTS, hash, "error")) kind = ErrorKind(data[b"kind"].decode()) # Sometimes the python boilerplate is really freaking annoying... tmp = data.get(b"source", None) if tmp is not None: source: Optional[hash_t] = hash_t(tmp.decode()) else: source = None tmp = data.get(b"details", None) if tmp is not None: details: Optional[str] = tmp.decode() else: details = None return Error(kind, source=source, details=details)
Class variables
var kind : ErrorKind
var source : Optional[NewType.
.new_type() ]var details : Optional[str]
Static methods
def grab(db: Redis[bytes], hash: NewType.
.new_type() ) ‑> 'Error'-
Grab an Error from the Redis store.
Expand source code
@classmethod def grab(cls: Type["Error"], db: Redis[bytes], hash: hash_t) -> "Error": """Grab an Error from the Redis store.""" if not join(ARTEFACTS, hash, "error"): raise RuntimeError(f"No error for artefact at {hash}") data = db.hgetall(join(ARTEFACTS, hash, "error")) kind = ErrorKind(data[b"kind"].decode()) # Sometimes the python boilerplate is really freaking annoying... tmp = data.get(b"source", None) if tmp is not None: source: Optional[hash_t] = hash_t(tmp.decode()) else: source = None tmp = data.get(b"details", None) if tmp is not None: details: Optional[str] = tmp.decode() else: details = None return Error(kind, source=source, details=details)
Methods
def put(self: "'Error'", db: Redis[bytes], hash: NewType.
.new_type() ) ‑> None-
Save an Error to Redis.
Expand source code
def put(self: "Error", db: Redis[bytes], hash: hash_t) -> None: """Save an Error to Redis.""" data = dict(kind=self.kind.name) if self.source: data["source"] = str(self.source) if self.details: data["details"] = str(self.details) db.hset( # type:ignore join(ARTEFACTS, hash, "error"), mapping=data, # type:ignore )
class ErrorKind (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Kinds of errors.
Expand source code
class ErrorKind(str, Enum): """Kinds of errors.""" # db errors NotFound = "NotFound" Mismatch = "Mismatch" UnresolvedLink = "UnresolvedLink" # Type errors WrongType = "WrongType" JSONEncodingError = "JSONEncodingError" JSONDecodingError = "JSONDecodingError" UnknownEncodingError = "UnknownEncodingError" # Job error conditions MissingOutput = "MissingOutput" MissingInput = "MissingInput" ExceptionRaised = "ExceptionRaised" JobTimedOut = "JobTimedOut" NoErrorData = "NoErrorData" KilledBySignal = "KilledBySignal"
Ancestors
- builtins.str
- enum.Enum
Class variables
var NotFound
var Mismatch
var UnresolvedLink
var WrongType
var JSONEncodingError
var JSONDecodingError
var UnknownEncodingError
var MissingOutput
var MissingInput
var ExceptionRaised
var JobTimedOut
var NoErrorData
var KilledBySignal
class UnwrapError (*args, **kwargs)
-
Exception thrown when unwrapping an error.
Expand source code
class UnwrapError(Exception): """Exception thrown when unwrapping an error.""" pass
Ancestors
- builtins.Exception
- builtins.BaseException
class RunStatus (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Possible status of running an operation.
Expand source code
class RunStatus(IntEnum): """Possible status of running an operation.""" # <= 0 -> issue prevented running job. subdag_ready = -5 delayed = -3 unmet_dependencies = -2 not_ready = -1 # > 0 -> executed, can run dependents. executed = 1 using_cached = 2 input_error = 4
Ancestors
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var subdag_ready
var delayed
var unmet_dependencies
var not_ready
var executed
var using_cached
var input_error
class Options (timeout: int = -1, queue: str = 'default', distributed: bool = True, reset: bool = False, evaluate: bool = True, ttl: int = 86400, result_ttl: int = 60, failure_ttl: int = 86400, serializer: str = 'rq.serializers.JSONSerializer')
-
Runtime options for an Operation.
This is a class that basically contains all the random options that may need to be set when building a workflow, such as timeouts, heterogeneous compute etc. It should generally be instantiated using the
options()
function.Expand source code
@dataclass class Options: """Runtime options for an Operation. This is a class that basically contains all the random options that may need to be set when building a workflow, such as timeouts, heterogeneous compute etc. It should generally be instantiated using the `funsies.options()` function. """ timeout: int = INFINITE """Max execution time for this operation, in seconds or -1 for an operation that never timeouts. Defaults to -1.""" queue: str = "default" """Defines which queue this operation should be executed by. For example, if a complex workflow requires GPUs for certain jobs, those jobs would be setup with option `queue="gpu"` and workers on the nodes with available GPUs would be instantiated with `funsies worker gpu`. Then, only worker processes in the GPU queue would execute the GPU jobs.""" distributed: bool = True """If False, jobs are executed by the local enqueuing process. Used to test workflows without having to start workers.""" reset: bool = False """If `True`, this operation is `funsies.reset()` when generated.""" evaluate: bool = True """If False, calling `funsies.execute()` on this job or its dependencies will fail. Can be used to ensure a specific branch is never executed.""" ttl: int = ONE_DAY """Time to live (ttl) in queue for the operation. Defaults to 24h. Equivalent to the [rq keyword with the same name](https://python-rq.org/docs/). """ result_ttl: int = ONE_MINUTE """Time to live (ttl) in queue for the rq result objects. Defaults to one minute. Equivalent to the [rq keyword with the same name](https://python-rq.org/docs/). (Note that this has nothing to do with the actual data results.) """ failure_ttl: int = ONE_DAY """Time to live (ttl) in queue for the rq result objects of failing jobs. Defaults to one day. Equivalent to the [rq keyword with the same name](https://python-rq.org/docs/). (Note that this has nothing to do with the actual data results.) """ # TODO: make meaningfully adjustable serializer: str = "rq.serializers.JSONSerializer" @property def job_args(self: "Options") -> Mapping[str, Any]: """Return a dictionary of arguments for rq.enqueue's job_args.""" return dict( timeout=self.timeout, ttl=self.ttl, result_ttl=self.result_ttl, failure_ttl=self.failure_ttl, ) @property def task_args(self: "Options") -> Mapping[str, Any]: """Return a dictionary of arguments for dag.task().""" return dict( evaluate=self.evaluate, ) @property def queue_args(self: "Options") -> Mapping[str, Any]: """Return a dictionary of arguments for rq.Queue.""" return dict(is_async=self.distributed, serializer=self.serializer) def pack(self: "Options") -> str: """Pack an Options instance to a bytestring.""" return json.dumps(asdict(self)) @classmethod def unpack(cls: Type["Options"], data: str) -> "Options": """Unpack an Options instance from a byte string.""" return Options(**json.loads(data))
Class variables
var timeout : int
-
Max execution time for this operation, in seconds or -1 for an operation that never timeouts. Defaults to -1.
var queue : str
-
Defines which queue this operation should be executed by. For example, if a complex workflow requires GPUs for certain jobs, those jobs would be setup with option
queue="gpu"
and workers on the nodes with available GPUs would be instantiated withfunsies worker gpu
. Then, only worker processes in the GPU queue would execute the GPU jobs. var distributed : bool
-
If False, jobs are executed by the local enqueuing process. Used to test workflows without having to start workers.
var reset : bool
-
If
True
, this operation isreset()
when generated. var evaluate : bool
-
If False, calling
execute()
on this job or its dependencies will fail. Can be used to ensure a specific branch is never executed. var ttl : int
-
Time to live (ttl) in queue for the operation. Defaults to 24h. Equivalent to the rq keyword with the same name.
var result_ttl : int
-
Time to live (ttl) in queue for the rq result objects. Defaults to one minute. Equivalent to the rq keyword with the same name. (Note that this has nothing to do with the actual data results.)
var failure_ttl : int
-
Time to live (ttl) in queue for the rq result objects of failing jobs. Defaults to one day. Equivalent to the rq keyword with the same name. (Note that this has nothing to do with the actual data results.)
var serializer : str
Static methods
def unpack(data: str) ‑> Options
-
Unpack an Options instance from a byte string.
Expand source code
@classmethod def unpack(cls: Type["Options"], data: str) -> "Options": """Unpack an Options instance from a byte string.""" return Options(**json.loads(data))
Instance variables
var job_args : Mapping[str, Any]
-
Return a dictionary of arguments for rq.enqueue's job_args.
Expand source code
@property def job_args(self: "Options") -> Mapping[str, Any]: """Return a dictionary of arguments for rq.enqueue's job_args.""" return dict( timeout=self.timeout, ttl=self.ttl, result_ttl=self.result_ttl, failure_ttl=self.failure_ttl, )
var task_args : Mapping[str, Any]
-
Return a dictionary of arguments for dag.task().
Expand source code
@property def task_args(self: "Options") -> Mapping[str, Any]: """Return a dictionary of arguments for dag.task().""" return dict( evaluate=self.evaluate, )
var queue_args : Mapping[str, Any]
-
Return a dictionary of arguments for rq.Queue.
Expand source code
@property def queue_args(self: "Options") -> Mapping[str, Any]: """Return a dictionary of arguments for rq.Queue.""" return dict(is_async=self.distributed, serializer=self.serializer)
Methods
def pack(self: Options) ‑> str
-
Pack an Options instance to a bytestring.
Expand source code
def pack(self: "Options") -> str: """Pack an Options instance to a bytestring.""" return json.dumps(asdict(self))