Source code for aihwkit.utils.visualization

# -*- coding: utf-8 -*-

# (C) Copyright 2020, 2021 IBM. All Rights Reserved.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""Visualization utilities.

This module includes plotting and visualization utilities for ``aihwkit``.
Using this module has extra dependencies that can be installed via the
extras mechanism::

    pip install aihwkit[visualization]
"""

# Allow untyped calls for `np.*`, as proper support for numpy typing requires
# 1.20+ (https://numpy.org/devdocs/reference/typing.html).
# mypy: disallow-untyped-calls=False

from copy import deepcopy
from typing import Any, Tuple, Union

import matplotlib.pyplot as plt
import numpy as np

from matplotlib import ticker
from matplotlib.figure import Figure
from numpy import ndarray
from torch import device as torch_device
from torch import eye, from_numpy, ones

from aihwkit.exceptions import ConfigError
from aihwkit.simulator.configs import SingleRPUConfig, UnitCellRPUConfig
from aihwkit.simulator.configs.devices import PulsedDevice, UnitCell
from aihwkit.simulator.configs.utils import (
    BoundManagementType, IOParameters, NoiseManagementType, PulseType,
    UpdateParameters, WeightNoiseType
)
from aihwkit.simulator.tiles import AnalogTile, BaseTile
from aihwkit.simulator.rpu_base import cuda
from aihwkit.inference.noise.base import BaseNoiseModel
from aihwkit.inference.noise.pcm import PCMLikeNoiseModel


[docs]def compute_pulse_response( analog_tile: BaseTile, direction: ndarray, use_forward: bool = False ) -> ndarray: """Compute the pulse response of a given device configuration. Args: analog_tile: Base tile used for computing the weight traces direction: numpy vector of directions to sequentially apply (-1 or 1) use_forward: Whether to use the (noisy) forward pass to read out the weights (otherwise returns exact weight value). Returns: An numpy array ``w_trace`` of dimensions ``len(direction) x out_size x in_size`` """ out_size = analog_tile.out_size in_size = analog_tile.in_size if analog_tile.is_cuda: device = torch_device('cuda') else: device = torch_device('cpu') total_iters = len(direction) w_trace = np.zeros((total_iters, out_size, in_size)) in_vector = -ones(1, in_size, device=device) out_vector = ones(1, out_size, device=device) in_eye = eye(in_size, device=device) dir_tensor = from_numpy(direction).float().to(device) for i in range(total_iters): # Update the pulses. analog_tile.update(in_vector, out_vector * dir_tensor[i]) if use_forward: # Save weights by using the forward pass (to get the short-term read noise). w_trace[i, :, :] = analog_tile.forward(in_eye).detach().cpu().numpy().T else: # Noise free. w_trace[i, :, :] = analog_tile.get_weights()[0].detach().cpu().numpy() return w_trace
[docs]def plot_pulse_response( analog_tile: BaseTile, direction: ndarray, use_forward: bool = False ) -> ndarray: """Plot the pulse response of a direction vector for each weight of the analog tile. Args: analog_tile: Base tile used for computing the weight traces direction: vector of directions to sequentially apply (-1 or 1) use_forward: Whether to use the (noisy) forward pass to read out the weights (otherwise returns exact weight value). Returns: w_trace: from :func:`compute_pulse_response` """ w_trace = compute_pulse_response(analog_tile, direction, use_forward) plt.plot(w_trace.reshape(w_trace.shape[0], -1)) if use_forward: plt.title(analog_tile.rpu_config.device.__class__.__name__) else: plt.title('{} (without cycle/read noise)' .format(analog_tile.rpu_config.device.__class__.__name__)) plt.ylabel('Weight [conductance]') plt.xlabel('Pulse number #') return w_trace
[docs]def compute_pulse_statistics( w_nodes: ndarray, w_trace: ndarray, direction: ndarray, up_direction: bool, smoothness: float = 0.5 ) -> Tuple[ndarray, ndarray]: """Compute the statistics of the step trace from :func:`compute_pulse_response`. Args: w_nodes: weight range vector to estimate the step histogram w_trace: weight trace from :func:`compute_pulse_response` direction: direction vector used to generate the weight traces up_direction: whether and plot to compute the statistics for up or down direction smoothness: value for smoothing the estimation of the statistical step response curves Returns: Tuple of ``(dw_mean, dw_std)``. """ # pylint: disable=too-many-locals def calc_mean_and_std( node: ndarray, w_values: ndarray, delta_w: ndarray, lam: float ) -> Tuple[ndarray, ndarray]: """Calculate the mean and std of a w location (node). Note: In case there are multiple trials then it also includes device-to-device variation. """ alpha = np.exp(-0.5*(node - w_values)**2/lam**2) beta = alpha.sum(axis=0) alpha[:, beta < 0.1] = np.nan alpha /= np.expand_dims(beta, axis=0) mean = np.sum(alpha*delta_w, axis=0) std = np.sqrt(np.sum(alpha*(delta_w - np.expand_dims(mean, axis=0))**2, axis=0)) return (mean, std) # dw statistics. delta_w = np.diff(w_trace, axis=0) w_trace_s = w_trace[:-1, :, :] if up_direction: msk = np.logical_and(direction[:-1] > 0, np.diff(direction) == 0) w_values = w_trace_s[msk, :, :] delta_w_values = delta_w[msk, :, :] else: msk = np.logical_and(direction[:-1] < 0, np.diff(direction) == 0) w_values = w_trace_s[msk, :, :] delta_w_values = delta_w[msk, :, :] dw_mean = np.zeros((len(w_nodes), w_trace.shape[1], w_trace.shape[2])) dw_std = np.zeros((len(w_nodes), w_trace.shape[1], w_trace.shape[2])) lam = (w_nodes[1]-w_nodes[0])/2*smoothness for i, node in enumerate(w_nodes): dw_mean[i, :, :], dw_std[i, :, :] = calc_mean_and_std(node, w_values, delta_w_values, lam) return dw_mean, dw_std
[docs]def plot_pulse_statistics( w_trace: ndarray, direction: ndarray, up_direction: bool, num_nodes: int = 100, smoothness: float = 0.5 ) -> Tuple[ndarray, ndarray, ndarray]: """Plot the dG-G curve from a given weight trace and direction vector. Args: w_trace: weight trace from :func:`compute_pulse_response` direction: direction vector used to generate ``w_trace`` up_direction: whether and plot to compute the statistics for up or down direction num_nodes: number of nodes for estimation of the step histogram smoothness: value for smoothing the estimation of the statistical step response curves Returns: A tuple (w_nodes, dw_mean, dw_std) from :func:`compute_pulse_statistics` """ def errorbar_patch(x: ndarray, mean: ndarray, std: ndarray) -> None: """Plot a patchy error bar.""" axis = plt.plot(x, mean)[0] plt.fill_between(x, mean - std, mean + std, edgecolor=None, facecolor=axis.get_color(), alpha=0.5) # Compute statistics. w_nodes = np.linspace(w_trace.min(), w_trace.max(), num_nodes) dw_mean, dw_std = compute_pulse_statistics(w_nodes, w_trace, direction, up_direction, smoothness) n_traces = dw_mean.shape[1] * dw_mean.shape[2] for i in range(n_traces): errorbar_patch(w_nodes, dw_mean.reshape(-1, n_traces)[:, i], dw_std.reshape(-1, n_traces)[:, i]) plt.xlabel('Weight $w$') plt.ylabel('Avg. step $\\Delta w$') if up_direction: plt.title('up-direction') else: plt.title('down-direction') return w_nodes, dw_mean, dw_std
[docs]def get_tile_for_plotting( rpu_config: Union[SingleRPUConfig, UnitCellRPUConfig], n_traces: int, use_cuda: bool = False, noise_free: bool = False ) -> BaseTile: """Return an analog tile for plotting the response curve. Args: rpu_config: RPU Configuration to use for plotting n_traces: Number of traces to plot use_cuda: Whether to use the CUDA implementation (if available) noise_free: Whether to turn-off cycle-to-cycle noises (if possible) Returns: Instantiated tile. """ def set_noise_free(dev: Any) -> Any: if hasattr(dev, 'dw_min_std'): dev.dw_min_std = 0.0 # Noise free. if hasattr(dev, 'refresh_forward'): setattr(dev, 'refresh_forward', IOParameters(is_perfect=True)) if hasattr(dev, 'refresh_update'): setattr(dev, 'refresh_update', UpdateParameters(pulse_type=PulseType.NONE)) if hasattr(dev, 'transfer_forward'): setattr(dev, 'refresh_forward', IOParameters(is_perfect=True)) if hasattr(dev, 'transfer_update'): setattr(dev, 'transfer_update', UpdateParameters(pulse_type=PulseType.NONE)) if (hasattr(dev, 'write_noise_std') and getattr(dev, 'write_noise_std') > 0.0): # Just make very small to avoid hidden parameter mismatch. setattr(dev, 'write_noise_std', 1e-6) config = deepcopy(rpu_config) # Make sure we use single pulses for the overview. config.update.update_bl_management = False config.update.update_management = False config.update.desired_bl = 1 if noise_free: config.forward.is_perfect = True set_noise_free(config.device) if hasattr(config.device, 'unit_cell_devices'): for dev in getattr(config.device, 'unit_cell_devices'): set_noise_free(dev) if hasattr(config.device, 'device'): set_noise_free(getattr(config.device, 'device')) analog_tile = AnalogTile(n_traces, 1, config) # type: BaseTile analog_tile.set_learning_rate(1) w_min = getattr(config.device.as_bindings(), 'w_min', -1.0) weights = w_min * ones((n_traces, 1)) analog_tile.set_weights(weights) if use_cuda and cuda.is_compiled(): return analog_tile.cuda() return analog_tile
[docs]def estimate_n_steps(rpu_config: Union[SingleRPUConfig, UnitCellRPUConfig]) -> int: """Estimate the n_steps. Note: The estimate of the number of update pulses needed to drive from smallest to largest conductance. The estimation just assumes linear behavior, thus only be a rough estimate for non-linear response curves. Args: rpu_config: RPU Configuration to use for plotting Returns: Guessed number of steps Raises: ConfigError: If rpu_config.device does not have the w_min attribute (which is only ensured for :class:`~aihwkit.simulator.configs.devices.PulseDevice`) """ if not isinstance(rpu_config, SingleRPUConfig): return 1000 device_binding = rpu_config.device.as_bindings() if not hasattr(device_binding, 'w_min'): raise ConfigError('n_step estimation only for PulsedDevice. ' + 'Provide n_step explicitly.') weight_granularity = device_binding.calc_weight_granularity() w_min = device_binding.w_min w_max = device_binding.w_max n_steps = int(np.round((w_max - w_min) / weight_granularity)) return n_steps
[docs]def plot_response_overview( rpu_config: Union[SingleRPUConfig, UnitCellRPUConfig], n_loops: int = 5, n_steps: int = None, n_traces: int = 5, use_cuda: bool = False, smoothness: float = 0.1 ) -> None: """Plot the step response and statistics of a given device configuration. Args: rpu_config: RPU Configuration to use for plotting n_loops: How many hyper-cycles (up/down pulse sequences) to plot n_steps: Number of up/down steps per cycle. If not given, will be tried to be estimated (only for ``PulsedDevice`` possible otherwise defaults to 1000 if ``n_steps=None``). n_traces: Number of traces to plot use_cuda: Whether to use the CUDA implementation (if available) smoothness: value for smoothing the estimation of the statistical step response curves """ if n_steps is None: n_steps = estimate_n_steps(rpu_config) total_iters = min(max(n_loops*2*n_steps, 1000), max(50000, 2*n_steps)) direction = np.sign(np.sin(np.pi*(np.arange(total_iters)+1)/n_steps)) plt.clf() # 1. Noisy tile. analog_tile = get_tile_for_plotting(rpu_config, n_traces, use_cuda, noise_free=False) plt.subplot(3, 1, 1) plot_pulse_response(analog_tile, direction, use_forward=True) # 2. Noise-free tile. analog_tile_noise_free = get_tile_for_plotting(rpu_config, n_traces, use_cuda, noise_free=True) analog_tile_noise_free.set_hidden_parameters(analog_tile.get_hidden_parameters()) plt.subplot(3, 1, 2) w_trace = plot_pulse_response(analog_tile_noise_free, direction, use_forward=False) num_nodes = min(n_steps, 100) # 3. Plot up statistics. plt.subplot(3, 2, 5) plot_pulse_statistics(w_trace, direction, True, num_nodes, smoothness) # 4. Plot down statistics. plt.subplot(3, 2, 6) plot_pulse_statistics(w_trace, direction, False, num_nodes, smoothness) plt.tight_layout()
[docs]def plot_device(device: Union[PulsedDevice, UnitCell], w_noise: float = 0.0, **kwargs: Any) -> None: """Plot the step response figure for a given device (preset). Note: It will use an amount of read weight noise ``w_noise`` for reading the weights. Args: device: PulsedDevice parameters w_noise: Weight noise standard deviation during read kwargs: for other parameters, see :func:`plot_response_overview` """ plt.figure(figsize=[7, 7]) # To simulate some weight read noise. io_pars = IOParameters(out_noise=0.0, # no out noise w_noise=w_noise, # quite low inp_res=-1., # turn off DAC out_bound=100., # not limiting out_res=-1., # turn off ADC bound_management=BoundManagementType.NONE, noise_management=NoiseManagementType.NONE, w_noise_type=WeightNoiseType.ADDITIVE_CONSTANT) if isinstance(device, PulsedDevice): plot_response_overview(SingleRPUConfig(device=device, forward=io_pars), **kwargs) else: plot_response_overview(UnitCellRPUConfig(device=device, forward=io_pars), **kwargs)
[docs]def plot_device_compact( device: Union[PulsedDevice, UnitCell], w_noise: float = 0.0, n_steps: int = None, n_traces: int = 3, use_cuda: bool = False, ) -> Figure: """Plot a compact step response figure for a given device (preset). Note: It will use an amount of read weight noise ``w_noise`` for reading the weights. Args: device: ``PulsedDevice`` or ``UnitCell`` parameters w_noise: Weight noise standard deviation during read n_steps: Number of up/down steps per cycle. If not given, will be tried to be estimated (only for ``PulsedDevice`` possible otherwise defaults to 1000 if ``n_steps=None``). n_traces: Number of traces to plot (for device-to-device variation) use_cuda: Whether to use CUDA for the computation Returns: the compact step response figure. """ # pylint: disable=too-many-locals,too-many-statements def get_rpu_config(device: Union[PulsedDevice, UnitCell], io_pars: IOParameters) \ -> Union[SingleRPUConfig, UnitCellRPUConfig]: if isinstance(device, PulsedDevice): return SingleRPUConfig(device=device, forward=io_pars) return UnitCellRPUConfig(device=device, forward=io_pars) figure = plt.figure(figsize=[12, 4]) # To simulate some weight read noise. io_pars = IOParameters(out_noise=0.0, # no out noise w_noise=w_noise, # quite low inp_res=-1., # turn off DAC out_bound=100., # not limiting out_res=-1., # turn off ADC bound_management=BoundManagementType.NONE, noise_management=NoiseManagementType.NONE, w_noise_type=WeightNoiseType.ADDITIVE_CONSTANT) rpu_config = get_rpu_config(device, io_pars) if n_steps is None: n_steps = estimate_n_steps(rpu_config) # Noisy tile response curves. n_loops = 2 total_iters = n_loops*2*n_steps direction = np.sign(np.sin(np.pi*(np.arange(total_iters)+1)/n_steps)) analog_tile = get_tile_for_plotting(rpu_config, n_traces, use_cuda, noise_free=False) w_trace = compute_pulse_response(analog_tile, direction, use_forward=True)\ .reshape(-1, n_traces) axis = figure.add_subplot(1, 1, 1) axis.plot(w_trace, linewidth=1) axis.set_title(analog_tile.rpu_config.device.__class__.__name__) axis.set_xlabel('Pulse number #') limit = np.abs(w_trace).max()*1.2 axis.set_ylim(-limit, limit) axis.set_xlim(0, total_iters-1) axis.xaxis.set_major_formatter(ticker.ScalarFormatter(useMathText=True)) # Noise-free tile for statistics. n_loops = 1 total_iters = min(max(n_loops*2*n_steps, 1000), max(50000, 2*n_steps)) direction = np.sign(np.sin(np.pi*(np.arange(total_iters)+1)/n_steps)) analog_tile_noise_free = get_tile_for_plotting(rpu_config, n_traces, use_cuda, noise_free=True) analog_tile_noise_free.set_hidden_parameters(analog_tile.get_hidden_parameters()) w_trace = compute_pulse_response(analog_tile_noise_free, direction, False) # Compute statistics. num_nodes = min(n_steps, 100) w_nodes = np.linspace(w_trace.min(), w_trace.max(), num_nodes) dw_mean_up = compute_pulse_statistics(w_nodes, w_trace, direction, True)[0]\ .reshape(-1, n_traces) dw_mean_down = compute_pulse_statistics(w_nodes, w_trace, direction, False)[0]\ .reshape(-1, n_traces) # Plot mean up statistics. pos = axis.get_position().bounds space = 0.1 gap = 0.01 axis.set_position([pos[0] + gap + space, pos[1], pos[2] - 2*gap - 2*space, pos[3]]) axis.set_yticks([]) axis_left = figure.add_axes([pos[0], pos[1], space, pos[3]]) dw_mean_up = dw_mean_up.reshape(-1, n_traces) for i in range(n_traces): axis_left.plot(dw_mean_up[:, i], w_nodes) axis_left.set_position([pos[0], pos[1], space, pos[3]]) axis_left.set_xlabel('Up pulse size') axis_left.set_ylabel('Weight \n [conductance]') axis_left.set_ylim(-limit, limit) # Plot mean down statistics. axis_right = figure.add_axes([pos[0] + pos[2] - space, pos[1], space, pos[3]]) dw_mean_down = dw_mean_down.reshape(-1, n_traces) for i in range(n_traces): axis_right.plot(np.abs(dw_mean_down[:, i]), w_nodes) axis_right.set_yticks([]) axis_right.set_xlabel('Down pulse size') axis_right.set_ylim(-limit, limit) # Set xlim's. limit = np.maximum(np.nanmax(np.abs(dw_mean_down)), np.nanmax(np.abs(dw_mean_up))) * 1.2 axis_left.set_xlim(0.0, limit) axis_right.set_xlim(0.0, limit) return figure
[docs]def plot_device_symmetry( device: PulsedDevice, w_noise: float = 0.0, n_pulses: int = 10000, n_traces: int = 3, use_cuda: bool = False, w_init: float = 1.0, ) -> None: """Plot the response figure for a given device (preset). It will show the response to alternating up down pulses. Note: It will use an amount of read weight noise ``w_noise`` for reading the weights. Args: device: PulsedDevice parameters n_pulses: total number of pulses w_noise: Weight noise standard deviation during read n_traces: Number of device traces use_cuda: Whether to use CUDA, w_init: Initial value of the weights """ plt.figure(figsize=[10, 5]) io_pars = IOParameters(out_noise=0.0, # no out noise w_noise=w_noise, # quite low inp_res=-1., # turn off DAC out_bound=100., # not limiting out_res=-1., # turn off ADC bound_management=BoundManagementType.NONE, noise_management=NoiseManagementType.NONE, w_noise_type=WeightNoiseType.ADDITIVE_CONSTANT) rpu_config = SingleRPUConfig(device=device, forward=io_pars) direction = np.sign(np.cos(np.pi*np.arange(n_pulses))) plt.clf() analog_tile = get_tile_for_plotting(rpu_config, n_traces, use_cuda, noise_free=False) weights = w_init*ones((n_traces, 1)) analog_tile.set_weights(weights) plot_pulse_response(analog_tile, direction, use_forward=False) plt.ylim([-1, 1]) plt.grid(True)
[docs]def plot_weight_drift(noise_model: BaseNoiseModel = None, t_inference_list: ndarray = None, w_inits: ndarray = None, n_repeats: int = 25) -> None: """Plots the weight drift behavior of a given noise model over time. Args: noise_model: Noise model of derived from :class:`~aihwkit.simulator.noise_models.BaseNoiseModel` t_inference_list: Numpy array of times of inference after programming at time 0 (in seconds) w_inits: Numpy array of target weights to program n_repeats: How many repeats to estimate the standard deviation """ plt.figure(figsize=[10, 5]) if noise_model is None: noise_model = PCMLikeNoiseModel() if t_inference_list is None: t_inference_list = np.logspace(0., 7.0, 15) if w_inits is None: w_inits = np.linspace(-1., 1., 9) weights = w_inits.flatten() weights.sort() weights = np.tile(weights, [n_repeats, 1]) m_list = [] s_list = [] for t_inference in t_inference_list: noisy_weights = noise_model.apply_noise(from_numpy(weights), t_inference).numpy() m_list.append(noisy_weights.mean(axis=0)) s_list.append(noisy_weights.std(axis=0)) m_array = np.stack(m_list, axis=1) s_array = np.stack(s_list, axis=1) for i in range(w_inits.size): curve = plt.plot(t_inference_list, m_array[i]) plt.fill_between(t_inference_list, m_array[i] - s_array[i], m_array[i] + s_array[i], edgecolor=None, linewidth=0, alpha=0.3, antialiased=True, facecolor=curve[0].get_color()) plt.gca().set_xscale('log') plt.xlabel('Time after programming [sec]') plt.ylabel('Weight value [norm. units]')