jobman.dao package

Submodules

jobman.dao.engine_sqlite_dao module

class jobman.dao.engine_sqlite_dao.EngineSqliteDAO(table_prefix=None, extra_job_fields=None, **kwargs)[source]

Bases: jobman.dao.jobs_dao_mixin.JobsDaoMixin, jobman.dao.sqlite_dao.SqliteDAO

class JobSchema[source]

Bases: jobman.dao.utils.TimestampedSchemaMixin, jobman.dao.utils.Schema

Fields for job records.

key = {'type': 'TEXT', 'primary_key': True, 'default': <function generate_key>}
status = {'type': 'TEXT'}

jobman.dao.jobman_sqlite_dao module

class jobman.dao.jobman_sqlite_dao.JobmanSqliteDAO(lock_timeout=30, debug=None, **kwargs)[source]

Bases: jobman.dao.jobs_dao_mixin.JobsDaoMixin, jobman.dao.sqlite_dao.SqliteDAO

DAO for JobMan

class JobSchema[source]

Bases: jobman.dao.utils.TimestampedSchemaMixin, jobman.dao.utils.Schema

Fields for job records.

batchable = {'type': 'INTEGER'}

Flag to indicate whether a job can be included in a batch.

errors = {'type': 'JSON'}
job_spec = {'type': 'JSON'}

dict of job metadata/parameters.

A job_spec often includes these items:

batchable
Whether a job can be included in a batch.
dir
Absolute path to job_dir.
cfg
Extra cfg values to provide to cfg resolvers.
resources
Resources job requires.
key = {'type': 'TEXT', 'primary_key': True, 'default': <function generate_key>}
purgeable = {'type': 'INTEGER'}

Whether a job can be purged from the db.

source_key = {'type': 'TEXT'}

Key for source that provided this job.

source_meta = {'type': 'JSON'}

Metadata from source, to allow source to correlated jobman records with its own records.

source_tag = {'type': 'TEXT'}

A tag that the source can set. This is often useful for sources which want to do some sort of post-processing before marking a job as complete.

status = {'type': 'TEXT'}
worker_key = {'type': 'TEXT'}

Key for worker handling the job.

worker_meta = {'type': 'JSON'}

Metadata to retrieve related state from worker.

generate_source_key_filter(source_key=None)[source]

jobman.dao.jobs_dao_mixin module

class jobman.dao.jobs_dao_mixin.JobsDaoMixin[source]

Bases: object

Common methods used by daos that work with jobs.

class JOB_STATUSES

Bases: object

CLAIMED = 'CLAIMED'
COMPLETED = 'COMPLETED'
EXECUTED = 'EXECUTED'
FAILED = 'FAILED'
PENDING = 'PENDING'
RUNNING = 'RUNNING'
UNKNOWN = 'UNKNOWN'
claim_jobs(query=None)[source]

Claim jobs which match a query.

Sets status to JOB_STATUSES.CLAIMED.

Parameters:query – A query dict per query_jobs().
Returns:a list of claimed jobs.
create_job(job=None)[source]

Create a job record.

Parameters:job – a job dict. Format of dict depends on schema for DAO.
Returns:job record from DAO.
generate_status_filter(status=None)[source]

Generate a status filter.

Parameters:status – job status
Returns:status_filter_dict
get_job(key=None)[source]

Get a job that has the given key.

Parameters:key – job key
Returns:job_record
get_jobs_for_status(status=None)[source]

Get jobs that have a given status.

Parameters:status – job status
Returns:job_records
query_jobs(query=None)[source]

query DAO for jobs.

Parameters:query – A query dict per jobman.dao.sqlite_dao.SqliteDAO.query_ents().
Returns:a list of job records matching query.
save_jobs(jobs=None, replace=True)[source]

Save jobs to DAO.

Parameters:
  • jobs – A list of job dicts.
  • replace – If True will overrwrite existing records. If False will raise errors for key collisions.
Returns:

saved jobs.

update_jobs(updates=None, query=None)[source]

Update jobs which match a query.

Parameters:query – A query dict per query_jobs().
Returns:
a dict with a ‘rowcount’ item indicating how many records were
updated.

jobman.dao.orm module

class jobman.dao.orm.ORM(name=None, fields=None, json=<module 'json' from '/Users/adorsk/anaconda/envs/jobman/lib/python3.6/json/__init__.py'>, table_prefix=None, logger=None)[source]

