Allocators from literature

Balanced

This allocator, introduced in [NettiGKSB18], considers a set of critical and scarce resource types (like GPUs or MICs), and tries to balance the allocation to the respective nodes in order to avoid fragmentation and waste.

The algorithm will collect the nodes having different critical resource types in distinct sets. It will then build a list of nodes to be used in the allocation process: the nodes having no critical resource types are placed in front, followed by the nodes having critical resources; these nodes are interleaved, in order to balance their usage and avoid favoring a specific resource type. The resource types to be balanced can be given as input. If a node has more than one type of critical resource available, it will be considered for the type for which it has the greatest availability.

Show/Hide Code
Balanced
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
from accasim.base.allocator_class import FirstFit
from copy import copy


class allocator_balanced(FirstFit):
    """
    An allocator which considers a set of critical and scarce resource types (like GPUs or MICs), and tries to balance 
    the allocation to the respective nodes in order to avoid fragmentation and waste.
    
    The algorithm will collect the nodes having different critical resource types in distinct sets. It will then build 
    a list of nodes to be used in the allocation process: the nodes having no critical resource types are placed in 
    front, followed by the nodes having critical resources; these nodes are interleaved, in order to balance their usage
    and avoid favoring a specific resource type.
    The resource types to be balanced can be given as input. If a node has more than one type of critical resource
    available, it will be considered for the type for which it has the greatest availability.
    """

    name = 'Balanced'

    def __init__(self, seed, resource_manager, **kwargs):
        """
        Constructor for the class.
        :param seed: seed for random events (not used)
        :param resource_manager: reference to the system resource manager
        :param kwargs: critical_res = defines the set of resource types to be balanced (default mic,gpu); 
        """
        FirstFit.__init__(self, seed)
        # The ID to associate to nodes/jobs that possess no critical resources
        self._noneID = 'None'
        
        res_key = 'critical_res'
        # If the user doesn't supply the set of resources to balance, mic and gpu are used by default
        if res_key in kwargs.keys():
            self._critical_resources = kwargs[res_key]
            assert all(res in resource_manager.resource_types for res in
                       self._critical_resources), 'Selected resource types for interleaving are not correct'
        else:
            self._critical_resources = [r for r in ('mic', 'gpu') if r in resource_manager.resource_types]
    
        # The lists containing the single node IDs, per resource type
        self._res_lists = None
        
        #=======================================================================
        # if resource_manager:
        #     self.set_resources(resource_manager)
        #=======================================================================

    def _sort_resources(self):
        """
        This method sorts the keys of the available resources dictionary, basing on the ranking policy.
        It is called after the resources are set in the allocator.
        :return: the list of sorted keys (node ids) for the resources
        """
        assert self.avl_resources is not None, 'The dictionary of available resources must be non-empty.'

        # res_lists is a dictionary containing, for each resource type, the list of nodes that have them. The lists
        # do not overlap, and each node falls in the list whose resource it has in the greatest quantity.
        # If a node has none of the critical resource, it will fall in a special 'none' list
        self._res_lists = {self._noneID: []}
        for k in self._critical_resources:
            self._res_lists[k] = []

        # All the nodes in the avl_resources dictionary are classified, according to the critical resources
        # they possess
        for node, res in self.avl_resources.items():
            self._res_lists[self._critical_list_select(res)].append(node)

        return self._convert_to_final_list(self._res_lists)

    def _adjust_resources(self, sorted_keys=None):
        """
        Adjusts the sorting of the resources after a successful allocation. 
        In order to improve efficiency, this method uses the self._res_lists dictionary stored after a set_resources
        call. The lists are not built from scratch, and a minor adjustment is performed to restore sorting.
        :param sorted_keys: the list of keys, almost sorted, that needs to be adjusted
        """
        assert self.avl_resources is not None, 'The dictionary of available resources must be non-empty.'
        # assert sorted_keys is not None, 'The list of keys must be non-empty'
        # assert self._res_lists is not None, 'Cannot adjust resources if they have not been initialized'
        if sorted_keys:
            self.sorted_keys.clear()
        else:
            self._sort_resources()
        self.sorted_keys = self._convert_to_final_list(self._res_lists)

    def _update_resources(self, reserved_nodes, requested_resources):
        """
        Updates the internal avl_resources list after a successful allocation.

        Also, this method updates the internal self._res_lists dictionary, moving nodes whose resources have changed
        to the appropriate list in the dictionary. Such lists will then be sorted again at the next adjust_resources
        call.
        :param reserved_nodes: the list of nodes assigned to the allocated job
        :param requested_resources: the list of resources requested by the job per each node
        """
        # Nodes involved in the update are removed from the respective lists
        temp_node_list = []
        for node in set(reserved_nodes):
            # Small trick: if a node belongs to the None list before the update, we do not consider it for the
            # resource adjustment, as it will be placed again in the None list, which is not sorted
            rr_res = self._critical_list_select(self.avl_resources[node])
            if rr_res is not self._noneID:
                self._res_lists[self._critical_list_select(self.avl_resources[node])].remove(node)
                temp_node_list.append(node)
        FirstFit._update_resources(self, reserved_nodes, requested_resources)
        # Again, nodes that are involved in the update are re-added to the lists, according to their new resources
        for node in temp_node_list:
            self._res_lists[self._critical_list_select(self.avl_resources[node])].append(node)

    def _convert_to_final_list(self, res_lists):
        """
        A method which considers a dictionary of node lists for each critical resource, and builds a final sorted
        list of nodes from it.

        :param res_lists: the dictionary of node lists, for each critical resource
        :return: the final sorted list to be used by the allocator
        """
        # Each separate list is sorted, so that nodes with the most available critical resources will be placed
        # at the end
        res_lists[self._noneID] = self._trim_nodes(res_lists[self._noneID])
        res_lists[self._noneID].sort(key=lambda x: x)
        for key in self._critical_resources:
            res_lists[key] = self._trim_nodes(res_lists[key])
            res_lists[key].sort(key=lambda x: (self.avl_resources.get(x).get(key), x))

        # The lists are then combined, by placing in front the 'none' list, which is a buffer for the critical res
        final_list = copy(res_lists[self._noneID])
        remaining_nodes = len(self.avl_resources) - len(final_list)

        # The algorithm would 'pop' elements from the lists' heads in succession. To avoid this, as it is expensive,
        # we use a dictionary of starting indexes for each list
        start_indexes = dict.fromkeys(self._critical_resources, 0)
        # After the 'none' list, the remaining lists are interleaved, and at each step an element is picked from
        # the longest list in res_lists, in order to balance their usage.
        for i in range(remaining_nodes):
            rr_res = self._get_longest_list(res_lists, start_indexes)
            if rr_res != self._noneID:
                final_list.append(res_lists[rr_res][start_indexes[rr_res]])
                start_indexes[rr_res] += 1

        return final_list

    def _critical_list_select(self, noderes):
        """
        A simple method which, given the resources of a node, returns the key of the critical resource for which
        such node has greater availability. If there are no critical resources available, a 'none' key is returned.

        :param noderes: the resources dictionary of a node
        :return: the key of the critical resource to which assign the node
        """
        maxval = 0
        maxkey = self._noneID
        for cres in self._critical_resources:
            if noderes[cres] > maxval:
                maxval = noderes[cres]
                maxkey = cres
        return maxkey

    def _get_longest_list(self, res_lists, start_indexes):
        """
        Given the dictionary of critical resources lists, the method returns the key of the next list from which
        an element has to be picked in order to build the final nodes list.

        :param res_lists: the critical resources' lists dictionary
        :param start_indexes: the dictionary of starting indexes for each list (to simulate a pop operation)
        :return: the key of the next list to be picked
        """
        maxkey = self._noneID
        maxval = 0
        for k in self._critical_resources:
            modifier = len(res_lists[k]) - start_indexes[k]
            if modifier > maxval:
                maxval = modifier
                maxkey = k
        return maxkey


    def _trim_nodes(self, nodes):
        """
        Method which removes from a list of node IDs those elements that correspond to nodes that are full, i.e. they
        have no available Memory or CPU resources and are thus useless for allocation.
        
        :param nodes: A list of node IDs
        :return: The trimmed list of nodes
        """
        trimNodes = [n for n in nodes if all(self.avl_resources[n][r] > 0 for r in self.nec_res_types)]
        return trimNodes

