Source code for exporters

from typing import Union

import pathlib
import time
import operator

import yaml
from yaml import YAMLObject

import pandas as pd

# Pandas serializing handlers
import jsonpickle
import jsonpickle.ext.pandas as jsonpickle_pandas

import dask

from yaml_helper import proto_constructor

import logging
from common.logging_facilities import logi, loge, logd, logw, get_logging_level

from extractors import BaseExtractor

from utility.filesystem import check_file_access_permissions, check_directory_access_permissions


[docs] class FileResultProcessor(YAMLObject): r""" Export the given dataset as [feather/arrow](https://arrow.apache.org/docs/python/feather.html) or JSON Parameters ---------- output_filename: str the name of the output file output_directory: str the path of the output directory dataset_name: str the name of the dataset to export format: str the output file format, either `feather` or `json` concatenate: bool Whether to concatenate the input data before exporting it. If false, the name of the output files will be derived from the input file names and the aliases in the data. raw: bool whether to save the raw input or convert the columns of the input `pandas.DataFrame` to categories before saving """ yaml_tag = '!FileResultProcessor' def __init__(self, dataset_name:str , output_filename = None , output_directory = None , format:str = 'feather' , concatenate:bool = False , raw:bool = False , categorical_columns:set[str] = set() , numerical_columns:Union[dict[str, str], set[str]] = set() , *args, **kwargs): if (not output_filename) and concatenate: raise ValueError('When concatenating a dataset into a single file, the `output_filename` must be specified') if (not output_directory) and (not concatenate): raise ValueError('When not concatenating a dataset into a single file, the `output_directory` must be specified') if output_filename and concatenate: check_file_access_permissions(output_filename) if output_directory and (not concatenate): check_directory_access_permissions(output_directory) self.dataset_name = dataset_name self.output_filename = output_filename self.output_directory = output_directory self.format = format self.concatenate = concatenate self.raw = raw # categorical_columns and numerical_columns (if appropriate) are explicitly converted # to a set to alleviate the need for an explicit tag in the YAML recipe, since pyyaml # always interprets values in curly braces as dictionaries self.categorical_columns:set[str] = set(categorical_columns) if not isinstance(numerical_columns, dict): self.numerical_columns:set[str] = set(numerical_columns) else: self.numerical_columns:dict[str, str] = numerical_columns
[docs] def save_to_disk(self, df, filename, file_format='feather', compression='lz4', hdf_key='data'): start = time.time() logi(f'Saving "{filename}" ...') if df is None: logw('>>>> save_to_disk: input DataFrame is None') return if not self.raw and df.empty: logw('>>>> save_to_disk: input DataFrame is empty') return check_file_access_permissions(filename) if file_format == 'feather': try: df.reset_index().to_feather(filename, compression=compression) except Exception as e: loge(f'An exception occurred while trying to save "{filename}":\n{e}') loge(f'df:\n{df}') return elif file_format == 'hdf': df.reset_index().to_hdf(filename , format='table' , key=hdf_key ) elif file_format == 'json': try: f = open(filename, 'w') f.write(jsonpickle.encode(df, unpicklable=False, make_refs=False, keys=True)) f.close() except Exception as e: loge(f'An exception occurred while trying to save "{filename}":\n{e}') loge(f'df:\n{df}') return else: raise Exception('Unknown file format') stop = time.time() logi(f'>>>> save_to_disk: it took {stop - start}s to save {filename}') if not self.raw: if (get_logging_level() == logging.DEBUG): logd(f'>>>> save_to_disk: {df.memory_usage(deep=True)=}')
[docs] def set_data_repo(self, data_repo): self.data_repo = data_repo
[docs] def get_data(self, dataset_name:str): if dataset_name not in self.data_repo: raise Exception(f'"{dataset_name}" not found in data repo') data = self.data_repo[dataset_name] if data is None: raise Exception(f'data for "{dataset_name}" is None') return data
[docs] def prepare_concatenated(self, data_list, job_list): if self.raw: job = dask.delayed(self.save_to_disk)(tuple(map(operator.itemgetter(0), data_list)), self.output_filename, self.format) else: concat_result = dask.delayed(pd.concat)(tuple(map(operator.itemgetter(0), data_list)), ignore_index=True) if len(self.categorical_columns) != 0 or len(self.numerical_columns) != 0: convert_columns_result = dask.delayed(BaseExtractor.convert_columns_dtype)(concat_result, self.categorical_columns, self.numerical_columns) job = dask.delayed(self.save_to_disk)(convert_columns_result, self.output_filename, self.format) else: job = dask.delayed(self.save_to_disk)(concat_result, self.output_filename, self.format) job_list.append(job) return job_list
[docs] def prepare_separated(self, data_list, job_list): for data, attributes in data_list: logd(f">>>>>>\n{attributes=}") if len(attributes.source_files) == 1: source_file_str = list(attributes.source_files)[0] source_file = str(pathlib.PurePath(source_file_str).stem) else: logd(">>>>>> Multiple source files") source_file = '_'.join(list(attributes.get_source_files())) if len(attributes.aliases) == 1: aliases = list(attributes.get_aliases())[0] else: aliases = '_'.join(list(attributes.get_aliases())) relative_file = list(attributes.source_files)[0].split(str(attributes.common_root))[1] output_filename = self.output_directory + '/' \ + relative_file \ + '_' \ + aliases \ + '.' + self.format logd(f'{output_filename=}') if len(self.categorical_columns) != 0 or len(self.numerical_columns) != 0: convert_columns_result = dask.delayed(BaseExtractor.convert_columns_dtype)(data, self.categorical_columns, self.numerical_columns) job = dask.delayed(self.save_to_disk)(convert_columns_result, output_filename, self.format) else: job = dask.delayed(self.save_to_disk)(data, output_filename, self.format) job_list.append(job) return job_list
[docs] def prepare(self): data_list = self.get_data(self.dataset_name) job_list = [] if self.concatenate: job_list = self.prepare_concatenated(data_list, job_list) else: job_list = self.prepare_separated(data_list, job_list) logd(f'FileResultProcessor: prepare: {job_list=}') return job_list
[docs] def register_jsonpickle_handlers(): r""" Register the jsonpickle handlers for pickling pandas objects to JSON. """ jsonpickle_pandas.register_handlers()
[docs] def register_constructors(): r""" Register YAML constructors for all exporters """ yaml.add_constructor('!FileResultProcessor', proto_constructor(FileResultProcessor))