Source code for xtp_job_control.workflows.workflow_components

"""Contains the components to create a workflow."""
import logging
import os
import re
import shutil
from collections import defaultdict
from functools import wraps
from pathlib import Path
from subprocess import PIPE, Popen
from typing import Callable, Dict, List

from noodles import schedule
from noodles.interface import PromisedObject

from ..xml_editor import (add_absolute_path_to_options, create_job_file,
                          edit_xml_file, edit_xml_job_file, edit_xml_options,
                          read_available_jobs)

# Starting logger
logger = logging.getLogger(__name__)


[docs]@schedule def call_xtp_cmd( cmd: str, workdir: str, expected_output: dict = None): """Run a bash `cmd` in the `workdir` folder. It searches for a list of `expected_output` files. """ print("running: ", cmd) if not workdir.exists(): workdir.mkdir() return run_command(cmd, workdir, expected_output)
def run_command(cmd: str, workdir: str, expected_output: dict = None): """Run a bash command using subprocess.""" with Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True, cwd=workdir.as_posix()) as p: rs = p.communicate() logger.info("RUNNING COMMAND: {}".format(cmd)) logger.info("COMMAND OUTPUT: {}".format(rs[0].decode())) logger.error("COMMAND ERROR: {}".format(rs[1].decode())) if expected_output is None: return None else: return {key: retrieve_ouput(workdir, file_name) for key, file_name in expected_output.items()} def retrieve_ouput(workdir: str, expected_file: str) -> str: """ Search for `expected_file` files in the `workdir`. """ path = workdir / expected_file if path.exists(): return path.as_posix() else: return list(p.as_posix() for p in workdir.rglob(expected_file)) def wait_dependencies(fun: Callable): """ Add an implicit dependency from `job` to the caller of this function """ @wraps(fun) def wrapper(*args, **kwargs): dependencies = kwargs.get('dependencies') if dependencies is not None: for d in dependencies: assert isinstance(d, PromisedObject) return fun(*args) return wrapper
[docs]@schedule def edit_options(options: Dict, names_xml_files: List, path_optionfiles: str) -> Dict: """ Edit a list of XML files `names_xml_files` that are located in the `path_optionfiles` using a set of user-defined `options`. """ # Replace relative path with absolute ones in the calculators options sections_to_edit = {name: options[name] for name in names_xml_files} return edit_xml_options(sections_to_edit, path_optionfiles)
@schedule def edit_path_options(path_xml: str, path_optionfiles: Path) -> str: """ replace relative path by absolute ones at `path_xml` """ return add_absolute_path_to_options(path_xml, path_optionfiles)
[docs]@schedule def create_promise_command( string: str, *args) -> str: """Use a `string` as template command and fill in the options using possible promised `args` """ return string.format(*args)
@schedule def edit_jobs_file(path: str, jobs_to_run: List): """ Run only the jobs listed in jobs_to_run """ return {path: edit_xml_job_file(path, jobs_to_run)} @schedule def run_parallel_jobs(dict_jobs: dict, dict_input: dict) -> dict: """ Run a set of jobs defined in `dict_jobs` using the options specified in dict_input. """ state = dict_input['state'] # Name of the job to run name = dict_input['name'] cmd_options = dict_input['cmd_options'] # Add command to run results = dict_jobs.copy() for key, job_info in dict_jobs.items(): input_xml = job_info[name] cmd_parallel = "xtp_parallel -e {} -f {} -o {} ".format( name, state, input_xml) # Call subprocess output = run_command( cmd_parallel + cmd_options, job_info['workdir'], expected_output=dict_input['expected_output']) # Also store the path to the workdir results[key]['job_workdir'] = job_info['workdir'] for k, val in output.items(): results[key][k] = val # Pack the state in the ouput results.update({'state': dict_input['scratch_dir'] / 'state.sql'}) return results @schedule def split_xqmultipole_calculations(input_dict: dict) -> dict: """ Split the jobs specified in xqmultipole into independent jobs. """ results = split_calculations(input_dict, 'xqmultipole_jobs') for idx, config in results.items(): workdir = config['workdir'] # Replace path to MP_FILES mp_files = input_dict['mp_files'].absolute().as_posix() # replace references inside xqmultipole.xml and job.xml options = { 'xqmultipole': {'multipoles': input_dict['system'], 'control': {'job_file': config['job'].name, 'emp_file': input_dict['mps_tab']}}, 'job': {'input': { 'replace_regex': ('MP_FILES', mp_files)}} } edited_files = edit_xml_options(options, workdir) results[idx]['xqmultipole'] = edited_files['xqmultipole'] results[idx]['job'] = edited_files['job'] return {k: v for k, v in results.items()} @schedule def split_eqm_calculations(input_dict: dict) -> dict: """ Split the jobs specified in eqm.jobs into independent jobs. """ results = split_calculations(input_dict, 'eqm_jobs') # path_optionfiles = input_dict['path_optionfiles'] for idx, config in results.items(): workdir = config['workdir'] sections = {'job_file': config['job'].as_posix()} path_file = workdir / 'eqm.xml' results[idx]['eqm'] = edit_xml_file( path_file.as_posix(), 'eqm', sections) return {k: v for k, v in results.items()} @schedule def split_iqm_calculations(input_dict: dict) -> dict: """ Split the jobs specified in iqm.jobs into independent jobs. """ results = split_calculations(input_dict, 'iqm_jobs') for idx, config in results.items(): workdir = config['workdir'] options = { 'iqm': { 'job_file': config['job'].name, } } # Make a symbolic link to the or_files os.symlink(input_dict['scratch_dir'] / "OR_FILES", workdir / "OR_FILES") edited_files = edit_xml_options(options, workdir) results[idx]['iqm'] = edited_files['iqm'] return {k: v for k, v in results.items()} @schedule def split_qmmm_calculations(input_dict: dict) -> dict: """ """ pass def split_calculations(input_dict: dict, jobs_name: str) -> dict: """ Split the jobs specified in a xml file in independent jobs that run independently. """ tmp_dir = create_workdir(input_dict['scratch_dir'], jobs_name) # Copy job dependencies to a new folder results = defaultdict(dict) for job in read_available_jobs(input_dict[jobs_name]): # identifier idx = job.find('id').text name = "{}_{}".format('job', idx) workdir = create_workdir(tmp_dir, name) results[idx]['workdir'] = workdir # Job files results[idx]['job'] = create_xml_job_file(job, workdir) # Move input option file to workdir name = input_dict['name'] shutil.copy(input_dict[name], workdir.as_posix()) return results def create_workdir(tmp_dir: Path, name: str): """ Create temporal workdir """ # create workdir for each job workdir = tmp_dir / name workdir.mkdir() return workdir def create_xml_job_file(job: object, workdir: Path) -> Path: """ Create an xml file containing a single job """ job_file = workdir / 'job.xml' create_job_file(job, job_file.as_posix()) return job_file @schedule def rename_map_file(path_file: Path, expression: str, new_val: str): """ Replace the regex `expression` with `new_val` in the `file_path`. """ regex = re.compile("MP_FILES") with open(path_file, 'r+') as f: xs = f.read() # overwrite the file with open(path_file, 'w') as f: f.write(re.sub(regex, new_val, xs)) return path_file @schedule def move_results_to_workdir(jobs: dict, names: list, workdir: Path) -> dict: """ Move all the resulting or_files to the same central location """ def collect_new_files(job, name): new_files = [] for path in (Path(x) for x in job[name]): relative = path.relative_to(job['job_workdir']) folder_dest = workdir / relative.parent os.makedirs(folder_dest.as_posix(), exist_ok=True) dst = folder_dest / path.name shutil.move(path.as_posix(), dst.as_posix()) new_files.append(dst / path.name) return new_files for k, job in jobs.items(): for name in names: if isinstance(job, dict): jobs[k][name] = collect_new_files(job, name) return jobs