import os
import warnings
import numpy as np
from scipy.optimize import curve_fit
from sklearn.exceptions import NotFittedError
from sklearn.metrics import classification_report
from mcalf.utils.spec import reinterpolate_spectrum
from mcalf.utils.misc import make_iter
__all__ = ['ModelBase']
[docs]class ModelBase:
"""Base class for spectral line model fitting.
Warning: This class should not be used directly.
Use derived classes instead.
Attributes
----------
array : ndarray, dimensions are ['time', 'row', 'column', 'spectra']
Array holding spectra.
background : ndarray, dimensions are ['time', 'row', 'column']
Array holding spectral backgrounds.
"""
def __init__(self):
self.array = None # Array that will hold any loaded spectra
self.background = None # Array holding constant background values for `self.array`
def _load_data(self, array, names=None, target=None):
"""Load a specified array into the model object.
Load `array` with dimension names `names` into the parameter specified by `target`.
Parameters
----------
array : ndarray
The array to load.
names : list of str, length = `array.ndims`
List of dimension names for `array`. Valid dimension names depend on `target`.
target : {'array', 'background'}
See Also
--------
load_array : Load and array of spectra
load_background : Load an array of spectral backgrounds
"""
# Validate specified `target`
if target not in ['array', 'background']:
raise ValueError("array target must be either 'array' or 'background', got '%s'" % target)
# Verify `array` has data on more than one spectrum
if (target == 'array' and np.ndim(array) == 1) or (target == 'background' and np.ndim(array) == 0):
raise ValueError("cannot load an array containing one spectrum, use the spectrum directly instead")
# Validate the dimension names given
if names is None:
raise ValueError("dimension names must be specified for each dimension")
if np.ndim(array) != len(np.atleast_1d(names)): # All dimensions must be labelled
raise ValueError("number of dimension names do not match number of columns")
if len(np.atleast_1d(names)) != len(np.unique(np.atleast_1d(names))):
raise ValueError("duplicate dimension names found")
if target == 'array' and 'wavelength' not in names:
raise ValueError("array must contain a wavelength dimension")
# # Pad input array to include all possible input dimensions and
# # then transpose it such that order of dimensions is consistent
# e.g. ['time', 'wavelength'] -> ['time', 'row', 'column', 'wavelength']
# ['wavelength', 'column', 'row'] -> ['time', 'row', 'column', 'wavelength']
# Define the transposition on the input array to get the dimensions in order
# ['time', 'row', 'column'(, 'wavelength')]
n_dims = 4 if target == 'array' else 3 # ndims of (padded) array to load data into
transposition = [None] * n_dims
for i in range(len(names)):
if names[i] == 'time':
transposition[0] = i
elif names[i] == 'row':
transposition[1] = i
elif names[i] == 'column':
transposition[2] = i
elif names[i] == 'wavelength' and target == 'array': # Only `target` 'array' should contain wavelengths
transposition[3] = i
else: # Name given is not recognised
raise ValueError("name '{}' is not a valid dimension name".format(names[i]))
# Define where the new padded dimensions will be placed within the transposition
array_n_dims = np.ndim(array) # Number of dimensions in input array
new_axis_id = array_n_dims # Start labelling new axes after last dimension of input array
for axis in range(n_dims): # Loop through each axis of padded array to be created
if transposition[axis] is None: # Axis hasn't been labelled yet (not part of input array)
transposition[axis] = new_axis_id # Locate it within the transposition definition
new_axis_id += 1
# Add missing dimensions to the end
n_dims_to_add = n_dims - array_n_dims
ordered_array = array
for i in range(n_dims_to_add): # Add required number of dimensions to the end of the input array
ordered_array = ordered_array[..., np.newaxis]
# Reorder the dimensions of the padded array such that they can be later assumed
ordered_array = np.transpose(ordered_array, transposition)
# If spectral array, verify that length of each spectrum is equal to the number of corresponding wavelengths
if target == 'array' and np.shape(ordered_array)[-1] != len(self.original_wavelengths):
raise ValueError("length of wavelength dimension not equal length of original_wavelengths")
# Load the processed array into the specified object parameter
if target == 'array':
self.array = ordered_array
elif target == 'background':
self.background = ordered_array
else: # This should not happen
raise ValueError("unknown target (impossible error)")
# If "the other array" has been loaded, warn the user if their dimensions are not compatible
if self.array is not None and self.background is not None and \
np.sum(np.shape(self.array)[:-1] != np.shape(self.background)) != 0:
warnings.simplefilter('always', UserWarning)
warnings.warn("shape of spectrum array indices does not match shape of background array")
[docs] def load_array(self, array, names=None):
"""Load an array of spectra.
Load `array` with dimension names `names` into the `array` parameter of the model object.
Parameters
----------
array : ndarray of ndims > 1
An array containing at least two spectra.
names : list of str, length = `array.ndims`
List of dimension names for `array`. Valid dimension names are 'time', 'row', 'column' and 'wavelength'.
'wavelength' is a required dimension.
See Also
--------
load_background : Load an array of spectral backgrounds
"""
self._load_data(array, names=names, target='array')
[docs] def load_background(self, array, names=None):
"""Load an array of spectral backgrounds.
Load `array` with dimension names `names` into `background` parameter of the model object.
Parameters
----------
array : ndarray of ndim>0
An array containing at least two backgrounds.
names : list of str, length = `array.ndims`
List of dimension names for `array`. Valid dimension names are 'time', 'row' and 'column'.
See Also
--------
load_array : Load and array of spectra
"""
self._load_data(array, names=names, target='background')
[docs] def train(self, X, y):
"""Fit the neural network model to spectra matrix X and spectra labels y.
Calls the `fit` method on the `neural_network` parameter of the model object.
Parameters
----------
X : ndarray or sparse matrix of shape (n_spectra, n_wavelengths)
The input spectra.
y : ndarray of shape (n_spectra,) or (n_spectra, n_outputs)
The target class labels.
See Also
--------
test : Test how well the neural network has been trained
"""
self.neural_network.fit(X, y)
[docs] def test(self, X, y):
"""Test the accuracy of the trained neural network.
Prints a table of results showing:
1) the percentage of predictions that equal the target labels;
2) the average classification deviation and standard deviation from the ground truth classification
for each labelled classification;
3) the average classification deviation and standard deviation overall.
If the model object has an output parameter, it will create a CSV file (`self.output`/neural_network/test.csv)
listing the predictions and ground truth data.
Parameters
----------
X : ndarray or sparse matrix of shape (n_spectra, n_wavelengths)
The input spectra.
y : ndarray of shape (n_spectra,) or (n_spectra, n_outputs)
The target class labels.
See Also
--------
train : Train the neural network
"""
# Predict the labels
try:
predictions = self.neural_network.predict(X)
except NotFittedError:
raise NotFittedError("Neural network has not been trained yet. Call 'train' on the model class first.")
if len(predictions) != len(y): # Must be a corresponding ground truth label to compare to
raise ValueError("number of samples X does not equal number of labels y")
# Score is the percentage of predictions that are equal to their ground truth label
score = np.sum(predictions == y) / len(predictions) * 100
# Generate the table
info = "+---------------------------------------------+\n"
info += "| Neural Network Testing Statistics |\n"
info += "+---------------------------------------------+\n"
info += "| Percentage predictions==labels :: {: 6.2f}% |\n".format(score)
info += "+---------------------------------------------+\n"
info += "| Average deviation for each classification |\n"
info += "+---------------------------------------------+\n"
# Generate the average deviation for each classification
classifications = np.sort(np.unique([predictions, y]))
for classification in classifications:
dev = predictions[np.where(y == classification)[0]] - classification
info += "| class {: 2d} :: {: 6.2f} ± {: 4.2f} |\n".format(classification, np.mean(dev),
np.std(dev))
info += "+---------------------------------------------+\n"
# Generate the average deviation over all classifications
dev = predictions - y
info += "| Average deviation overall :: {: 6.2f} ± {: 4.2f} |\n".format(np.mean(dev), np.std(dev))
info += "+---------------------------------------------+"
# Print the table
print(info)
# Print scikit-learn classification report
print(classification_report(y, predictions))
# Output to file columns: 'ground_truth', 'predictions'
if self.output is not None:
try:
os.makedirs(os.path.join(self.output, "neural_network"))
except FileExistsError:
pass
np.savetxt(os.path.join(self.output, "neural_network", "test.csv"), np.asarray([y, predictions]).T,
fmt='%3d', delimiter=',', header='ground_truth, prediction')
def _get_time_row_column(self, time=None, row=None, column=None):
"""Validate and infer the time, row and column index
Takes any time, row and column index given and if any are not specified, they are returned as 0 if the
spectral array only has one value at its dimension. If there are multiple and no index is specified,
an error is raised due to the ambiguity.
Parameters
----------
time : optional, default=None
The time index.
row : optional, default=None
The row index.
column : optional, default=None
The column index.
Returns
-------
time
The corrected time index.
row
The corrected row index.
column
The corrected column index.
See Also
--------
utils.make_iter : Make a variable iterable
Notes
-----
No type checking is done on the input indices so it can be anything but in most cases will need to be
either an integer or iterable. The `utils.make_iter` function can be used to make indices iterable.
"""
array_shape = np.shape(self.array)
if time is None:
# No index was specified but we can infer that it would be 0 as there is only 1 value at this dimension
if array_shape[0] == 1:
time = 0
else:
raise ValueError("time index must be specified as multiple indices exist")
if row is None:
if array_shape[1] == 1:
row = 0
else:
raise ValueError("row index must be specified as multiple indices exist")
if column is None:
if array_shape[2] == 1:
column = 0
else:
raise ValueError("column index must be specified as multiple indices exist")
return time, row, column
[docs] def get_spectra(self, time=None, row=None, column=None, spectrum=None, correct=True, background=False):
"""Gets corrected spectra from the spectral array
Takes either a set of indices or an explicit spectrum and optionally applied corrections and background
removal.
Parameters
----------
time : int or iterable, optional, default=None
The time index. The index can be either a single integer index or an iterable. E.g. a list, a NumPy
array, a Python range, etc. can be used.
row : int or iterable, optional, default=None
The row index. See comment for `time` parameter.
column : int or iterable, optional, default=None
The column index. See comment for `time` parameter.
spectrum : ndarray of ndim=1, optional, default=None
The explicit spectrum.
correct : bool, optional, default=True
Whether to reinterpolate the spectrum and apply the prefilter correction (if exists).
background : bool, optional, default=False
Whether to include the background in the outputted spectra. Only removes the background if the
relevant background array has been loaded. Does not remove background is processing an explicit spectrum.
"""
# Locate the spectra
if spectrum is None: # No explicit spectrum so use specified indices
time, row, column = make_iter(*self._get_time_row_column(time=time, row=row, column=column))
spectra = self.array[time][:, row][:, :, column] # Iterable indices are needed to crop array correctly
else:
if np.ndim(spectrum) != 1: # Keep things simple (for now)
raise ValueError("explicit spectrum must have one dimension, got %s" % np.ndim(spectrum))
spectra = spectrum
# Apply option corrections to spectra
if correct:
spectra_list = spectra.reshape(-1, spectra.shape[-1]) # To shape: (n_spectra, n_wavelengths)
corrected_spectra = np.array([
reinterpolate_spectrum(spectrum, self.original_wavelengths, self.constant_wavelengths)
for spectrum in spectra_list]) # Reinterpolate each spectrum
if self.prefilter_response is not None: # Apply prefilter correction if one was initialised
corrected_spectra /= self.prefilter_response
spectra = corrected_spectra.reshape(*spectra.shape[:-1], -1) # Revert to the original shape
# Apply optional background removal to spectra
if spectrum is None and not background and self.background is not None: # Remove the background
# Calculate transpositions to get the array in the right shape for a quick background subtraction
forward_transpose = list(np.roll(np.arange(spectra.ndim), 1))
backwards_transpose = list(np.roll(np.arange(spectra.ndim), -1))
spectra = np.transpose(spectra, axes=forward_transpose) - self.background[time][:, row][:, :, column]
spectra = np.transpose(spectra, axes=backwards_transpose) # Revert to original shape
return spectra
[docs] def fit_spectrum(self, spectrum, **kwargs):
"""Fits the specified spectrum array
Passes the spectrum argument to the fit method. For easily iterating over a list of spectra.
Parameters
----------
spectrum : ndarray of ndim=1
The explicit spectrum.
**kwargs : dictionary, optional
Extra keyword arguments to pass to fit.
Returns
-------
result : FitResult
Result of the fit.
See Also
--------
fit : General fitting method
"""
return self.fit(spectrum=spectrum, **kwargs)
def _curve_fit(self, model, spectrum, guess, sigma, bounds, x_scale, time=None, row=None, column=None):
"""scipy.optimize.curve_fit wrapper with error handling
Passes a certain set of parameters to the scipy.optimize.curve_fit function and catches some typical
errors, presenting a more specific warning message.
Parameters
----------
model : callable
The model function, f(x, …). It must take the `self.constant_wavelenghts` as the first argument and the
parameters to fit as separate remaining arguments.
spectrum : array_like
The dependent data, with length equal to that of `self.constant_wavelengths`.
guess : array_like, optional
Initial guess for the parameters to fit.
sigma : array_like
Determines the uncertainty in the `spectrum`. Used to weight certain regions of the spectrum.
bounds : 2-tuple of array_like
Lower and upper bounds on each parameter.
x_scale : array_like
Characteristic scale of each parameter.
time : optional, default=None
The time index for error handling.
row : optional, default=None
The row index for error handling.
column : optional, default=None
The column index for error handling.
Returns
-------
fitted_parameters : ndarray, length=n_parameters
The parameters that recreate the model fitted to the spectrum.
success : bool
Whether the fit was successful or an error had to be handled.
See Also
--------
fit : General fitting method
fit_spectrum : Explicit spectrum fitting method
Notes
-----
More details can be found in the documentation for scipy.optimize.curve_fit and scipy.optimize.least_squares.
"""
try: # TODO Investigate if there is a performance gain to setting `check_finite` to False
fitted_parameters = curve_fit(model, self.constant_wavelengths, spectrum,
p0=guess, sigma=sigma, bounds=bounds, x_scale=x_scale)[0]
success = True
except RuntimeError:
success = False
location_text = " at ({}, {}, {})".format(time, row, column) if time is not None else ''
warnings.warn("RuntimeError{}".format(location_text))
fitted_parameters = np.full_like(guess, np.nan)
except ValueError as e:
success = False
if str(e) != "`x0` violates bound constraints.":
raise
else:
location_text = " at ({}, {}, {})".format(time, row, column) if time is not None else ''
warnings.warn("`x0` violates bound constraints{}".format(location_text))
fitted_parameters = np.full_like(guess, np.nan)
except np.linalg.LinAlgError as e:
success = False
if str(e) != "SVD did not converge in Linear Least Squares":
raise
else:
location_text = " at ({}, {}, {})".format(time, row, column) if time is not None else ''
warnings.warn("SVD did not converge in Linear Least Squares{}.".format(location_text))
fitted_parameters = np.full_like(guess, np.nan)
return fitted_parameters, success