Download source code

Weighted

This allocator, introduced in [NettiGKSB18], tries to perform optimization on the allocation of the queued jobs. Each job is allocated on the node on which after the allocation the minimum weighted resources will be obtained. The weights quantify the level of criticality of a certain resource type using three parameters: the average amount requested, load ratio and total capacity.

Show/Hide Code
Balanced
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
from accasim.base.allocator_class import FirstFit
from sortedcontainers.sortedset import SortedSet
from re import split
from _functools import reduce


class allocator_weighted(FirstFit):
    """
    An allocator which tries to perform optimization on the allocation of all events in the queue.

    When the user supplies the queue of events to be allocated, the average number of resources required per-type
    is computed, together with the current load rate. The average is weighted according to the expected length of
    each job. These values are used as weights to indicate the value of a resource. Also, the allocator computes 
    the resource weights considering also the base availability in the system, assigning higher values to scarcer 
    resources, to assess which are actually more precious and need to be preserved in a global manner. Each job is 
    then allocated on the nodes on which the minimum number of weighted resources is left (after the allocation), 
    in a similar fashion to the best-fit allocator. 
    
    The user can also supply a "critical resources" list: for such resources, an additional heuristic is used, and in
    this case the allocator goes under the name of Priority-Weighted; this consists in a priority value, per resource 
    type, which will increase as allocations fail for jobs that require critical resources, and will decrease for each 
    successful allocation. If a jobs requires multiple critical resource types, all of their priority values will be
    affected by the allocation. This tends to assign greater weight to critical resources actually requested by jobs, 
    and will make the allocator statistically preserve such resources.
    """

    name = 'Weighted'

    def __init__(self, seed, resource_manager, **kwargs):
        """
        Constructor for the class.

        :param seed: seed for random events (not used)
        :param resource_manager: reference to the system resource manager
        :param kwargs: critical_res = defines the set of resource types to be preserved (default none); 
                       critical_bounds = the lower and upper bounds for the allocation fail heuristic (default [1,10]);
                       critical_steps = the number of steps in the scale between the critical_bounds (default 9);
                       window_size = defines the window size for job resource analysis(default 100);            
        """
        FirstFit.__init__(self, seed)

        win_key = 'window_size'
        res_key = 'critical_res'
        steps_key = 'critical_steps'
        bound_key = 'critical_bounds'

        # If the user doesn't supply the set of resources to balance, mic and gpu are used by default
        if res_key in kwargs.keys():
            self._critical_resources = kwargs[res_key]
            assert all(res in resource_manager.resource_types for res in self._critical_resources), 'Selected resource types for interleaving are not correct'
        else:
            self._critical_resources = []  # Priority-Weighted heuristic is disabled by default
        # The default window size to be used for job analysis is 100
        self._windowsize = (kwargs[win_key] if win_key in kwargs.keys() and kwargs[win_key] >= 0 else 100)
        # The lower and upper bounds for the weight modifier of critical resources
        if bound_key in kwargs.keys() and len(kwargs[bound_key]) == 2 and 1 <= kwargs[bound_key][0] <= kwargs[bound_key][1]:
            self._modifierbounds = kwargs[bound_key]
        else:
            self._modifierbounds = [1, 10]
        # The number of steps in the scale between the lower and upper bound for the allocation fail heuristic
        self._numsteps = (kwargs[steps_key] if steps_key in kwargs.keys() and kwargs[steps_key] > 0 else 9)

        # Parameters relative to the allocation fail metric for jobs requiring critical resources
        # The step for increasing and decreasing the modifiers
        self._modifierstep = (self._modifierbounds[1] - self._modifierbounds[0]) / self._numsteps
        # The dictionary containing the current modifier values
        self._critical_modifier = {}
        for k in self._critical_resources:
            self._critical_modifier[k] = self._modifierbounds[0]

        # The resource types in the system; stored for efficiency
        self._types = resource_manager.resource_types
        # The scheduling plan computed by the scheduler, if present
        self._schedule = None
        # The event dictionary used to retrieve job information from the schedule
        self._event_dictionary = None
        # The number of jobs currently considered for analysis (max is window_size)
        self._jobstoallocate = 0
        # The counters for resources required by jobs in the analysis window
        self._rescounters = {}
        for k in self._types:
            self._rescounters[k] = 1
        # Same as rescounters, but used for the overlapping events in the schedule, and not in the queue
        self._jobstoschedule = 0
        self._schedulecounters = {}
        for k in self._types:
            self._schedulecounters[k] = 1
        # The weights associated to the resource types
        self._weights = {}
        for k in self._types:
            self._weights[k] = 0

        # Two variables used to compute the weighted average of job resources' requests; they store the total
        # length of jobs in the analysis window
        self._cumulative_length = 0
        # Same as above, but for jobs overlapping the current one in the schedule (for CP)
        self._cumulative_schedule = 0

    def set_attr(self, **kwargs):
        """
        Method used to set internal parameters and meta-data for the allocator.

        Its behavior depends on the specific allocator that is being used, and some arguments may be discarded.
        In this optimization-based allocator, the method is used to set the internal variable related to the 
        schedule plan, if present. It is useful for the CP scheduler, which is the only one that computes an actual
        scheduling plan.

        :param kwargs: schedule = the scheduling plan, in a tuple of the type (es_dict,dict(job_id: start_time))
        """
        schedule_key = 'schedule'
        if schedule_key in kwargs.keys():
            self._event_dictionary = kwargs[schedule_key][0]
            self._schedule = kwargs[schedule_key][1]
        elif 'dict' in kwargs.keys():
            self._event_dictionary = kwargs['dict']

    def allocating_method(self, es, cur_time, skip=False, reserved_time=None, reserved_nodes=None, debug=False):
        """
        Given a job list es, this method searches for a suitable allocation for as many jobs as possible.

        In normal allocation, the method stops as soon as an event in the list cannot be allocated. In this case,
        ths list of already allocated jobs is returned. This is done to be coherent with the scheduler's rules.
        As an alternative, the skip parameter can be supplied to allow the scheduler to skip unallocated jobs.
        This method also support backfilling schedule. In this case, the backfilling parameters are supplied,
        and the allocator tries to fit jobs without delaying the reserved job. In this second case,
        the method does not stop when a job cannot be allocated, but simply skips it.

        es can be a list or a single event object. The return type (list or single tuple) changes accordingly.

        :param es: the event(s) to be allocated
        :param cur_time: current time, needed to build the schedule list
        :param skip: determines if the allocator can skip jobs
        :param reserved_time: beginning of the next reservation slot (used for backfilling)
        :param reserved_nodes: nodes already reserved (used for backfilling)

        :return: a list of assigned nodes of length e.requested_nodes, for all events that could be allocated. The
        list is in the format (time,event,nodes) where time can be either cur_time or None.
        """
        if not isinstance(es, (list, tuple)):
            listAsInput = False
            es = [es]
        else:
            listAsInput = True

        allocation = []
        success_counter = 0
        event_counter = 0
        # Create aux resources for this allocation
        self._set_aux_resources()

        # The required resources' counters are initialized according to the event queue given as input
        self._initialize_counters(es)
        if debug:
            print('Queue length %i - Window size %i' % (len(es), self._jobstoallocate))
            print('Distribution: ')
            print(self._rescounters.values())

        for e in es:
            requested_nodes = e.requested_nodes
            requested_resources = e.requested_resources

            # If the input arguments relative to backfilling are not supplied, the method operates in regular mode.
            # Otherwise, backfilling mode is enabled, allowing the allocator to skip jobs and consider the reservation.
            nodes_to_discard = self._compute_reservation_overlaps(e, cur_time, reserved_time, reserved_nodes)
            backfilling_overlap = False if len(nodes_to_discard) == 0 else True

            self._find_overlapping_jobs(e, cur_time)
            # After each allocation, the weights are updated according to the new analysis window
            self._update_weights()
            # After computing the weights, the sorted list of available nodes for allocation is computed
            nodekeys = self._get_sorted_nodes(e)

            assigned_nodes = []
            nodes_left = requested_nodes
            for node in nodekeys:
                # The algorithm check whether the given node belongs to the list of reserved nodes, in backfilling.
                # If it does, the node is discarded.
                resources = self.avl_resources[node]
                if backfilling_overlap and node in nodes_to_discard:
                    continue
                # We compute the number of job units fitting in the current node, and update the assignment
                fits = self._event_fits_node(resources, requested_resources)
                if nodes_left <= fits:
                    assigned_nodes += [node] * nodes_left
                    nodes_left = 0
                else:
                    assigned_nodes += [node] * fits
                    nodes_left -= fits
                if nodes_left <= 0:
                    break

            # If, after analyzing all nodes, the allocation is still not complete, the partial allocation
            # is discarded.
            if nodes_left > 0:
                assigned_nodes = []
            assert not assigned_nodes or requested_nodes == len(assigned_nodes), 'Requested' + str(
                requested_nodes) + ' got ' + str(len(assigned_nodes))

            # If a correct allocation was found, we update the resources of the system, sort them again, and
            # add the allocation to the output list.
            self._update_modifiers(e, assigned_nodes)
            if assigned_nodes:
                allocation.append((cur_time, e.id, assigned_nodes))
                self._update_resources(assigned_nodes, requested_resources)
                success_counter += 1
                if debug:
                    print('Allocation successful for event %s' % (e.id))
            # If no correct allocation could be found, two scenarios are possible: 1) normally, the allocator stops
            # here and returns the jobs allocated so far 2) if the skip parameter is enabled, the job is just
            # skipped, and we proceed with the remaining ones.
            else:
                if debug:
                    print('Allocation failed for event %s with %s nodes left' % (e.id, nodes_left))
                allocation.append((None, e.id, []))
                if not skip:
                    # if jobs cannot be skipped, at the first allocation fail all subsequent jobs fail too
                    for ev in es[(success_counter + 1):]:
                        allocation.append((None, ev.id, []))
                    if debug:
                        print('Cannot skip jobs, %s additional pending allocations failed' % (len(es) - success_counter - 1))
                    break
            # After trying to allocate the current event, the analysis window is shifted by one: the previous first
            # event is removed from the counters, and the next one falling in the window is added to them
            event_counter += 1
            self._update_counters(es, event_counter)
        if debug:
            print('There were %s successful allocations out of %s events' % (success_counter, len(es)))
        self._reset_schedule()
        return allocation if listAsInput else allocation[0]

    def _initialize_counters(self, es):
        """
        A method which initializes the required resources counters for the job queue. 
        
        It is executed only once for each queue allocation request, and then the counters are dynamically 
        and efficiently updated through the update_counters method.
        
        :param es: the list of events to be allocated
        """
        self._cumulative_length = 0
        self._cumulative_schedule = 0
        # The counters are initialized to 1, to avoid having weights equal to 0
        for k in self._types:
            self._rescounters[k] = 1
        # The size of the analysis window is computed
        self._jobstoallocate = len(es) - 1 if len(es) - 1 <= self._windowsize else self._windowsize
        # For each job in the window, we then add to the counters its total number of resources required per-type
        for i in range(self._jobstoallocate):
            req_res = es[i + 1].requested_resources
            req_nodes = es[i + 1].requested_nodes
            self._cumulative_length += es[i + 1].expected_duration + 1
            for k in self._types:
                self._rescounters[k] += req_res[k] * req_nodes * (es[i + 1].expected_duration + 1)

    def _update_counters(self, es, startindex):
        """
        A method which updates the counters of resources requested by jobs.
        
        It operates in O(1) time, since we just need to remove the old job from the counters and add the new one,
        if present.
        
        :param es: The list of jobs to be allocated
        :param startindex: The starting index of the new window (including the job that is to be allocated now)
        """
        # Adding new event to counters
        if startindex + self._windowsize < len(es):
            e = es[startindex + self._windowsize]
            req_res = e.requested_resources
            req_nodes = e.requested_nodes
            self._cumulative_length += e.expected_duration + 1
            for k in self._types:
                self._rescounters[k] += req_res[k] * req_nodes * (e.expected_duration + 1)
        else:
            self._jobstoallocate -= 1
        # Removing old event from counters
        if 0 <= startindex < len(es):
            e = es[startindex]
            req_res = e.requested_resources
            req_nodes = e.requested_nodes
            self._cumulative_length -= e.expected_duration + 1
            for k in self._types:
                self._rescounters[k] -= req_res[k] * req_nodes * (e.expected_duration + 1)
                assert self._rescounters[k] >= 1, 'A resource is going below zero during counter update'

    def _update_weights(self):
        """
        Computes the weights associated to each resource type, basing on the current resource counters.
        
        The weights consider the average number of resources required by jobs in the window, weighted according to
        the job's expected duration. This average per-resource request is then normalized by the
        base availability of each resource. Then the weights are multiplied by the current load factor for each
        resource, to preserve resources that are becoming scarce in the system. Finally, critical resources that are
        to be preserved at all times in the system (supplied by the user) see their weights multiplied by a constant
        modifier.
        """
        # The amount of current used resources in the system. Used to compute the load rate
        used_resources = {}
        base = self.resource_manager.system_capacity('total')
        for t in self._types:
            # base = self.resource_manager.get_total_resources(t)
            avl = self.resource_manager.current_availability
            qt = base[t] - sum([avl[node][t] for node in avl.keys()])
            used_resources[t] = qt
        # used_resources = self.resource_manager.get_used_resources()
        for k in self._types:
            # self._weights[k] = (self._rescounters[k] + self._schedulecounters[k]) / (self._jobstoallocate + self._jobstoschedule + 1)
            self._weights[k] = (self._rescounters[k] + self._schedulecounters[k]) / (self._cumulative_length + self._cumulative_schedule + 1)
            # Might be useful to smooth out and compress average values
            # self._weights[k] = sqrt(self._weights[k])
            # self._weights[k] *= (used_resources[k] + 1) / (self._base_availability[k] * self._base_availability[k])
            self._weights[k] *= (used_resources[k] + 1) / (base[k] * base[k])
            # Alternative weighting strategy, considers only the load factor: simpler, but with worse results
            # self._weights[k] *= (used_resources[k] + 1) / (self._base_availability[k])
            # The weights related to critical resources are multiplied by the allocation fail heuristic
            if k in self._critical_resources:
                self._weights[k] *= self._critical_modifier[k]

    def _find_overlapping_jobs(self, e, cur_time):
        """
        A method which searches for overlapping jobs in the schedule with the one to be allocated now.
        
        The schedule is to be set by the user via the set_attr method before allocation. The metric used to
        determine overlap is the expected duration of the job. All the data computed from the resources needed
        by the overlapping jobs is stored in the schedulecounters and jobstoschedule, identical in function to
        rescounters and jobstoallocate.
        
        :param e: the event to be allocated now
        :param cur_time: the current time, in which allocation will be performed
        """
        # In any case, the max number of jobs we want to consider is windowsize
        maxjobs = self._windowsize - self._jobstoallocate
        if maxjobs == 0 or self._schedule is None or self._event_dictionary is None:
            return
        # The resource counters are reset
        for k in self._types:
            self._schedulecounters[k] = 0
        self._jobstoschedule = 0
        self._cumulative_schedule = 0
        # The expected end time of the current job is used to estimate overlap with future jobs
        end_time = cur_time + e.expected_duration
        for jobid, jobtime in self._schedule.items():
            if cur_time < jobtime < end_time:
                # For overlapping jobs we update the resource counters like in regular allocation
                req_res = self._event_dictionary[jobid].requested_resources
                req_nodes = self._event_dictionary[jobid].requested_nodes
                self._cumulative_schedule += self._event_dictionary[jobid].expected_duration
                for k in self._types:
                    self._schedulecounters[k] += req_res[k] * req_nodes * self._event_dictionary[jobid].expected_duration
                self._jobstoschedule += 1
                # If the maximum number of jobs in the window has been reached, we break from the cycle
                if self._jobstoschedule >= maxjobs:
                    break

    def _update_modifiers(self, e, success):
        """
        This method updates the modifiers used for critical resources (if active) upon a failed 
        or successful allocation.
        
        :param e: the event subject to allocation
        :param success: boolean value; True if the allocation succeeded, False otherwise
        """
        noderes = e.requested_resources
        for cres in self._critical_resources:
            if noderes[cres] > 0:
                if not success:
                    self._critical_modifier[cres] += self._modifierstep
                    if self._critical_modifier[cres] > self._modifierbounds[1]:
                        self._critical_modifier[cres] = self._modifierbounds[1]
                else:
                    self._critical_modifier[cres] -= self._modifierstep
                    if self._critical_modifier[cres] < self._modifierbounds[0]:
                        self._critical_modifier[cres] = self._modifierbounds[0]

    def _reset_schedule(self):
        """
        A method which resets all internal variables related to scheduling, after allocation of all jobs is performed.
        
        The use of this method implies that the schedule must be set every time after calling the search_allocation
        method. 
        """
        self._jobstoschedule = 0
        self._schedule = None
        self._event_dictionary = None
        for k in self._types:
            self._schedulecounters[k] = 0

    def _get_sorted_nodes(self, e):
        """
        Given an event e to be allocated, the method returns the sorted list of nodes which best fit the job.
        
        :param e: The event to be allocated
        :return: The sorted list of nodes that best fit the job
        """
        assert self.avl_resources is not None, 'The dictionary of available resources must be non-empty.'
        nodelist = []
        s_nodes = self._find_sat_nodes(e.requested_resources)
        # For each node in the system, the job "fit", which is the number of job units fitting the node, is computed
        for node in s_nodes:  # self.avl_resources.keys():
            fits = self._event_fits_node(self.avl_resources[node], e.requested_resources)
            # If the node has not enough resources to fit the job, it is simply discarded
            if fits == 0:
                continue
            elif fits > e.requested_nodes:
                fits = e.requested_nodes
            # The nodes are ranked by the amount of weighted resources left after allocating the job
            rank = sum((self.avl_resources.get(node).get(k) - e.requested_resources[k] * fits) * self._weights[k] for k in self._types)
            # Alternative ranking, similar to a weighted consolidate; usually performs worse than the above
            # rank = sum((self.avl_resources.get(node).get(k)) * self._weights[k] for k in self._types)
            # We use a temporary list to store the node ID and its ranking
            nodelist.append((node, rank))
        # Lastly, sorting is performed. Note that sorting is performed only on nodes that actually fit the job, thus
        # resulting in smaller instances and lower times compared to, for example, the consolidate allocator
        nodelist.sort(key=lambda x: x[1])
        # The list of sorted node IDs is returned
        return [x[0] for x in nodelist]

    def _trim_nodes(self, nodes):
        """
        Method which removes from a list of node IDs those elements that correspond to nodes that are full, i.e. they
        have no available Memory or CPU resources and are thus useless for allocation.
        
        :param nodes: A list of node IDs
        :return: The trimmed list of nodes
        """
        trimNodes = [n for n in nodes if all(self.avl_resources[n][r] > 0 for r in self.nec_res_types)]
        return trimNodes
    
    def _set_aux_resources(self):
        """
        @todo: Check how to improve
        """
        # Generate an aux structure to speedup the allocation process
        resource_types = self.resource_manager.resource_types        
        self.aux_resources = {}
        for res_type in resource_types:
            if not (res_type in self.aux_resources):
                self.aux_resources[res_type] = {}
            for node in self.sorted_keys:
                n_res = self.avl_resources[node][res_type]
                if n_res == 0:  # This works similar to trim_nodes
                    continue
                if not (n_res in self.aux_resources[res_type]):
                    self.aux_resources[res_type][n_res] = SortedSet()
                self.aux_resources[res_type][n_res].add(node)
                
    def _find_sat_nodes(self, req_resources):
        sat_nodes = {}
        # fitting_nodes = {}
        for t_res, n_res in req_resources.items():
            if n_res == 0:
                continue
            if not(t_res in sat_nodes):
                sat_nodes[t_res] = SortedSet(key=self._natural_keys)
            for n, nodes in self.aux_resources[t_res].items():
                if n >= n_res:
                    sat_nodes[t_res].update(nodes)
                    #===========================================================
                    # for node in nodes:
                    #     if not (node in fitting_nodes):
                    #         fitting_nodes[node] = {}
                    #     fitting_nodes[node][t_res] = n // n_res
                    #===========================================================
        # nodes = list(reduce(set.intersection, (set(val) for val in sat_nodes.values())))
        # tot_fitting_reqs = sum([min(fitting_nodes[n].values()) for n in nodes])
        nodes = reduce(SortedSet.intersection, sat_nodes.values())
        return nodes  # , tot_fitting_reqs
    
    

    def _atoi(self, text):
        return int(text) if text.isdigit() else text

    def _natural_keys(self, text):
        '''
        alist.sort(key=natural_keys) sorts in human order
        http://nedbatchelder.com/blog/200712/human_sorting.html
        (See Toothy's implementation in the comments)
        '''
        return [ self._atoi(c) for c in split('(\d+)', text) ]

