Source code for run_recipe

#!/usr/bin/python3

import pprint
import sys
import os
import shutil
import argparse
import subprocess
import traceback
import threading

from typing import Callable

# ---

import logging


# ---

import yaml

from yaml import dump
try:
    from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
    from yaml import Loader, Dumper

import pandas as pd

import numexpr

import dask
import dask.distributed
from dask.distributed import Client, LocalCluster
from dask_jobqueue import SLURMCluster


# ---

from common.logging_facilities import logi, loge, logd, setup_logging_defaults, set_logging_level

from recipe import Recipe

import extractors
import transforms
import exporters
import plots
import yaml_helper

# ---

import tag_regular_expressions as tag_regex

# import debug helper for usage in function definitions
from common.debug import start_debug # noqa: F401recipe import Recipe

from utility.code import compile_and_evaluate_function_definition

_debug = False


[docs] def evaluate_function_definitions(recipe:Recipe): r""" Evaluate the functions defined in `recipe.function_definitions` and make the compiled code available in a copy of the global environment of the interpreter. A copy of the global environment is used so as to not pollute the global namespace itself. All the defined functions share the same copy. Parameters ---------- recipe: Recipe The recipe in which the functions are to be made available. Returns ------- A shallow copy of the global environment with the compiled functions as members, i.e. a dictionary with the function names as key and the code as associated value. """ # Create a copy of the global environment for evaluating the extra code fragments so as to not pollute the global # namespace itself. # # NOTE: This is a shallow copy and thus a rather simple method to prevent accidental overwrites, *not* a defense # against deliberately malicious modifications. global_env = globals().copy() for function_name in recipe.function_definitions: # Get the string with the source code. function_code = recipe.function_definitions[function_name] # Actually evaluate the code within the given namespace to allow # access to all the defined symbols, such as helper functions that are not defined inline. function, global_env = compile_and_evaluate_function_definition(function_code, function_name, global_env) # Bind the function to the specified name. recipe.function_definitions[function_name] = function # Return the environment with the compiled functions. return global_env
[docs] def eval_recipe_tag_definitions(recipe:Recipe , attributes_regex_map, iterationvars_regex_map, parameters_regex_map , function_definitions_global_env:dict ): def eval_and_add_tags(tag_set_name, regex_map): for tag_name in recipe.evaluation.tags[tag_set_name]: # The `eval` is necessary here since the `transform` function of the tag # can be an arbitrary function and has to be parsed into a`Callable`. tag_list = eval(recipe.evaluation.tags[tag_set_name][tag_name], function_definitions_global_env) # pylint: disable=W0123:eval-used # Check that the transform is indeed a `Callable` . for tag in tag_list: if not isinstance(tag['transform'], Callable): raise RuntimeError(f'transform for {tag=} is not a Callable!') regex_map[tag_name] = tag_list if 'attributes' in recipe.evaluation.tags: eval_and_add_tags('attributes', attributes_regex_map) if 'iterationvars' in recipe.evaluation.tags: eval_and_add_tags('iterationvars', iterationvars_regex_map) if 'parameters' in recipe.evaluation.tags: eval_and_add_tags('parameters', parameters_regex_map) return attributes_regex_map, iterationvars_regex_map, parameters_regex_map
[docs] def prepare_evaluation_phase(recipe:Recipe, options, data_repo, function_definitions_global_env:dict): logi(f'prepare_evaluation_phase: {recipe} {recipe.name}') if hasattr(recipe.evaluation, 'tags'): attributes_regex_map, iterationvars_regex_map, parameters_regex_map = eval_recipe_tag_definitions(recipe \ , tag_regex.attributes_regex_map, tag_regex.iterationvars_regex_map, tag_regex.parameters_regex_map , function_definitions_global_env) else: attributes_regex_map, iterationvars_regex_map, parameters_regex_map = \ tag_regex.attributes_regex_map, tag_regex.iterationvars_regex_map, tag_regex.parameters_regex_map if not hasattr(recipe.evaluation, 'extractors'): logi('prepare_evaluation_phase: no `extractors` in recipe.Evaluation') return data_repo, [] for extractor_tuple in recipe.evaluation.extractors: extractor_name = list(extractor_tuple.keys())[0] extractor = list(extractor_tuple.values())[0] if options.run_tree and extractor_name not in options.run_tree['evaluation']['extractors']: logi(f'skipping extractor {extractor_name}') continue if extractor_name in options.extraction_overrides: extractor.input_files = [ options.extraction_overrides[extractor_name] ] logi(f'overriding {extractor_name} with {extractor.input_files}') extractor.set_tag_maps(attributes_regex_map, iterationvars_regex_map, parameters_regex_map) delayed_data = extractor.prepare() # print(f'{extractor=}') # print(f'{delayed_data.memory_usage(deep=True) = }') # print(f'-<-<-<-<-<-<-') data_repo[extractor_name] = delayed_data logi(f'added extractor {extractor_name}') if not hasattr(recipe.evaluation, 'transforms'): logi('prepare_evaluation_phase: no `transforms` in recipe.Evaluation') else: for transform_tuple in recipe.evaluation.transforms: transform_name = list(transform_tuple.keys())[0] transform = list(transform_tuple.values())[0] if options.run_tree and transform_name not in options.run_tree['evaluation']['transforms']: logi(f'skipping transform {transform_name}') continue logi(f'preparing transform {transform_name}') transform.set_name(transform_name) transform.set_data_repo(data_repo) transform.prepare() logi(f'added transform {transform_name}') jobs = [] if recipe.evaluation.exporter is None: logi('prepare_evaluation_phase: no `exporter` in recipe.Evaluation') else: for exporter_tuple in recipe.evaluation.exporter: exporter_name = list(exporter_tuple.keys())[0] exporter = list(exporter_tuple.values())[0] if options.run_tree and exporter_name not in options.run_tree['evaluation']['exporter']: logi(f'skipping exporter {exporter_name}') continue if exporter_name in options.export_overrides: exporter.output_filename = options.export_overrides[exporter_name] logi(f'overriding {exporter_name} with {exporter.output_filename}') exporter.set_data_repo(data_repo) job = exporter.prepare() jobs.extend(job) logi(f'added exporter {exporter_name}') logi(f'{jobs=}') if options.plot_task_graphs: for i in range(0, len(jobs)): graph_output_file = f'{options.tmpdir}/dask_task_graph_evaluation_job-{i}.png' jobs[i].visualize(graph_output_file) logi(f'saved the evaluation phase task graph for job {i}:{jobs[i]} to: {graph_output_file}') return data_repo, jobs
[docs] def prepare_plotting_phase(recipe:Recipe, options, data_repo): logi(f'prepare_plotting_phase: {recipe} {recipe.name}') if not hasattr(recipe.plot, 'reader'): logi('prepare_plotting_phase: no `reader` in recipe.Plot') else: for dataset_tuple in recipe.plot.reader: dataset_name = list(dataset_tuple.keys())[0] reader = list(dataset_tuple.values())[0] if options.run_tree and dataset_name not in options.run_tree['plot']['reader']: logi(f'skipping reader {dataset_name}') continue logi(f'plot: loading dataset: "{dataset_name=}"') if dataset_name in options.reader_overrides: reader.input_files = options.reader_overrides[dataset_name] logi(f'plot: prepare_plotting_phase overriding input files for "{dataset_name}": "{reader.input_files=}"') data = reader.prepare() data_repo[dataset_name] = data logi(f'added reader {dataset_name}') logd('<<<-<-<--<-<-<--<-<-<') logd(f'plot: {data_repo=}') logd('<<<-<-<--<-<-<--<-<-<') if not hasattr(recipe.plot, 'transforms'): logi('prepare_plotting_phase: no `transforms` in recipe.Plot') else: for transform_tuple in recipe.plot.transforms: transform_name = list(transform_tuple.keys())[0] transform = list(transform_tuple.values())[0] if options.run_tree and transform_name not in options.run_tree['plot']['transforms']: logi(f'skipping transform {transform_name}') continue transform.set_name(transform_name) transform.set_data_repo(data_repo) transform.prepare() logi(f'added transform {transform_name}') jobs = [] for task_tuple in recipe.plot.tasks: task_name = list(task_tuple.keys())[0] task = list(task_tuple.values())[0] if options.run_tree and task_name not in options.run_tree['plot']['tasks']: logi(f'skipping task {task_name}') continue if task_name in options.plot_overrides: task.output_file = options.plot_overrides[task_name] logi(f'overriding {task_name} with {task.output_file}') task.set_data_repo(data_repo) logi(f'plot: preparing plotting task {task_name}') job = task.prepare() # logi(f'plot: {job=}') jobs.append(job) logi(f'added task {task_name}') if options.plot_task_graphs: for i in range(0, len(jobs)): graph_output_file = f'{options.tmpdir}/dask_task_graph_plotting_job-{i}.png' jobs[i].visualize(graph_output_file) logi(f'saved the plot phase task graph for job {i}:{jobs[i]} to: {graph_output_file}') return data_repo, jobs
[docs] def process_recipe(options): f = open(options.recipe, mode='r') recipe = yaml.unsafe_load(f.read()) if options.dump_recipe or options.dump_recipe_only: output = dump(recipe, Dumper=Dumper) terminal_size = shutil.get_terminal_size() logd(pprint.pformat(output, width=terminal_size.columns)) if options.dump_recipe_only: exit() data_repo = {} job_list = [] # Compile all the functions defined in `function_definitions` and make them available in a shallow copy # of the runtime environment. if hasattr(recipe, 'function_definitions'): function_definitions_global_env = evaluate_function_definitions(recipe) else: # Or just use the default environment. function_definitions_global_env = globals() if not options.plot_only: if not hasattr(recipe, 'evaluation'): logi('process_recipe: no Evaluation in recipe') return data_repo, [] data_repo, jobs = prepare_evaluation_phase(recipe, options, data_repo, function_definitions_global_env) job_list.extend(jobs) if options.eval_only: return data_repo, job_list if not hasattr(recipe, 'plot'): logi('process_recipe: no Plot in recipe') return data_repo, job_list data_repo, jobs = prepare_plotting_phase(recipe, options, data_repo) job_list.extend(jobs) return data_repo, job_list
[docs] def extract_dict_from_string(string): return dict([ token.strip(',').split(':') for token in string.strip('{}').split(',') ])
[docs] def parse_arguments(arguments): parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('recipe', help='Input recipe') parser.add_argument('--override-extractor', type=str, help='Override extractor parameters') parser.add_argument('--override-exporter', type=str, help='Override exporter parameters') parser.add_argument('--override-reader', type=str, help='Override reader parameters') parser.add_argument('--override-plot', type=str, help='Override plot parameters') parser.add_argument('--eval-only', action='store_true', default=False, help='Run evaluation phase only') parser.add_argument('--plot-only', action='store_true', default=False, help='Run plot phase only') parser.add_argument('--run', type=str, default='all', help='Run selected tasks only, in the format [evaluation_phase_tasks]:[plot_phase_tasks]' ' where each (optional) phase consists of a comma-separated list of qualified task names,' ' i.e. the name of the sub-phase and the task name' ' e.g. `extractors.e1,transforms.t1,exporter.e1:reader.r1,transforms.t1,tasks.plot1`' ) parser.add_argument('--worker', type=int, default=4, help='The number of worker processes') parser.add_argument('--mem', type=int, default=1, help='The memory, in GB, to limit each worker process to. This is only obeyed if the cluster is not already running.') parser.add_argument('--cluster', type=str, help='The address of an already running cluster') parser.add_argument('--single-threaded', action='store_true', default=False, help='Run in single-threaded mode; this overrides the value of the `--worker` flag') parser.add_argument('--dashboard-port', type=int, default=8787, help='The port for the dashboard of the cluster. For the default localhost cluster') parser.add_argument('--slurm', action='store_true', default=False, help='Use a SLURM cluster') parser.add_argument('--partition', type=str, help='The partition for the SLURM cluster') parser.add_argument('--nodelist', type=str, help='The nodelist for the SLURM cluster') parser.add_argument('--tmpdir', type=str, default='/opt/tmpssd/tmp', help='The directory for temporary files') parser.add_argument('--plot-task-graphs', action='store_true', default=False, help='Plot the evaluation and plotting phase task graph') parser.add_argument('--verbose', '-v', action='count', default=0, help='Increase logging verbosity') parser.add_argument('--dump-recipe', action='store_true', default=False, help='Dump the loaded recipe; useful for finding errors in the recipe') parser.add_argument('--dump-recipe-only', action='store_true', default=False, help='Dump the loaded recipe and exit; useful for finding errors in the recipe') parser.add_argument('--lint-recipe', action='store_true', default=False, help='Run yamllint over the specified recipe; useful for finding errors in the recipe') parser.add_argument('--debug', action='store_true', default=False, help='Enable debug mode. If an exception is encountered, drop into an ipdb debugger session') args = parser.parse_args(arguments) if args.debug: global _debug _debug = True if args.single_threaded: args.worker = 1 if args.slurm: if not args.nodelist: raise Exception('A nodelist ist required when using SLURM.') if not args.partition: raise Exception('A partition ist required when using SLURM.') def set_dict_arg_from_string(option, arg_name): if option: try: option_dict = extract_dict_from_string(option) except Exception as e: loge(f'>>>> ERROR: extracting parameters for `{arg_name}` from `{option}` failed:\n>>>> {e}') return logd(f'{option_dict=}') setattr(args, arg_name, option_dict) else: setattr(args, arg_name, dict()) set_dict_arg_from_string(args.override_extractor, 'extraction_overrides') set_dict_arg_from_string(args.override_exporter, 'export_overrides') set_dict_arg_from_string(args.override_reader, 'reader_overrides') set_dict_arg_from_string(args.override_plot, 'plot_overrides') def get_run_tree(option, phase_names): d = dict() d[phase_names[0]] = set() d[phase_names[1]] = set() d[phase_names[2]] = set() for qualified_task_name in option.split(','): if len(qualified_task_name) == 0: continue if len(r := qualified_task_name.split('.')) == 2: top_level, task_name = r else: loge(f'>>>> ERROR: bad task name: {qualified_task_name}') exit(1) if top_level in d: d[top_level] = d[top_level].union(set([task_name])) else: loge(f'>>>> ERROR: not a valid phase name: {top_level}') exit(1) return d def process_run_tree(option): eval_phases = ['extractors', 'transforms', 'exporter'] plot_phases = ['reader', 'transforms', 'tasks'] p = option.split(':') if len(p) == 1 or (len(p) == 2 and len(p[1]) == 0): # evaluation phase only eval_str = p[0] plot_str = '' else: eval_str = p[0] plot_str = p[1] run_tree = { 'evaluation': get_run_tree(eval_str, eval_phases) , 'plot': get_run_tree(plot_str, plot_phases) } return run_tree if args.run != 'all': run_tree = process_run_tree(args.run) else: run_tree = None setattr(args, 'run_tree', run_tree) logd(f'{run_tree=}') def map_verbosity_level_to_log_level(verbosity): match verbosity: case 0: log_level = logging.WARNING case 1: log_level = logging.INFO case 2: log_level = logging.DEBUG case 3: log_level = logging.NOTSET case _: log_level = logging.NOTSET return log_level log_level = map_verbosity_level_to_log_level(args.verbose) setattr(args, 'log_level', log_level) return args
[docs] def setup_pandas(): # verbose printing of DataFrames pd.set_option('display.max_columns', None) pd.set_option('display.max_colwidth', None) # print the numexpr thread pool config to the log numexp_max_threads = os.getenv('NUMEXPR_MAX_THREADS') numexp_num_threads = os.getenv('NUMEXPR_NUM_THREADS') numexpr_threads_log_msg = f'thread_id={threading.get_native_id()}' if numexp_max_threads: numexpr_threads_log_msg += f' NUMEXPR_MAX_THREADS={numexp_max_threads}' if numexp_num_threads: numexpr_threads_log_msg += f' NUMEXPR_NUM_THREADS={numexp_num_threads}' numexpr_threads_log_msg += f' {numexpr.ncores=} {numexpr.nthreads=} {numexpr.MAX_THREADS=}' logd(numexpr_threads_log_msg)
[docs] class WorkerPlugin(dask.distributed.WorkerPlugin): r""" A dask worker plugin for setting defaults in the worker process Parameters ---------- options : dict The dictionary containing the configuration for the worker, usually the same as for the launcher """ def __init__(self, options, *args, **kwargs): self.options = options
[docs] def setup(self, worker: dask.distributed.Worker): # append the current path to the PYTHONPATH of the worker sys.path.append('.') setup_logging_defaults(level=self.options.log_level) set_logging_level(self.options.log_level) setup_pandas()
[docs] def setup_dask(options): r""" Setup and configure the dask cluster and its workers. Parameters ---------- options : dict The dictionary containing the configuration for the launcher """ plugin = WorkerPlugin(options) dask.config.set({'distributed.scheduler.worker-ttl': None}) # single-threaded mode for debugging if options.single_threaded: logi('using local single-threaded process cluster') dask.config.set(scheduler='synchronous') # no client is returned, creating a client here leads to sqlite # connections objects being transported between threads return None if options.slurm: logi('using SLURM cluster') cluster = SLURMCluster(cores = 1 , n_workers = options.worker , memory = str(options.mem) + 'GB' , job_extra_directives = [ f'--nodelist={options.nodelist} --partition={options.partition}' ] , interface = 'lo' , shared_temp_directory = options.tmpdir ) client = Client(cluster) client.register_worker_plugin(plugin) return client elif options.cluster: if options.cluster == 'local': logi('using local process cluster') cluster = LocalCluster(n_workers=options.worker , host='localhost' # , interface='lo' , memory_limit = str(options.mem) + 'GB' , local_directory = options.tmpdir ) cluster.scale(options.worker) client = Client(cluster) client.register_worker_plugin(plugin) return client else: logi(f'using distributed cluster at {options.cluster}') client = Client(options.cluster) client.register_worker_plugin(plugin) return client else: dashboard_address = f'localhost:{options.dashboard_port}' logi(f'using local cluster with dashboard at {dashboard_address}') client = Client(dashboard_address=dashboard_address , n_workers=options.worker , memory_limit = str(options.mem) + 'GB' , local_directory = options.tmpdir ) client.register_worker_plugin(plugin) return client
[docs] def compute_graph(jobs): r""" Compute the task graph Parameters ---------- jobs : List[dask.Delayed] The list of jobs/tasks to compute """ logi('=-!!'*40) logi('recombobulating splines...') logi(f'compute_graph: {jobs=}') result = dask.compute(*jobs) logi('=-!!'*40) return result
[docs] def main(): setup_logging_defaults(logging.WARNING) options = parse_arguments(sys.argv[1:]) # setup logging level again set_logging_level(options.log_level) logd(f'{options=}') if options.lint_recipe: # Just run yamllint in a subprocess for now. This also provides a nicely colored output. subprocess.run(['yamllint', options.recipe]) exit() setup_pandas() client = setup_dask(options) # This is needed for proper pickling of DataFrames to JSON. exporters.register_jsonpickle_handlers() # register constructors for all YAML objects yaml_helper.register_constructors() extractors.register_constructors() transforms.register_constructors() exporters.register_constructors() plots.register_constructors() data_repo, job_list = process_recipe(options) if len(job_list) == 0: loge('No tasks to run') return # now actually compute the constructed computation graph result = compute_graph(job_list) # ... return
if __name__=='__main__': try: main() except Exception as e: loge(f'{e=}') loge(''.join(traceback.format_exception(e))) if sys.stdin.isatty() and _debug: loge('dropping into an interactive debugging environment') # drop into the debugger in the context of the exception thrown import ipdb ipdb.post_mortem(e.__traceback__) else: loge('not dropping into an interactive debugging environment since the executing interpreter is not interactive')