Advanced dispatchers

In this page, the advanced dispatchers implemented using AccaSim are shown.

As our knowledge, the simulator has been used for experimentation in [GalleguillosMOD17]. In this paper, several dispatching methods were used, most of them are available in AccaSim library:

Priority Rules Based (PRB)

PRB scheduling method makes use of the Estimated Waiting Time (EWT) for working, this method and the EWT were introduced in [BorghesiCLMB15].

Show/Hide Code
PRB
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from accasim.base.scheduler_class import scheduler_base

class prb_scheduler(scheduler_base):
    """
    PRB type scheduler. Sorts the events depending on their expected and accumulated waiting time in the queue.
    
    In this scheduler, jobs can be skipped. If one fails, allocation is still tried on the following jobs.
    Sorting as name, sort funct parameters
    """
    name = 'PRB'
    def __init__(self, _allocator, _resource_manager=None, _seed=0, _ewt={'default': 1800}, **kwargs):
        scheduler_base.__init__(self, _seed, _resource_manager, _allocator)
        self.ewt = _ewt

    def get_id(self):
        """
        Returns the full ID of the scheduler, including policy and allocator.

        :return: the scheduler's id.
        """
        return '-'.join([self.__class__.__name__, self.name, self.allocator.get_id()])

    def scheduling_method(self, cur_time, es_dict, es, _debug=False):
        """
        This function must map the queued events to available nodes at the current time.

        :param cur_time: current time
        :param es_dict: dictionary with full data of the events
        :param es: events to be scheduled
        :param _debug: Flag to debug

        :return a tuple of (time to schedule, event id, list of assigned nodes)  
        """
        avl_resources = self.resource_manager.availability()
        self.allocator.set_resources(avl_resources)

        # Sorted by more time in queue time, break ties with more simpliest requests
        sorted_es = self._sort_events(cur_time, es_dict, es)
        
        event_list = [es_dict[e] for e in sorted_es]
        allocated_events = self.allocator.allocate(event_list, cur_time, skip=True, debug=_debug)

        return allocated_events

    def _get_ewt(self, queue_type):
        """
        Returns the expected waiting time for the selected queue.
        
        :param queue_type: the queue type
        :return: the expected waiting time
        """
        if queue_type in self.ewt:
            return self.ewt[queue_type]
        return self.ewt['default']

    def _sort_events(self, cur_time, events_dict, events):
        """
        Method which sorts the events depending on their waiting times in the queue.
        
        :param cur_time: the current time
        :param events_dict: the events dictionary
        :param events: the list of events to be scheduled
        :return: the sorted list of events
        """
        if len(events) <= 1:
            return events
        
        sort_helper = {
            e:
                {
                    'qtime': cur_time - events_dict[e].queued_time + 1,
                    'ewt': self._get_ewt(events_dict[e].queue),
                    'dur': events_dict[e].expected_duration,
                    'req': sum([events_dict[e].requested_nodes * val for attr, val in
                                events_dict[e].requested_resources.items()])
                }
            for e in events
        }
        sort_helper['max_ewt'] = max([v['ewt'] for v in sort_helper.values()])
        
        return sorted(events, key=lambda e: (
        -(sort_helper['max_ewt'] * sort_helper[e]['qtime']) / sort_helper[e]['ewt'],
        sort_helper[e]['dur'] * sort_helper[e]['req']))

Download source code

Constraint Programing + Heuristic (CPH)

CPH scheduling method makes use of the Estimated Waiting Time (EWT) for working, this method and the EWT were introduced in [BorghesiCLMB15]. This scheduler uses OR-Tools library.

Show/Hide Code
CPH
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
from collections import namedtuple
from ortools.constraint_solver import pywrapcp
from accasim.base.scheduler_class import scheduler_base
from accasim.utils.misc import sorted_object_list 

