Module funsies.config
Configuration dictionaries for jobs.
Expand source code
"""Configuration dictionaries for jobs."""
# std
from dataclasses import asdict, dataclass
import json
import os
from typing import Any, Mapping, Optional, Type
# Constants
INFINITE = -1
ONE_DAY = 86400
ONE_MINUTE = 60
def _get_funsies_url(url: Optional[str] = None) -> str:
"""Get the default funsies URL."""
if url is not None:
return url
else:
try:
default = os.environ["FUNSIES_URL"]
except KeyError:
default = "redis://localhost:6379"
return default
def _extract_hostname(url: str) -> str:
"""Get the hostname part of the url."""
if "@" in url:
hn = url.split("@")[-1]
else:
hn = url.split("//")[-1]
return hn
@dataclass
class Options:
"""Runtime options for an Operation.
This is a class that basically contains all the random options that may
need to be set when building a workflow, such as timeouts, heterogeneous
compute etc. It should generally be instantiated using the
`funsies.options()` function.
"""
timeout: int = INFINITE
"""Max execution time for this operation, in seconds or -1 for an operation
that never timeouts. Defaults to -1."""
queue: str = "default"
"""Defines which queue this operation should be executed by. For example,
if a complex workflow requires GPUs for certain jobs, those jobs would be
setup with option `queue="gpu"` and workers on the nodes with available
GPUs would be instantiated with `funsies worker gpu`. Then, only worker
processes in the GPU queue would execute the GPU jobs."""
distributed: bool = True
"""If False, jobs are executed by the local enqueuing process. Used to
test workflows without having to start workers."""
reset: bool = False
"""If `True`, this operation is `funsies.reset()` when generated."""
evaluate: bool = True
"""If False, calling `funsies.execute()` on this job or its dependencies will fail.
Can be used to ensure a specific branch is never executed."""
ttl: int = ONE_DAY
"""Time to live (ttl) in queue for the operation. Defaults to 24h. Equivalent
to the [rq keyword with the same name](https://python-rq.org/docs/). """
result_ttl: int = ONE_MINUTE
"""Time to live (ttl) in queue for the rq result objects. Defaults to one
minute. Equivalent to the [rq keyword with the same
name](https://python-rq.org/docs/). (Note that this has nothing to do with
the actual data results.) """
failure_ttl: int = ONE_DAY
"""Time to live (ttl) in queue for the rq result objects of failing jobs.
Defaults to one day. Equivalent to the [rq keyword with the same
name](https://python-rq.org/docs/). (Note that this has nothing to do with
the actual data results.) """
# TODO: make meaningfully adjustable
serializer: str = "rq.serializers.JSONSerializer"
@property
def job_args(self: "Options") -> Mapping[str, Any]:
"""Return a dictionary of arguments for rq.enqueue's job_args."""
return dict(
timeout=self.timeout,
ttl=self.ttl,
result_ttl=self.result_ttl,
failure_ttl=self.failure_ttl,
)
@property
def task_args(self: "Options") -> Mapping[str, Any]:
"""Return a dictionary of arguments for dag.task()."""
return dict(
evaluate=self.evaluate,
)
@property
def queue_args(self: "Options") -> Mapping[str, Any]:
"""Return a dictionary of arguments for rq.Queue."""
return dict(is_async=self.distributed, serializer=self.serializer)
def pack(self: "Options") -> str:
"""Pack an Options instance to a bytestring."""
return json.dumps(asdict(self))
@classmethod
def unpack(cls: Type["Options"], data: str) -> "Options":
"""Unpack an Options instance from a byte string."""
return Options(**json.loads(data))
Classes
class Options (timeout: int = -1, queue: str = 'default', distributed: bool = True, reset: bool = False, evaluate: bool = True, ttl: int = 86400, result_ttl: int = 60, failure_ttl: int = 86400, serializer: str = 'rq.serializers.JSONSerializer')
-
Runtime options for an Operation.
This is a class that basically contains all the random options that may need to be set when building a workflow, such as timeouts, heterogeneous compute etc. It should generally be instantiated using the
options()
function.Expand source code
@dataclass class Options: """Runtime options for an Operation. This is a class that basically contains all the random options that may need to be set when building a workflow, such as timeouts, heterogeneous compute etc. It should generally be instantiated using the `funsies.options()` function. """ timeout: int = INFINITE """Max execution time for this operation, in seconds or -1 for an operation that never timeouts. Defaults to -1.""" queue: str = "default" """Defines which queue this operation should be executed by. For example, if a complex workflow requires GPUs for certain jobs, those jobs would be setup with option `queue="gpu"` and workers on the nodes with available GPUs would be instantiated with `funsies worker gpu`. Then, only worker processes in the GPU queue would execute the GPU jobs.""" distributed: bool = True """If False, jobs are executed by the local enqueuing process. Used to test workflows without having to start workers.""" reset: bool = False """If `True`, this operation is `funsies.reset()` when generated.""" evaluate: bool = True """If False, calling `funsies.execute()` on this job or its dependencies will fail. Can be used to ensure a specific branch is never executed.""" ttl: int = ONE_DAY """Time to live (ttl) in queue for the operation. Defaults to 24h. Equivalent to the [rq keyword with the same name](https://python-rq.org/docs/). """ result_ttl: int = ONE_MINUTE """Time to live (ttl) in queue for the rq result objects. Defaults to one minute. Equivalent to the [rq keyword with the same name](https://python-rq.org/docs/). (Note that this has nothing to do with the actual data results.) """ failure_ttl: int = ONE_DAY """Time to live (ttl) in queue for the rq result objects of failing jobs. Defaults to one day. Equivalent to the [rq keyword with the same name](https://python-rq.org/docs/). (Note that this has nothing to do with the actual data results.) """ # TODO: make meaningfully adjustable serializer: str = "rq.serializers.JSONSerializer" @property def job_args(self: "Options") -> Mapping[str, Any]: """Return a dictionary of arguments for rq.enqueue's job_args.""" return dict( timeout=self.timeout, ttl=self.ttl, result_ttl=self.result_ttl, failure_ttl=self.failure_ttl, ) @property def task_args(self: "Options") -> Mapping[str, Any]: """Return a dictionary of arguments for dag.task().""" return dict( evaluate=self.evaluate, ) @property def queue_args(self: "Options") -> Mapping[str, Any]: """Return a dictionary of arguments for rq.Queue.""" return dict(is_async=self.distributed, serializer=self.serializer) def pack(self: "Options") -> str: """Pack an Options instance to a bytestring.""" return json.dumps(asdict(self)) @classmethod def unpack(cls: Type["Options"], data: str) -> "Options": """Unpack an Options instance from a byte string.""" return Options(**json.loads(data))
Class variables
var timeout : int
-
Max execution time for this operation, in seconds or -1 for an operation that never timeouts. Defaults to -1.
var queue : str
-
Defines which queue this operation should be executed by. For example, if a complex workflow requires GPUs for certain jobs, those jobs would be setup with option
queue="gpu"
and workers on the nodes with available GPUs would be instantiated withfunsies worker gpu
. Then, only worker processes in the GPU queue would execute the GPU jobs. var distributed : bool
-
If False, jobs are executed by the local enqueuing process. Used to test workflows without having to start workers.
var reset : bool
-
If
True
, this operation isreset()
when generated. var evaluate : bool
-
If False, calling
execute()
on this job or its dependencies will fail. Can be used to ensure a specific branch is never executed. var ttl : int
-
Time to live (ttl) in queue for the operation. Defaults to 24h. Equivalent to the rq keyword with the same name.
var result_ttl : int
-
Time to live (ttl) in queue for the rq result objects. Defaults to one minute. Equivalent to the rq keyword with the same name. (Note that this has nothing to do with the actual data results.)
var failure_ttl : int
-
Time to live (ttl) in queue for the rq result objects of failing jobs. Defaults to one day. Equivalent to the rq keyword with the same name. (Note that this has nothing to do with the actual data results.)
var serializer : str
Static methods
def unpack(data: str) ‑> Options
-
Unpack an Options instance from a byte string.
Expand source code
@classmethod def unpack(cls: Type["Options"], data: str) -> "Options": """Unpack an Options instance from a byte string.""" return Options(**json.loads(data))
Instance variables
var job_args : Mapping[str, Any]
-
Return a dictionary of arguments for rq.enqueue's job_args.
Expand source code
@property def job_args(self: "Options") -> Mapping[str, Any]: """Return a dictionary of arguments for rq.enqueue's job_args.""" return dict( timeout=self.timeout, ttl=self.ttl, result_ttl=self.result_ttl, failure_ttl=self.failure_ttl, )
var task_args : Mapping[str, Any]
-
Return a dictionary of arguments for dag.task().
Expand source code
@property def task_args(self: "Options") -> Mapping[str, Any]: """Return a dictionary of arguments for dag.task().""" return dict( evaluate=self.evaluate, )
var queue_args : Mapping[str, Any]
-
Return a dictionary of arguments for rq.Queue.
Expand source code
@property def queue_args(self: "Options") -> Mapping[str, Any]: """Return a dictionary of arguments for rq.Queue.""" return dict(is_async=self.distributed, serializer=self.serializer)
Methods
def pack(self: Options) ‑> str
-
Pack an Options instance to a bytestring.
Expand source code
def pack(self: "Options") -> str: """Pack an Options instance to a bytestring.""" return json.dumps(asdict(self))