Allocators from literature¶
Balanced¶
This allocator, introduced in [NettiGKSB18], considers a set of critical and scarce resource types (like GPUs or MICs), and tries to balance the allocation to the respective nodes in order to avoid fragmentation and waste.
The algorithm will collect the nodes having different critical resource types in distinct sets. It will then build a list of nodes to be used in the allocation process: the nodes having no critical resource types are placed in front, followed by the nodes having critical resources; these nodes are interleaved, in order to balance their usage and avoid favoring a specific resource type. The resource types to be balanced can be given as input. If a node has more than one type of critical resource available, it will be considered for the type for which it has the greatest availability.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 | from accasim.base.allocator_class import FirstFit
from copy import copy
class allocator_balanced(FirstFit):
"""
An allocator which considers a set of critical and scarce resource types (like GPUs or MICs), and tries to balance
the allocation to the respective nodes in order to avoid fragmentation and waste.
The algorithm will collect the nodes having different critical resource types in distinct sets. It will then build
a list of nodes to be used in the allocation process: the nodes having no critical resource types are placed in
front, followed by the nodes having critical resources; these nodes are interleaved, in order to balance their usage
and avoid favoring a specific resource type.
The resource types to be balanced can be given as input. If a node has more than one type of critical resource
available, it will be considered for the type for which it has the greatest availability.
"""
name = 'Balanced'
def __init__(self, seed, resource_manager, **kwargs):
"""
Constructor for the class.
:param seed: seed for random events (not used)
:param resource_manager: reference to the system resource manager
:param kwargs: critical_res = defines the set of resource types to be balanced (default mic,gpu);
"""
FirstFit.__init__(self, seed)
# The ID to associate to nodes/jobs that possess no critical resources
self._noneID = 'None'
res_key = 'critical_res'
# If the user doesn't supply the set of resources to balance, mic and gpu are used by default
if res_key in kwargs.keys():
self._critical_resources = kwargs[res_key]
assert all(res in resource_manager.resource_types for res in
self._critical_resources), 'Selected resource types for interleaving are not correct'
else:
self._critical_resources = [r for r in ('mic', 'gpu') if r in resource_manager.resource_types]
# The lists containing the single node IDs, per resource type
self._res_lists = None
#=======================================================================
# if resource_manager:
# self.set_resources(resource_manager)
#=======================================================================
def _sort_resources(self):
"""
This method sorts the keys of the available resources dictionary, basing on the ranking policy.
It is called after the resources are set in the allocator.
:return: the list of sorted keys (node ids) for the resources
"""
assert self.avl_resources is not None, 'The dictionary of available resources must be non-empty.'
# res_lists is a dictionary containing, for each resource type, the list of nodes that have them. The lists
# do not overlap, and each node falls in the list whose resource it has in the greatest quantity.
# If a node has none of the critical resource, it will fall in a special 'none' list
self._res_lists = {self._noneID: []}
for k in self._critical_resources:
self._res_lists[k] = []
# All the nodes in the avl_resources dictionary are classified, according to the critical resources
# they possess
for node, res in self.avl_resources.items():
self._res_lists[self._critical_list_select(res)].append(node)
return self._convert_to_final_list(self._res_lists)
def _adjust_resources(self, sorted_keys=None):
"""
Adjusts the sorting of the resources after a successful allocation.
In order to improve efficiency, this method uses the self._res_lists dictionary stored after a set_resources
call. The lists are not built from scratch, and a minor adjustment is performed to restore sorting.
:param sorted_keys: the list of keys, almost sorted, that needs to be adjusted
"""
assert self.avl_resources is not None, 'The dictionary of available resources must be non-empty.'
# assert sorted_keys is not None, 'The list of keys must be non-empty'
# assert self._res_lists is not None, 'Cannot adjust resources if they have not been initialized'
if sorted_keys:
self.sorted_keys.clear()
else:
self._sort_resources()
self.sorted_keys = self._convert_to_final_list(self._res_lists)
def _update_resources(self, reserved_nodes, requested_resources):
"""
Updates the internal avl_resources list after a successful allocation.
Also, this method updates the internal self._res_lists dictionary, moving nodes whose resources have changed
to the appropriate list in the dictionary. Such lists will then be sorted again at the next adjust_resources
call.
:param reserved_nodes: the list of nodes assigned to the allocated job
:param requested_resources: the list of resources requested by the job per each node
"""
# Nodes involved in the update are removed from the respective lists
temp_node_list = []
for node in set(reserved_nodes):
# Small trick: if a node belongs to the None list before the update, we do not consider it for the
# resource adjustment, as it will be placed again in the None list, which is not sorted
rr_res = self._critical_list_select(self.avl_resources[node])
if rr_res is not self._noneID:
self._res_lists[self._critical_list_select(self.avl_resources[node])].remove(node)
temp_node_list.append(node)
FirstFit._update_resources(self, reserved_nodes, requested_resources)
# Again, nodes that are involved in the update are re-added to the lists, according to their new resources
for node in temp_node_list:
self._res_lists[self._critical_list_select(self.avl_resources[node])].append(node)
def _convert_to_final_list(self, res_lists):
"""
A method which considers a dictionary of node lists for each critical resource, and builds a final sorted
list of nodes from it.
:param res_lists: the dictionary of node lists, for each critical resource
:return: the final sorted list to be used by the allocator
"""
# Each separate list is sorted, so that nodes with the most available critical resources will be placed
# at the end
res_lists[self._noneID] = self._trim_nodes(res_lists[self._noneID])
res_lists[self._noneID].sort(key=lambda x: x)
for key in self._critical_resources:
res_lists[key] = self._trim_nodes(res_lists[key])
res_lists[key].sort(key=lambda x: (self.avl_resources.get(x).get(key), x))
# The lists are then combined, by placing in front the 'none' list, which is a buffer for the critical res
final_list = copy(res_lists[self._noneID])
remaining_nodes = len(self.avl_resources) - len(final_list)
# The algorithm would 'pop' elements from the lists' heads in succession. To avoid this, as it is expensive,
# we use a dictionary of starting indexes for each list
start_indexes = dict.fromkeys(self._critical_resources, 0)
# After the 'none' list, the remaining lists are interleaved, and at each step an element is picked from
# the longest list in res_lists, in order to balance their usage.
for i in range(remaining_nodes):
rr_res = self._get_longest_list(res_lists, start_indexes)
if rr_res != self._noneID:
final_list.append(res_lists[rr_res][start_indexes[rr_res]])
start_indexes[rr_res] += 1
return final_list
def _critical_list_select(self, noderes):
"""
A simple method which, given the resources of a node, returns the key of the critical resource for which
such node has greater availability. If there are no critical resources available, a 'none' key is returned.
:param noderes: the resources dictionary of a node
:return: the key of the critical resource to which assign the node
"""
maxval = 0
maxkey = self._noneID
for cres in self._critical_resources:
if noderes[cres] > maxval:
maxval = noderes[cres]
maxkey = cres
return maxkey
def _get_longest_list(self, res_lists, start_indexes):
"""
Given the dictionary of critical resources lists, the method returns the key of the next list from which
an element has to be picked in order to build the final nodes list.
:param res_lists: the critical resources' lists dictionary
:param start_indexes: the dictionary of starting indexes for each list (to simulate a pop operation)
:return: the key of the next list to be picked
"""
maxkey = self._noneID
maxval = 0
for k in self._critical_resources:
modifier = len(res_lists[k]) - start_indexes[k]
if modifier > maxval:
maxval = modifier
maxkey = k
return maxkey
def _trim_nodes(self, nodes):
"""
Method which removes from a list of node IDs those elements that correspond to nodes that are full, i.e. they
have no available Memory or CPU resources and are thus useless for allocation.
:param nodes: A list of node IDs
:return: The trimmed list of nodes
"""
trimNodes = [n for n in nodes if all(self.avl_resources[n][r] > 0 for r in self.nec_res_types)]
return trimNodes
|
Weighted¶
This allocator, introduced in [NettiGKSB18], tries to perform optimization on the allocation of the queued jobs. Each job is allocated on the node on which after the allocation the minimum weighted resources will be obtained. The weights quantify the level of criticality of a certain resource type using three parameters: the average amount requested, load ratio and total capacity.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 | from accasim.base.allocator_class import FirstFit
from sortedcontainers.sortedset import SortedSet
from re import split
from _functools import reduce
class allocator_weighted(FirstFit):
"""
An allocator which tries to perform optimization on the allocation of all events in the queue.
When the user supplies the queue of events to be allocated, the average number of resources required per-type
is computed, together with the current load rate. The average is weighted according to the expected length of
each job. These values are used as weights to indicate the value of a resource. Also, the allocator computes
the resource weights considering also the base availability in the system, assigning higher values to scarcer
resources, to assess which are actually more precious and need to be preserved in a global manner. Each job is
then allocated on the nodes on which the minimum number of weighted resources is left (after the allocation),
in a similar fashion to the best-fit allocator.
The user can also supply a "critical resources" list: for such resources, an additional heuristic is used, and in
this case the allocator goes under the name of Priority-Weighted; this consists in a priority value, per resource
type, which will increase as allocations fail for jobs that require critical resources, and will decrease for each
successful allocation. If a jobs requires multiple critical resource types, all of their priority values will be
affected by the allocation. This tends to assign greater weight to critical resources actually requested by jobs,
and will make the allocator statistically preserve such resources.
"""
name = 'Weighted'
def __init__(self, seed, resource_manager, **kwargs):
"""
Constructor for the class.
:param seed: seed for random events (not used)
:param resource_manager: reference to the system resource manager
:param kwargs: critical_res = defines the set of resource types to be preserved (default none);
critical_bounds = the lower and upper bounds for the allocation fail heuristic (default [1,10]);
critical_steps = the number of steps in the scale between the critical_bounds (default 9);
window_size = defines the window size for job resource analysis(default 100);
"""
FirstFit.__init__(self, seed)
win_key = 'window_size'
res_key = 'critical_res'
steps_key = 'critical_steps'
bound_key = 'critical_bounds'
# If the user doesn't supply the set of resources to balance, mic and gpu are used by default
if res_key in kwargs.keys():
self._critical_resources = kwargs[res_key]
assert all(res in resource_manager.resource_types for res in self._critical_resources), 'Selected resource types for interleaving are not correct'
else:
self._critical_resources = [] # Priority-Weighted heuristic is disabled by default
# The default window size to be used for job analysis is 100
self._windowsize = (kwargs[win_key] if win_key in kwargs.keys() and kwargs[win_key] >= 0 else 100)
# The lower and upper bounds for the weight modifier of critical resources
if bound_key in kwargs.keys() and len(kwargs[bound_key]) == 2 and 1 <= kwargs[bound_key][0] <= kwargs[bound_key][1]:
self._modifierbounds = kwargs[bound_key]
else:
self._modifierbounds = [1, 10]
# The number of steps in the scale between the lower and upper bound for the allocation fail heuristic
self._numsteps = (kwargs[steps_key] if steps_key in kwargs.keys() and kwargs[steps_key] > 0 else 9)
# Parameters relative to the allocation fail metric for jobs requiring critical resources
# The step for increasing and decreasing the modifiers
self._modifierstep = (self._modifierbounds[1] - self._modifierbounds[0]) / self._numsteps
# The dictionary containing the current modifier values
self._critical_modifier = {}
for k in self._critical_resources:
self._critical_modifier[k] = self._modifierbounds[0]
# The resource types in the system; stored for efficiency
self._types = resource_manager.resource_types
# The scheduling plan computed by the scheduler, if present
self._schedule = None
# The event dictionary used to retrieve job information from the schedule
self._event_dictionary = None
# The number of jobs currently considered for analysis (max is window_size)
self._jobstoallocate = 0
# The counters for resources required by jobs in the analysis window
self._rescounters = {}
for k in self._types:
self._rescounters[k] = 1
# Same as rescounters, but used for the overlapping events in the schedule, and not in the queue
self._jobstoschedule = 0
self._schedulecounters = {}
for k in self._types:
self._schedulecounters[k] = 1
# The weights associated to the resource types
self._weights = {}
for k in self._types:
self._weights[k] = 0
# Two variables used to compute the weighted average of job resources' requests; they store the total
# length of jobs in the analysis window
self._cumulative_length = 0
# Same as above, but for jobs overlapping the current one in the schedule (for CP)
self._cumulative_schedule = 0
def set_attr(self, **kwargs):
"""
Method used to set internal parameters and meta-data for the allocator.
Its behavior depends on the specific allocator that is being used, and some arguments may be discarded.
In this optimization-based allocator, the method is used to set the internal variable related to the
schedule plan, if present. It is useful for the CP scheduler, which is the only one that computes an actual
scheduling plan.
:param kwargs: schedule = the scheduling plan, in a tuple of the type (es_dict,dict(job_id: start_time))
"""
schedule_key = 'schedule'
if schedule_key in kwargs.keys():
self._event_dictionary = kwargs[schedule_key][0]
self._schedule = kwargs[schedule_key][1]
elif 'dict' in kwargs.keys():
self._event_dictionary = kwargs['dict']
def allocating_method(self, es, cur_time, skip=False, reserved_time=None, reserved_nodes=None, debug=False):
"""
Given a job list es, this method searches for a suitable allocation for as many jobs as possible.
In normal allocation, the method stops as soon as an event in the list cannot be allocated. In this case,
ths list of already allocated jobs is returned. This is done to be coherent with the scheduler's rules.
As an alternative, the skip parameter can be supplied to allow the scheduler to skip unallocated jobs.
This method also support backfilling schedule. In this case, the backfilling parameters are supplied,
and the allocator tries to fit jobs without delaying the reserved job. In this second case,
the method does not stop when a job cannot be allocated, but simply skips it.
es can be a list or a single event object. The return type (list or single tuple) changes accordingly.
:param es: the event(s) to be allocated
:param cur_time: current time, needed to build the schedule list
:param skip: determines if the allocator can skip jobs
:param reserved_time: beginning of the next reservation slot (used for backfilling)
:param reserved_nodes: nodes already reserved (used for backfilling)
:return: a list of assigned nodes of length e.requested_nodes, for all events that could be allocated. The
list is in the format (time,event,nodes) where time can be either cur_time or None.
"""
if not isinstance(es, (list, tuple)):
listAsInput = False
es = [es]
else:
listAsInput = True
allocation = []
success_counter = 0
event_counter = 0
# Create aux resources for this allocation
self._set_aux_resources()
# The required resources' counters are initialized according to the event queue given as input
self._initialize_counters(es)
if debug:
print('Queue length %i - Window size %i' % (len(es), self._jobstoallocate))
print('Distribution: ')
print(self._rescounters.values())
for e in es:
requested_nodes = e.requested_nodes
requested_resources = e.requested_resources
# If the input arguments relative to backfilling are not supplied, the method operates in regular mode.
# Otherwise, backfilling mode is enabled, allowing the allocator to skip jobs and consider the reservation.
nodes_to_discard = self._compute_reservation_overlaps(e, cur_time, reserved_time, reserved_nodes)
backfilling_overlap = False if len(nodes_to_discard) == 0 else True
self._find_overlapping_jobs(e, cur_time)
# After each allocation, the weights are updated according to the new analysis window
self._update_weights()
# After computing the weights, the sorted list of available nodes for allocation is computed
nodekeys = self._get_sorted_nodes(e)
assigned_nodes = []
nodes_left = requested_nodes
for node in nodekeys:
# The algorithm check whether the given node belongs to the list of reserved nodes, in backfilling.
# If it does, the node is discarded.
resources = self.avl_resources[node]
if backfilling_overlap and node in nodes_to_discard:
continue
# We compute the number of job units fitting in the current node, and update the assignment
fits = self._event_fits_node(resources, requested_resources)
if nodes_left <= fits:
assigned_nodes += [node] * nodes_left
nodes_left = 0
else:
assigned_nodes += [node] * fits
nodes_left -= fits
if nodes_left <= 0:
break
# If, after analyzing all nodes, the allocation is still not complete, the partial allocation
# is discarded.
if nodes_left > 0:
assigned_nodes = []
assert not assigned_nodes or requested_nodes == len(assigned_nodes), 'Requested' + str(
requested_nodes) + ' got ' + str(len(assigned_nodes))
# If a correct allocation was found, we update the resources of the system, sort them again, and
# add the allocation to the output list.
self._update_modifiers(e, assigned_nodes)
if assigned_nodes:
allocation.append((cur_time, e.id, assigned_nodes))
self._update_resources(assigned_nodes, requested_resources)
success_counter += 1
if debug:
print('Allocation successful for event %s' % (e.id))
# If no correct allocation could be found, two scenarios are possible: 1) normally, the allocator stops
# here and returns the jobs allocated so far 2) if the skip parameter is enabled, the job is just
# skipped, and we proceed with the remaining ones.
else:
if debug:
print('Allocation failed for event %s with %s nodes left' % (e.id, nodes_left))
allocation.append((None, e.id, []))
if not skip:
# if jobs cannot be skipped, at the first allocation fail all subsequent jobs fail too
for ev in es[(success_counter + 1):]:
allocation.append((None, ev.id, []))
if debug:
print('Cannot skip jobs, %s additional pending allocations failed' % (len(es) - success_counter - 1))
break
# After trying to allocate the current event, the analysis window is shifted by one: the previous first
# event is removed from the counters, and the next one falling in the window is added to them
event_counter += 1
self._update_counters(es, event_counter)
if debug:
print('There were %s successful allocations out of %s events' % (success_counter, len(es)))
self._reset_schedule()
return allocation if listAsInput else allocation[0]
def _initialize_counters(self, es):
"""
A method which initializes the required resources counters for the job queue.
It is executed only once for each queue allocation request, and then the counters are dynamically
and efficiently updated through the update_counters method.
:param es: the list of events to be allocated
"""
self._cumulative_length = 0
self._cumulative_schedule = 0
# The counters are initialized to 1, to avoid having weights equal to 0
for k in self._types:
self._rescounters[k] = 1
# The size of the analysis window is computed
self._jobstoallocate = len(es) - 1 if len(es) - 1 <= self._windowsize else self._windowsize
# For each job in the window, we then add to the counters its total number of resources required per-type
for i in range(self._jobstoallocate):
req_res = es[i + 1].requested_resources
req_nodes = es[i + 1].requested_nodes
self._cumulative_length += es[i + 1].expected_duration + 1
for k in self._types:
self._rescounters[k] += req_res[k] * req_nodes * (es[i + 1].expected_duration + 1)
def _update_counters(self, es, startindex):
"""
A method which updates the counters of resources requested by jobs.
It operates in O(1) time, since we just need to remove the old job from the counters and add the new one,
if present.
:param es: The list of jobs to be allocated
:param startindex: The starting index of the new window (including the job that is to be allocated now)
"""
# Adding new event to counters
if startindex + self._windowsize < len(es):
e = es[startindex + self._windowsize]
req_res = e.requested_resources
req_nodes = e.requested_nodes
self._cumulative_length += e.expected_duration + 1
for k in self._types:
self._rescounters[k] += req_res[k] * req_nodes * (e.expected_duration + 1)
else:
self._jobstoallocate -= 1
# Removing old event from counters
if 0 <= startindex < len(es):
e = es[startindex]
req_res = e.requested_resources
req_nodes = e.requested_nodes
self._cumulative_length -= e.expected_duration + 1
for k in self._types:
self._rescounters[k] -= req_res[k] * req_nodes * (e.expected_duration + 1)
assert self._rescounters[k] >= 1, 'A resource is going below zero during counter update'
def _update_weights(self):
"""
Computes the weights associated to each resource type, basing on the current resource counters.
The weights consider the average number of resources required by jobs in the window, weighted according to
the job's expected duration. This average per-resource request is then normalized by the
base availability of each resource. Then the weights are multiplied by the current load factor for each
resource, to preserve resources that are becoming scarce in the system. Finally, critical resources that are
to be preserved at all times in the system (supplied by the user) see their weights multiplied by a constant
modifier.
"""
# The amount of current used resources in the system. Used to compute the load rate
used_resources = {}
base = self.resource_manager.system_capacity('total')
for t in self._types:
# base = self.resource_manager.get_total_resources(t)
avl = self.resource_manager.current_availability
qt = base[t] - sum([avl[node][t] for node in avl.keys()])
used_resources[t] = qt
# used_resources = self.resource_manager.get_used_resources()
for k in self._types:
# self._weights[k] = (self._rescounters[k] + self._schedulecounters[k]) / (self._jobstoallocate + self._jobstoschedule + 1)
self._weights[k] = (self._rescounters[k] + self._schedulecounters[k]) / (self._cumulative_length + self._cumulative_schedule + 1)
# Might be useful to smooth out and compress average values
# self._weights[k] = sqrt(self._weights[k])
# self._weights[k] *= (used_resources[k] + 1) / (self._base_availability[k] * self._base_availability[k])
self._weights[k] *= (used_resources[k] + 1) / (base[k] * base[k])
# Alternative weighting strategy, considers only the load factor: simpler, but with worse results
# self._weights[k] *= (used_resources[k] + 1) / (self._base_availability[k])
# The weights related to critical resources are multiplied by the allocation fail heuristic
if k in self._critical_resources:
self._weights[k] *= self._critical_modifier[k]
def _find_overlapping_jobs(self, e, cur_time):
"""
A method which searches for overlapping jobs in the schedule with the one to be allocated now.
The schedule is to be set by the user via the set_attr method before allocation. The metric used to
determine overlap is the expected duration of the job. All the data computed from the resources needed
by the overlapping jobs is stored in the schedulecounters and jobstoschedule, identical in function to
rescounters and jobstoallocate.
:param e: the event to be allocated now
:param cur_time: the current time, in which allocation will be performed
"""
# In any case, the max number of jobs we want to consider is windowsize
maxjobs = self._windowsize - self._jobstoallocate
if maxjobs == 0 or self._schedule is None or self._event_dictionary is None:
return
# The resource counters are reset
for k in self._types:
self._schedulecounters[k] = 0
self._jobstoschedule = 0
self._cumulative_schedule = 0
# The expected end time of the current job is used to estimate overlap with future jobs
end_time = cur_time + e.expected_duration
for jobid, jobtime in self._schedule.items():
if cur_time < jobtime < end_time:
# For overlapping jobs we update the resource counters like in regular allocation
req_res = self._event_dictionary[jobid].requested_resources
req_nodes = self._event_dictionary[jobid].requested_nodes
self._cumulative_schedule += self._event_dictionary[jobid].expected_duration
for k in self._types:
self._schedulecounters[k] += req_res[k] * req_nodes * self._event_dictionary[jobid].expected_duration
self._jobstoschedule += 1
# If the maximum number of jobs in the window has been reached, we break from the cycle
if self._jobstoschedule >= maxjobs:
break
def _update_modifiers(self, e, success):
"""
This method updates the modifiers used for critical resources (if active) upon a failed
or successful allocation.
:param e: the event subject to allocation
:param success: boolean value; True if the allocation succeeded, False otherwise
"""
noderes = e.requested_resources
for cres in self._critical_resources:
if noderes[cres] > 0:
if not success:
self._critical_modifier[cres] += self._modifierstep
if self._critical_modifier[cres] > self._modifierbounds[1]:
self._critical_modifier[cres] = self._modifierbounds[1]
else:
self._critical_modifier[cres] -= self._modifierstep
if self._critical_modifier[cres] < self._modifierbounds[0]:
self._critical_modifier[cres] = self._modifierbounds[0]
def _reset_schedule(self):
"""
A method which resets all internal variables related to scheduling, after allocation of all jobs is performed.
The use of this method implies that the schedule must be set every time after calling the search_allocation
method.
"""
self._jobstoschedule = 0
self._schedule = None
self._event_dictionary = None
for k in self._types:
self._schedulecounters[k] = 0
def _get_sorted_nodes(self, e):
"""
Given an event e to be allocated, the method returns the sorted list of nodes which best fit the job.
:param e: The event to be allocated
:return: The sorted list of nodes that best fit the job
"""
assert self.avl_resources is not None, 'The dictionary of available resources must be non-empty.'
nodelist = []
s_nodes = self._find_sat_nodes(e.requested_resources)
# For each node in the system, the job "fit", which is the number of job units fitting the node, is computed
for node in s_nodes: # self.avl_resources.keys():
fits = self._event_fits_node(self.avl_resources[node], e.requested_resources)
# If the node has not enough resources to fit the job, it is simply discarded
if fits == 0:
continue
elif fits > e.requested_nodes:
fits = e.requested_nodes
# The nodes are ranked by the amount of weighted resources left after allocating the job
rank = sum((self.avl_resources.get(node).get(k) - e.requested_resources[k] * fits) * self._weights[k] for k in self._types)
# Alternative ranking, similar to a weighted consolidate; usually performs worse than the above
# rank = sum((self.avl_resources.get(node).get(k)) * self._weights[k] for k in self._types)
# We use a temporary list to store the node ID and its ranking
nodelist.append((node, rank))
# Lastly, sorting is performed. Note that sorting is performed only on nodes that actually fit the job, thus
# resulting in smaller instances and lower times compared to, for example, the consolidate allocator
nodelist.sort(key=lambda x: x[1])
# The list of sorted node IDs is returned
return [x[0] for x in nodelist]
def _trim_nodes(self, nodes):
"""
Method which removes from a list of node IDs those elements that correspond to nodes that are full, i.e. they
have no available Memory or CPU resources and are thus useless for allocation.
:param nodes: A list of node IDs
:return: The trimmed list of nodes
"""
trimNodes = [n for n in nodes if all(self.avl_resources[n][r] > 0 for r in self.nec_res_types)]
return trimNodes
def _set_aux_resources(self):
"""
@todo: Check how to improve
"""
# Generate an aux structure to speedup the allocation process
resource_types = self.resource_manager.resource_types
self.aux_resources = {}
for res_type in resource_types:
if not (res_type in self.aux_resources):
self.aux_resources[res_type] = {}
for node in self.sorted_keys:
n_res = self.avl_resources[node][res_type]
if n_res == 0: # This works similar to trim_nodes
continue
if not (n_res in self.aux_resources[res_type]):
self.aux_resources[res_type][n_res] = SortedSet()
self.aux_resources[res_type][n_res].add(node)
def _find_sat_nodes(self, req_resources):
sat_nodes = {}
# fitting_nodes = {}
for t_res, n_res in req_resources.items():
if n_res == 0:
continue
if not(t_res in sat_nodes):
sat_nodes[t_res] = SortedSet(key=self._natural_keys)
for n, nodes in self.aux_resources[t_res].items():
if n >= n_res:
sat_nodes[t_res].update(nodes)
#===========================================================
# for node in nodes:
# if not (node in fitting_nodes):
# fitting_nodes[node] = {}
# fitting_nodes[node][t_res] = n // n_res
#===========================================================
# nodes = list(reduce(set.intersection, (set(val) for val in sat_nodes.values())))
# tot_fitting_reqs = sum([min(fitting_nodes[n].values()) for n in nodes])
nodes = reduce(SortedSet.intersection, sat_nodes.values())
return nodes # , tot_fitting_reqs
def _atoi(self, text):
return int(text) if text.isdigit() else text
def _natural_keys(self, text):
'''
alist.sort(key=natural_keys) sorts in human order
http://nedbatchelder.com/blog/200712/human_sorting.html
(See Toothy's implementation in the comments)
'''
return [ self._atoi(c) for c in split('(\d+)', text) ]
|
Hybrid¶
This allocator, introduced in [NettiGKSB18], is a combination of the Weighted and Balanced allocators.
Intuitively, it will separate and interleave the nodes according to the critical resources they possess like in the Balanced allocator; however, each of those lists is sorted individually like in the Weighted allocator.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 | # from extra.allocators.allocator_weighted import allocator_weighted
from allocator_weighted import allocator_weighted
from accasim.base.allocator_class import FirstFit
class allocator_hybrid(allocator_weighted):
"""
This allocator is a combination of the Weighted and Balanced allocators.
Intuitively, it will separate and interleave the nodes according to the critical resources they possess like in
the Balanced allocator; however, each of those lists is sorted individually like in the Weighted allocator.
"""
name = 'Hybrid'
def __init__(self, seed, resource_manager, **kwargs):
"""
Constructor for the class.
:param seed: seed for random events (not used)
:param resource_manager: reference to the system resource manager
:param kwargs: critical_res = defines the set of resource types to be preserved (default mic,gpu);
window_size = defines the window size for job resource analysis(default 100);
"""
FirstFit.__init__(self, seed)
win_key = 'window_size'
res_key = 'critical_res'
# If the user doesn't supply the set of resources to balance, mic and gpu are used by default
if res_key in kwargs.keys():
self._critical_resources = kwargs[res_key]
assert all(res in resource_manager.resource_types for res in self._critical_resources), 'Selected resource types for interleaving are not correct'
else:
self._critical_resources = [r for r in ('mic', 'gpu') if r in resource_manager.resource_types]
# The default window size to be used for job analysis is 100
self._windowsize = (kwargs[win_key] if win_key in kwargs.keys() and kwargs[win_key] >= 0 else 100)
# Dummy values for the parameters of the failed allocation heuristic, not used here
self._modifierbounds = [1, 1]
self._numsteps = 1
self._modifierstep = 0
self._critical_modifier = {}
for k in self._critical_resources:
self._critical_modifier[k] = self._modifierbounds[0]
# The resource types in the system; stored for efficiency
self._types = resource_manager.resource_types
# The scheduling plan computed by the scheduler, if present
self._schedule = None
# The event dictionary used to retrieve job information from the schedule
self._event_dictionary = None
# The number of jobs currently considered for analysis (max is window_size)
self._jobstoallocate = 0
# The counters for resources required by jobs in the analysis window
self._rescounters = {}
for k in self._types:
self._rescounters[k] = 1
# Same as rescounters, but used for the overlapping events in the schedule, and not in the queue
self._jobstoschedule = 0
self._schedulecounters = {}
for k in self._types:
self._schedulecounters[k] = 1
# The weights associated to the resource types
self._weights = {}
for k in self._types:
self._weights[k] = 0
# The ID to associate to nodes/jobs that possess no critical resources
self._noneID = 'None'
def _update_weights(self):
"""
Computes the weights associated to each resource type, basing on the current resource counters.
The weights consider the average number of resources required by jobs in the window, normalized by the
base availability of each resource. Then the weights are multiplied by the current load factor for each
resource, to preserve resources that are becoming scarce in the system. Finally, critical resources that are
to be preserved at all times in the system (supplied by the user) see their weights multiplied by a constant
modifier.
"""
# The amount of current used resources in the system. Used to compute the load rate
used_resources = {}
base = self.resource_manager.system_capacity('total')
for t in self._types:
avl = self.resource_manager.current_availability
qt = base[t] - sum([avl[node][t] for node in avl.keys()])
used_resources[t] = qt
# used_resources = self.resource_manager.get_used_resources()
for k in self._types:
self._weights[k] = (self._rescounters[k] + self._schedulecounters[k]) / (
self._jobstoallocate + self._jobstoschedule + 1)
# Might be useful to smooth out and compress average values
# self._weights[k] = sqrt(self._weights[k])
self._weights[k] *= (used_resources[k] + 1) / (base[k] * base[k])
# Alternative weighting strategy, considers only the load factor: simpler, but with worse results
# self._weights[k] *= (used_resources[k] + 1) / (self._base_availability[k])
def _get_sorted_nodes(self, e):
"""
Given an event e to be allocated, the method returns the sorted list of nodes which best fit the job,
adjusted like in the balanced allocator.
:param e: The event to be allocated
:return: The sorted list of nodes that best fit the job
"""
assert self.avl_resources is not None, 'The dictionary of available resources must be non-empty.'
# res_lists is a dictionary containing, for each resource type, the list of nodes that have them. The lists
# do not overlap, and each node falls in the list whose resource it has in the greatest quantity.
# If a node has none of the critical resource, it will fall in a special 'none' list
res_lists = {self._noneID: []}
for k in self._critical_resources:
res_lists[k] = []
# All the nodes in the avl_resources dictionary are classified, according to the critical resources
# they possess
s_nodes = self._find_sat_nodes(e.requested_resources)
for node in s_nodes: # self.avl_resources.items():
res_lists[self._critical_list_select(self.avl_resources[node])].append(node)
res_lists[self._noneID] = self._get_sorted_node_sublist(e, res_lists[self._noneID])
for key in self._critical_resources:
res_lists[key] = self._get_sorted_node_sublist(e, res_lists[key])
# The lists are then combined, by placing in front the 'none' list, which is a buffer for the critical res
final_list = res_lists[self._noneID]
remaining_nodes = sum([len(l) for l in res_lists.values()]) - len(final_list)
# The algorithm would 'pop' elements from the lists' heads in succession. To avoid this, as it is expensive,
# we use a dictionary of starting indexes for each list
start_indexes = dict.fromkeys(self._critical_resources, 0)
# After the 'none' list, the remaining lists are interleaved, and at each step an element is picked from
# the longest list in res_lists, in order to balance their usage.
for i in range(remaining_nodes):
rr_res = self._get_longest_list(res_lists, start_indexes)
if rr_res != self._noneID:
final_list.append(res_lists[rr_res][start_indexes[rr_res]])
start_indexes[rr_res] += 1
return final_list
def _get_sorted_node_sublist(self, e, nodes):
"""
Given a sublist of nodes and an event e, the method returns the sorted version of the sublist, like in the
weighted allocator. It is used to sort the single node lists, relative to the various resource types.
:param e: the event to be allocated
:param nodes: the nodes' sublist to be sorted
:return: the sorted nodes' sublist
"""
nodelist = []
# For each node in the system, the job "fit", which is the number of job units fitting the node, is computed
for node in nodes:
fits = self._event_fits_node(self.avl_resources[node], e.requested_resources)
# If the node has not enough resources to fit the job, it is simply discarded
if fits == 0:
continue
elif fits > e.requested_nodes:
fits = e.requested_nodes
# The nodes are ranked by the amount of weighted resources left after allocating the job
rank = sum(
(self.avl_resources.get(node).get(k) - e.requested_resources[k] * fits) * self._weights[k] for k in
self._types)
# Alternative ranking, similar to a weighted consolidate; usually performs worse than the above
# rank = sum((self.avl_resources.get(node).get(k)) * self._weights[k] for k in self._types)
# We use a temporary list to store the node ID and its ranking
nodelist.append((node, rank))
# Lastly, sorting is performed. Note that sorting is performed only on nodes that actually fit the job, thus
# resulting in smaller instances and lower times compared to, for example, the consolidate allocator
nodelist.sort(key=lambda x: x[1])
# The list of sorted node IDs is returned
return [x[0] for x in nodelist]
def _critical_list_select(self, noderes):
"""
A simple method which, given the resources of a node, returns the key of the critical resource for which
such node has greater availability. If there are no critical resources available, a 'none' key is returned.
:param noderes: the resources dictionary of a node
:return: the key of the critical resource to which assign the node
"""
maxval = 0
maxkey = self._noneID
for cres in self._critical_resources:
if noderes[cres] > maxval:
maxval = noderes[cres]
maxkey = cres
return maxkey
def _get_longest_list(self, res_lists, start_indexes):
"""
Given the dictionary of critical resources lists, the method returns the key of the next list from which
an element has to be picked in order to build the final nodes list.
:param res_lists: the critical resources' lists dictionary
:param start_indexes: the dictionary of starting indexes for each list (to simulate a pop operation)
:return: the key of the next list to be picked
"""
maxkey = self._noneID
maxval = 0
for k in self._critical_resources:
modifier = len(res_lists[k]) - start_indexes[k]
if modifier > maxval:
maxval = modifier
maxkey = k
return maxkey
|
Citations
[NettiGKSB18] | (1, 2, 3) Alessio Netti, Cristian Galleguillos, Zeynep Kiziltan, Alina Sirbu, Ozalp Babaoglu. Heterogeneity-Aware Resource Allocation in HPC Systems. In Proc. of ISC 2018. |