#!/usr/bin/env python
# -*- coding: utf-8 -*-
import numpy as np
from tqdm import tqdm
from collections import deque
from plasticity.utils import _check_activation
from plasticity.utils.activations import Linear
from plasticity.model.optimizer import Optimizer
from plasticity.model.weights import BaseWeights
from sklearn.base import BaseEstimator
from sklearn.base import TransformerMixin
from sklearn.utils import check_array
from sklearn.utils import check_X_y
from sklearn.utils.validation import check_is_fitted
__author__ = ['Nico Curti', 'SimoneGasperini']
__email__ = ['nico.curit2@unibo.it', 'simone.gasperini2@studio.unibo.it']
[docs]class BasePlasticity (BaseEstimator, TransformerMixin):
'''
Abstract base class for plasticity models
Parameters
----------
outputs : int (default=100)
Number of hidden units
num_epochs : int (default=100)
Maximum number of epochs for model convergency
batch_size : int (default=100)
Size of the minibatch
weights_init : BaseWeights object
Weights initialization strategy.
activation : str (default="Linear")
Name of the activation function
optimizer : Optimizer (default=SGD)
Optimizer object (derived by the base class Optimizer)
precision : float (default=1e-30)
Parameter that controls numerical precision of the weight updates
epochs_for_convergency : int (default=None)
Number of stable epochs requested for the convergency.
If None the training proceeds up to the maximum number of epochs (num_epochs).
convergency_atol : float (default=0.01)
Absolute tolerance requested for the convergency
random_state : int (default=None)
Random seed for batch subdivisions
verbose : bool (default=True)
Turn on/off the verbosity
'''
def __init__ (self, outputs=100, num_epochs=100,
activation='Linear', optimizer=Optimizer,
batch_size=100, weights_init=BaseWeights,
precision=1e-30,
epochs_for_convergency=None,
convergency_atol=0.01,
random_state=None,
verbose=True):
_, activation = _check_activation(self, activation)
self.outputs = outputs
self.num_epochs = num_epochs
self.batch_size = batch_size
self.activation = activation
self.optimizer = optimizer
self.weights_init = weights_init
self.precision = precision
self.epochs_for_convergency = epochs_for_convergency if epochs_for_convergency is not None else num_epochs
self.epochs_for_convergency = max(self.epochs_for_convergency, 1)
self.convergency_atol = convergency_atol
self.random_state = random_state
self.verbose = verbose
def _weights_update (self, X, output):
'''
Compute the weights update using the given learning rule.
Parameters
----------
X : array-like (2D)
Input array of data
output : array-like (2D)
Output of the model estimated by the predict function
Returns
-------
weight_update : array-like (2D)
Weight updates matrix to apply
theta : array-like
Array of learning progress
'''
raise NotImplementedError
def _lebesgue_norm (self):
'''
Apply the Lebesgue norm to the weights.
'''
if self.p != 2:
sign = np.sign(self.weights)
self.weights = sign * np.absolute(self.weights)**(self.p - 1)
def _fit_step (self, X, norm=False):
'''
Core function of fit step (forward + backward + updates).
We divide the step into a function to allow an easier visualization
of the weight matrix (if necessary).
Parameters
----------
X : array-like (2D)
Input array of data
norm : bool (default=False)
Switch on/off the weights normalization using Lebesque norm
Returns
-------
theta : array-like
Array of learning progress
'''
if norm:
self._lebesgue_norm()
# predict the encoded values
output = self._predict(X)
# update weights
w_update, theta = self._weights_update(X, output)
#self.weights[:] += epsilon * w_update
self.weights, = self.optimizer.update(params=[self.weights], gradients=[-w_update]) # -update for compatibility with optimizers
return theta
@property
def _check_convergency (self):
'''
Check if the current training has reached the convergency.
Returns
-------
check : bool
Check if the learning history of the model is stable.
Notes
-----
.. note::
The convergency is estimated by the stability or not of the
learning parameter in a fixed (epochs_for_convergency) number
of epochs for all the outputs.
'''
if len(self.history) < self.epochs_for_convergency:
return False
last = np.full_like(self.history, fill_value=self.history[-1])
return np.allclose(self.history, last, atol=self.convergency_atol)#, rtol=self.convergency_atol)
def _join_input_label (self, X, y):
'''
Join the input data matrix to the labels.
In this way the labels array/matrix is considered as a new
set of inputs for the model and the plasticity model can
perform classification tasks without any extra supervised learning.
Parameters
----------
X : array-like (2D)
Input array of data
y : array-like (1D or 2D)
Labels array/matrix
Returns
-------
join : array-like (2D)
Matrix of the merged data in which the first n_sample columns
are occupied by the original data and the remaining ones store
the labels.
Notes
-----
.. note::
The labels can be a 1D array or multi-dimensional array: the given
shape is internally reshaped according to the required dimensions.
'''
X, y = check_X_y(X, y, multi_output=True, y_numeric=True)
# reshape the labels if it is a single array
y = y.reshape(-1, 1) if len(y.shape) == 1 else y
# concatenate the labels as new inputs for neurons
X = np.concatenate((X, y), axis=1)
return X
def _fit (self, X, norm=False):
'''
Core function for the fit member
Parameters
----------
X : array-like (2D)
Input array of data
norm : bool (default=False)
Switch on/off the weights normalization using Lebesque norm
Returns
-------
self
'''
num_samples, _ = X.shape
indices = np.arange(0, num_samples).astype('int64')
num_batches = num_samples // self.batch_size
for epoch in range(self.num_epochs):
if self.verbose:
print('Epoch {:d}/{:d}'.format(epoch + 1, self.num_epochs))
# random shuffle the input
np.random.shuffle(indices)
batches = np.lib.stride_tricks.as_strided(indices, shape=(num_batches, self.batch_size), strides=(self.batch_size * 8, 8))
for batch in tqdm(batches, disable=(not self.verbose)):
batch_data = X[batch, ...]
theta = self._fit_step(X=batch_data, norm=norm)
# append only the last theta value of the batch
# for the convergency evaluation since the weight matrix
# changes (it is updated) at every batch
self.history.append(theta)
# check if the model has reached the convergency (early stopping criteria)
if self._check_convergency:
if self.verbose:
print('Early stopping: the training has reached the convergency criteria')
break
##### WEIGHTS SYMMETRIC ORTHOGONALIZATION (once at the end of each epoch)
# the two methods are actually exactly equivalent
# (provable by simple linear algebra in real domain)
# but the first using the numpy svd function is faster
# (1)
#U, _, Vt = np.linalg.svd(self.weights, full_matrices=False)
#self.weights = np.einsum('ij, jk -> ik', U, Vt, optimize=True)
# (2)
#from scipy.linalg import sqrtm
#self.weights = np.real(self.weights @ np.linalg.inv(sqrtm(self.weights.T @ self.weights)))
return self
[docs] def fit (self, X, y=None):
'''
Fit the Plasticity model weights.
Parameters
----------
X : array-like of shape (n_samples, n_features)
The training input samples
y : array-like, default=None
The array of labels
Returns
-------
self : object
Return self
Notes
-----
.. note::
The model tries to memorize the given input producing a valid encoding.
.. warning::
If the array of labels is provided, it will be considered as a set of new inputs
for the neurons. The labels can be 1D array or multi-dimensional array: the given
shape is internally reshaped according to the required dimensions.
'''
if y is not None:
X = self._join_input_label(X=X, y=y)
X = check_array(X)
np.random.seed(self.random_state)
num_samples, num_features = X.shape
if self.batch_size > num_samples:
raise ValueError('Incorrect batch_size found. The batch_size must be less or equal to the number of samples. '
'Given {:d} for {:d} samples'.format(self.batch_size, num_samples))
#self.weights = np.random.normal(loc=self.mu, scale=self.sigma, size=(self.outputs, num_features))
self.weights = self.weights_init.get(size=(self.outputs, num_features))
self.history = deque(maxlen=self.epochs_for_convergency)
self._fit(X)
return self
def _predict (self, X):
'''
Core function for the predict member
'''
raise NotImplementedError
[docs] def predict (self, X, y=None):
'''
Reduce X applying the Plasticity encoding.
Parameters
----------
X : array of shape (n_samples, n_features)
The input samples
y : array-like, default=None
The array of labels
Returns
-------
Xnew : array of shape (n_values, n_samples)
The encoded features
Notes
-----
.. warning::
If the array of labels is provided, it will be considered as a set of new inputs
for the neurons. The labels can be 1D array or multi-dimensional array: the given
shape is internally reshaped according to the required dimensions.
'''
check_is_fitted(self, 'weights')
if y is not None:
X = self._join_input_label(X=X, y=y)
# return (self.weights @ X).transpose()
old_activation = self.activation
self.activation = Linear()
result = self._predict(X).transpose()#np.einsum('ij, kj -> ik', self.weights, X, optimize=True).transpose() # without activation
self.activation = old_activation
return result
X = check_array(X)
return self._predict(X)
[docs] def save_weights (self, filename):
'''
Save the current weights to a binary file.
Parameters
----------
filename : str
Filename or path
Returns
-------
True if everything is ok
'''
check_is_fitted(self, 'weights')
with open(filename, 'wb') as fp:
self.weights.tofile(fp, sep='')
return True
[docs] def load_weights (self, filename):
'''
Load the weight matrix from a binary file.
Parameters
----------
filename : str
Filename or path
Returns
-------
self : object
Return self
'''
with open(filename, 'rb') as fp:
self.weights = np.fromfile(fp, dtype=np.float, count=-1)
# reshape the loaded weights since the numpy function loads
# only in ravel format!!
self.weights = self.weights.reshape(self.outputs, -1)
return self
def __repr__(self):
'''
Object representation
'''
class_name = self.__class__.__qualname__
params = self.__init__.__code__.co_varnames
params = set(params) - {'self'}
args = ', '.join(['{0}={1}'.format(k, str(getattr(self, k)))
if not isinstance(getattr(self, k), str) else '{0}="{1}"'.format(k, str(getattr(self, k)))
for k in params])
return '{0}({1})'.format(class_name, args)