class cph_scheduler(scheduler_base):
    """
    A scheduler which uses constraint programming to plan a sub-optimal schedule.
    This scheduler don't use the automatic feature of the automatic allocation, then it isn't
    
    """
    name = 'CPH'
    
    def __init__(self, allocator, resource_manager=None, _seed=0, _ewt={'default': 1800}, **kwargs):
        scheduler_base.__init__(self, _seed, resource_manager, None)
        self.ewt = _ewt
        self.manual_allocator = allocator
        
        self.max_ewt = 0
        self.QueuedJobClass = namedtuple('Job', ['id', 'first', 'second'])
        self.queued_jobs = sorted_object_list({'main': 'first', 'break_tie':'second'})
        self.numberOfIterations = 1
        self.max_timelimite = 150000  # 2.5min
        self.prev_solved = None
        
           
    def scheduling_method(self, cur_time, es_dict, es, _debug):
        """
            This function must map the queued events to available nodes at the current time.
            
            :param cur_time: current time
            :param es_dict: dictionary with full data of the events
            :param es: events to be scheduled
            :param _debug: Flag to debug
            
            :return a tuple of (time to schedule, event id, list of assigned nodes)  
        """
        if not self.manual_allocator.resource_manager:
            self.manual_allocator.set_resource_manager(self.resource_manager)
        allocation = []
        avl_resources = self.resource_manager.availability()
        self.manual_allocator.set_resources(avl_resources)
        for i in range(self.numberOfIterations):
            timelimit = 1000
            solve_tries = 1
            sol_found = False
            temp_sched = {}
            while not sol_found and timelimit < self.max_timelimite:                        
                sol_found = self.cp_model(es, es_dict, temp_sched, timelimit, cur_time,
                                          self.queued_jobs, avl_resources, _debug)
                if _debug:
                    print('#{} Try. Results {}'.format(solve_tries, temp_sched))
                solve_tries += 1
                timelimit *= 2
            assert(len(es) == len(temp_sched)), 'Some Jobs were not scheduled.'
            allocation = self.allocate(cur_time, temp_sched, es_dict, _debug)        
        return allocation   

    def allocate(self, cur_time, schedule_plan, es_dict, _debug):
        """
        Prepare jobs to be allocated. Just jobs that must be started at the current time are considered.
        @param cur_time: Current simulation time
        @param schedule_plan: schedule plan  
        @param es_dict: Dictionary of the current jobs.
        @param _debug: Debug flag
        
        @return: dispatching plan        
        """
        allocated_events = []
        to_schedule_now = []
        to_schedule_later = []
        # Building the list of jobs that can be allocated now, in the computed solution
        for _id, _time in schedule_plan.items():
            if _time == cur_time:
                to_schedule_now.append(es_dict[_id])
                if _debug:
                    print('Trying to Allocate {} job.'.format(_id))
            else:
                to_schedule_later.append(es_dict[_id])
                if _debug:
                    print('{} incorrect allocation. {}'.format(_id, 'Postponed job (Estimated time: {})'.format(_time)))
        # Trying to allocate all jobs scheduled now
        self.manual_allocator.set_attr(schedule=(es_dict, schedule_plan))
        allocation = self.manual_allocator.allocate(to_schedule_now, cur_time, skip=True, debug=_debug)

        for (time, idx, nodes) in allocation:
            if time is not None:
                if _debug:
                    print('{} correct allocation. Removing from priority job list'.format(idx))
                self.queued_jobs.remove(idx)
            else:
                if _debug:
                    print('{} incorrect allocation. {}'.format(idx, 'Insufficient resources'))

        allocated_events += allocation
        # All jobs to be scheduled later are considered as discarded by the allocator
        allocated_events += [(None, ev.id, []) for ev in to_schedule_later]
        return allocated_events

    def cp_model(self, es, es_dict, temp_sched, timelimit, cur_time, queued_jobs, avl_resources, _debug):
        """
        Implementation of the CP Model using OR-Tools to generate the schedule plan.
        
        @param es: Queued jobs to be scheduled.
        @param es_dict: Dictionary of the current jobs.
        @param temp_sched: Storages the scheduled jobs.
        @param timelimit: Limit of the search process in ms.
        @param cur_time: Current simulated time
        @param queued_jobs: Already queued jobs and sorted.
        @param avl_resources: Availability of the resources
        @param _debug: Debug flag.
        
        @return True is solved, False otherwise. 
        
        """
        search_needed = False
        solver = False
        
        solver = pywrapcp.Solver('CPSolver_relaxedResources')
        q_length = 100
        job_real_vars_count = 0 
        
        running_events = self.resource_manager.actual_events
        max_mks = 0 
        job_map = {}
        for _e in running_events:
            e = es_dict[_e]
            job_map[_e] = e
            mks = e.expected_duration - (cur_time - e.start_time)
            if mks >= 0:
                max_mks += mks
        
        for i, _e in enumerate(es):
            if i < q_length:
                # To be scheduled
                e = es_dict[_e]
                max_mks += es_dict[_e].expected_duration
                job_map[_e] = e
            else:
                # Postponed
                temp_sched[_e] = None
        
        self.queued_jobs.add(*self.prepare_job(job_map.values(), cur_time))
        prb_vars = []
        for _id in self.queued_jobs:
            name = _id
            duration = job_map[_id].expected_duration
            if _id in running_events:
                if _debug:
                    print('{} has an assigned start, and will not be modified.'.format(_id))
                start_min = 0
                start_max = start_min

                elapsed_time = (cur_time - job_map[_id].start_time)
                estimated_remaining_time = (duration - elapsed_time)

                # FIX the (-remaining time) where sometimes the job length is overestimated. Just use 1 to speed up the search process
                duration = 1 if estimated_remaining_time <= 0 else estimated_remaining_time
            else:
                """
                Arguments: int64 start_min, int64 start_max, int64 duration, bool optional, const std::string& name
                Creates an interval var with a fixed duration. The duration must be greater than 0. 
                If optional is true, then the interval can be performed or unperformed. 
                If optional is false, then the interval is always performed.                        
                """
                if _debug:               
                    print('{} is a new Interval variable, without any assignation'.format(_id))
                start_min = 0
                start_max = max_mks
                search_needed = True
                job_real_vars_count += 1
            var = solver.FixedDurationIntervalVar(start_min, start_max, duration, False, name)
            prb_vars.append(var)
        
        if search_needed:
            solved = False
            _keys = self.resource_manager.resource_types()
            total_capacity = {}
            total_demand = {}
            for _key in _keys:
                total_capacity[_key] = sum([ resources[_key] for node, resources in avl_resources.items()])
                # The resources used by running jobs are loaded into the capacity
                total_capacity[_key] += sum(es_dict[_sjob].requested_nodes * es_dict[_sjob].requested_resources[_key] for _sjob in running_events)
                total_demand[_key] = [ es_dict[_sjob].requested_nodes * es_dict[_sjob].requested_resources[_key] for _sjob in self.queued_jobs]
                if _debug:
                    print('Loading constraints related to capacity and demand of {}: Capacity {} - Demand {}'.format(_key, total_capacity[_key], total_demand[_key]))
                _name = 'cum_{}'.format(_key)
                _cum = solver.Cumulative(prb_vars, total_demand[_key], total_capacity[_key], _name)
                solver.AddConstraint(_cum)
            
            wt = []
            for i, _sjob in enumerate(self.queued_jobs):
                _job = es_dict[_sjob]
                _id = _job.id
                ewt = self.get_ewt(_job.queue) 
                wgt = int(self.max_ewt / ewt)
                jstart = prb_vars[i].StartExpr()
                qtime = jstart - (_job.queued_time - cur_time)
                prod = (qtime * wgt)
                wt.append(prod.Var())
                
            # Minimize the weighted queue time
            objective_var = solver.Sum(wt).Var()
            objective_monitor = solver.Minimize(objective_var, 1)           
            db = solver.HeuristicSearch(prb_vars)

            restart = solver.ConstantRestart(1000)
            limit = solver.TimeLimit(timelimit)

            log = solver.SearchLog(10000)
            solver.NewSearch(db, objective_monitor, limit)

            # Start Search
            while solver.NextSolution():
                solved = True
                if _debug:
                    print('Solved during search')
                    for i, _j in enumerate(prb_vars):
                        print('{}: {} EST {}, LST {}, EET, LET'.format(i, _j, _j.StartMin(), _j.StartMax(), _j.EndMin(), _j.EndMax()))
                for _j in prb_vars:
                    if _j.Name() in running_events:
                        if _debug:
                            print('{} is already running, it not will be considered to update the actual schedule.'.format(_j.Name()))
                        # self.queued_jobs.remove(_j.Name())
                        continue
                    if _debug:
                        print('Loading {} into the schedule. '.format(_j.Name()))
                    temp_sched[_j.Name()] = _j.StartMin() + cur_time
            if not solved:
                solver.EndSearch()
                for _e in es:
                    temp_sched[_e] = None                    
        else:
            solved = True
        for _j in prb_vars:
            if _j.Name() in running_events:
                if _debug:
                    print('{} is already running, it not will be considered to update the actual schedule.'.format(_j.Name()))
                self.queued_jobs.remove(_j.Name())
        return solved
            
    def get_id(self):
        """
        Returns the full ID of the scheduler, including policy and allocator.

        :return: the scheduler's id.
        """
        return '-'.join([self.__class__.__name__, self.name, self.manual_allocator.get_id()])
        
    def prepare_job(self, jobs, cur_time):
        """
        Establishes the priority of each job.
        
        @param jobs: List of jobs
        @param cur_time: Current simulation time.
        
        @return A list with the calculation of the priorities for each job.
        """
        sort_helper = {
            e.id: 
            {
                    'qtime':cur_time - e.queued_time + 1,
                    'ewt': self.get_ewt(e.queue),
                    'dur': e.expected_duration,
                    'req': sum([e.requested_nodes * val for attr, val in e.requested_resources.items()])
                }  
            for e in jobs
        }
        ended = set(self.queued_jobs) - set(sort_helper.keys())
        self.queued_jobs.remove(*ended) 
        self.max_ewt = max([v['ewt'] for v in sort_helper.values()])
        return [ 
            self.QueuedJobClass(**{
                'id': e.id,
                'first':-(self.max_ewt * sort_helper[e.id]['qtime']) / sort_helper[e.id]['ewt'],
                'second': sort_helper[e.id]['dur'] * sort_helper[e.id]['req']
            }) for e in jobs]    

    def get_ewt(self, queue_type):
        """
        @param queue_type: Name of the queue.
        
        @return: EWT for a specific queue type
        """
        if queue_type in self.ewt:
            return self.ewt[queue_type]
        return self.ewt['default']

Download source code.

Citations

[BorghesiCLMB15](1, 2) Borghesi et al. Power Capping in High Performance Computing Systems in Proc. of CP 2015.
[GalleguillosMOD17]Galleguillos et al. Data-driven job dispatching in HPC systems in Proc. of MOD 2017.