Bases: object

exception InsertError[source]

Bases: Exception

exception IntegrityError[source]

Bases: Exception

exception UpdateError[source]

Bases: Exception

create_table(connection=None)[source]
execute_insert_or_replace(fields=None, values=None, replace=None, connection=None)[source]
query_objects(query=None, connection=None)[source]
save_object(obj=None, connection=None, replace=True)[source]
table
update_objects(updates=None, query=None, connection=None)[source]

jobman.dao.sqlite_dao module

class jobman.dao.sqlite_dao.SqliteDAO(db_uri=':memory:', orm_specs=None, table_prefix=None, initialize=True, logger=None, include_kvp_orm=True, debug=None, sqlite=<module 'sqlite3' from '/Users/adorsk/anaconda/envs/jobman/lib/python3.6/sqlite3/__init__.py'>, orm=<module 'jobman.dao.orm' from '/Users/adorsk/projects/jobman/jobman/dao/orm.py'>)[source]

Bases: object

exception InsertError[source]

Bases: Exception

exception IntegrityError[source]

Bases: Exception

exception UpdateError[source]

Bases: Exception

connection
create_connection()[source]
create_ent(ent_type=None, ent=None)[source]
create_kvp(kvp=None)[source]
ensure_tables()[source]
flush()[source]
generate_key()[source]
get_ent(ent_type=None, key=None)[source]
get_kvp(key=None)[source]
initialize()[source]
query_ents(ent_type=None, query=None)[source]

Query ents.

Parameters:
  • ent_type
  • query

    a query dict of this form:

    {
        'filters': [
            {'field': 'some_field', 'op': '=', 'arg': 'some_arg'}
            # other filters...
        ],
        'limit': 100,
        'order_by': [
            {'field': 'some_field', 'direction': 'ASC'},
            {'field': 'some_other_field', 'direction': 'DESC'},
        ]
    }
    
query_kvps(query=None)[source]
save_ents(ent_type=None, ents=None, replace=True)[source]
save_kvps(kvps=None, replace=True)[source]
update_ents(ent_type=None, updates=None, query=None)[source]
update_kvp(key=None, new_value=None, where_prev_value=Ellipsis)[source]

jobman.dao.utils module

class jobman.dao.utils.Schema[source]

Bases: object

Base class for defining ORM schemas.

classmethod get_field_infos()[source]
class jobman.dao.utils.SchemaMeta[source]

Bases: type

Metaclass to propagate doc strings for Schema subclasses.

class jobman.dao.utils.TimestampedSchemaMixin[source]

Bases: jobman.dao.utils.Schema

Provides created and modified timestamp fields.

created = {'type': 'FLOAT', 'default': <function generate_timestamp>}

created

modified = {'type': 'FLOAT', 'auto_update': <function generate_timestamp>}

modified

jobman.dao.utils.generate_key(*args, **kwargs)[source]
jobman.dao.utils.generate_timestamp(*args, **kwargs)[source]
jobman.dao.utils.generate_timestamp_fields()[source]
jobman.dao.utils.generate_uuid(*args, **kwargs)[source]

jobman.dao.worker_sqlite_dao module

class jobman.dao.worker_sqlite_dao.WorkerSqliteDAO(table_prefix=None, extra_job_fields=None, **kwargs)[source]

Bases: jobman.dao.jobs_dao_mixin.JobsDaoMixin, jobman.dao.sqlite_dao.SqliteDAO

class JobSchema[source]

Bases: jobman.dao.utils.TimestampedSchemaMixin, jobman.dao.utils.Schema

Fields for job records.

batch_meta = {'type': 'JSON'}

Metadata for batch jobs, often includes list of subjobs

batchable = {'type': 'INTEGER'}

Flag to indicate whether a job can be included in a batch.

engine_key = {'type': 'TEXT'}

Key for engine handling the job.

engine_meta = {'type': 'JSON'}

Metadata to retrieve related state from engine.

is_batch = {'type': 'INTEGER'}

Flag to indicate whether a job is a batch job.

key = {'type': 'TEXT', 'primary_key': True, 'default': <function generate_key>}
parent_batch_key = {'type': 'Text'}

If job is part of batch, this is the parent batch job’s key.

status = {'type': 'TEXT'}