Source code for aihwkit.simulator.tiles.base

# -*- coding: utf-8 -*-

# (C) Copyright 2020, 2021 IBM. All Rights Reserved.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""High level analog tiles (base)."""

from collections import OrderedDict
from typing import Dict, Generic, List, Optional, Tuple, TypeVar, Union

from numpy import concatenate, expand_dims
from numpy import abs as numpy_abs
from torch import Tensor, stack, zeros
from torch import device as torch_device
from torch.nn import Parameter
from torch.autograd import no_grad

from aihwkit.exceptions import TileError
from aihwkit.simulator.rpu_base import tiles
from aihwkit.optim.context import AnalogContext

RPUConfigGeneric = TypeVar('RPUConfigGeneric')


[docs]class BaseTile(Generic[RPUConfigGeneric]): """Base class for tiles. Args: out_size: output size in_size: input size rpu_config: resistive processing unit configuration. bias: whether to add a bias column to the tile. in_trans: Whether to assume an transposed input (batch first) out_trans: Whether to assume an transposed output (batch first) """ # pylint: disable=too-many-instance-attributes,too-many-public-methods def __init__( self, out_size: int, in_size: int, rpu_config: RPUConfigGeneric, bias: bool = True, in_trans: bool = False, out_trans: bool = False ): self.out_size = out_size self.in_size = in_size self.rpu_config = rpu_config self.bias = bias self.in_trans = in_trans self.out_trans = out_trans self.shared_weights = None # type: Parameter # Only used for indexed. self.image_sizes = [] # type: List[int] x_size = in_size + 1 if self.bias else in_size d_size = out_size self.tile = self._create_simulator_tile(x_size, d_size, rpu_config) self.tile.set_learning_rate(0.01) self.tile.set_weights_uniform_random(-0.01, 0.01) self.device = torch_device('cpu') self.is_cuda = False # create analog context self.analog_ctx = AnalogContext(self)
[docs] @no_grad() def get_analog_ctx(self) -> AnalogContext: """Return the analog context of the tile to be used in ``AnalogFunction``.""" return self.analog_ctx
[docs] @no_grad() def ensure_shared_weights(self, shared_weights: Optional[Tensor] = None) -> None: """Ensure that the shared_weights is set properly. Caution: This is only called from analog function. No-op if shared weights is not used. """ if shared_weights is not None: self.shared_weights.data = shared_weights.data # type: ignore if self.shared_weights is not None: self.tile.set_shared_weights(self.shared_weights.data)
[docs] @no_grad() def set_delta_weights(self, delta_weights: Optional[Tensor] = None) -> None: """Set the weight grad tensor and set the update to. No-op if shared weights is not used. """ if self.shared_weights is not None and delta_weights is not None: self.tile.set_delta_weights(delta_weights)
[docs] @no_grad() def reset_delta_weights(self) -> None: """Reset the weight grad tensor to default update behavior (i.e. adding the update directly to the weight). No-op if shared weights is not used. """ if self.shared_weights is not None: self.tile.reset_delta_weights()
[docs] @no_grad() def get_brief_info(self) -> str: """Return short info about the underlying C++ tile.""" return self.tile.get_brief_info().rstrip()
def __getstate__(self) -> Dict: """Get the state for pickling. This method removes the ``tile`` member, as the binding Tiles are not serializable. """ current_dict = self.__dict__.copy() current_dict['analog_tile_weights'] = self.tile.get_weights() # Store the hidden parameters as a numpy array, as storing it as # Tensor causes issues in PyTorch 1.5. current_dict['analog_tile_hidden_parameters'] \ = self.tile.get_hidden_parameters().data.numpy() current_dict['analog_tile_hidden_parameter_names'] \ = self.tile.get_hidden_parameter_names() current_dict['analog_tile_class'] = self.__class__.__name__ current_dict['analog_alpha_scale'] = self.tile.get_alpha_scale() current_dict['analog_lr'] = self.tile.get_learning_rate() current_dict['shared_weights'] = self.shared_weights current_dict.pop('tile', None) # don't save device. Will be determined by loading object current_dict.pop('stream', None) current_dict.pop('is_cuda', None) current_dict.pop('device', None) return current_dict def __setstate__(self, state: Dict) -> None: """Set the state after unpickling. This method recreates the ``tile`` member, creating a new one from scratch, as the binding Tiles are not serializable. Caution: RPU configs are overwritten by loading the state. Raises: TileError: if tile class does not match or hidden parameters do not match """ # pylint: disable=too-many-locals # Note: self here is NOT initialized! So we need to recreate # attributes that were not saved in getstate current_dict = state.copy() weights = current_dict.pop('analog_tile_weights') hidden_parameters = current_dict.pop('analog_tile_hidden_parameters') hidden_parameters_names = current_dict.pop('analog_tile_hidden_parameter_names', []) alpha_scale = current_dict.pop('analog_alpha_scale') tile_class = current_dict.pop('analog_tile_class', self.__class__.__name__) analog_lr = current_dict.pop('analog_lr', 0.01) analog_ctx = current_dict.pop('analog_ctx') shared_weights = current_dict.pop('shared_weights') shared_weights_if = shared_weights is not None self.__dict__.update(current_dict) self.device = torch_device('cpu') self.is_cuda = False # get the current map location from analog_ctx (which is restored) to_device = analog_ctx.device # recreate attributes not saved # always first create on CPU x_size = self.in_size + 1 if self.bias else self.in_size d_size = self.out_size # Recreate the tile. # Check for tile mismatch if tile_class != self.__class__.__name__: raise TileError( 'Mismatch of tile class: {} versus {}. Can only load analog ' 'state from the same tile class.'.format(self.__class__.__name__, tile_class)) self.tile = self._create_simulator_tile(x_size, d_size, self.rpu_config) names = self.tile.get_hidden_parameter_names() if len(hidden_parameters_names) > 0 and names != hidden_parameters_names: # Check whether names match raise TileError('Mismatch with loaded analog state: ' 'Hidden parameter structure is unexpected.') self.tile.set_hidden_parameters(Tensor(hidden_parameters)) self.tile.set_weights(weights) if alpha_scale is not None: self.tile.set_alpha_scale(alpha_scale) self.tile.set_learning_rate(analog_lr) # re-generate shared weights (CPU) if shared_weights_if: if not hasattr(self, 'shared_weights'): # this is needed when pkl loading self.shared_weights = shared_weights with no_grad(): # always new will be populated with set weights. self.shared_weights.data = zeros(d_size, x_size, requires_grad=True) self.ensure_shared_weights() else: self.shared_weights = None # Regenerate context but keep the object ID if not hasattr(self, 'analog_ctx'): # when loading self.analog_ctx = AnalogContext(self, parameter=analog_ctx) self.analog_ctx.reset(self) self.analog_ctx.set_data(analog_ctx.data) if to_device.type.startswith('cuda'): self.cuda(to_device) def _create_simulator_tile( self, x_size: int, d_size: int, rpu_config: RPUConfigGeneric ) -> Union[tiles.FloatingPointTile, tiles.AnalogTile]: """Create a simulator tile. Args: x_size: input size d_size: output size rpu_config: resistive processing unit configuration Returns: a simulator tile based on the specified configuration. """ raise NotImplementedError
[docs] def set_weights( self, weights: Tensor, biases: Optional[Tensor] = None, realistic: bool = False, n_loops: int = 10 ) -> None: """Set the tile weights (and biases). Sets the internal tile weights to the specified values, and also the internal tile biases if the tile was set to use bias (via ``self.bias``). Note: By default this is **not** hardware realistic. You can set the ``realistic`` parameter to ``True`` for a realistic transfer. Args: weights: ``[out_size, in_size]`` weight matrix. biases: ``[out_size]`` bias vector. This parameter is required if ``self.bias`` is ``True``, and ignored otherwise. realistic: whether to use the forward and update pass to program the weights iteratively, using :meth:`set_weights_realistic`. n_loops: number of times the columns of the weights are set in a closed-loop manner. A value of ``1`` means that all columns in principle receive enough pulses to change from ``w_min`` to ``w_max``. Returns: None. Raises: ValueError: if the tile has bias but ``bias`` has not been specified. """ # Prepare the array expected by the pybind function, appending the # biases row if needed. weights_numpy = weights.clone().detach().cpu().numpy() if self.bias: # Create a ``[out_size, in_size (+ 1)]`` matrix. if biases is None: raise ValueError('Analog tile has a bias, but no bias given') biases_numpy = expand_dims(biases.clone().detach().cpu().numpy(), 1) combined_weights = concatenate([weights_numpy, biases_numpy], axis=1) else: # Use only the ``[out_size, in_size]`` matrix. combined_weights = weights_numpy if realistic: return self.tile.set_weights_realistic(combined_weights, n_loops) return self.tile.set_weights(combined_weights)
[docs] def get_weights( self, realistic: bool = False ) -> Tuple[Tensor, Optional[Tensor]]: """Get the tile weights (and biases). Gets the tile weights and extracts the mathematical weight matrix and biases (if present, by determined by the ``self.bias`` parameter). Note: By default this is **not** hardware realistic. Use set ``realistic`` to True for a realistic transfer. Args: realistic: Whether to use the forward pass to read out the tile weights iteratively, using :meth:`get_weights_realistic`. Returns: a tuple where the first item is the ``[out_size, in_size]`` weight matrix; and the second item is either the ``[out_size]`` bias vector or ``None`` if the tile is set not to use bias. """ # Retrieve the internal weights (and potentially biases) matrix. if realistic: combined_weights = self.tile.get_weights_realistic() else: combined_weights = self.tile.get_weights() # Split the internal weights (and potentially biases) matrix. if self.bias: # combined_weights is [out_size, in_size (+ 1)]. weights = Tensor(combined_weights[:, :-1]) biases = Tensor(combined_weights[:, -1]) else: # combined_weights is [out_size, in_size]. weights = Tensor(combined_weights) biases = None return weights, biases if self.bias else None
[docs] def set_weights_scaled( self, weights: Tensor, biases: Optional[Tensor] = None, realistic: bool = False, n_loops: int = 10, omega: float = 1.0 ) -> None: r"""Set the tile weights (and biases) in a scaled fashion. Similar to :meth:`set_weights`, however, additionally scales the weights by a global scale :math:`\alpha`, that is then applied in digital at the output of forward and backward pass, and the learning rate for this tile is adjusted accordingly. The weights are scaled by :math:`\omega/\max_{ij} |w_{ij}|` and the global digital factor :math:`alpha` is set to :math:`\max_{ij} |w_{ij}|/\omega`. It can be shown that such a constant factor greatly improves the SNR and training accuracy as the full weight range of the analog devices are used. See also `Rasch, Gokmen & Haensch (2019)`_ for more details. Caution: Using ``get_weights`` will now retrieve the true analog weights *without* applying the global factor. To get the true weights, use ``get_weights`` and scale it by the :math:`\alpha` of this layer which can be retrieved by ``get_alpha_scale()``. Args: weights: ``[out_size, in_size]`` weight matrix. biases: ``[out_size]`` bias vector. This parameter is required if ``self.bias`` is ``True``, and ignored otherwise. realistic: whether to use the forward and update pass to program the weights iteratively, using :meth:`set_weights_realistic`. n_loops: number of times the columns of the weights are set in a closed-loop manner. A value of ``1`` means that all columns in principle receive enough pulses to change from ``w_min`` to ``w_max``. omega: where the weight max should be mapped in terms of the weight range. Note that for ``omega`` larger than the maximal weight of the device, weights will get clipped for most devices. Returns: None. Raises: ValueError: if the tile has bias but ``bias`` has not been specified. .. _`Rasch, Gokmen & Haensch (2019)`: https://arxiv.org/abs/1906.02698 """ # Prepare the array expected by the pybind function, appending the # biases row if needed. weights_numpy = weights.clone().detach().cpu().numpy() if self.bias: # Create a ``[out_size, in_size (+ 1)]`` matrix. if biases is None: raise ValueError('Analog tile has a bias, but no bias given') biases_numpy = expand_dims(biases.clone().detach().cpu().numpy(), 1) combined_weights = concatenate([weights_numpy, biases_numpy], axis=1) else: # Use only the ``[out_size, in_size]`` matrix. combined_weights = weights_numpy # Scale the weights. weight_max = numpy_abs(combined_weights).max() combined_weights = combined_weights/weight_max*omega alpha = weight_max/omega self.tile.set_alpha_scale(alpha) if realistic: return self.tile.set_weights_realistic(combined_weights, n_loops) return self.tile.set_weights(combined_weights)
[docs] def get_weights_scaled( self, realistic: bool = False ) -> Tuple[Tensor, Optional[Tensor]]: """Get the tile weights (and biases) and applies the current alpha scale to it. Gets the tile weights and extracts the mathematical weight matrix and biases (if present, by determined by the ``self.bias`` parameter). Note: By default this is **not** hardware realistic. Use set ``realistic`` to True for a realistic transfer. Args: realistic: Whether to use the forward pass to read out the tile weights iteratively, using :meth:`get_weights_realistic`. Returns: a tuple where the first item is the ``[out_size, in_size]`` weight matrix; and the second item is either the ``[out_size]`` bias vector or ``None`` if the tile is set not to use bias. Both have the alpha scale applied. """ weights, biases = self.get_weights(realistic=realistic) alpha = self.tile.get_alpha_scale() if self.bias: return weights*alpha, biases*alpha return weights*alpha, None
[docs] def set_learning_rate(self, learning_rate: float) -> None: """Set the tile learning rate. Set the tile learning rate to ``-learning_rate``. Note that the learning rate is always taken to be negative (because of the meaning in gradient descent) and positive learning rates are not supported. Args: learning_rate: the desired learning rate. Returns: None. """ return self.tile.set_learning_rate(learning_rate)
[docs] def get_learning_rate(self) -> float: """Return the tile learning rate. Returns: float: the tile learning rate. """ return self.tile.get_learning_rate()
[docs] @no_grad() def decay_weights(self, alpha: float = 1.0) -> None: """Decays the weights once according to the decay parameters of the tile. Args: alpha: additional decay scale (such as LR). The base decay rate is set during tile init. Returns: None. """ return self.tile.decay_weights(alpha)
[docs] @no_grad() def drift_weights(self, delta_t: float = 1.0) -> None: """Drifts the weights once according to the drift parameters of the tile. See also :class:`~aihwkit.simulator.configs.utils.DriftParameter`. Args: delta_t: Time since last drift call. Returns: None. """ return self.tile.drift_weights(delta_t)
[docs] @no_grad() def diffuse_weights(self) -> None: """Diffuses the weights once according to the diffusion parameters of the tile. The base diffusion rate is set during tile init. Returns: None """ return self.tile.diffuse_weights()
[docs] @no_grad() def reset_columns( self, start_column_idx: int = 0, num_columns: int = 1, reset_prob: float = 1.0 ) -> None: r"""Reset (a number of) columns according to the reset parameters of the tile. Resets the weights with device-to-device and cycle-to-cycle variability (depending on device type), typically: .. math:: W_{ij} = \xi*\sigma_\text{reset} + b^\text{reset}_{ij} The reset parameters are set during tile init. Args: start_column_idx: a start index of columns (0..x_size-1) num_columns: how many consecutive columns to reset (with circular warping) reset_prob: individual probability of reset. Returns: None """ return self.tile.reset_columns(start_column_idx, num_columns, reset_prob)
[docs] def cpu(self) -> 'BaseTile': """Return a copy of this tile in CPU memory.""" raise NotImplementedError
[docs] def cuda( self, device: Optional[Union[torch_device, str, int]] = None ) -> 'BaseTile': """Return a copy of this tile in CUDA memory.""" raise NotImplementedError
[docs] @no_grad() def forward(self, x_input: Tensor, is_test: bool = False) -> Tensor: """Perform the forward pass. Args: x_input: ``[N, in_size]`` tensor. If ``in_trans`` is set, transposed. is_test: whether to assume testing mode. Returns: torch.Tensor: ``[N, out_size]`` tensor. If ``out_trans`` is set, transposed. """ # We use no-grad as we do it explicitly in the optimizer. return self.tile.forward(x_input, self.bias, self.in_trans, self.out_trans, is_test)
[docs] def backward(self, d_input: Tensor) -> Tensor: """Perform the backward pass. Args: d_input: ``[N, out_size]`` tensor. If ``out_trans`` is set, transposed. Returns: torch.Tensor: ``[N, in_size]`` tensor. If ``in_trans`` is set, transposed. """ return self.tile.backward(d_input, self.bias, self.out_trans, self.in_trans)
[docs] def update(self, x_input: Tensor, d_input: Tensor) -> None: """Perform the update pass. Args: x_input: ``[N, in_size]`` tensor. If ``in_trans`` is set, transposed. d_input: ``[N, out_size]`` tensor. If ``out_trans`` is set, transposed. Returns: None """ return self.tile.update(x_input, d_input, self.bias, self.in_trans, self.out_trans)
[docs] def get_hidden_parameters(self) -> OrderedDict: """Get the hidden parameters of the tile. Returns: Ordered dictionary of hidden parameter tensors. """ names = self.tile.get_hidden_parameter_names() hidden_parameters = self.tile.get_hidden_parameters().detach_() ordered_parameters = OrderedDict() for idx, name in enumerate(names): ordered_parameters[name] = hidden_parameters[idx].clone() return ordered_parameters
[docs] def set_hidden_parameters(self, ordered_parameters: OrderedDict) -> None: """Set the hidden parameters of the tile. Caution: Usually the hidden parameters are drawn according to the parameter definitions (those given in the RPU config). If the hidden parameters are arbitrary set by the user, then this correspondence might be broken. This might cause problems in the learning, in particular, the `weight granularity` (usually ``dw_min``, depending on the device) is needed for the dynamic adjustment of the bit length (``update_bl_management``, see :class:`~aihwkit.simulator.configs.utils.UpdateParameters`). Currently, the new ``dw_min`` parameter is tried to be estimated from the average of hidden parameters if the discrepancy with the ``dw_min`` from the definition is too large. Args: ordered_parameters: Ordered dictionary of hidden parameter tensors. Raises: TileError: In case the ordered dict keys do not conform with the current rpu config tile structure of the hidden parameters """ if len(ordered_parameters) == 0: return hidden_parameters = stack(list(ordered_parameters.values()), dim=0) names = self.tile.get_hidden_parameter_names() if names != list(ordered_parameters.keys()): raise TileError('Mismatch with loaded analog state:' 'Hidden parameter structure is unexpected.') self.tile.set_hidden_parameters(hidden_parameters)
[docs] def get_hidden_update_index(self) -> int: """Get the current updated device index of the hidden devices. Usually this is 0 as only one device is present per cross-point for many tile RPU configs. However, some RPU configs maintain internally multiple devices per cross-point (e.g. :class:`~aihwkit.simulator.config.devices.VectorUnitCell`). Returns: The next mini-batch updated device index. Note: Depending on the update and learning policy implemented in the tile, updated devices might switch internally as well. """ return self.tile.get_hidden_update_index()
[docs] def set_hidden_update_index(self, index: int) -> None: """Set the current updated hidden device index. Usually this is ignored and fixed to 0 as only one device is present per cross-point. Other devices, might not allow explicit setting as it would interfere with the implemented learning rule. However, some tiles have internally multiple devices per cross-point (eg. unit cell) that can be chosen depending on the update policy. Args: index: device index to be updated in the next mini-batch Note: Depending on the update and learning policy implemented in the tile, updated devices might switch internally as well. """ self.tile.set_hidden_update_index(index)
[docs] def set_indexed(self, indices: Tensor, image_sizes: List) -> None: """Set the index matrix for convolutions ans switches to indexed forward/backward/update versions. Args: indices : torch.tensor with int indices image_sizes: [C_in, H_in, W_in, H_out, W_out] sizes Raises: ValueError: if ``image_sizes`` does not have valid dimensions. TileError: if the tile uses transposition. """ if len(image_sizes) not in (3, 5, 7): raise ValueError('image_sizes expects 3, 5 or 7 sizes ' '[C_in, (D_in), H_in, (W_in), (D_out), H_out, (W_out)]') if self.in_trans or self.out_trans: raise TileError('Transposed indexed versions not supported (assumes NC(D)HW)') self.image_sizes = image_sizes self.tile.set_matrix_indices(indices)
[docs] @no_grad() def forward_indexed(self, x_input: Tensor, is_test: bool = False) -> Tensor: """Perform the forward pass for convolutions. Depending on the input tensor size it performs the forward pass for a 2D image or a 3D one. Args: x_input: ``[N, in_size]`` tensor. If ``in_trans`` is set, transposed. is_test: whether to assume testing mode. Returns: torch.Tensor: ``[N, out_size]`` tensor. If ``out_trans`` is set, transposed. Raises: TileError: if the indexed tile has not been initialized, or if ``self.images_sizes`` does not have a valid dimennion. """ if not self.image_sizes: raise TileError('self.image_sizes is not initialized. Please use ' 'set_indexed()') n_batch = x_input.size(0) channel_out = self.out_size if len(self.image_sizes) == 3: _, _, height_out = self.image_sizes d_tensor = x_input.new_empty((n_batch, channel_out, height_out)) elif len(self.image_sizes) == 5: _, _, _, height_out, width_out = self.image_sizes d_tensor = x_input.new_empty((n_batch, channel_out, height_out, width_out)) elif len(self.image_sizes) == 7: _, _, _, _, depth_out, height_out, width_out = self.image_sizes d_tensor = x_input.new_empty((n_batch, channel_out, depth_out, height_out, width_out)) else: raise TileError('self.image_sizes length is not 3, 5 or 7') return self.tile.forward_indexed(x_input, d_tensor, is_test)
[docs] def backward_indexed(self, d_input: Tensor) -> Tensor: """Perform the backward pass for convolutions. Depending on the input tensor size it performs the backward pass for a 2D image or a 3D one. Args: d_input: ``[N, out_size]`` tensor. If ``out_trans`` is set, transposed. Returns: torch.Tensor: ``[N, in_size]`` tensor. If ``in_trans`` is set, transposed. Raises: TileError: if the indexed tile has not been initialized, or if ``self.images_sizes`` does not have a valid dimennion. """ if not self.image_sizes: raise TileError('self.image_sizes is not initialized. Please use ' 'set_indexed()') n_batch = d_input.size(0) if len(self.image_sizes) == 3: channel_in, height_in, _ = self.image_sizes x_tensor = d_input.new_empty((n_batch, channel_in, height_in)) elif len(self.image_sizes) == 5: channel_in, height_in, width_in, _, _ = self.image_sizes x_tensor = d_input.new_empty((n_batch, channel_in, height_in, width_in)) elif len(self.image_sizes) == 7: channel_in, depth_in, height_in, width_in, _, _, _ \ = self.image_sizes x_tensor = d_input.new_empty((n_batch, channel_in, depth_in, height_in, width_in)) else: raise TileError('self.image_sizes length is not 3, 5 or 7') return self.tile.backward_indexed(d_input, x_tensor)
[docs] def update_indexed(self, x_input: Tensor, d_input: Tensor) -> None: """Perform the update pass for convolutions. Args: x_input: ``[N, in_size]`` tensor. If ``in_trans`` is set, transposed. d_input: ``[N, out_size]`` tensor. If ``out_trans`` is set, transposed. Returns: None """ return self.tile.update_indexed(x_input, d_input)
[docs] @no_grad() def post_update_step(self) -> None: """Operators that need to be called once per mini-batch.""" if self.rpu_config.device.requires_diffusion(): # type: ignore self.diffuse_weights() if self.rpu_config.device.requires_decay(): # type: ignore self.decay_weights()