import os
import copy
import warnings
import collections
import numpy as np
from pathos.multiprocessing import ProcessPool as Pool
from scipy.optimize import curve_fit
from sklearn.exceptions import NotFittedError
from sklearn.metrics import classification_report
from yaml import load
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
from mcalf.utils.misc import load_parameter, make_iter, update_signature
from mcalf.utils.spec import reinterpolate_spectrum
__all__ = ['ModelBase', 'BASE_PARAMETERS', 'BASE_ATTRIBUTES']
[docs]class ModelBase:
"""Base class for spectral line model fitting.
.. warning::
This class should not be used directly.
Use derived classes instead.
Parameters
----------
${PARAMETERS}
Attributes
----------
${ATTRIBUTES}
"""
default_kwargs = default_modelbase_kwargs = collections.OrderedDict([
('stationary_line_core', None),
('original_wavelengths', None),
('constant_wavelengths', None),
('delta_lambda', 0.05),
('sigma', None),
('prefilter_response', None),
('prefilter_ref_main', None),
('prefilter_ref_wvscl', None),
('output', None),
])
def __init__(self, config=None, **kwargs):
# STAGE 1: Define dictionary of default attribute values
defaults = copy.deepcopy(self.default_modelbase_kwargs)
# STAGE 2: Update defaults with any values specified in a config file
self.config = {} # Dictionary of parameters from the config file
if config is not None: # Process config file if one is specified
with open(config, 'r') as stream: # Load YAML file
self.config = load(stream, Loader=Loader)
# Extract the BaseModel attributes from the config file
class_defaults = {k: self.config[k] for k in self.config.keys() if k in defaults.keys()}
for k in class_defaults.keys():
if k not in ['output']: # Keep string as is (don't pass through `load_parameter`)
class_defaults[k] = load_parameter(class_defaults[k])
self.config.pop(k) # Remove copied parameter
# Update the defaults with the config file
defaults.update(class_defaults)
# STAGE 3: Update defaults with the keyword arguments passed into the class initialisation
# Verify that only expected kwargs are passed
for k in kwargs.keys():
if k not in defaults.keys():
raise TypeError(f"ModelBase() got an unexpected keyword argument '{k}'")
defaults.update(kwargs) # Update the defaults
# Load default values of any parameters that haven't been given yet
if defaults['original_wavelengths'] is None:
raise ValueError("original_wavelengths must be specified")
if defaults['constant_wavelengths'] is None:
defaults['constant_wavelengths'] = np.arange(min(defaults['original_wavelengths']),
(max(defaults['original_wavelengths'])
+ defaults['delta_lambda']),
defaults['delta_lambda'])
# STAGE 4: Set the object attributes (with some type enforcing)
# values in the defaults dict
self.original_wavelengths = np.asarray(defaults['original_wavelengths'], dtype=np.float64)
self.constant_wavelengths = np.asarray(defaults['constant_wavelengths'], dtype=np.float64)
if defaults['stationary_line_core'] is None: # Allow to be None by default
self.__stationary_line_core = None # Override setter
else:
self.stationary_line_core = defaults['stationary_line_core']
self.__delta_lambda = defaults['delta_lambda']
self.sigma = defaults['sigma']
self.prefilter_response = defaults['prefilter_response']
self.__prefilter_ref_main = defaults['prefilter_ref_main']
self.__prefilter_ref_wvscl = defaults['prefilter_ref_wvscl']
self.output = defaults['output']
# attributes whose default value cannot be changed during initialisation
self.neural_network = None # Must be set in a child class initialisation or after initialisation
self.array = None # Array that will hold any loaded spectra
self.background = None # Array holding constant background values for `self.array`
# STAGE 5: Validate the loaded attributes
self._validate_base_attributes()
@property
def stationary_line_core(self):
return self.__stationary_line_core
@stationary_line_core.setter
def stationary_line_core(self, wavelength):
# Verify that stationary_line_core is a float
if not isinstance(wavelength, float):
raise ValueError("stationary_line_core must be a float, got %s" % type(wavelength))
# Stationary wavelength must be within wavelength range
original_diff = self.original_wavelengths - wavelength
constant_diff = self.constant_wavelengths - wavelength
for n, i in [['original_wavelengths', original_diff], ['constant_wavelengths', constant_diff]]:
if min(i) > 1e-6 or max(i) < -1e-6:
raise ValueError("`stationary_line_core` is not within `{}`".format(n))
# Verification passed; set value
self.__stationary_line_core = wavelength
[docs] def _set_prefilter(self):
"""Set the `prefilter_response` parameter.
.. deprecated:: 0.2
Prefilter response correction code, and `prefilter_response`, `prefilter_ref_main`
and `prefilter_ref_wvscl`, may be removed in a later release of MCALF.
Spectra should be fully processed before loading into MCALF.
This method should be called in a child class once `stationary_line_core` has been set.
"""
if self.prefilter_response is None:
if self.__prefilter_ref_main is not None and self.__prefilter_ref_wvscl is not None:
self.prefilter_response = reinterpolate_spectrum(self.__prefilter_ref_main,
self.__prefilter_ref_wvscl + self.stationary_line_core,
self.constant_wavelengths)
else:
return # None of the prefilter attributes are set, so no prefilter to apply.
else: # Make sure it is a numpy array so that division works as expected when doing array operations
self.prefilter_response = np.asarray(self.prefilter_response, dtype=np.float64)
# TODO: Remove this warning and possibly all prefilter code
warnings.warn("Spectra should be fully processed before loading into MCALF. "
"Prefilter response correction code may be removed in a later "
"release.", PendingDeprecationWarning)
# If a prefilter response is given it must be a compatible length
if self.prefilter_response is not None:
if len(self.prefilter_response) != len(self.constant_wavelengths):
raise ValueError("prefilter_response array must be the same length as constant_wavelengths array")
[docs] def _validate_base_attributes(self):
"""Validate some of the object's attributes.
Raises
------
ValueError
To signal that an attribute is not valid.
"""
# Wavelength arrays must be sorted ascending
if np.sum(np.diff(self.original_wavelengths) > 0) < len(self.original_wavelengths) - 1:
raise ValueError("original_wavelengths array must be sorted ascending")
if np.sum(np.diff(self.constant_wavelengths) > 0) < len(self.constant_wavelengths) - 1:
raise ValueError("constant_wavelengths array must be sorted ascending")
# Warn if the constant wavelengths extrapolate the original wavelengths
if min(self.constant_wavelengths) - min(self.original_wavelengths) < -1e-6:
# If lower-bound of constant wavelengths is more than 1e-6 outside of the original wavelengths
warnings.warn("Lower bound of `constant_wavelengths` is outside of `original_wavelengths` range.")
if max(self.constant_wavelengths) - max(self.original_wavelengths) - self.__delta_lambda > 1e-6:
# If upper-bound of constant wavelengths is more than 1e-6 ouside the original wavelengths
warnings.warn("Upper bound of `constant_wavelengths` is outside of `original_wavelengths` range.")
[docs] def _load_data(self, array, names=None, target=None):
"""Load a specified array into the model object.
Load `array` with dimension names `names` into the attribute specified by `target`.
Parameters
----------
array : numpy.ndarray
The array to load.
names : list of str, length=`array.ndim`
List of dimension names for `array`. Valid dimension names depend on `target`.
target : {'array', 'background'}
The attribute to load the `array` into.
See Also
--------
load_array : Load and array of spectra.
load_background : Load an array of spectral backgrounds.
"""
# Validate specified `target`
if target not in ['array', 'background']:
raise ValueError("array target must be either 'array' or 'background', got '%s'" % target)
# Verify `array` has data on more than one spectrum
if (target == 'array' and np.ndim(array) == 1) or (target == 'background' and np.ndim(array) == 0):
raise ValueError("cannot load an array containing one spectrum, use the spectrum directly instead")
# Validate the dimension names given
if names is None:
raise ValueError("dimension names must be specified for each dimension")
if np.ndim(array) != len(np.atleast_1d(names)): # All dimensions must be labelled
raise ValueError("number of dimension names do not match number of columns")
if len(np.atleast_1d(names)) != len(np.unique(np.atleast_1d(names))):
raise ValueError("duplicate dimension names found")
if target == 'array' and 'wavelength' not in names:
raise ValueError("array must contain a wavelength dimension")
# # Pad input array to include all possible input dimensions and
# # then transpose it such that order of dimensions is consistent
# e.g. ['time', 'wavelength'] -> ['time', 'row', 'column', 'wavelength']
# ['wavelength', 'column', 'row'] -> ['time', 'row', 'column', 'wavelength']
# Define the transposition on the input array to get the dimensions in order
# ['time', 'row', 'column'(, 'wavelength')]
n_dims = 4 if target == 'array' else 3 # ndim of (padded) array to load data into
transposition = [None] * n_dims
for i in range(len(names)):
if names[i] == 'time':
transposition[0] = i
elif names[i] == 'row':
transposition[1] = i
elif names[i] == 'column':
transposition[2] = i
elif names[i] == 'wavelength' and target == 'array': # Only `target` 'array' should contain wavelengths
transposition[3] = i
else: # Name given is not recognised
raise ValueError("name '{}' is not a valid dimension name".format(names[i]))
# Define where the new padded dimensions will be placed within the transposition
array_n_dims = np.ndim(array) # Number of dimensions in input array
new_axis_id = array_n_dims # Start labelling new axes after last dimension of input array
for axis in range(n_dims): # Loop through each axis of padded array to be created
if transposition[axis] is None: # Axis hasn't been labelled yet (not part of input array)
transposition[axis] = new_axis_id # Locate it within the transposition definition
new_axis_id += 1
# Add missing dimensions to the end
n_dims_to_add = n_dims - array_n_dims
ordered_array = array
for i in range(n_dims_to_add): # Add required number of dimensions to the end of the input array
ordered_array = ordered_array[..., np.newaxis]
# Reorder the dimensions of the padded array such that they can be later assumed
ordered_array = np.transpose(ordered_array, transposition)
# If spectral array, verify that length of each spectrum is equal to the number of corresponding wavelengths
if target == 'array' and np.shape(ordered_array)[-1] != len(self.original_wavelengths):
raise ValueError("length of wavelength dimension not equal length of original_wavelengths")
# Load the processed array into the specified object parameter
if target == 'array':
self.array = ordered_array
elif target == 'background':
self.background = ordered_array
# If "the other array" has been loaded, warn the user if their dimensions are not compatible
if self.array is not None and self.background is not None and \
np.sum(np.shape(self.array)[:-1] != np.shape(self.background)) != 0:
warnings.simplefilter('always', UserWarning)
warnings.warn("shape of spectrum array indices does not match shape of background array")
[docs] def load_array(self, array, names=None):
"""Load an array of spectra.
Load `array` with dimension names `names` into the `array` parameter of the model object.
Parameters
----------
array : numpy.ndarray, ndim>1
An array containing at least two spectra.
names : list of str, length=`array.ndim`
List of dimension names for `array`. Valid dimension names are 'time', 'row', 'column' and 'wavelength'.
'wavelength' is a required dimension.
See Also
--------
load_background : Load an array of spectral backgrounds.
Examples
--------
Create a basic model:
>>> import mcalf.models
>>> from astropy.io import fits
>>> wavelengths = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0]
>>> model = mcalf.models.ModelBase(original_wavelengths=wavelengths)
Load spectra from a file:
>>> spectra = fits.open('spectra_0000.fits')[0].data # doctest: +SKIP
>>> model.load_array(spectra, names=['wavelength', 'column', 'row']) # doctest: +SKIP
"""
self._load_data(array, names=names, target='array')
[docs] def load_background(self, array, names=None):
"""Load an array of spectral backgrounds.
Load `array` with dimension names `names` into `background` parameter of the model object.
Parameters
----------
array : numpy.ndarray, ndim>0
An array containing at least two backgrounds.
names : list of str, length=`array.ndim`
List of dimension names for `array`. Valid dimension names are 'time', 'row' and 'column'.
See Also
--------
load_array : Load and array of spectra.
Examples
--------
Create a basic model:
>>> import mcalf.models
>>> from astropy.io import fits
>>> wavelengths = [0.0, 10.0, 20.0, 30.0, 40.0, 50.0]
>>> model = mcalf.models.ModelBase(original_wavelengths=wavelengths)
Load background array from a file:
>>> background = fits.open('background_0000.fits')[0].data # doctest: +SKIP
>>> model.load_background(background, names=['column', 'row']) # doctest: +SKIP
"""
self._load_data(array, names=names, target='background')
[docs] def train(self, X, y):
"""Fit the neural network model to spectra matrix X and spectra labels y.
Calls the :meth:`fit` method on the `neural_network` parameter of the model object.
Parameters
----------
X : numpy.ndarray or sparse matrix, shape=(n_spectra, n_wavelengths)
The input spectra.
y : numpy.ndarray, shape= (n_spectra,) or (n_spectra, n_outputs)
The target class labels.
See Also
--------
test : Test how well the neural network has been trained.
"""
self.neural_network.fit(X, y)
[docs] def test(self, X, y):
"""Test the accuracy of the trained neural network.
Prints a table of results showing:
1) the percentage of predictions that equal the target labels;
2) the average classification deviation and standard deviation from the ground truth classification
for each labelled classification;
3) the average classification deviation and standard deviation overall.
If the model object has an output parameter, it will create a CSV file (``output``/neural_network/test.csv)
listing the predictions and ground truth data.
Parameters
----------
X : numpy.ndarray or sparse matrix, shape=(n_spectra, n_wavelengths)
The input spectra.
y : numpy.ndarray, shape= (n_spectra,) or (n_spectra, n_outputs)
The target class labels.
See Also
--------
train : Train the neural network.
"""
# Predict the labels
try:
predictions = self.neural_network.predict(X)
except NotFittedError:
raise NotFittedError("Neural network has not been trained yet. Call 'train' on the model class first.")
if len(predictions) != len(y): # Must be a corresponding ground truth label to compare to
raise ValueError("number of samples X does not equal number of labels y")
# Score is the percentage of predictions that are equal to their ground truth label
score = np.sum(predictions == y) / len(predictions) * 100
# Generate the table
info = "+---------------------------------------------+\n"
info += "| Neural Network Testing Statistics |\n"
info += "+---------------------------------------------+\n"
info += "| Percentage predictions==labels :: {: 6.2f}% |\n".format(score)
info += "+---------------------------------------------+\n"
info += "| Average deviation for each classification |\n"
info += "+---------------------------------------------+\n"
# Generate the average deviation for each classification
classifications = np.sort(np.unique([predictions, y]))
for classification in classifications:
dev = predictions[np.where(y == classification)[0]] - classification
info += "| class {: 2d} :: {: 6.2f} ± {: 4.2f} |\n".format(classification, np.mean(dev),
np.std(dev))
info += "+---------------------------------------------+\n"
# Generate the average deviation over all classifications
dev = predictions - y
info += "| Average deviation overall :: {: 6.2f} ± {: 4.2f} |\n".format(np.mean(dev), np.std(dev))
info += "+---------------------------------------------+"
# Print the table
print(info)
# Print scikit-learn classification report
print(classification_report(y, predictions))
# Output to file columns: 'ground_truth', 'predictions'
if self.output is not None:
try:
os.makedirs(os.path.join(self.output, "neural_network"))
except FileExistsError:
pass
np.savetxt(os.path.join(self.output, "neural_network", "test.csv"), np.asarray([y, predictions]).T,
fmt='%3d', delimiter=',', header='ground_truth, prediction')
[docs] def classify_spectra(self, time=None, row=None, column=None, spectra=None, only_normalise=False):
"""Classify the specified spectra.
Will also normalise each spectrum such that its intensity will range from zero to one.
Parameters
----------
time : int or iterable, optional, default=None
The time index. The index can be either a single integer index or an iterable. E.g. a list,
a :class:`numpy.ndarray`, a Python range, etc. can be used.
row : int or iterable, optional, default=None
The row index. See comment for `time` parameter.
column : int or iterable, optional, default=None
The column index. See comment for `time` parameter.
spectra : numpy.ndarray, optional, default=None
The explicit spectra to classify. If `only_normalise` is False, this must be 1D.
However, if `only_normalise` is set to true, `spectra` can be of any dimension.
It is assumed that the final dimension is wavelengths, so return shape will be the
same as `spectra`, except with no final wavelengths dimension.
only_normalise : bool, optional, default=False
Whether the single spectrum given in `spectra` should not be interpolated and corrected.
If set to true, the only processing applied to `spectra` will be a normalisation to be
in range 0 to 1.
Returns
-------
classifications : numpy.ndarray
Array of classifications with the same time, row and column indices as `spectra`.
See Also
--------
train : Train the neural network.
test : Test the accuracy of the neural network.
get_spectra : Get processed spectra from the objects `array` attribute.
Examples
--------
Create a basic model:
>>> import mcalf.models
>>> import numpy as np
>>> wavelengths = np.linspace(8542.1, 8542.2, 30)
>>> model = mcalf.models.ModelBase(original_wavelengths=wavelengths)
Load a trained neural network:
>>> import pickle
>>> pkl = open('trained_neural_network.pkl', 'rb') # doctest: +SKIP
>>> model.neural_network = pickle.load(pkl) # doctest: +SKIP
Classify an individual spectrum:
>>> spectrum = np.random.rand(30)
>>> model.classify_spectra(spectra=spectrum) # doctest: +SKIP
array([2])
When :code:`only_normalise=True`, classify an n-dimensional spectral array:
>>> spectra = np.random.rand(5, 4, 3, 2, 30)
>>> model.classify_spectra(spectra=spectra, only_normalise=True).shape # doctest: +SKIP
(5, 4, 3, 2)
Load spectra from a file and classify:
>>> from astropy.io import fits
>>> spectra = fits.open('spectra_0000.fits')[0].data # doctest: +SKIP
>>> model.load_array(spectra, names=['wavelength', 'column', 'row']) # doctest: +SKIP
>>> model.classify_spectra(column=range(10, 15), row=[7, 16]) # doctest: +SKIP
array([[[0, 2, 0, 3, 0],
[4, 0, 1, 0, 0]]])
"""
if not only_normalise: # Get the spectrum, otherwise use the provided one directly
spectra = self.get_spectra(time=time, row=row, column=column, spectrum=spectra)
# Vectorised normalisation for all spectra such that intensities for each spectrum is in range [0, 1]
spectra_list = spectra.reshape(-1, spectra.shape[-1]).T # Reshape & transpose to (n_wavelengths, n_spectra)
spectra_list = spectra_list - spectra_list.min(axis=0) # Subtract each spectrum's min value from itself (min 0)
spectra_list = spectra_list / spectra_list.max(axis=0) # Divide each spectrum by its max value (min 0, max 1)
spectra_list = spectra_list.T # Transpose to (n_spectra, n_wavelengths)
# Create the empty classifications array
classifications = np.full(len(spectra_list), -1, dtype=int) # -1 reserved for invalid
valid_spectra_i = np.where(~np.isnan(spectra_list[:, 0])) # Only predict the valid spectra
# Note: Spectra that have indices in the data but were masked or outside the field of view should be np.nan
try:
classifications[valid_spectra_i] = self.neural_network.predict(spectra_list[valid_spectra_i])
except NotFittedError:
raise NotFittedError("Neural network has not been trained yet. Call 'train' on the model class first.")
try: # Try to give the classifications array the same indices as the spectral array
classifications = classifications.reshape(*spectra.shape[:-1])
except TypeError: # 1D spectra array given
assert classifications.size == 1 # Verify
return classifications
[docs] def _get_time_row_column(self, time=None, row=None, column=None):
"""Validate and infer the time, row and column index.
Takes any time, row and column index given and if any are not specified, they are returned as 0 if the
spectral array only has one value at its dimension. If there are multiple and no index is specified,
an error is raised due to the ambiguity.
Parameters
----------
time : optional, default=None
The time index.
row : optional, default=None
The row index.
column : optional, default=None
The column index.
Returns
-------
time
The corrected time index.
row
The corrected row index.
column
The corrected column index.
See Also
--------
mcalf.utils.misc.make_iter : Make a variable iterable.
Notes
-----
No type checking is done on the input indices so it can be anything but in most cases will need to be
either an integer or iterable. The :func:`mcalf.utils.misc.make_iter` function can be used to make
indices iterable.
"""
array_shape = np.shape(self.array)
if time is None:
# No index was specified but we can infer that it would be 0 as there is only 1 value at this dimension
if array_shape[0] == 1:
time = 0
else:
raise ValueError("time index must be specified as multiple indices exist")
if row is None:
if array_shape[1] == 1:
row = 0
else:
raise ValueError("row index must be specified as multiple indices exist")
if column is None:
if array_shape[2] == 1:
column = 0
else:
raise ValueError("column index must be specified as multiple indices exist")
return time, row, column
[docs] def get_spectra(self, time=None, row=None, column=None, spectrum=None, correct=True, background=False):
"""Gets corrected spectra from the spectral array.
Takes either a set of indices or an explicit spectrum and optionally applied corrections and background
removal.
Parameters
----------
time : int or iterable, optional, default=None
The time index. The index can be either a single integer index or an iterable. E.g. a list,
a :class:`numpy.ndarray`, a Python range, etc. can be used.
row : int or iterable, optional, default=None
The row index. See comment for `time` parameter.
column : int or iterable, optional, default=None
The column index. See comment for `time` parameter.
spectrum : ndarray of ndim=1, optional, default=None
The explicit spectrum. If provided, `time`, `row`, and `column` are ignored.
correct : bool, optional, default=True
Whether to reinterpolate the spectrum and apply the prefilter correction (if exists).
background : bool, optional, default=False
Whether to include the background in the outputted spectra. Only removes the background if the
relevant background array has been loaded. Does not remove background is processing an explicit spectrum.
Returns
-------
spectra : ndarray
Examples
--------
Create a basic model:
>>> import mcalf.models
>>> import numpy as np
>>> wavelengths = np.linspace(8541.3, 8542.7, 30)
>>> model = mcalf.models.ModelBase(original_wavelengths=wavelengths)
Provide a single spectrum for processing, and notice output is 1D:
>>> spectrum = model.get_spectra(spectrum=np.random.rand(30))
>>> spectrum.ndim
1
Load an array of spectra:
>>> spectra = np.random.rand(3, 4, 30)
>>> model.load_array(spectra, names=['column', 'row', 'wavelength'])
Extract a single (unprocessed) spectrum from the loaded array, and notice output is 4D:
>>> spectrum = model.get_spectra(row=1, column=0, correct=False)
>>> spectrum.shape
(1, 1, 1, 30)
>>> (spectrum[0, 0, 0] == spectra[0, 1]).all()
True
Extract an array of spectra, and notice output is 4D, and with dimensions
time, row, column, wavelength regardless of the original dimensions and order:
>>> spectrum = model.get_spectra(row=range(4), column=range(3))
>>> spectrum.shape
(1, 4, 3, 30)
Notice that the time index can be excluded, as the loaded array only represents a single time.
However, in this case leaving out `row` or `column` results in an error as it is ambiguous:
>>> spectrum = model.get_spectra(row=range(4))
Traceback (most recent call last):
...
ValueError: column index must be specified as multiple indices exist
"""
# Locate the spectra
if spectrum is None: # No explicit spectrum so use specified indices
time, row, column = make_iter(*self._get_time_row_column(time=time, row=row, column=column))
spectra = self.array[time][:, row][:, :, column] # Iterable indices are needed to crop array correctly
else:
if np.ndim(spectrum) != 1: # Keep things simple (for now)
raise ValueError("explicit spectrum must have one dimension, got %s" % np.ndim(spectrum))
spectra = spectrum
# Apply option corrections to spectra
if correct:
spectra_list = spectra.reshape(-1, spectra.shape[-1]) # To shape: (n_spectra, n_wavelengths)
corrected_spectra = np.array([
reinterpolate_spectrum(spectrum, self.original_wavelengths, self.constant_wavelengths)
for spectrum in spectra_list]) # Reinterpolate each spectrum
if self.prefilter_response is not None: # Apply prefilter correction if one was initialised
corrected_spectra /= self.prefilter_response
spectra = corrected_spectra.reshape(*spectra.shape[:-1], -1) # Revert to the original shape
# Apply optional background removal to spectra
if spectrum is None and not background and self.background is not None: # Remove the background
# Calculate transpositions to get the array in the right shape for a quick background subtraction
forward_transpose = list(np.roll(np.arange(spectra.ndim), 1))
backwards_transpose = list(np.roll(np.arange(spectra.ndim), -1))
spectra = np.transpose(spectra, axes=forward_transpose) - self.background[time][:, row][:, :, column]
spectra = np.transpose(spectra, axes=backwards_transpose) # Revert to original shape
return spectra
[docs] def _fit(self, spectrum, classification=None, spectrum_index=None, **kwargs):
"""Fit a single spectrum for the given profile or classification.
.. warning::
This call signature and docstring specify how the `_fit` method must be implemented in
each subclass of `ModelBase`. **It is not implemented in this class.**
Parameters
----------
spectrum : numpy.ndarray, ndim=1, length=n_constant_wavelengths
The spectrum to be fitted.
classification : int, optional, default=None
Classification to determine the fitted profile to use.
spectrum_index : array_like or list or tuple, length=3, optional, default=None
The [time, row, column] index of the `spectrum` provided. Only used for error reporting.
Returns
-------
result : mcalf.models.FitResult
Outcome of the fit returned in a :class:`mcalf.models.FitResult` object.
See Also
--------
fit : The recommended method for fitting spectra.
mcalf.models.FitResult : The object that the fit method returns.
Notes
-----
This method is called for each requested spectrum by the :meth:`models.ModelBase.fit` method.
This is where most of the adjustments to the fitting method should be made. See other
subclasses of `models.ModelBase` for examples of how to implement this method in a
new subclass. See :meth:`models.ModelBase.fit` for more information on how this method is
called.
"""
raise NotImplementedError("The `_fit` method must be implemented in a subclass of `ModelBase`."
"See `models.ModelBase._fit` for more details.")
[docs] def fit(self, time=None, row=None, column=None, spectrum=None, classifications=None,
background=None, n_pools=None, **kwargs):
"""Fits the model to specified spectra.
Fits the model to an array of spectra using multiprocessing if requested.
Parameters
----------
time : int or iterable, optional, default=None
The time index. The index can be either a single integer index or an iterable. E.g. a list,
:class:`numpy.ndarray`, a Python range, etc. can be used.
row : int or iterable, optional, default=None
The row index. See comment for `time` parameter.
column : int or iterable, optional, default=None
The column index. See comment for `time` parameter.
spectrum : numpy.ndarray, ndim=1, optional, default=None
The explicit spectrum to fit the model to.
classifications : int or array_like, optional, default=None
Classifications to determine the fitted profile to use. Will use neural network to classify them if not.
If a multidimensional array, must have the same shape as [`time`, `row`, `column`].
Dimensions that would have length of 1 can be excluded.
background : float, optional, default=None
If provided, this value will be subtracted from the explicit spectrum provided in `spectrum`. Will
not be applied to spectra found from the indices, use the :meth:`~mcalf.models.ModelBase.load_background`
method instead.
n_pools : int, optional, default=None
The number of processing pools to calculate the fitting over. This allocates the fitting of different
spectra to `n_pools` separate worker processes. When processing a large number of spectra this will make
the fitting process take less time overall. It also distributes such that each worker process has the
same ratio of classifications to process. This should balance out the workload between workers.
If few spectra are being fitted, performance may decrease due to the overhead associated with splitting
the evaluation over separate processes. If `n_pools` is not an integer greater than zero, it will fit
the spectrum with a for loop.
**kwargs
Extra keyword arguments to pass to :meth:`~mcalf.models.ModelBase._fit`.
Returns
-------
result : list of :class:`~mcalf.models.FitResult`, length=n_spectra
Outcome of the fits returned as a list of :class:`~mcalf.models.FitResult` objects.
Examples
--------
Create a basic model:
>>> import mcalf.models
>>> import numpy as np
>>> wavelengths = np.linspace(8541.3, 8542.7, 30)
>>> model = mcalf.models.ModelBase(original_wavelengths=wavelengths)
Set up the neural network classifier:
>>> model.neural_network = ... # load an untrained classifier # doctest: +SKIP
>>> model.train(...) # doctest: +SKIP
>>> model.test(...) # doctest: +SKIP
Load the spectra and background array:
>>> model.load_array(...) # doctest: +SKIP
>>> model.load_background(...) # doctest: +SKIP
Fit a subset of the loaded spectra, using 5 processing pools:
>>> fits = model.fit(row=range(3, 5), column=range(200), n_pools=5) # doctest: +SKIP
>>> fits # doctest: +SKIP
['Successful FitResult with ________ profile of classification 0',
'Successful FitResult with ________ profile of classification 2',
...
'Successful FitResult with ________ profile of classification 0',
'Successful FitResult with ________ profile of classification 4']
Merge the fit results into a :class:`~mcalf.models.FitResults` object:
>>> results = mcalf.models.FitResults((500, 500), 8)
>>> for fit in fits: # doctest: +SKIP
... results.append(fit) # doctest: +SKIP
See :meth:`fit_spectrum` examples for how to manually providing a `spectrum` to fit.
"""
# Only include the background is using an explicit spectrum; remove for indices
include_background = False if spectrum is None else True
spectra = self.get_spectra(time=time, row=row, column=column, spectrum=spectrum, background=include_background)
# At least one valid spectrum must be given
n_valid = np.sum(~np.isnan(spectra[..., 0]))
if np.sum(~np.isnan(spectra[..., 0])) == 0:
raise ValueError("no valid spectra given")
if spectrum is not None and background is not None: # remove explicit background of explicit spectrum
spectra -= background
if classifications is None: # Classify if not already specified
classifications = self.classify_spectra(spectra=spectra, only_normalise=True)
if spectrum is None: # No explicit spectrum given
# Create the array of indices that the spectra represent
time, row, column = make_iter(*self._get_time_row_column(time=time, row=row, column=column))
indices = np.transpose(np.array(np.meshgrid(time, row, column, indexing='ij')), axes=[1, 2, 3, 0])
# Ensure that length of arrays match
spectra_indices_shape_mismatch = spectra.shape[:-1] != indices.shape[:-1]
spectra_class_size_mismatch = np.size(spectra[..., 0]) != np.size(classifications)
spectra_class_shape_mismatch = False # Only test this if appropriate
if isinstance(classifications, np.ndarray) and classifications.ndim != 1:
# If a multidimensional array of classifications are given, make sure it matches the indices layout
# Allow for dimensions of length 1 to be excluded
if np.squeeze(spectra[..., 0]).shape != np.squeeze(classifications).shape:
spectra_class_shape_mismatch = True
if spectra_indices_shape_mismatch or spectra_class_size_mismatch or spectra_class_shape_mismatch:
raise ValueError("number classifications do not match number of spectra and associated indices")
# Make shape (n_spectra, n_features) so can process in a list
spectra = spectra.reshape(-1, spectra.shape[-1])
indices = indices.reshape(-1, indices.shape[-1])
classifications = np.asarray(classifications) # Make sure ndarray methods are available
classifications = classifications.reshape(-1)
# Remove spectra that are invalid (this allows for masking of the loaded data to constrain a region to fit)
valid_spectra_i = np.where(~np.isnan(spectra[:, 0])) # Where the first item of the spectrum is not NaN
spectra = spectra[valid_spectra_i]
indices = indices[valid_spectra_i]
classifications = classifications[valid_spectra_i]
assert len(spectra) == len(indices) == len(classifications) # Postprocessing sanity check
# Multiprocessing not required
if n_pools is None or (isinstance(n_pools, (int, np.integer)) and n_pools <= 0):
print("Processing {} spectra".format(n_valid))
results = [
self._fit(spectra[i], classification=classifications[i], spectrum_index=indices[i], **kwargs)
for i in range(len(spectra))
]
elif isinstance(n_pools, (int, np.integer)) and n_pools >= 1: # Use multiprocessing
# Define single argument function that can be evaluated in the pools
def func(data, kwargs=kwargs):
# Extract data and pass to `_fit` method
spectrum, index, classification = data # pragma: no cover
return self._fit(spectrum, classification=classification,
spectrum_index=list(index), **kwargs) # pragma: no cover
# Sort the arrays in descending classification order
s = np.argsort(classifications)[::-1] # Classifications indices sorted in descending order
spectra = spectra[s]
indices = indices[s]
classifications = classifications[s]
# Distribute the sorted spectra to each of `n_pools` in turn to ensure even workload
data = []
for pool in range(n_pools):
data += [[spectra[i], indices[i], classifications[i]] for i in range(pool, len(spectra), n_pools)]
print("Processing {} spectra over {} pools".format(n_valid, n_pools))
with Pool(n_pools) as p: # Send the job to the `n_pools` pools
results = p.map(func, data) # Map each element of `data` to the first argument of `func`
p.close() # Finished, so no more jobs to come
p.join() # Clean up the closed pools
p.clear() # Remove the server so it can be created again
else:
raise TypeError("n_pools must be an integer, got %s" % type(n_pools))
else: # Explicit spectrum must be 1D so no loop needed
results = self._fit(spectra, classification=classifications, spectrum_index=None, **kwargs)
return results
[docs] def fit_spectrum(self, spectrum, **kwargs):
"""Fits the specified spectrum array.
Passes the spectrum argument to the :meth:`~mcalf.models.ModelBase.fit` method.
For easily iterating over a list of spectra.
Parameters
----------
spectrum : numpy.ndarray, ndim=1
The explicit spectrum.
**kwargs
Extra keyword arguments to pass to :meth:`~mcalf.models.ModelBase.fit`.
Returns
-------
result : :class:`~mcalf.models.FitResult`
Result of the fit.
See Also
--------
fit : General fitting method.
Examples
--------
Create a basic model:
>>> import mcalf.models
>>> import numpy as np
>>> wavelengths = np.linspace(8541.3, 8542.7, 30)
>>> model = mcalf.models.ModelBase(original_wavelengths=wavelengths)
**Quickly provide a spectrum and fit it.** Remember that the model must be optimised for
the spectra that it is asked to fit. In this example the neural network is not
called upon to classify the provided spectrum as a classification is provided directly:
>>> spectrum = np.random.rand(30)
>>> model.fit_spectrum(spectrum, classifications=0, background=142.2) # doctest: +SKIP
Successful FitResult with ________ profile of classification 0
As the spectrum is provided manually, any background value must also be provided manually.
Alternatively, the background can be subtracted before passing to the function, as by
default, no background is subtracted:
>>> model.fit_spectrum(spectrum - 142.2, classifications=0) # doctest: +SKIP
Successful FitResult with ________ profile of classification 0
"""
return self.fit(spectrum=spectrum, **kwargs)
[docs] def _curve_fit(self, model, spectrum, guess, sigma, bounds, x_scale, time=None, row=None, column=None, **kwargs):
""":func:`scipy.optimize.curve_fit` wrapper with error handling.
Passes a certain set of parameters to the :func:`scipy.optimize.curve_fit` function and catches some typical
errors, presenting a more specific warning message.
Parameters
----------
model : callable
The model function, f(x, …). It must take the `ModelBase.constant_wavelenghts` attribute
as the first argument and the parameters to fit as separate remaining arguments.
spectrum : array_like
The dependent data, with length equal to that of the `ModelBase.constant_wavelengths` attribute.
guess : array_like, optional
Initial guess for the parameters to fit.
sigma : array_like
Determines the uncertainty in the `spectrum`. Used to weight certain regions of the spectrum.
bounds : 2-tuple of array_like
Lower and upper bounds on each parameter.
x_scale : array_like
Characteristic scale of each parameter.
time : optional, default=None
The time index for error handling.
row : optional, default=None
The row index for error handling.
column : optional, default=None
The column index for error handling.
Returns
-------
fitted_parameters : numpy.ndarray, length=n_parameters
The parameters that recreate the model fitted to the spectrum.
success : bool
Whether the fit was successful or an error had to be handled.
See Also
--------
fit : General fitting method.
fit_spectrum : Explicit spectrum fitting method.
Notes
-----
More details can be found in the documentation for :func:`scipy.optimize.curve_fit`
and :func:`scipy.optimize.least_squares`.
"""
try: # TODO Investigate if there is a performance gain to setting `check_finite` to False
fitted_parameters = curve_fit(model, self.constant_wavelengths, spectrum,
p0=guess, sigma=sigma, bounds=bounds, x_scale=x_scale, **kwargs)[0]
success = True
except RuntimeError as e:
success = False
location_text = " at ({}, {}, {})".format(time, row, column) if time is not None else ''
warnings.warn(f"RuntimeError({e}){location_text}")
fitted_parameters = np.full_like(guess, np.nan)
except ValueError as e:
success = False
if str(e) != "`x0` violates bound constraints.":
raise
else:
location_text = " at ({}, {}, {})".format(time, row, column) if time is not None else ''
warnings.warn("`x0` violates bound constraints{}".format(location_text))
fitted_parameters = np.full_like(guess, np.nan)
except np.linalg.LinAlgError as e:
success = False
if str(e) != "SVD did not converge in Linear Least Squares":
raise
else:
location_text = " at ({}, {}, {})".format(time, row, column) if time is not None else ''
warnings.warn("SVD did not converge in Linear Least Squares{}.".format(location_text))
fitted_parameters = np.full_like(guess, np.nan)
return fitted_parameters, success
# Define each parameter and attribute in an ordered dictionary so definitions can be passed to child objects
DOCS = collections.OrderedDict()
DOCS['original_wavelengths'] = """
original_wavelengths : array_like
One-dimensional array of wavelengths that correspond to the uncorrected spectral data."""
DOCS['stationary_line_core'] = """
stationary_line_core : float, optional, default=None
Wavelength of the stationary line core."""
DOCS['neural_network'] = """
neural_network : optional, default=None
The neural network classifier object that is used to classify spectra. This attribute should be set by a
child class of :class:`~mcalf.models.ModelBase`."""
DOCS['constant_wavelengths'] = """
constant_wavelengths : array_like, ndim=1, optional, default= see description
The desired set of wavelengths that the spectral data should be rescaled to represent. It is assumed
that these have constant spacing, but that may not be a requirement if you specify your own array.
The default value is an array from the minimum to the maximum wavelength of `original_wavelengths` in
constant steps of `delta_lambda`, overshooting the upper bound if the maximum wavelength has not been
reached."""
DOCS['delta_lambda'] = """
delta_lambda : float, optional, default=0.05
The step used between each value of `constant_wavelengths` when its default value has to be calculated."""
DOCS['sigma'] = """
sigma : optional, default=None
Sigma values used to weight the fit. This attribute should be set by a child class of
:class:`~mcalf.models.ModelBase`."""
DOCS['prefilter_response'] = """
prefilter_response : array_like, length=n_wavelengths, optional, default= see note
Each constant wavelength scaled spectrum will be corrected by dividing it by this array. If `prefilter_response`
is not given, and `prefilter_ref_main` and `prefilter_ref_wvscl` are not given, `prefilter_response` will have a
default value of `None`."""
DOCS['prefilter_ref_main'] = """
prefilter_ref_main : array_like, optional, default= None
If `prefilter_response` is not specified, this will be used along with `prefilter_ref_wvscl` to generate the
default value of `prefilter_response`."""
DOCS['prefilter_ref_wvscl'] = """
prefilter_ref_wvscl : array_like, optional, default=None
If `prefilter_response` is not specified, this will be used along with `prefilter_ref_main` to generate the
default value of `prefilter_response`."""
DOCS['config'] = """
config : str, optional, default=None
Filename of a `.yml` file (relative to current directory) containing the initialising parameters for this
object. Parameters provided explicitly to the object upon initialisation will override any provided in this
file. All (or some) parameters that this object accepts can be specified in this file, except `neural_network`
and `config`. Each line of the file should specify a different parameter and be formatted like
`emission_guess: '[-inf, wl-0.15, 1e-6, 1e-6]'` or `original_wavelengths: 'original.fits'` for example.
When specifying a string, use 'inf' to represent `np.inf` and 'wl' to represent `stationary_line_core` as shown.
If the string matches a file, :func:`mcalf.utils.misc.load_parameter()` is used to load the contents
of the file."""
DOCS['output'] = """
output : str, optional, default=None
If the program wants to output data, it will place it relative to the location specified by this parameter.
Some methods will only save data to a file if this parameter is not `None`. Such cases will be documented
where relevant."""
DOCS['array'] = """
array: numpy.ndarray, dimensions are ['time', 'row', 'column', 'spectra']
Array holding spectra."""
DOCS['background'] = """
background: numpy.ndarray, dimensions are ['time', 'row', 'column']
Array holding spectral backgrounds."""
# Form the parameter list
BASE_PARAMETERS = copy.deepcopy(DOCS)
for k in [ # Remove some entries
'neural_network',
'array',
'background',
]:
del BASE_PARAMETERS[k]
BASE_PARAMETERS_STR = ''.join(BASE_PARAMETERS[i] for i in BASE_PARAMETERS)
# Form the attribute list
BASE_ATTRIBUTES = copy.deepcopy(DOCS)
for k in [ # Remove some entries
'delta_lambda',
'prefilter_ref_main',
'prefilter_ref_wvscl',
'config',
]:
del BASE_ATTRIBUTES[k]
BASE_ATTRIBUTES_STR = ''.join(BASE_ATTRIBUTES[i] for i in BASE_ATTRIBUTES)
# Update the docstring with the generated strings
ModelBase.__doc__ = ModelBase.__doc__.replace(
'${PARAMETERS}',
BASE_PARAMETERS_STR.lstrip()
)
ModelBase.__doc__ = ModelBase.__doc__.replace(
'${ATTRIBUTES}',
BASE_ATTRIBUTES_STR.lstrip()
)
update_signature(ModelBase)