Download source code

Hybrid

This allocator, introduced in [NettiGKSB18], is a combination of the Weighted and Balanced allocators.

Intuitively, it will separate and interleave the nodes according to the critical resources they possess like in the Balanced allocator; however, each of those lists is sorted individually like in the Weighted allocator.

Show/Hide Code
Balanced
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# from extra.allocators.allocator_weighted import allocator_weighted
from allocator_weighted import allocator_weighted
from accasim.base.allocator_class import FirstFit


class allocator_hybrid(allocator_weighted):
    """
    This allocator is a combination of the Weighted and Balanced allocators.
    
    Intuitively, it will separate and interleave the nodes according to the critical resources they possess like in
    the Balanced allocator; however, each of those lists is sorted individually like in the Weighted allocator.
    """

    name = 'Hybrid'

    def __init__(self, seed, resource_manager, **kwargs):
        """
        Constructor for the class.

        :param seed: seed for random events (not used)
        :param resource_manager: reference to the system resource manager
        :param kwargs: critical_res = defines the set of resource types to be preserved (default mic,gpu); 
                       window_size = defines the window size for job resource analysis(default 100);            
        """
        FirstFit.__init__(self, seed)

        win_key = 'window_size'
        res_key = 'critical_res'

        # If the user doesn't supply the set of resources to balance, mic and gpu are used by default
        if res_key in kwargs.keys():
            self._critical_resources = kwargs[res_key]
            assert all(res in resource_manager.resource_types for res in self._critical_resources), 'Selected resource types for interleaving are not correct'
        else:
            self._critical_resources = [r for r in ('mic', 'gpu') if r in resource_manager.resource_types]
        # The default window size to be used for job analysis is 100
        self._windowsize = (kwargs[win_key] if win_key in kwargs.keys() and kwargs[win_key] >= 0 else 100)

        # Dummy values for the parameters of the failed allocation heuristic, not used here
        self._modifierbounds = [1, 1]
        self._numsteps = 1
        self._modifierstep = 0
        self._critical_modifier = {}
        for k in self._critical_resources:
            self._critical_modifier[k] = self._modifierbounds[0]

        # The resource types in the system; stored for efficiency
        self._types = resource_manager.resource_types
        # The scheduling plan computed by the scheduler, if present
        self._schedule = None
        # The event dictionary used to retrieve job information from the schedule
        self._event_dictionary = None
        # The number of jobs currently considered for analysis (max is window_size)
        self._jobstoallocate = 0
        # The counters for resources required by jobs in the analysis window
        self._rescounters = {}
        for k in self._types:
            self._rescounters[k] = 1
        # Same as rescounters, but used for the overlapping events in the schedule, and not in the queue
        self._jobstoschedule = 0
        self._schedulecounters = {}
        for k in self._types:
            self._schedulecounters[k] = 1
        # The weights associated to the resource types
        self._weights = {}
        for k in self._types:
            self._weights[k] = 0
        # The ID to associate to nodes/jobs that possess no critical resources
        self._noneID = 'None'

    def _update_weights(self):
        """
        Computes the weights associated to each resource type, basing on the current resource counters.

        The weights consider the average number of resources required by jobs in the window, normalized by the
        base availability of each resource. Then the weights are multiplied by the current load factor for each
        resource, to preserve resources that are becoming scarce in the system. Finally, critical resources that are
        to be preserved at all times in the system (supplied by the user) see their weights multiplied by a constant
        modifier.
        """
        # The amount of current used resources in the system. Used to compute the load rate
        used_resources = {}
        base = self.resource_manager.system_capacity('total')
        for t in self._types:
            avl = self.resource_manager.current_availability
            qt = base[t] - sum([avl[node][t] for node in avl.keys()])
            used_resources[t] = qt
        # used_resources = self.resource_manager.get_used_resources()
        for k in self._types:
            self._weights[k] = (self._rescounters[k] + self._schedulecounters[k]) / (
            self._jobstoallocate + self._jobstoschedule + 1)
            # Might be useful to smooth out and compress average values
            # self._weights[k] = sqrt(self._weights[k])
            self._weights[k] *= (used_resources[k] + 1) / (base[k] * base[k])
            # Alternative weighting strategy, considers only the load factor: simpler, but with worse results
            # self._weights[k] *= (used_resources[k] + 1) / (self._base_availability[k])

    def _get_sorted_nodes(self, e):
        """
        Given an event e to be allocated, the method returns the sorted list of nodes which best fit the job,
        adjusted like in the balanced allocator.

        :param e: The event to be allocated
        :return: The sorted list of nodes that best fit the job
        """
        assert self.avl_resources is not None, 'The dictionary of available resources must be non-empty.'

        # res_lists is a dictionary containing, for each resource type, the list of nodes that have them. The lists
        # do not overlap, and each node falls in the list whose resource it has in the greatest quantity.
        # If a node has none of the critical resource, it will fall in a special 'none' list
        res_lists = {self._noneID: []}
        for k in self._critical_resources:
            res_lists[k] = []

        # All the nodes in the avl_resources dictionary are classified, according to the critical resources
        # they possess
        s_nodes = self._find_sat_nodes(e.requested_resources)
        for node in s_nodes:  # self.avl_resources.items():
            res_lists[self._critical_list_select(self.avl_resources[node])].append(node)

        res_lists[self._noneID] = self._get_sorted_node_sublist(e, res_lists[self._noneID])
        for key in self._critical_resources:
            res_lists[key] = self._get_sorted_node_sublist(e, res_lists[key])

        # The lists are then combined, by placing in front the 'none' list, which is a buffer for the critical res
        final_list = res_lists[self._noneID]
        remaining_nodes = sum([len(l) for l in res_lists.values()]) - len(final_list)

        # The algorithm would 'pop' elements from the lists' heads in succession. To avoid this, as it is expensive,
        # we use a dictionary of starting indexes for each list
        start_indexes = dict.fromkeys(self._critical_resources, 0)
        # After the 'none' list, the remaining lists are interleaved, and at each step an element is picked from
        # the longest list in res_lists, in order to balance their usage.
        for i in range(remaining_nodes):
            rr_res = self._get_longest_list(res_lists, start_indexes)
            if rr_res != self._noneID:
                final_list.append(res_lists[rr_res][start_indexes[rr_res]])
                start_indexes[rr_res] += 1

        return final_list

    def _get_sorted_node_sublist(self, e, nodes):
        """
        Given a sublist of nodes and an event e, the method returns the sorted version of the sublist, like in the
        weighted allocator. It is used to sort the single node lists, relative to the various resource types.
        
        :param e: the event to be allocated
        :param nodes: the nodes' sublist to be sorted
        :return: the sorted nodes' sublist
        """
        nodelist = []
        # For each node in the system, the job "fit", which is the number of job units fitting the node, is computed
        for node in nodes:
            fits = self._event_fits_node(self.avl_resources[node], e.requested_resources)
            # If the node has not enough resources to fit the job, it is simply discarded
            if fits == 0:
                continue
            elif fits > e.requested_nodes:
                fits = e.requested_nodes
            # The nodes are ranked by the amount of weighted resources left after allocating the job
            rank = sum(
                (self.avl_resources.get(node).get(k) - e.requested_resources[k] * fits) * self._weights[k] for k in
                self._types)
            # Alternative ranking, similar to a weighted consolidate; usually performs worse than the above
            # rank = sum((self.avl_resources.get(node).get(k)) * self._weights[k] for k in self._types)
            # We use a temporary list to store the node ID and its ranking
            nodelist.append((node, rank))
        # Lastly, sorting is performed. Note that sorting is performed only on nodes that actually fit the job, thus
        # resulting in smaller instances and lower times compared to, for example, the consolidate allocator
        nodelist.sort(key=lambda x: x[1])
        # The list of sorted node IDs is returned
        return [x[0] for x in nodelist]

    def _critical_list_select(self, noderes):
        """
        A simple method which, given the resources of a node, returns the key of the critical resource for which
        such node has greater availability. If there are no critical resources available, a 'none' key is returned.

        :param noderes: the resources dictionary of a node
        :return: the key of the critical resource to which assign the node
        """
        maxval = 0
        maxkey = self._noneID
        for cres in self._critical_resources:
            if noderes[cres] > maxval:
                maxval = noderes[cres]
                maxkey = cres
        return maxkey

    def _get_longest_list(self, res_lists, start_indexes):
        """
        Given the dictionary of critical resources lists, the method returns the key of the next list from which
        an element has to be picked in order to build the final nodes list.

        :param res_lists: the critical resources' lists dictionary
        :param start_indexes: the dictionary of starting indexes for each list (to simulate a pop operation)
        :return: the key of the next list to be picked
        """
        maxkey = self._noneID
        maxval = 0
        for k in self._critical_resources:
            modifier = len(res_lists[k]) - start_indexes[k]
            if modifier > maxval:
                maxval = modifier
                maxkey = k
        return maxkey

Download source code

Citations

[NettiGKSB18](1, 2, 3) Alessio Netti, Cristian Galleguillos, Zeynep Kiziltan, Alina Sirbu, Ozalp Babaoglu. Heterogeneity-Aware Resource Allocation in HPC Systems. In Proc. of ISC 2018.

Return