Source code for accasim.utils.reader_class

"""
MIT License

Copyright (c) 2017 cgalleguillosm

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""

from abc import abstractmethod, ABC
from re import compile

from accasim.utils.misc import CONSTANT, DEFAULT_SWF_PARSE_CONFIG, obj_assertion
from accasim.base.resource_manager_class import Resources


[docs]class WorkloadParserBase(ABC): """ Workload Parser Abstract class """
[docs] @abstractmethod def parse_line(self, line): """ Parses a lines and retrieves the corresponding dictionary. :param line: A text line which keep all job data. :return: A dictionary of the parsed data. """ raise NotImplementedError()
[docs]class DefaultWorkloadParser(WorkloadParserBase): def __init__(self): WorkloadParserBase.__init__(self) """ workloader_paser is a general class for parsing workload files. :param reg_exp: Dictionary where the name of the group is the key and the value its regular expresion. It must contain all the regular expresions in a sorted way, for appending one at the end of the previous and then recover the value(s) for each group. :param avoid_token: List of reg_exp to avoid reading lines. The lines that are avoided won't be readed by the parser. """ self.reg_exp, self.avoid_tokens = DEFAULT_SWF_PARSE_CONFIG self.reg_exp_dict = {} self._compile_job_regexp() self._compile_infeasible_regexp()
[docs] def feasible_line(self, line): """ :param line: Line to be checked :return: True if it can be parse (it does not match to any avoid token), False otherwise. """ for p in self._compiled_infeasible_regexp: if p.fullmatch(line): return False return True
[docs] def parse_line(self, line): """ Parse a feasible line, returning a dict for all groupnames :param line: Line to be parsed :return: A dictionary of the parsed data. """ if not self.feasible_line(line): return None _matches = self._compiled_job_regexp.match(line) if not _matches: return None _dict = {k:self.reg_exp_dict[k][1](v) for k, v in _matches.groupdict().items()} if _dict['requested_number_processors'] != -1: _dict['total_processors'] = _dict.pop('requested_number_processors') elif _dict['allocated_processors'] != -1: _dict['total_processors'] = _dict.pop('allocated_processors') else: return None if _dict['requested_memory'] != -1: _dict['mem'] = _dict.pop('requested_memory') elif _dict['used_memory'] != -1: _dict['mem'] = _dict.pop('used_memory') else: pass return _dict
def _compile_job_regexp(self): reg_exp = r'' for (_key, _reg_exp) in self.reg_exp: self.reg_exp_dict[_key] = _reg_exp reg_exp += _reg_exp[0].format(_key) self._compiled_job_regexp = compile(reg_exp) def _compile_infeasible_regexp(self): self._compiled_infeasible_regexp = [] for _token in self.avoid_tokens: self._compiled_infeasible_regexp.append(compile(_token))
[docs]class Reader(ABC): """ This class is used to simulate the creation of jobs from HPC users. This is an abstract class. The main method is read, which must be implemented to return the set of next submission for the system. :Note: A default implementation is named as DefaultReader. This class read from a single file, and use a SWF parser to extract the jobs. """ def __init__(self, _job_factory): """ Reader class constructor. :Note: For a job generation at least the required attributes for a job event must be presented: *job_id*, *queued_time*, *duration*. :param _job_factory: Job Factory instance. """ self.last_time = None self.job_factory = _job_factory self.submission_enabled = True self.loaded_jobs = []
[docs] def next(self, current_time, time_points=2, stime_name='queued_time'): """ Laods the data and generates the jobs that belongs to the corresponding next `time_points`. :param current_time: Current simulated point. :param time_points: Number of submission points to be loaded :param stime_name: Name of the attribute (key dictionary) of the submit/queue time :return: A tuple composed with an array of the next time points sorted chronologically and a Dictionary with an array for each time point {time_point: [job_1, ..., job_n]} """ #======================================================================= # For each time point a list of dictionary of jobs will maintained in # the next_jobs variable. #======================================================================= time_samples = time_points next_points, next_jobs = self._reload_jobs(current_time, stime_name) while self.submission_enabled and time_samples >= 0: _dict = self._read(current_time) if not _dict: continue if self.last_time != _dict[stime_name]: self.last_time = _dict[stime_name] next_points.append(self.last_time) time_samples -= 1 if self.last_time not in next_jobs: next_jobs[self.last_time] = [] next_jobs[self.last_time].append(self.job_factory.factory(**_dict)) if len(next_jobs) > time_points: self.loaded_jobs = next_jobs.pop(self.last_time, []) next_points.pop() return (next_points, next_jobs)
def _reload_jobs(self, current_time, stime_name): """ Takes the already loaded jobs from a previous load process which exceeded the time steps requested. :param current_time: Current simulated point. :param stime_name: Name of the attribute (key dictionary) of the submit/queue time :return: A tuple of an array of the previous time points and a dictionary with jobs for each time point. """ jobs_dict = {} time_points = [] for _job in self.loaded_jobs: _subt_job = getattr(_job, stime_name) assert(_subt_job >= current_time), 'Error of the loading jobs process. The submission time belongs to the past. ({} is previous to {})'.format(_subt_job, current_time) if _subt_job not in jobs_dict: jobs_dict[_subt_job] = [] time_points.append(_subt_job) jobs_dict[_subt_job].append(_job) self.loaded_jobs = [] return (time_points, jobs_dict) @abstractmethod def _read(self, current_time): """ This method must return a dictionary with all the required keys for covering the job attributes. None must be returned If the data is not valid or incomplete. :param current_time: The current time if it's needed :return: A job's dictionary """ raise NotImplementedError()
[docs] def stop_submission(self): """ Marks as stopped the submission process. """ self.submission_enabled = False
[docs]class DefaultReader(Reader): """ A default implementation of the reader class. This implementation works reading a workload file line by line. """ def __init__(self, filepath, job_factory=None, parser=None, tweak_function=None, max_lines=None, start_time=0, equivalence={}): """ Class constructor :param filepath: Filepath to the workload file. :param job_factory: A :class:`.job_factory` object :param parser: An implementation of :class:`.WorkloadParserBase` object. By default, :class:`.DefaultWorkloadParser` is used to handle SWF files. :param tweak_function: Function that allows to tweak a dictionary. :param max_lines: Optional. Number of lines to read. None for reading the entire file. :param equivalence: Optional. Transforms from workload format a key:value to a new key with a new value in regards of the equivalence. """ Reader.__init__(self, job_factory) self.parser = None self.tweak_function = None if parser: if not isinstance(parser, WorkloadParserBase): assert(issubclass(parser, WorkloadParserBase)), 'Only :class:`.WorkloadParserBase` class can be used as parsers' self.parser = parser() else: assert(isinstance(parser, WorkloadParserBase)), 'Only :class:`.WorkloadParserBase` object can be used as parsers' self.parser = parser else: self.parser = DefaultWorkloadParser() if not tweak_function: _resources = self.job_factory.resource_manager.system_resources() self.tweak_function = DefaultTweaker(start_time, _resources, equivalence) if tweak_function: assert(isinstance(tweak_function, Tweaker)), 'The tweak_function argument must be an implementation of the :class:`.Tweaker`' self.tweak_function = tweak_function elif not self.tweak_function: self.tweak_function = None self.equivalence = equivalence self.start_time = start_time self.last_line = 0 self.max_lines = max_lines self.filepath = filepath self.file = None self.EOF = True self.open_file() def __del__(self): """ :return: """ if hasattr(self, 'file') and self.file: self.file.close() self.file = None self.EOF = True
[docs] def open_file(self): """ :return: """ if self.file is None: self.file = open(self.filepath) self.EOF = False return self.file
def _read_next_lines(self, n_lines=1): """ :param n_lines: :return: """ if not self.EOF: tmp_lines = 0 lines = [] for line in self.file: lines.append(line[:-1]) self.last_line += 1 tmp_lines += 1 if tmp_lines == n_lines or (self.max_lines and self.max_lines == self.last_line): break if tmp_lines < n_lines or (self.max_lines and self.max_lines == self.last_line): self.EOF = True self.stop_submission() if self.EOF and tmp_lines == 0: return None return lines return None def _read(self, current_time=0): """ :param current_time: :return: """ line = self._read_next_lines() # No more lines. End of File if not line: return None parsed_line = self.parser.parse_line(line[0]) if not parsed_line: return None if self.tweak_function: parsed_line = self.tweak_function.tweak_function(parsed_line) return parsed_line
[docs]class Tweaker(ABC): def __init__(self, **kwargs): """ :param kwargs: """ pass
[docs] @abstractmethod def tweak_function(self, job_dict): """ :param job_dict: :return: """ pass
[docs]class DefaultTweaker(Tweaker): def __init__(self, start_time, system_resources=None, equivalence=None): """ :param start_time: :param equivalence: """ if system_resources: obj_assertion(system_resources, Resources) self.start_time = start_time self.equivalence = equivalence if equivalence else {'processor': {'core': 1}}
[docs] def tweak_function(self, _dict): """ As in the SWF workload logs the numbers of cores are not expressed, just the number of requested processors, we have to tweak this information i.e we replace the number of processors by the number of requested cores. The equivalence from processor to core is given in the system config file. As in the example, one processor contains two cores. Then the number of cores will be processor \* core. Besides, memory is expressed in kB per processor. :Example: >>> "equivalence": { >>> "processor": { >>> "core": 2 >>> } >>> } :param _dict: Dictionary to be tweaked. :return: The tweaked dictionary. """ _processors = _dict['total_processors'] _dict['requested_resources'] = {} for k, v in self.equivalence.items(): #=================================================================== # Since SWF doesn't provide any information about nodes request # every single processor request is considered as a node request. # An specific tweak class has to be implemented to specify a conversion of this value. #=================================================================== if k == 'processor': for k2, v2 in v.items(): _dict[k2] = v2 * _processors _dict['requested_resources'][k2] = v2 _dict['requested_nodes'] = _processors if 'mem' in _dict: _dict['requested_resources']['mem'] = _dict['mem'] _dict['mem'] = _dict['mem'] * _processors _dict['queue'] = _dict.pop('queue_number') _dict['queued_time'] = _dict.pop('queued_time') + self.start_time assert ( _dict['core'] >= 0), 'Please consider to clean your data cannot exists requests with any info about core request.' if 'mem' in _dict: assert ( _dict['mem'] >= 0), 'Please consider to clean your data cannot exists requests with any info about mem request.' return _dict