Source code for pacman.operations.multi_cast_router_check_functionality.valid_routes_checker

# Copyright (c) 2014 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Collection of functions which together validate routes.
"""
from collections import defaultdict
import logging
from typing import Dict, NamedTuple, Iterable, List, Set
from spinn_utilities.ordered_set import OrderedSet
from spinn_utilities.progress_bar import ProgressBar
from spinn_utilities.log import FormatAdapter
from spinn_machine import Chip, MulticastRoutingEntry
from pacman.data import PacmanDataView
from pacman.exceptions import (
    PacmanConfigurationException, PacmanRoutingException)
from pacman.model.graphs.application import ApplicationVertex
from pacman.model.graphs import AbstractVirtual
from pacman.utilities.constants import FULL_MASK
from pacman.utilities.algorithm_utilities.routing_algorithm_utilities import (
    get_app_partitions)
from pacman.model.graphs.machine import MachineVertex
from pacman.model.placements import Placement
from pacman.model.routing_info import BaseKeyAndMask
from pacman.model.routing_tables import (
    AbstractMulticastRoutingTable, MulticastRoutingTables)

logger = FormatAdapter(logging.getLogger(__name__))
range_masks = {FULL_MASK - ((2 ** i) - 1) for i in range(33)}


class PlacementTuple(NamedTuple):
    """
    A particular location in placement.
    """
    #: The X coordinate of the chip
    x: int
    #: The Y coordinate of the chip
    y: int
    #: The ID of the CPU core on the chip
    p: int


class _Failure(NamedTuple):
    router_x: int
    router_y: int
    keys: List[int]
    source_mask: int


[docs] def validate_routes(routing_tables: MulticastRoutingTables) -> None: """ Go through the app partitions and check that the routing entries within the routing tables support reach the correction destinations as well as not producing any cycles. :param routing_tables: the routing tables generated by the routing algorithm :raises PacmanRoutingException: when either no routing table entry is found by the search on a given router, or a cycle is detected """ # Find all partitions that need to be dealt with partitions = get_app_partitions() routing_infos = PacmanDataView.get_routing_infos() # Now go through the app edges and route app vertex by app vertex progress = ProgressBar(len(partitions), "Checking Routes") for partition in progress.over(partitions): source = partition.pre_vertex # Destination cores by source machine vertices destinations: Dict[MachineVertex, OrderedSet[PlacementTuple]] = \ defaultdict(OrderedSet) for edge in partition.edges: target = edge.post_vertex target_vertices = \ target.splitter.get_source_specific_in_coming_vertices( source, partition.identifier) for tgt, srcs in target_vertices: place = PacmanDataView.get_placement_of_vertex(tgt) for src in srcs: if isinstance(src, ApplicationVertex): for s in src.splitter.get_out_going_vertices( partition.identifier): destinations[s].add(PlacementTuple( x=place.x, y=place.y, p=place.p)) else: destinations[src].add(PlacementTuple( x=place.x, y=place.y, p=place.p)) outgoing: OrderedSet[MachineVertex] = OrderedSet( source.splitter.get_out_going_vertices(partition.identifier)) internal = source.splitter.get_internal_multicast_partitions() for in_part in internal: if in_part.identifier == partition.identifier: outgoing.add(in_part.pre_vertex) for edge in in_part.edges: place = PacmanDataView.get_placement_of_vertex( edge.post_vertex) destinations[in_part.pre_vertex].add(PlacementTuple( x=place.x, y=place.y, p=place.p)) # locate all placements to which this placement/vertex will # communicate with for a given key_and_mask and search its # determined destinations for m_vertex in outgoing: if isinstance(m_vertex, AbstractVirtual): continue placement = PacmanDataView.get_placement_of_vertex(m_vertex) r_info = routing_infos.get_info_from( m_vertex, partition.identifier) # search for these destinations _search_route( placement, destinations[m_vertex], r_info.key_and_mask, routing_tables, m_vertex.vertex_slice.n_atoms)
def _search_route( source_placement: Placement, dest_placements: Iterable[PlacementTuple], key_and_mask: BaseKeyAndMask, routing_tables: MulticastRoutingTables, n_atoms: int) -> None: """ Locate if the routing tables work for the source to desks as defined. :param source_placement: the placement from which the search started :param dest_placements: the placements to which this trace should visit only once :param key_and_mask: the key and mask associated with this set of edges :param routing_tables: :param n_atoms: the number of atoms going through this path :raise PacmanRoutingException: when the trace completes and there are still destinations not visited """ if logger.isEnabledFor(logging.DEBUG): logger.debug(f"{source_placement=}") for dest in dest_placements: logger.debug("[{}:{}:{}]", dest.x, dest.y, dest.p) located_destinations: Set[PlacementTuple] = set() failed_to_cover_all_keys_routers: List[_Failure] = list() _start_trace_via_routing_tables( source_placement, key_and_mask, located_destinations, routing_tables, n_atoms, failed_to_cover_all_keys_routers) # start removing from located_destinations and check if destinations not # reached failed_to_reach_destinations = list() for dest in dest_placements: if dest in located_destinations: located_destinations.remove(dest) else: failed_to_reach_destinations.append(dest) # check for error if trace didn't reach a destination it was meant to error_message = "" if failed_to_reach_destinations: failures = ", ".join( f"[{dest.x}:{dest.y}:{dest.p}]" for dest in failed_to_reach_destinations) error_message += ( "failed to locate all destinations with vertex " f"{source_placement.vertex.label} on processor " f"[{source_placement.x}:{source_placement.y}:{source_placement.p}]" f" with keys {key_and_mask} as it did not reach destinations " f"{failures}") # check for error if the trace went to a destination it shouldn't have if located_destinations: failures = ", ".join( f"[{dest.x}:{dest.y}:{dest.p}]" for dest in located_destinations) error_message += ( "trace went to more failed to locate all destinations with " f"vertex {source_placement.vertex.label} on processor " f"[{source_placement.x}:{source_placement.y}:{source_placement.p}]" f" with keys {key_and_mask} as it didn't reach destinations " f"{failures}") if failed_to_cover_all_keys_routers: failures = ", ".join( f"[{router.router_x}, {router.router_y}, " f"{router.keys}, {router.source_mask}]" for router in failed_to_cover_all_keys_routers) error_message += ( "trace detected that there were atoms which the routing entries " "won't cover and therefore packets will fly off to unknown places." f" These keys came from the vertex {source_placement.vertex.label}" f" on processor [{source_placement.x}:{source_placement.y}:" f"{source_placement.p}] and the failed routers are {failures}") # raise error if required if error_message != "": raise PacmanRoutingException(error_message) logger.debug(f"successful test between {source_placement.vertex.label} " f"and {dest_placements}") def _start_trace_via_routing_tables( source_placement: Placement, key_and_mask: BaseKeyAndMask, reached_placements: Set[PlacementTuple], routing_tables: MulticastRoutingTables, n_atoms: int, failed_to_cover_all_keys_routers: List[_Failure]) -> None: """ Start the trace, by using the source placement's router and tracing from the route. :param source_placement: the source placement used by the trace :param key_and_mask: the key being used by the vertex which resides on the source placement :param reached_placements: the placements reached during the trace :param routing_tables: :param n_atoms: the number of atoms going through this path :param failed_to_cover_all_keys_routers: list of failed routers for all keys """ current_router_table = routing_tables.get_routing_table_for_chip( source_placement.x, source_placement.y) if current_router_table is None: return visited_routers: Set[Chip] = set() visited_routers.add(current_router_table.chip) # get src router entry = _locate_routing_entry( current_router_table, key_and_mask.key, n_atoms) _recursive_trace_to_destinations( entry, current_router_table, source_placement.x, source_placement.y, key_and_mask, visited_routers, reached_placements, routing_tables, n_atoms, failed_to_cover_all_keys_routers) def _check_all_keys_hit_entry( entry: MulticastRoutingEntry, n_atoms: int, base_key: int) -> List[int]: """ :param entry: routing entry discovered :param n_atoms: the number of atoms this partition covers :param base_key: the base key of the partition :return: the list of keys which this entry doesn't cover which it should """ bad_entries = list() for atom_id in range(0, n_atoms): key = base_key + atom_id if entry.mask & key != entry.key: bad_entries.append(key) return bad_entries # locates the next dest position to check def _recursive_trace_to_destinations( entry: MulticastRoutingEntry, current_router: AbstractMulticastRoutingTable, chip_x: int, chip_y: int, key_and_mask: BaseKeyAndMask, visited_routers: Set[Chip], reached_placements: Set[PlacementTuple], routing_tables: MulticastRoutingTables, n_atoms: int, failed_to_cover_all_keys_routers: List[_Failure]) -> None: """ Recursively search though routing tables until no more entries are registered with this key. :param entry: the original entry used by the first router which resides on the source placement chip. :param current_router: the router currently being visited during the trace :param chip_x: the x coordinate of the chip being considered :param chip_y: the y coordinate of the chip being considered :param key_and_mask: the key and mask being used by the vertex which resides on the source placement :param visited_routers: the list of routers which have been visited during this trace so far :param reached_placements: the placements reached during the trace :param routing_tables: :param n_atoms: the number of atoms going through this path :param failed_to_cover_all_keys_routers: list of failed routers for all keys """ # determine where the route takes us chip_links = entry.link_ids processor_values = entry.processor_ids # if goes down a chip link if chip_links: # also goes to a processor if processor_values: _is_dest(processor_values, current_router, reached_placements) # only goes to new chip for link_id in chip_links: # locate next chips router machine_router = PacmanDataView.get_chip_at(chip_x, chip_y).router link = machine_router.get_link(link_id) if link is None: continue next_router = routing_tables.get_routing_table_for_chip( link.destination_x, link.destination_y) if next_router is None: continue # check that we've not visited this router before _check_visited_routers(next_router.chip, visited_routers) # locate next entry entry = _locate_routing_entry( next_router, key_and_mask.key, n_atoms) bad_entries = _check_all_keys_hit_entry( entry, n_atoms, key_and_mask.key) if bad_entries: failed_to_cover_all_keys_routers.append( _Failure(next_router.x, next_router.y, bad_entries, key_and_mask.mask)) # get next route value from the new router _recursive_trace_to_destinations( entry, next_router, link.destination_x, link.destination_y, key_and_mask, visited_routers, reached_placements, routing_tables, n_atoms, failed_to_cover_all_keys_routers) # only goes to a processor elif processor_values: _is_dest(processor_values, current_router, reached_placements) def _check_visited_routers(chip: Chip, visited_routers: Set[Chip]) -> None: """ Check if the trace has visited this router already. :param chip: the chip being checked :param visited_routers: routers already visited :raise PacmanRoutingException: when a router has been visited twice. """ if chip in visited_routers: raise PacmanRoutingException( "visited this router before, there is a cycle here. " f"The routers I've currently visited are {visited_routers} and " f"the router i'm visiting is ({chip.x},{chip.y})") visited_routers.add(chip) def _is_dest(processor_ids: Iterable[int], current_router: AbstractMulticastRoutingTable, reached_placements: Set[PlacementTuple]) -> None: """ Collect processors to be removed. :param processor_ids: the processor IDs which the last router entry said the trace should visit :param current_router: the current router being used in the trace :param reached_placements: the placements to which the trace visited """ dest_x, dest_y = current_router.x, current_router.y for processor_id in processor_ids: reached_placements.add(PlacementTuple(dest_x, dest_y, processor_id)) def _locate_routing_entry( current_router: AbstractMulticastRoutingTable, key: int, n_atoms: int) -> MulticastRoutingEntry: """ Locate the entry from the router based off the edge. :param current_router: the current router being used in the trace :param key: the key being used by the source placement :param n_atoms: the number of atoms :raise PacmanRoutingException: when there is no entry located on this router """ found_entry = None for entry in current_router.multicast_routing_entries: key_combo = entry.mask & key e_key = entry.key if key_combo == e_key: if found_entry is None: found_entry = entry else: logger.warning( "Found more than one entry for key {}. This could be " "an error, as currently no router supports overloading" " of entries.", hex(key)) if entry.mask in range_masks: last_atom = key + n_atoms - 1 last_key = e_key + (~entry.mask & FULL_MASK) if last_key < last_atom: raise PacmanRoutingException( f"Full key range not covered: key:0x{key:x} " f"key_combo:0x{key_combo:x} mask:0x{entry.mask:x}, " f"last_key:0x{last_key:x}, e_key:0x{e_key:x}") elif entry.mask in range_masks: last_atom = key + n_atoms last_key = e_key + (~entry.mask & FULL_MASK) if min(last_key, last_atom) - max(e_key, key) + 1 > 0: raise PacmanConfigurationException( f"Key range partially covered: key:0x{key:x}, " f"key_combo:0x{key_combo:x} mask:0x{entry.mask:x}, " f"last_key:0x{last_key:x}, e_key:0x{e_key:x}") if found_entry is None: raise PacmanRoutingException("no entry located") return found_entry