Source code for jobman.engines.base_bash_engine

from pathlib import Path
import textwrap

from .base_engine import BaseEngine


[docs]class BaseBashEngine(BaseEngine): ENGINE_ENTRYPOINT_TPL = 'JOBMAN.ENTRYPOINT.sh' DEFAULT_STD_LOG_FILE_NAMES = { log_name: log_name for log_name in ['stdout', 'stderr'] } def _write_engine_entrypoint(self, job=None, extra_cfgs=None): entrypoint_content = self._generate_engine_entrypoint_content( job=job, extra_cfgs=extra_cfgs) entrypoint_path = Path( job['job_spec']['dir'], self.ENGINE_ENTRYPOINT_TPL).absolute() with entrypoint_path.open('w') as f: f.write(entrypoint_content) entrypoint_path.chmod(0o755) return str(entrypoint_path) def _generate_engine_entrypoint_content(self, job=None, extra_cfgs=None): return textwrap.dedent( ''' #!/bin/bash {preamble} pushd "{jobdir}" > /dev/null && {job_entrypoint} RESULT=$? if [ $RESULT -eq 0 ]; then touch {completed_checkpoint} else touch {failed_checkpoint} fi popd > /dev/null exit $RESULT ''' ).lstrip().format( preamble=self._generate_engine_entrypoint_preamble( job=job, extra_cfgs=extra_cfgs), jobdir=job['job_spec']['dir'], job_entrypoint=job['job_spec']['entrypoint'], completed_checkpoint=self.CHECKPOINT_FILE_NAMES['completed'], failed_checkpoint=self.CHECKPOINT_FILE_NAMES['failed'], ) def _generate_engine_entrypoint_preamble(self, job=None, extra_cfgs=None): preamble = textwrap.dedent( ''' # start preamble # engine_preamble {engine_preamble} # env_vars_for_cfg_specs {env_vars_for_cfg_specs} # end preamble ''' ).lstrip().format( engine_preamble=( self.resolve_cfg_item( key='ENGINE_PREAMBLE', spec={'default': ''}) or '' ), env_vars_for_cfg_specs=( self._generate_env_vars_for_cfg_specs( job=job, extra_cfgs=extra_cfgs) ) ) return preamble def _generate_env_vars_for_cfg_specs(self, job=None, extra_cfgs=None): resolved_cfgs = self.resolve_job_cfg_specs( job=job, extra_cfgs=extra_cfgs) env_var_blocks = [ self._kvp_to_env_var_block(kvp={'key': k, 'value': v}) for k, v in resolved_cfgs.items() if v is not None ] return "\n".join([block for block in env_var_blocks if block.strip()]) def _kvp_to_env_var_block(self, kvp=None): return textwrap.dedent( ''' read -d '' {key} << EOF {value} EOF export {key}=${key} ''' ).lstrip().format( key=kvp['key'], value=kvp['value'].lstrip() ) def _get_std_log_redirects(self, job=None): std_log_paths = self._get_std_log_paths(job=job) stdout_redirect = '' if 'stdout' in std_log_paths: stdout_redirect = '>> %s' % std_log_paths['stdout'] stderr_redirect = '' if 'stderr' in std_log_paths: stderr_redirect = '2>> %s' % std_log_paths['stderr'] return {'stdout_redirect': stdout_redirect, 'stderr_redirect': stderr_redirect} def _get_std_log_paths(self, job=None): log_file_names_w_defaults = { **self.DEFAULT_STD_LOG_FILE_NAMES, **(job['job_spec'].get('std_log_file_names') or {}) } return { log_key: ( str(Path(job['job_spec']['dir'], log_file_name).absolute()) ) for log_key, log_file_name in log_file_names_w_defaults.items() } def _get_std_log_contents(self, job=None): std_log_contents = {} for log_name, log_path in self._get_std_log_paths(job=job).items(): log_content = '' try: with open(log_path) as f: log_content = f.read() except Exception as exc: log_content = "COULD NOT READ LOG '{log_name}': {exc}'".format( log_name=log_name, exc=exc) std_log_contents[log_name] = log_content return std_log_contents