Source code for spinnman.messages.spinnaker_boot.spinnaker_boot_messages

# Copyright (c) 2014 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import array
import math
import os
import time
from typing import Any, Dict, Iterable, Optional, Tuple

from spinnman.data import SpiNNManDataView
from spinnman.exceptions import SpinnmanIOException

from .system_variable_boot_values import (
    SystemVariableBootValues, SystemVariableDefinition)
from .spinnaker_boot_message import SpinnakerBootMessage
from .spinnaker_boot_op_code import SpinnakerBootOpCode

_BOOT_MESSAGE_DATA_WORDS = 256
_BOOT_MESSAGE_DATA_BYTES = _BOOT_MESSAGE_DATA_WORDS * 4
_BOOT_IMAGE_MAX_BYTES = 32 * 1024
_BOOT_DATA_FILE_NAME = "scamp.boot"
_BOOT_STRUCT_REPLACE_OFFSET = 384 // 4
_BOOT_STRUCT_REPLACE_LENGTH = 128 // 4
_BOOT_DATA_OPERAND_1 = ((_BOOT_MESSAGE_DATA_BYTES // 4) - 1) << 8


[docs] class SpinnakerBootMessages(object): """ A set of boot messages to be sent to boot the board. """ __slots__ = ( "_boot_data", "_n_bytes_to_read", "_no_data_packets") def __init__(self, extra_boot_values: Optional[Dict[ SystemVariableDefinition, Any]] = None): """ :param extra_boot_values: Any additional or overwrite values to set during boot. This should only be used for values which are not standard based on the board version. for example this may include an Led_0 value. :raise SpinnmanInvalidParameterException: If the board version is not supported :raise SpinnmanIOException: If there is an error assembling the packets """ version = SpiNNManDataView.get_machine_version() # Get the boot packet values spinnaker_boot_value = SystemVariableBootValues() spinnaker_boot_value.set_value( SystemVariableDefinition.hardware_version, version.number) current_time = int(time.time()) spinnaker_boot_value.set_value( SystemVariableDefinition.unix_timestamp, current_time) spinnaker_boot_value.set_value( SystemVariableDefinition.boot_signature, current_time) spinnaker_boot_value.set_value( SystemVariableDefinition.is_root_chip, 1) # Set any additional values if extra_boot_values is not None: for variable, value in extra_boot_values.items(): spinnaker_boot_value.set_value(variable, value) # Get the data as an array, to be used later spinnaker_boot_data = array.array("I", spinnaker_boot_value.bytestring) # Find the data file and size boot_data_file, boot_data_size = self._get_boot_image_file() # Compute how many packets to send n_words_to_read = boot_data_size // 4 self._no_data_packets = int(math.ceil( float(boot_data_size) / _BOOT_MESSAGE_DATA_BYTES)) # Read the data boot_data = array.array("I") with open(boot_data_file, "rb") as f: boot_data.fromfile(f, n_words_to_read) # Replace the appropriate part with the custom boot options offset = _BOOT_STRUCT_REPLACE_OFFSET length = _BOOT_STRUCT_REPLACE_LENGTH boot_data[offset:offset + length] = spinnaker_boot_data[0:length] # Byte swap and store the data for later use boot_data.byteswap() self._boot_data = boot_data.tobytes() self._n_bytes_to_read = n_words_to_read * 4 @staticmethod def _get_boot_image_file() -> Tuple[str, int]: """ The name and size of the boot file """ this_dir, _ = os.path.split(__file__) file_name = os.path.join(this_dir, "boot_data", _BOOT_DATA_FILE_NAME) file_size = os.stat(file_name).st_size if file_size > _BOOT_IMAGE_MAX_BYTES: raise SpinnmanIOException( f"The boot file is too big at {file_size} bytes " "(only files up to 32KB are acceptable") elif file_size % 4 != 0: raise SpinnmanIOException( f"The boot file size of {file_size} bytes " "must be divisible by 4") return file_name, file_size def _get_packet_data(self, block_id: int) -> bytes: """ Read a packet of data. :param block_id: """ offset = block_id * _BOOT_MESSAGE_DATA_BYTES n_bytes = min(self._n_bytes_to_read - offset, _BOOT_MESSAGE_DATA_BYTES) return self._boot_data[offset:offset + n_bytes] @property def messages(self) -> Iterable[SpinnakerBootMessage]: """ An iterable of message to be sent. """ # Construct and yield the start packet yield SpinnakerBootMessage( opcode=SpinnakerBootOpCode.FLOOD_FILL_START, operand_1=0, operand_2=0, operand_3=self._no_data_packets - 1) # Construct and yield the data packets for block_id in range(0, self._no_data_packets): operand_1 = _BOOT_DATA_OPERAND_1 | (block_id & 0xFF) yield SpinnakerBootMessage( opcode=SpinnakerBootOpCode.FLOOD_FILL_BLOCK, operand_1=operand_1, operand_2=0, operand_3=0, data=self._get_packet_data(block_id)) # Construct and yield the end packet yield SpinnakerBootMessage( opcode=SpinnakerBootOpCode.FLOOD_FILL_CONTROL, operand_1=1, operand_2=0, operand_3=0)