Source code for pacman.operations.placer_algorithms.application_placer

# Copyright (c) 2021 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import logging
from typing import Dict, Iterator, List, Optional, Tuple, Sequence, Set

from spinn_utilities.config_holder import get_config_bool, get_report_path
from spinn_utilities.log import FormatAdapter
from spinn_utilities.progress_bar import ProgressBar

from spinn_machine import Chip

from pacman.data import PacmanDataView
from pacman.model.placements import Placements, Placement
from pacman.model.graphs import AbstractVirtual
from pacman.model.graphs.machine import MachineVertex
from pacman.model.graphs.application import ApplicationVertex
from pacman.model.resources import AbstractSDRAM, ConstantSDRAM
from pacman.exceptions import (
    PacmanPlaceException, PacmanConfigurationException, PacmanTooBigToPlace)
from .draw_placements import draw_placements as dp

logger = FormatAdapter(logging.getLogger(__name__))


[docs] def place_application_graph(system_placements: Placements) -> Placements: """ Perform placement of an application graph on the machine. .. note:: app_graph must have been partitioned :param system_placements: The placements of cores doing system tasks. This is what we start from. :return: Placements for the application. *Includes the system placements.* """ placer = ApplicationPlacer(system_placements) return placer.do_placements(system_placements)
class ApplicationPlacer(object): """ Places the Vertices keeping ones for an ApplicationVertex together. """ __slots__ = ( # Values from PacmanDataView cached for speed # PacmanDataView.get_machine() "__machine", # PacmanDataView.get_plan_n_timestep() "__plan_n_timesteps", # Sdram available on perfect none Ethernet Chip after Monitors placed "__max_sdram", # Maximum sdram that should be used for a Chip to not be full "__cap_sdram", # N Cores free on perfect none Ethernet Chip after Monitors placed "__max_cores", # Pointer to the placements including all previous Application Vertices "__placements", # A Function to yield the Chips in a consistent order "__chips", # Chips that have been fully placed by previous Application Vertices "__full_chips", # Chips that have already been used by this ApplicationVertex "__prepared_chips", # Start Chips from previous ApplicationVertices not yet marked as full "__restored_chips", # Start Chips tried for this ApplicationVertex "__starts_tried", # Label of the current ApplicationVertex for (error) reporting "__app_vertex_label", # Data for the last Chip offered to place on # May be full after current group placed "__current_chip", # List of cores available. Included ones for current group until used "__current_cores_free", # Used sdram after the current group is placed "__current_sdram_used", # Data about the neighbouring Chips to ones used # Current board being placed on "__ethernet_x", "__ethernet_y", # List of available neighbours on the current board "__same_board_chips", # List of available neighbours not on the current board "__other_board_chips") def __init__(self, placements: Placements): """ :param placements: """ # Data cached for speed self.__machine = PacmanDataView.get_machine() self.__plan_n_timesteps = PacmanDataView.get_plan_n_timestep() version = PacmanDataView.get_machine_version() self.__max_sdram = ( version.max_sdram_per_chip - PacmanDataView.get_all_monitor_sdram().get_total_sdram( PacmanDataView.get_plan_n_timestep())) self.__max_cores = ( version.max_cores_per_chip - version.n_scamp_cores - PacmanDataView.get_all_monitor_cores()) self.__cap_sdram = self.__max_sdram - ( self.__max_sdram // self.__max_cores) self.__placements = placements self.__chips = self._chip_order() self.__full_chips: Set[Chip] = set() self.__prepared_chips: Set[Chip] = set() self.__restored_chips: List[Chip] = list() self.__starts_tried: List[Chip] = list() self.__current_chip: Optional[Chip] = None self.__current_cores_free: List[int] = list() self.__current_sdram_used: AbstractSDRAM = ConstantSDRAM(0) self.__app_vertex_label: Optional[str] = None # Set some value so no Optional needed self.__ethernet_x = -1 self.__ethernet_y = -1 self.__same_board_chips: Dict[Chip, Chip] = dict() self.__other_board_chips: Dict[Chip, Chip] = dict() def do_placements(self, system_placements: Placements) -> Placements: """ Perform placement of an application graph on the machine. .. note:: app_graph must have been partitioned :param system_placements: The placements of cores doing system tasks. :return: Placements for the application. *Includes the system placements.* :raises PacmanPlaceException: If no new start Chip is available :raises PacmanTooBigToPlace: If the requirements are too big for any chip """ # Go through the application graph by application vertex progress = ProgressBar( PacmanDataView.get_n_vertices() * 2, "Placing Vertices") try: for app_vertex in progress.over( PacmanDataView.iterate_vertices(), finish_at_end=False): if app_vertex.has_fixed_location(): self._place_fixed_vertex(app_vertex) for app_vertex in progress.over(PacmanDataView.iterate_vertices()): # as this checks if placed already not need to check if fixed self._place_vertex(app_vertex) except PacmanPlaceException as e: raise self._place_error(system_placements, e) from e if get_config_bool("Reports", "draw_placements"): report_file = get_report_path("path_placements") dp(self.__placements, system_placements, report_file) return self.__placements def _place_vertex(self, app_vertex: ApplicationVertex) -> None: """ Place the next application vertex :param app_vertex: :raises PacmanPlaceException: If no new start Chip is available :raises PacmanTooBigToPlace: If the requirements are too big for any chip """ same_chip_groups = app_vertex.splitter.get_same_chip_groups() if not same_chip_groups: # This vertex does not require placement or delegates return self.__app_vertex_label = app_vertex.label # Restore the starts tried last time. # Check if they are full comes later while len(self.__starts_tried) > 0: self.__restored_chips.append(self.__starts_tried.pop(0)) # try to make placements with a different start Chip each time while True: placements_to_make = self._prepare_placements(same_chip_groups) if placements_to_make is not None: break # Now actually add the placements having confirmed all can be done self.__placements.add_placements(placements_to_make) def _prepare_placements(self, same_chip_groups: Sequence[ Tuple[Sequence[MachineVertex], AbstractSDRAM]] ) -> Optional[List[Placement]]: """ Try to make the placements for this ApplicationVertex. This will make sure all placements are on linked Chips The next start Chip is tried. If successful a list of created but NOT added placements is returned If this start Chip fails it returns None. A start chip could fail either because it does not have enough space or because its neighbours do not have enough space :param same_chip_groups: :raises PacmanPlaceException: If no new start Chip is available :raises PacmanTooBigToPlace: If the requirements are too big for any chip """ # Clear the Chips used in the last prepare self.__prepared_chips.clear() self.__current_chip = None placements_to_make: List = list() # Go through the groups for vertices, sdram in same_chip_groups: vertices_to_place = self._filter_vertices(vertices) if len(vertices_to_place) == 0: # Either placed (fixed) or virtual so skip group continue n_cores = len(vertices_to_place) # Try to find a chip with space chip = self._get_next_chip_with_space(n_cores, sdram) if chip is None: return None # If this worked, store placements to be made for vertex in vertices: core = self.__current_cores_free.pop(0) placements_to_make.append(Placement( vertex, chip.x, chip.y, core)) return placements_to_make def _filter_vertices( self, vertices: Sequence[MachineVertex]) -> List[MachineVertex]: """ Removes an already placed or virtual vertices. Errors on groups that have both placed and unplaced vertices! :param vertices: """ # Remove any already placed vertices_to_place = [ vertex for vertex in vertices if not self.__placements.is_vertex_placed(vertex)] if len(vertices_to_place) != len(vertices) and len( vertices_to_place) > 0: # Putting the rest on a different chip is wrong # Putting them on the same chip is hard so will do if needed raise NotImplementedError( "Unexpected mix of placed and unplaced vertices") # No need to place virtual vertices return [vertex for vertex in vertices_to_place if not isinstance(vertex, AbstractVirtual)] def _place_error(self, system_placements: Placements, exception: Exception) -> PacmanPlaceException: unplaceable = list() vertex_count = 0 n_vertices = 0 for app_vertex in PacmanDataView.iterate_vertices(): same_chip_groups = app_vertex.splitter.get_same_chip_groups() app_vertex_placed = True found_placed_cores = False for vertices, _sdram in same_chip_groups: if isinstance(vertices[0], AbstractVirtual): break if self.__placements.is_vertex_placed(vertices[0]): found_placed_cores = True elif found_placed_cores: vertex_count += len(vertices) n_vertices = len(same_chip_groups) app_vertex_placed = False break else: app_vertex_placed = False break if not app_vertex_placed: unplaceable.append(app_vertex) report_file = get_report_path("path_placement_errors_report") with open(report_file, 'w', encoding="utf-8") as f: f.write(f"Could not place {len(unplaceable)} of " f"{PacmanDataView.get_n_vertices()} " f"application vertices.\n") f.write( f" Could not place {vertex_count} of {n_vertices} in the" " last app vertex\n\n") for xy in self.__placements.chips_with_placements: first = True for placement in self.__placements.placements_on_chip(xy): if system_placements.is_vertex_placed(placement.vertex): continue if first: f.write(f"Chip ({xy}):\n") first = False f.write(f" Processor {placement.p}:" f" Vertex {placement.vertex}\n") if not first: f.write("\n") f.write("\n") f.write("Not placed:\n") for app_vertex in unplaceable: f.write(f"Vertex: {app_vertex}\n") same_chip_groups = app_vertex.splitter.get_same_chip_groups() for vertices, sdram in same_chip_groups: p_sdram = sdram.get_total_sdram(self.__plan_n_timesteps) f.write(f" Group of {len(vertices)} vertices uses " f"{p_sdram} bytes of SDRAM:\n") for vertex in vertices: f.write(f" Vertex {vertex}") if self.__placements.is_vertex_placed(vertex): plce = self.__placements.get_placement_of_vertex( vertex) f.write( f" (placed at {plce.x}, {plce.y}, {plce.p})") f.write("\n") f.write("\n") f.write("Unused chips:\n") machine = PacmanDataView.get_machine() for xy in machine.chip_coordinates: n_placed = self.__placements.n_placements_on_chip(xy) system_placed = system_placements.n_placements_on_chip(xy) if n_placed - system_placed == 0: n_procs = machine[xy].n_placable_processors f.write(f" {xy} ({n_procs - system_placed}" " free cores)\n") if get_config_bool("Reports", "draw_placements_on_error"): report_file = get_report_path("path_placements_on_error") dp(self.__placements, system_placements, report_file) return PacmanPlaceException( f" {exception}." f" Report written to {report_file}.") def _place_fixed_vertex(self, app_vertex: ApplicationVertex) -> None: """ Place all vertices for this Application Vertex Checks that all MachineVertices are fixed or errors """ same_chip_groups = app_vertex.splitter.get_same_chip_groups() if not same_chip_groups: raise NotImplementedError("Unexpected mix of Fixed and no groups") for vertices, _ in same_chip_groups: vertices_to_place = self._filter_vertices(vertices) self._do_fixed_location(vertices_to_place) def _do_fixed_location(self, vertices: list[MachineVertex]) -> None: """ Do fixed placing for one group. Errors if the group does not have a fixed location :raise PacmanConfigurationException: If the requested location is not available """ for vertex in vertices: loc = vertex.get_fixed_location() if loc: x, y = loc.x, loc.y break else: # Mixing fixed and free allocations while still keeping the whole # App vertex together is hard so will only do is needed raise NotImplementedError( "Mixing fixed location and not fixed location groups " "within one vertex") chip = PacmanDataView.get_chip_at(x, y) if chip is None: raise PacmanConfigurationException( f"Constrained to chip {x, y} but no such chip") on_chip = self.__placements.placements_on_chip(chip) cores_used = {p.p for p in on_chip} cores = set(chip.placable_processors_ids) - cores_used next_cores = iter(cores) # first do the ones with a fixed p for vertex in vertices: fixed = vertex.get_fixed_location() if fixed and fixed.p is not None: if fixed.p not in next_cores: raise PacmanConfigurationException( f"Core {fixed.p} on {x}, {y} not available to " f"place {vertex} on") self.__placements.add_placement( Placement(vertex, x, y, fixed.p)) # Then do the ones without a fixed p for vertex in vertices: fixed = vertex.get_fixed_location() if not fixed or fixed.p is None: try: self.__placements.add_placement( Placement(vertex, x, y, next(next_cores))) except StopIteration: # pylint: disable=raise-missing-from raise PacmanConfigurationException( f"No more cores available on {x}, {y}: {on_chip}") def _chip_order(self) -> Iterator[Chip]: """ Iterate the Chips in a guaranteed order """ for x in range(self.__machine.width): for y in range(self.__machine.height): chip = self.__machine.get_chip_at(x, y) if chip: yield chip def _space_on_chip( self, chip: Chip, n_cores: int, sdram: AbstractSDRAM) -> bool: """ Checks if the Chip has enough space for this group, Cache if yes If the Chip has already full from other Application Vertices, the Chip is added to the full list and False is returned If the chip is not full but does not have the space, the Chip is added to the prepared_chips list and False is returned. As safety check is also done to make sure the group could fit on another Chip If there there is room on the Chip the Chip is cached and True is returned. The values Cached are the: current_chip Even if full to keep the code simpler current_cores_free Including the ones for this group current_sdram_free Excluding the sdram needed fit this group :param chip: :param n_cores: number of cores needed :param sdram: :raises PacmanTooBigToPlace: If the requirements are too big for any chip """ cores_free = list(chip.placable_processors_ids) sdram_used: AbstractSDRAM = ConstantSDRAM(0) # remove the already placed for other Application Vertices on_chip = self.__placements.placements_on_chip(chip) if len(on_chip) == len(cores_free): self.__full_chips.add(chip) return False for placement in on_chip: cores_free.remove(placement.p) sdram_used += placement.vertex.sdram_required if sdram_used.get_total_sdram( self.__plan_n_timesteps) > self.__cap_sdram: self.__full_chips.add(chip) return False # Remember this chip so it is not tried again in this preparation # This assumes all groups are the same size so even if too small self.__prepared_chips.add(chip) total_sdram = sdram_used + sdram plan_sdram = total_sdram.get_total_sdram(self.__plan_n_timesteps) if len(cores_free) < n_cores or plan_sdram > chip.sdram: self._check_could_fit(n_cores, sdram) return False # record the current Chip self.__current_chip = chip # cores are popped out later to keep them here for now self.__current_cores_free = cores_free # sdram is the whole group so can be removed now self.__current_sdram_used = total_sdram # adds the neighbours self._add_neighbours(chip) return True def _check_could_fit(self, n_cores: int, sdram: AbstractSDRAM) -> None: """ Checks that the cores/SDRAM would fit on a empty perfect Chip :param n_cores: number of cores needs :param sdram: minimum amount of SDRAM needed :raises PacmanTooBigToPlace: If the requirements are too big for any chip """ plan_sdram = sdram.get_total_sdram(self.__plan_n_timesteps) if plan_sdram <= self.__max_sdram and n_cores <= self.__max_cores: # should fit somewhere return message = \ f"{self.__app_vertex_label} will not fit on any possible Chip " if n_cores > 1: message += f" group has {n_cores=} " version = PacmanDataView.get_machine_version() if plan_sdram > self.__max_sdram: message += f"requires {plan_sdram} bytes but " if plan_sdram > version.max_sdram_per_chip: message += f"a Chip only has {version.max_sdram_per_chip} " \ f"bytes " else: message += f"after monitors only {self.__max_sdram} " \ f"bytes are available " message += "Lowering max_core_per_chip may resolve this." raise PacmanTooBigToPlace(message) if n_cores > version.max_cores_per_chip: message += " is more vertices than the number of cores on a chip." raise PacmanTooBigToPlace(message) user_cores = version.max_cores_per_chip - version.n_scamp_cores if n_cores > user_cores: message += ( f"is more vertices than the user cores ({user_cores}) " "available on a Chip") else: message += ( f"is more vertices than the {self.__max_cores} cores " f"available on a Chip once " f"{PacmanDataView.get_all_monitor_cores()} " f"are reserved for monitors") raise PacmanTooBigToPlace(message) def _get_next_start(self, n_cores: int, sdram: AbstractSDRAM) -> Chip: """ Gets the next start Chip Also sets up the current_chip and starts a new neighbourhood :param n_cores: number of cores needs :param sdram: minimum amount of SDRAM needed :raises PacmanPlaceException: If no new start Chip is available :raises PacmanTooBigToPlace: If the requirements are too big for any chip """ # reset the neighbour chip info self.__same_board_chips.clear() self.__other_board_chips.clear() # Find the next start chip while True: start = self._pop_start_chip() # Save as tried as not full even if toO small self.__starts_tried.append(start) # Set the Ethernet x and y in case space_on_chip adds neighbours self.__ethernet_x = start.nearest_ethernet_x self.__ethernet_y = start.nearest_ethernet_y if self._space_on_chip(start, n_cores, sdram): break logger.debug("Starting placement from {}", start) return start def _pop_start_chip(self) -> Chip: """ Gets the next start Chip from either restored or if none the Machine Ignores any Chip that are already full :raises PacmanPlaceException: If no new start Chip is available """ while self.__restored_chips: chip = self.__restored_chips.pop(0) if chip not in self.__full_chips: return chip try: start = next(self.__chips) while start in self.__full_chips: start = next(self.__chips) return start except StopIteration: raise PacmanPlaceException( # pylint: disable=raise-missing-from f"No more chips to start with for {self.__app_vertex_label} " f"Out of {self.__machine.n_chips} " f"{len(self.__full_chips)} already full " f"and {len(self.__starts_tried)} tried" f"{PacmanDataView.get_chips_boards_required_str()}") def _get_next_neighbour( self, n_cores: int, sdram: AbstractSDRAM) -> Optional[Chip]: """ Gets the next neighbour Chip Also changes the current_chip and updates the neighbourhood This will return None if there are no more neighbouring Chip big enough :param n_cores: number of cores needs :param sdram: minimum amount of SDRAM needed :raises PacmanTooBigToPlace: If the requirements are too big for any chip """ # Do while Chip with space not found while True: chip = self._pop_neighbour() if chip is None: # Sign to consider preparation with this start a failure return None if self._space_on_chip(chip, n_cores, sdram): return chip def _get_next_chip_with_space( self, n_cores: int, sdram: AbstractSDRAM) -> Optional[Chip]: """ Gets the next Chip with space If no start Chip is available raise an Exception If no neighbouring more Chips available returns None :param n_cores: number of cores needs :param sdram: minimum amount of SDRAM needed :raises PacmanPlaceException: If no new start Chip is available :raises PacmanTooBigToPlace: If the requirements are too big for any chip """ if self.__current_chip is None: return self._get_next_start(n_cores, sdram) total_sdram = sdram + self.__current_sdram_used plan_sdram = total_sdram.get_total_sdram( self.__plan_n_timesteps) if (len(self.__current_cores_free) >= n_cores and plan_sdram <= self.__current_chip.sdram): # Cores are popped out later self.__current_sdram_used = total_sdram return self.__current_chip else: return self._get_next_neighbour(n_cores, sdram) def _add_neighbours(self, chip: Chip) -> None: """ Adds the neighbours for this Chip to be used as the next chips. """ for link in chip.router.links: target = self.__machine[link.destination_x, link.destination_y] if (target not in self.__full_chips and target not in self.__prepared_chips): if (target.nearest_ethernet_x == self.__ethernet_x and target.nearest_ethernet_y == self.__ethernet_y): self.__same_board_chips[target] = target else: self.__other_board_chips[target] = target def _pop_neighbour(self) -> Optional[Chip]: """ Pops the next neighbour Chip with preference to ones on current board :return: A neighbour Chip or None if there are no More """ if self.__same_board_chips: k = next(iter(self.__same_board_chips)) del self.__same_board_chips[k] return k if self.__other_board_chips: next_chip = next(iter(self.__other_board_chips)) del self.__other_board_chips[next_chip] self.__ethernet_x = next_chip.nearest_ethernet_x self.__ethernet_y = next_chip.nearest_ethernet_y to_check = list(self.__other_board_chips) self.__other_board_chips.clear() for chip in to_check: if (chip.nearest_ethernet_x == self.__ethernet_x and chip.nearest_ethernet_y == self.__ethernet_y): self.__same_board_chips[chip] = chip else: self.__other_board_chips[chip] = chip return next_chip # Signal that there are no more Chips with a None return None