Source code for phileas.mock_instruments

"""
This module defines simulated instruments drivers and loaders. They can be used
for testing, or to demonstrate features, among others.
"""

from __future__ import annotations

from abc import abstractmethod
from dataclasses import dataclass, field
from math import log10
from pathlib import Path
from typing import Any, ClassVar

import numpy as np

from phileas import Loader, logger
from phileas.factory import register_default_loader
from phileas.iteration.base import DataTree

logger = logger.getChild(__name__)


@dataclass
class SimulatedPRESENTImplementation:
    """
    Simulation of an embedded `PRESENT
    <https://iacr.org/archive/ches2007/47270450/47270450.pdf>`_ implementation.
    It can be plugged to a probe, in order to simulate SCA.
    """

    #: Path of the device used for the serial connection.
    serial_device: Path

    #: Serial connection baudrate.
    baudrate: int

    #: Current probe used to perform simulated SCA. If you want to use it,
    #: simply set this attribute.
    probe: CurrentProbe | None = None

    #: Encryption key, must be :py:attr:`key_size` bits wide.
    _key: int | None = None

    #: Number of PRESENT rounds.
    rounds: int = 32

    #: Block size, in bits.
    block_size: ClassVar[int] = 64

    #: Key size, in bits.
    key_size: ClassVar[int] = 80

    #: Evaluation of the S-box.
    s_box: ClassVar[list[int]] = [
        0xC,
        0x5,
        0x6,
        0xB,
        0x9,
        0x0,
        0xA,
        0xD,
        0x3,
        0xE,
        0xF,
        0x8,
        0x4,
        0x7,
        0x1,
        0x2,
    ]

    #: SNR of the hamming distance of the S-box, as seen from the measured
    #: side-channel.
    snr: float = 1

    #: Number of sampling points per PRESENT round.
    points_per_round: int = 20

    def __post_init__(self) -> None:
        logger.info(
            f"Connected to the PRESENT DUT on {self.serial_device} "
            f"with baudrate {self.baudrate}."
        )

    @property
    def key(self) -> int | None:
        """
        Encryption key, must be :py:attr:`key_size` bits wide.
        """
        return self._key

    @key.setter
    def key(self, value: int):
        value = int(value)
        if value.bit_length() > self.key_size:
            raise ValueError(f"Key size must be {self.key_size} bits wide.")

        self._key = value
        logger.info(f"PRESENT DUT key set to {value}.")

    def encrypt(self, block: int) -> int:
        """
        Encrypt a 64-bit block. The :py:attr:`key` must be set before encryption.
        """
        if self.key is None:
            raise ValueError("The key must be set before encryption.")

        hamming_distance = np.empty(self.rounds, dtype=np.float64)

        # Encryption
        key = self.key
        for round_counter in range(1, self.rounds):
            round_key = key >> (self.key_size - self.block_size)
            block = self.add_round_key(round_key, block)
            before_s_box = block
            block = self.s_box_layer(block)
            after_s_box = block
            block = self.p_layer(block)
            key = self.key_schedule(round_counter, key)

            hamming_distance[round_counter - 1] = (
                after_s_box ^ before_s_box
            ).bit_count()

        block = self.add_round_key(key >> 16, block)

        # SCA simulations
        time = np.linspace(0, 1, self.points_per_round + 20)[1:]
        leakage_kernel = np.power(time[0] / time, 3)
        leakages = (
            leakage_kernel[np.newaxis, :]
            * hamming_distance[:, np.newaxis]
            / self.block_size
        )

        trace = leakages.flatten()
        noise = np.random.normal(0, 1, trace.shape)

        if self.probe is not None:
            self.probe.last_measurement = trace + noise / self.snr

        return block

    # PRESENT internals

    @classmethod
    def add_round_key(cls, key: int, block: int) -> int:
        return key ^ block

    def s_box_layer(self, block: int) -> int:
        s_block = 0
        for nibble in range((self.block_size + 3) // 4):
            s_block |= self.s_box[(block >> (nibble * 4)) & 0xF] << (nibble * 4)

        return s_block

    def p_layer(self, block: int) -> int:
        p_block = 0
        for position in range(self.block_size):
            destination = (position % 4) * 16 + (position // 4)
            bit_value = (block >> position) & 1
            p_block |= bit_value << destination

        return p_block

    def key_schedule(self, round_counter: int, current_key: int) -> int:
        # Rotate 61 bits left
        key = ((current_key << 61) & ((1 << self.key_size) - 1)) + (
            current_key >> (self.key_size - 61)
        )

        # Pass most significant nibble through the S-box
        ms_pos = self.key_size - 4
        ms_nibble = self.s_box[key >> ms_pos]
        key = key & ((1 << ms_pos) - 1) | (ms_nibble << ms_pos)

        # Add round counter
        key ^= round_counter << 15

        return key


@register_default_loader
class SimulatedPRESENTImplementationLoader(Loader):
    """
    Loader of a simulated PRESENT implementation, used for SCA.
    """

    name = "phileas-mock_present-phileas"
    interfaces = {"present-encryption"}

    def initiate_connection(self, configuration: dict) -> Any:
        """
        Parameters:
          - device (required): Path of the device used for the serial connection.
          - baudrate (required): Serial connection baudrate.
          - probe (optional): Name of the current probe to plug to the device,
            for monitoring its current consumption.
          - snr (optional): SNR of the hamming distance of the S-box, as seen
            from the measured side-channel.
          - points_per_round (optional): Number of sampling points per PRESENT
            round.
        """
        present = SimulatedPRESENTImplementation(
            configuration["device"], configuration["baudrate"]
        )

        if "probe" in configuration:
            probe = self.instruments_factory.get_bench_instrument(
                configuration["probe"]
            )
            if not isinstance(probe, CurrentProbe):
                raise TypeError("The simulated PRESENT only supports current probes.")

            present.probe = probe

        if "snr" in configuration:
            present.snr = configuration["snr"]

        if "points_per_round" in configuration:
            present.points_per_round = configuration["points_per_round"]

        return present

    def configure(self, instrument: Any, configuration: dict):
        """
        Parameters:
          - key: Encryption key, a 80-bit integer.
        """
        if "key" in configuration:
            instrument.key = configuration["key"]


@dataclass
class Motors:
    """
    Simulated 2D motors driver.
    """

    #: X position of the motor.
    x: float = 0.0

    #: Y position of the motor.
    y: float = 0.0

    #: Unique identifier of the motors.
    id: str = "mock-motors-driver:1"

    def __post_init__(self):
        logger.info("[Motors] Connection initiated.")

    def set_position(self, **kwargs: dict[str, float]):
        """
        Set the position of the motors. Expect arguments x and y, which are both
        optional.
        """
        for axis in "x", "y":
            if axis in kwargs:
                setattr(self, axis, kwargs[axis])


@register_default_loader
class MotorsLoader(Loader):
    """
    Loader of simulated 2D motors.
    """

    name = "phileas-mock_motors-phileas"
    interfaces = {"2d-motors"}

    def initiate_connection(self, configuration: dict) -> Motors:
        return Motors()

    def configure(self, instrument: Motors, configuration: dict):
        """
        Parameters:
         - x, y (optional): Required position of the motors.
        """
        instrument.set_position(**configuration)
        self.logger.info(f"Position set to {configuration}.")

    def get_effective_configuration(
        self, instrument: Motors, configuration: None | dict = None
    ) -> dict[str, DataTree]:
        if configuration is None:
            return self.dump_state(instrument)  # type: ignore[return-value]

        for name in configuration.keys():
            configuration[name] = getattr(instrument, name)

        return configuration

    def get_id(self, instrument: Motors) -> str:
        return instrument.id

    def dump_state(self, instrument: Motors) -> DataTree:
        return {"x": instrument.x, "y": instrument.y}

    def restore_state(self, instrument: Motors, state: dict[str, float]):  # type: ignore[override]
        instrument.x = state["x"]
        instrument.y = state["y"]


class Probe:
    """
    Measurement probe interface.
    """

    @abstractmethod
    def get_amplitude(self) -> np.ndarray:
        """Amplitude of the measured quantity."""
        raise NotImplementedError()


@dataclass
class RandomProbe(Probe):
    """
    Probe which returns random values in [0, 1], using
    {py:func}`numpy.random.rand`.
    """

    #: Shape of the output.
    shape: tuple[int, ...] = (1,)

    def get_amplitude(self) -> np.ndarray:
        return np.random.rand(*self.shape)


@dataclass
class ElectricFieldProbe(Probe):
    """
    Simulation of the electric field radiated by an electric dipole.
    """

    #: Motors used to position the probe.
    motor: Motors

    #: Gain of the probe.
    gain: ClassVar[float] = 1

    def get_amplitude(self) -> np.ndarray:
        p = np.array([1, 1]) / 10
        d = np.array([self.motor.x - 0.3, self.motor.y - 0.5])
        d_norm = np.linalg.norm(d)
        dn = d / d_norm
        electric_field = (3 * np.dot(p, dn) * dn - p) / np.power(d_norm, 3)
        multiplicative_noise = np.random.normal(1, 0.05)

        return self.gain * np.dot(electric_field, electric_field) * multiplicative_noise


@dataclass
class CurrentProbe(Probe):
    last_measurement: np.ndarray | None = None
    noise_level: float = 1.0
    gain: float = 1.0

    def get_amplitude(self) -> np.ndarray:
        if self.last_measurement is None:
            raise ValueError("The probe has no new measurement ready.")

        shape = self.last_measurement.shape
        noise = np.random.normal(0, self.noise_level, size=shape)
        noisy_measurement = self.gain * self.last_measurement + noise
        self.last_measurement = None

        return noisy_measurement


@register_default_loader
class CurrentProbeLoader(Loader):
    name = "phileas-current_probe-phileas"
    interfaces = set()

    def initiate_connection(self, configuration: dict) -> Any:
        probe = CurrentProbe()
        if "noise_level" in configuration:
            probe.noise_level = configuration["noise_level"]

        if "gain" in configuration:
            probe.gain = configuration["gain"]

        return probe

    def configure(self, instrument: Any, configuration: dict):
        raise NotImplementedError()


[docs] @dataclass class Oscilloscope: """ Simulated 8-bit oscilloscope driver. It uses a {py:class}`~phileas.mock_instruments.Probe` and quantifies its output. """ #: Probe which is connected to the oscilloscope probe: Probe #: Actual location of the :py:attr:`amplitude` field. _amplitude: float = field(init=False, repr=False, default=1.0) #: Unique identifier of the oscilloscope. id: str = "mock-oscilloscope-driver:1" #: Bit width of the ADC. width: int = 8 #: Version of the oscilloscope firmware. fw_version: ClassVar[str] = "12.3" def __post_init__(self): logger.info("[Oscilloscope] Connection initiated.")
[docs] def get_measurement(self) -> np.ndarray: """Sampled and quantified value of the probe.""" value = self.probe.get_amplitude() trimmed = np.clip(value, -self.amplitude / 2, self.amplitude / 2) quantized = np.round(trimmed * (1 << self.width)) / (1 << self.width) return quantized
@property def amplitude(self) -> float: #: Amplitude of the measurements, which have a null offset. return self._amplitude @amplitude.setter def amplitude(self, value: float): value = 10 ** round(log10(value)) self._amplitude = value
@register_default_loader class OscilloscopeLoader(Loader): """ Loader of a simulated oscilloscope. """ name = "phileas-mock_oscilloscope-phileas" interfaces = {"oscilloscope"} def initiate_connection(self, configuration: dict) -> Oscilloscope: """ Parameters: - probe (required): Name of the simulated probe to use. - width (optional): Bit width of the ADC, defaults to 8. Supported probes, and parameters: - electric-field-probe: motors: name of the motors that position the probe - random-probe: shape: tuple with the required probe output shape - generic: probe-name: Name of the probe bench instrument """ scope = None probe = configuration["probe"] if probe == "electric-field-probe": scope = Oscilloscope( probe=ElectricFieldProbe( self.instruments_factory.get_bench_instrument( configuration["motors"] ) ), ) elif probe == "random-probe": scope = Oscilloscope(probe=RandomProbe(shape=configuration["shape"])) elif probe == "generic": probe_name = configuration["probe-name"] probe_instrument = self.instruments_factory.get_bench_instrument(probe_name) if not isinstance(probe_instrument, Probe): raise TypeError( "Oscilloscope generic expects a Probe, whereas " f"{probe_name} is a {type(probe_instrument).__name__}." ) scope = Oscilloscope(probe=probe_instrument) else: raise ValueError(f"Unsupported probe type: {probe}") if "width" in configuration: scope.width = configuration["width"] return scope def configure(self, instrument: Oscilloscope, configuration: dict): """ Parameters: - amplitude (optional): Amplitude of the oscilloscope. """ if "amplitude" in configuration: amplitude = configuration["amplitude"] instrument.amplitude = amplitude self.logger.info(f"Amplitude set to {amplitude}.") def get_effective_configuration( self, instrument: Oscilloscope, configuration: None | dict = None ) -> dict: eff_conf = {} if configuration is None or "amplitude" in configuration: eff_conf["amplitude"] = instrument.amplitude return eff_conf def get_id(self, instrument: Oscilloscope) -> str: return instrument.id def dump_state(self, instrument: Oscilloscope) -> DataTree: return { "probe": f"{type(instrument.probe).__name__}", "amplitude": instrument.amplitude, "bit_width": instrument.width, "fw_version": instrument.fw_version, } def restore_state(self, instrument: Oscilloscope, state: dict[str, Any]): # type: ignore[override] if instrument.fw_version != state["fw_version"]: raise ValueError( f"Dumped FW version {state['fw_version']} is not compatible with" f" instrument FW version {instrument.fw_version}." ) if instrument.width != state["bit_width"]: raise ValueError( f"Dumped bit width {state['bit_width']} is not compatible with " f"instrument bit width {instrument.width}." ) instrument.amplitude = state["amplitude"] if type(instrument.probe).__name__ != state["probe"]: self.logger.warning("Cannot change the oscilloscope probe.")