Source code for spinn_front_end_common.interface.interface_functions.load_executable_images
# Copyright (c) 2015 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Callable
import logging
from spinn_utilities.progress_bar import ProgressBar
from spinn_utilities.log import FormatAdapter
from spinnman.messages.scp.enums import Signal
from spinnman.model import ExecutableTargets
from spinnman.model.enums import CPUState, ExecutableType
from spinnman.exceptions import (
SpiNNManCoresNotInStateException, SpinnmanException)
from spinn_front_end_common.data import FecDataView
from spinn_front_end_common.utilities.helpful_functions import (
flood_fill_binary_to_spinnaker)
from spinn_front_end_common.utilities.emergency_recovery import (
emergency_recover_states_from_failure)
from spinn_front_end_common.utilities.iobuf_extractor import IOBufExtractor
# 10 seconds is lots of time to wait for the application to become ready!
_APP_READY_TIMEOUT = 10.0
_running_state = frozenset([CPUState.RUNNING])
logger = FormatAdapter(logging.getLogger(__name__))
[docs]
def load_app_images() -> None:
"""
Go through the executable targets and load each binary to everywhere
and then send a start request to the cores that actually use it.
"""
_load_images(lambda ty: ty is not ExecutableType.SYSTEM,
"Loading executables onto the machine")
[docs]
def load_sys_images() -> None:
"""
Go through the executable targets and load each binary to everywhere
and then send a start request to the cores that actually use it.
"""
_load_images(lambda ty: ty is ExecutableType.SYSTEM,
"Loading system executables onto the machine")
try:
cores = filter_targets(lambda ty: ty is ExecutableType.SYSTEM)
FecDataView.get_transceiver().wait_for_cores_to_be_in_state(
cores.all_core_subsets, FecDataView.get_app_id(),
_running_state, timeout=10)
except SpiNNManCoresNotInStateException as e:
emergency_recover_states_from_failure()
raise e
def _load_images(filter_predicate: Callable[[ExecutableType], bool],
label: str) -> None:
"""
:param callable(ExecutableType,bool) filter_predicate:
:param str label
"""
# Compute what work is to be done here
cores = filter_targets(filter_predicate)
try:
with ProgressBar(cores.total_processors + 1, label) as progress:
for binary in cores.binaries:
progress.update(flood_fill_binary_to_spinnaker(binary))
_start_simulation(cores, FecDataView.get_app_id())
progress.update()
except Exception as e:
try:
transceiver = FecDataView.get_transceiver()
unsuccessful_cores = transceiver.get_cpu_infos(
cores.all_core_subsets, [CPUState.READY], False)
logger.error(unsuccessful_cores.get_status_string())
IOBufExtractor().extract_iobuf()
transceiver.stop_application(FecDataView.get_app_id())
except SpinnmanException:
# Ignore this, this was just an attempt at recovery
pass
raise e
def filter_targets(
filter_predicate: Callable[[ExecutableType], bool]
) -> ExecutableTargets:
"""
Creates a subset of all executable targets that pass the filter.
:param filter_predicate: method to use as the filter
:returns:Executable target with only those that pass the filter check
"""
cores = ExecutableTargets()
targets = FecDataView.get_executable_targets()
for exe_type in targets.executable_types_in_binary_set():
if filter_predicate(exe_type):
for aplx in targets.get_binaries_of_executable_type(exe_type):
cores.add_subsets(
aplx, targets.get_cores_for_binary(aplx), exe_type)
return cores
def _start_simulation(cores: ExecutableTargets, app_id: int) -> None:
"""
:param cores: Possible subset of all ExecutableTargets to start
:param app_id:
"""
txrx = FecDataView.get_transceiver()
txrx.wait_for_cores_to_be_in_state(
cores.all_core_subsets, app_id, frozenset([CPUState.READY]),
timeout=_APP_READY_TIMEOUT)
txrx.send_signal(app_id, Signal.START)