#!/usr/bin/env python
# -*- coding: utf-8 -*-

import numpy as np
from tqdm import tqdm
from collections import deque

from plasticity.utils import _check_activation
from plasticity.utils.activations import Linear
from plasticity.model.optimizer import Optimizer
from plasticity.model.weights import BaseWeights

from sklearn.base import BaseEstimator
from sklearn.base import TransformerMixin
from sklearn.utils import check_array
from sklearn.utils import check_X_y
from sklearn.utils.validation import check_is_fitted

__author__  = ['Nico Curti', 'SimoneGasperini']
__email__ = ['', '']

[docs]class BasePlasticity (BaseEstimator, TransformerMixin): ''' Abstract base class for plasticity models Parameters ---------- outputs : int (default=100) Number of hidden units num_epochs : int (default=100) Maximum number of epochs for model convergency batch_size : int (default=100) Size of the minibatch weights_init : BaseWeights object Weights initialization strategy. activation : str (default="Linear") Name of the activation function optimizer : Optimizer (default=SGD) Optimizer object (derived by the base class Optimizer) precision : float (default=1e-30) Parameter that controls numerical precision of the weight updates epochs_for_convergency : int (default=None) Number of stable epochs requested for the convergency. If None the training proceeds up to the maximum number of epochs (num_epochs). convergency_atol : float (default=0.01) Absolute tolerance requested for the convergency random_state : int (default=None) Random seed for batch subdivisions verbose : bool (default=True) Turn on/off the verbosity ''' def __init__ (self, outputs=100, num_epochs=100, activation='Linear', optimizer=Optimizer, batch_size=100, weights_init=BaseWeights, precision=1e-30, epochs_for_convergency=None, convergency_atol=0.01, random_state=None, verbose=True): _, activation = _check_activation(self, activation) self.outputs = outputs self.num_epochs = num_epochs self.batch_size = batch_size self.activation = activation self.optimizer = optimizer self.weights_init = weights_init self.precision = precision self.epochs_for_convergency = epochs_for_convergency if epochs_for_convergency is not None else num_epochs self.epochs_for_convergency = max(self.epochs_for_convergency, 1) self.convergency_atol = convergency_atol self.random_state = random_state self.verbose = verbose def _weights_update (self, X, output): ''' Compute the weights update using the given learning rule. Parameters ---------- X : array-like (2D) Input array of data output : array-like (2D) Output of the model estimated by the predict function Returns ------- weight_update : array-like (2D) Weight updates matrix to apply theta : array-like Array of learning progress ''' raise NotImplementedError def _lebesgue_norm (self): ''' Apply the Lebesgue norm to the weights. ''' if self.p != 2: sign = np.sign(self.weights) self.weights = sign * np.absolute(self.weights)**(self.p - 1) def _fit_step (self, X, norm=False): ''' Core function of fit step (forward + backward + updates). We divide the step into a function to allow an easier visualization of the weight matrix (if necessary). Parameters ---------- X : array-like (2D) Input array of data norm : bool (default=False) Switch on/off the weights normalization using Lebesque norm Returns ------- theta : array-like Array of learning progress ''' if norm: self._lebesgue_norm() # predict the encoded values output = self._predict(X) # update weights w_update, theta = self._weights_update(X, output) #self.weights[:] += epsilon * w_update self.weights, = self.optimizer.update(params=[self.weights], gradients=[-w_update]) # -update for compatibility with optimizers return theta @property def _check_convergency (self): ''' Check if the current training has reached the convergency. Returns ------- check : bool Check if the learning history of the model is stable. Notes ----- .. note:: The convergency is estimated by the stability or not of the learning parameter in a fixed (epochs_for_convergency) number of epochs for all the outputs. ''' if len(self.history) < self.epochs_for_convergency: return False last = np.full_like(self.history, fill_value=self.history[-1]) return np.allclose(self.history, last, atol=self.convergency_atol)#, rtol=self.convergency_atol) def _join_input_label (self, X, y): ''' Join the input data matrix to the labels. In this way the labels array/matrix is considered as a new set of inputs for the model and the plasticity model can perform classification tasks without any extra supervised learning. Parameters ---------- X : array-like (2D) Input array of data y : array-like (1D or 2D) Labels array/matrix Returns ------- join : array-like (2D) Matrix of the merged data in which the first n_sample columns are occupied by the original data and the remaining ones store the labels. Notes ----- .. note:: The labels can be a 1D array or multi-dimensional array: the given shape is internally reshaped according to the required dimensions. ''' X, y = check_X_y(X, y, multi_output=True, y_numeric=True) # reshape the labels if it is a single array y = y.reshape(-1, 1) if len(y.shape) == 1 else y # concatenate the labels as new inputs for neurons X = np.concatenate((X, y), axis=1) return X def _fit (self, X, norm=False): ''' Core function for the fit member Parameters ---------- X : array-like (2D) Input array of data norm : bool (default=False) Switch on/off the weights normalization using Lebesque norm Returns ------- self ''' num_samples, _ = X.shape indices = np.arange(0, num_samples).astype('int64') num_batches = num_samples // self.batch_size for epoch in range(self.num_epochs): if self.verbose: print('Epoch {:d}/{:d}'.format(epoch + 1, self.num_epochs)) # random shuffle the input np.random.shuffle(indices) batches = np.lib.stride_tricks.as_strided(indices, shape=(num_batches, self.batch_size), strides=(self.batch_size * 8, 8)) for batch in tqdm(batches, disable=(not self.verbose)): batch_data = X[batch, ...] theta = self._fit_step(X=batch_data, norm=norm) # append only the last theta value of the batch # for the convergency evaluation since the weight matrix # changes (it is updated) at every batch self.history.append(theta) # check if the model has reached the convergency (early stopping criteria) if self._check_convergency: if self.verbose: print('Early stopping: the training has reached the convergency criteria') break ##### WEIGHTS SYMMETRIC ORTHOGONALIZATION (once at the end of each epoch) # the two methods are actually exactly equivalent # (provable by simple linear algebra in real domain) # but the first using the numpy svd function is faster # (1) #U, _, Vt = np.linalg.svd(self.weights, full_matrices=False) #self.weights = np.einsum('ij, jk -> ik', U, Vt, optimize=True) # (2) #from scipy.linalg import sqrtm #self.weights = np.real(self.weights @ np.linalg.inv(sqrtm(self.weights.T @ self.weights))) return self
[docs] def fit (self, X, y=None): ''' Fit the Plasticity model weights. Parameters ---------- X : array-like of shape (n_samples, n_features) The training input samples y : array-like, default=None The array of labels Returns ------- self : object Return self Notes ----- .. note:: The model tries to memorize the given input producing a valid encoding. .. warning:: If the array of labels is provided, it will be considered as a set of new inputs for the neurons. The labels can be 1D array or multi-dimensional array: the given shape is internally reshaped according to the required dimensions. ''' if y is not None: X = self._join_input_label(X=X, y=y) X = check_array(X) np.random.seed(self.random_state) num_samples, num_features = X.shape if self.batch_size > num_samples: raise ValueError('Incorrect batch_size found. The batch_size must be less or equal to the number of samples. ' 'Given {:d} for {:d} samples'.format(self.batch_size, num_samples)) #self.weights = np.random.normal(, scale=self.sigma, size=(self.outputs, num_features)) self.weights = self.weights_init.get(size=(self.outputs, num_features)) self.history = deque(maxlen=self.epochs_for_convergency) self._fit(X) return self
def _predict (self, X): ''' Core function for the predict member ''' raise NotImplementedError
[docs] def predict (self, X, y=None): ''' Reduce X applying the Plasticity encoding. Parameters ---------- X : array of shape (n_samples, n_features) The input samples y : array-like, default=None The array of labels Returns ------- Xnew : array of shape (n_values, n_samples) The encoded features Notes ----- .. warning:: If the array of labels is provided, it will be considered as a set of new inputs for the neurons. The labels can be 1D array or multi-dimensional array: the given shape is internally reshaped according to the required dimensions. ''' check_is_fitted(self, 'weights') if y is not None: X = self._join_input_label(X=X, y=y) # return (self.weights @ X).transpose() old_activation = self.activation self.activation = Linear() result = self._predict(X).transpose()#np.einsum('ij, kj -> ik', self.weights, X, optimize=True).transpose() # without activation self.activation = old_activation return result X = check_array(X) return self._predict(X)
[docs] def transform (self, X): ''' Apply the data reduction according to the features in the best signature found. Parameters ---------- X : array-like of shape (n_samples, n_features) The input samples Returns ------- Xnew : array-like of shape (n_samples, encoded_features) The data encoded according to the model weights. ''' check_is_fitted(self, 'weights') Xnew = self._predict(X) return Xnew.transpose()
[docs] def fit_transform (self, X, y=None): ''' Fit the model model meta-transformer and apply the data encoding transformation. Parameters ---------- X : array-like of shape (n_samples, n_features) The training input samples y : array-like, shape (n_samples,) The target values Returns ------- Xnew : array-like of shape (n_samples, encoded_features) The data encoded according to the model weights. Notes ----- .. warning:: If the array of labels is provided, it will be considered as a set of new inputs for the neurons. The labels can be 1D array or multi-dimensional array: the given shape is internally reshaped according to the required dimensions. ''', y=y) Xnew = self.transform(X) return Xnew
[docs] def save_weights (self, filename): ''' Save the current weights to a binary file. Parameters ---------- filename : str Filename or path Returns ------- True if everything is ok ''' check_is_fitted(self, 'weights') with open(filename, 'wb') as fp: self.weights.tofile(fp, sep='') return True
[docs] def load_weights (self, filename): ''' Load the weight matrix from a binary file. Parameters ---------- filename : str Filename or path Returns ------- self : object Return self ''' with open(filename, 'rb') as fp: self.weights = np.fromfile(fp, dtype=np.float, count=-1) # reshape the loaded weights since the numpy function loads # only in ravel format!! self.weights = self.weights.reshape(self.outputs, -1) return self
def __repr__(self): ''' Object representation ''' class_name = self.__class__.__qualname__ params = self.__init__.__code__.co_varnames params = set(params) - {'self'} args = ', '.join(['{0}={1}'.format(k, str(getattr(self, k))) if not isinstance(getattr(self, k), str) else '{0}="{1}"'.format(k, str(getattr(self, k))) for k in params]) return '{0}({1})'.format(class_name, args)