Source code for mcalf.models.ibis

import warnings

import numpy as np
from pathos.multiprocessing import ProcessPool as Pool
from sklearn.exceptions import NotFittedError
from sklearn.model_selection import GridSearchCV
from sklearn.neural_network import MLPClassifier

from yaml import load, dump
try:
    from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
    from yaml import Loader, Dumper

from mcalf.models.results import FitResult
from mcalf.models.base import ModelBase
from mcalf.profiles.voigt import voigt_nobg, double_voigt_nobg
from mcalf.utils.spec import reinterpolate_spectrum, generate_sigma
from mcalf.utils.misc import load_parameter, make_iter
from mcalf.visualisation.spec import plot_ibis8542


__all__ = ['IBIS8542Model']


[docs]class IBIS8542Model(ModelBase): """Class for working with IBIS 8542 Å calcium II spectral imaging observations Parameters ---------- original_wavelengths : array_like One-dimensional array of wavelengths that correspond to the uncorrected spectral data. stationary_line_core : float, optional, default = 8542.104320687517 Wavelength of the stationary line core. absorption_guess : array_like, length=4, optional, default = [-1000, stationary_line_core, 0.2, 0.1] Initial guess to take when fitting the absorption Voigt profile. emission_guess : array_like, length=4, optional, default = [1000, stationary_line_core, 0.2, 0.1] Initial guess to take when fitting the emission Voigt profile. absorption_min_bound : array_like, length=4, optional, default = [-np.inf, stationary_line_core-0.15, 1e-6, 1e-6] Minimum bounds for all the absorption Voigt profile parameters in order of the function's arguments. emission_min_bound : array_like, length=4, optional, default = [0, stationary_line_core-0.15, 1e-6, 1e-6] Minimum bounds for all the emission Voigt profile parameters in order of the function's arguments. absorption_max_bound : array_like, length=4, optional, default = [0, stationary_line_core+0.15, 1, 1] Maximum bounds for all the absorption Voigt profile parameters in order of the function's arguments. emission_max_bound : array_like, length=4, optional, default = [np.inf, stationary_line_core+0.15, 1, 1] Maximum bounds for all the emission Voigt profile parameters in order of the function's arguments. absorption_x_scale : array_like, length=4, optional, default = [1500, 0.2, 0.3, 0.5] Characteristic scale for all the absorption Voigt profile parameters in order of the function's arguments. emission_x_scale : array_like, length=4, optional, default = [1500, 0.2, 0.3, 0.5] Characteristic scale for all the emission Voigt profile parameters in order of the function's arguments. neural_network : sklearn.neural_network.MLPClassifier, optional, default = see description The MLPClassifier object that will be used to classify the spectra. Its default value is `MLPClassifier(solver='lbfgs', alpha=1e-3, hidden_layer_sizes=(10, 4), random_state=1)`. constant_wavelengths : array_like, length same as `original_wavelengths`, optional, default = see description The desired set of wavelengths that the spectral data should be rescaled to represent. It is assumed that these have constant spacing, but that may not be a requirement if you specify your own array. The default value is an array from the minimum to the maximum wavelength of `original_wavelengths` in constant steps of `delta_lambda`, overshooting the upper bound if the maximum wavelength has not been reached. delta_lambda : float, optional, default = 0.05 The step used between each value of `constant_wavelengths` when its default value has to be calculated. sigma : list of array_like or bool, length=(2, n_wavelengths), optional, default = [type1, type2] A list of different sigma that are used to weight particular wavelengths along the spectra when fitting. The fitting method will expect to be able to choose a sigma array from this list at a specific index. It's default value is `[generate_sigma(i, constant_wavelengths, stationary_line_core) for i in [1, 2]]`. See `utils.generate_sigma()` for more information. If bool, True will generate the default sigma value regardless of the value specified in `config`, and False will set `sigma` to be all ones, effectively disabling it. prefilter_response : array_like, length=n_wavelengths, optional, default = see note Each constant wavelength scaled spectrum will be corrected by dividing it by this array. If `prefilter_response` is not give, and `prefilter_ref_main` and `prefilter_ref_wvscl` are not given, `prefilter_response` will have a default value of `None`. prefilter_ref_main : array_like, optional, default = None If `prefilter_response` is not specified, this will be used along with `prefilter_ref_wvscl` to generate the default value of `prefilter_response`. prefilter_ref_wvscl : array_like, optional, default = None If `prefilter_response` is not specified, this will be used along with `prefilter_ref_main` to generate the default value of `prefilter_response`. config : str, optional, default = None Filename of a `.yml` file (relative to current directory) containing the initialising parameters for this object. Parameters provided explicitly to the object upon initialisation will override any provided in this file. All (or some) parameters that this object accepts can be specified in this file, except `neural_network` and `config`. Each line of the file should specify a different parameter and be formatted like `emission_guess: '[-inf, wl-0.15, 1e-6, 1e-6]'` or `original_wavelengths: 'original.fits'` for example. When specifying a string, use 'inf' to represent `np.inf` and 'wl' to represent `stationary_line_core` as shown. If the string matches a file, `utils.load_parameter()` is used to load the contents of the file. output : str, optional, default = None If the program wants to output data, it will place it relative to the location specified by this parameter. Some methods will only save data to a file if this parameter is not `None`. Such cases will be documented where relevant. Attributes ---------- original_wavelengths : array_like One-dimensional array of wavelengths that correspond to the uncorrected spectral data. stationary_line_core : float, optional, default = 8542.099145376844 Wavelength of the stationary line core. absorption_guess : array_like, length=4, optional, default = [-1000, stationary_line_core, 0.2, 0.1] Initial guess to take when fitting the absorption Voigt profile. emission_guess : array_like, length=4, optional, default = [1000, stationary_line_core, 0.2, 0.1] Initial guess to take when fitting the emission Voigt profile. absorption_min_bound : array_like, length=4, optional, default = [-np.inf, stationary_line_core-0.15, 1e-6, 1e-6] Minimum bounds for all the absorption Voigt profile parameters in order of the function's arguments. emission_min_bound : array_like, length=4, optional, default = [0, -np.inf, 1e-6, 1e-6] Minimum bounds for all the emission Voigt profile parameters in order of the function's arguments. absorption_max_bound : array_like, length=4, optional, default = [0, stationary_line_core+0.15, 1, 1] Maximum bounds for all the absorption Voigt profile parameters in order of the function's arguments. emission_max_bound : array_like, length=4, optional, default = [np.inf, np.inf, 1, 1] Maximum bounds for all the emission Voigt profile parameters in order of the function's arguments. absorption_x_scale : array_like, length=4, optional, default = [1500, 0.2, 0.3, 0.5] Characteristic scale for all the absorption Voigt profile parameters in order of the function's arguments. emission_x_scale : array_like, length=4, optional, default = [1500, 0.2, 0.3, 0.5] Characteristic scale for all the emission Voigt profile parameters in order of the function's arguments. neural_network : sklearn.neural_network.MLPClassifier, optional, default = see description The MLPClassifier object (or similar) that will be used to classify the spectra. Defaults to a `GridSearchCV` with `MLPClassifier(solver='lbfgs', hidden_layer_sizes=(40,), max_iter=1000)` for best `alpha` selected from `[1e-5, 2e-5, 3e-5, 4e-5, 5e-5, 6e-5, 7e-5, 8e-5, 9e-5]`. constant_wavelengths : array_like, length same as `original_wavelengths`, optional, default = see description The desired set of wavelengths that the spectral data should be rescaled to represent. It is assumed that these have constant spacing, but that may not be a requirement if you specify your own array. The default value is an array from the minimum to the maximum wavelength of `original_wavelengths` in constant steps of `delta_lambda`, overshooting the upper bound if the maximum wavelength has not been reached. sigma : list of array_like, length=(2, n_wavelengths), optional, default = [type1, type2] A list of different sigma that are used to weight particular wavelengths along the spectra when fitting. The fitting method will expect to be able to choose a sigma array from this list at a specific index. It's default value is `[generate_sigma(i, constant_wavelengths, stationary_line_core) for i in [1, 2]]`. See `utils.generate_sigma()` for more information. prefilter_response : array_like, length=n_wavelengths, optional, default = see note Each constant wavelength scaled spectrum will be corrected by dividing it by this array. If `prefilter_response` is not give, and `prefilter_ref_main` and `prefilter_ref_wvscl` are not given, `prefilter_response` will have a default value of `None`. output : str, optional, default = None If the program wants to output data, it will place it relative to the location specified by this parameter. Some methods will only save data to a file if this parameter is not `None`. Such cases will be documented where relevant. quiescent_wavelength : int, default = 1 The index within the fitted parameters of the absorption Voigt line core wavelength. active_wavelength : int, default = 5 The index within the fitted parameters of the emission Voigt line core wavelength. """ def __init__(self, stationary_line_core=None, absorption_guess=None, emission_guess=None, absorption_min_bound=None, emission_min_bound=None, absorption_max_bound=None, emission_max_bound=None, absorption_x_scale=None, emission_x_scale=None, neural_network=None, original_wavelengths=None, constant_wavelengths=None, delta_lambda=None, sigma=None, prefilter_response=None, prefilter_ref_main=None, prefilter_ref_wvscl=None, config=None, output=None): super().__init__() # Initialise the parent class `ModelBase` also if config is not None: # Process config file if one is specified with open(config, 'r') as stream: # Load YAML file parameters = load(stream, Loader=Loader) # Load each parameter if it exists in the file and is not already given if 'stationary_line_core' in parameters and stationary_line_core is None: stationary_line_core = load_parameter(parameters['stationary_line_core']) if 'absorption_guess' in parameters and absorption_guess is None: absorption_guess = load_parameter(parameters['absorption_guess'], wl=stationary_line_core) if 'emission_guess' in parameters and emission_guess is None: emission_guess = load_parameter(parameters['emission_guess'], wl=stationary_line_core) if 'absorption_min_bound' in parameters and absorption_min_bound is None: absorption_min_bound = load_parameter(parameters['absorption_min_bound'], wl=stationary_line_core) if 'emission_min_bound' in parameters and emission_min_bound is None: emission_min_bound = load_parameter(parameters['emission_min_bound'], wl=stationary_line_core) if 'absorption_max_bound' in parameters and absorption_max_bound is None: absorption_max_bound = load_parameter(parameters['absorption_max_bound'], wl=stationary_line_core) if 'emission_max_bound' in parameters and emission_max_bound is None: emission_max_bound = load_parameter(parameters['emission_max_bound'], wl=stationary_line_core) if 'absorption_x_scale' in parameters and absorption_x_scale is None: absorption_x_scale = load_parameter(parameters['absorption_x_scale']) if 'emission_x_scale' in parameters and emission_x_scale is None: emission_x_scale = load_parameter(parameters['emission_x_scale']) if 'original_wavelengths' in parameters and original_wavelengths is None: original_wavelengths = load_parameter(parameters['original_wavelengths']) if 'constant_wavelengths' in parameters and constant_wavelengths is None: constant_wavelengths = load_parameter(parameters['constant_wavelengths']) if 'delta_lambda' in parameters and delta_lambda is None: delta_lambda = load_parameter(parameters['delta_lambda']) if 'sigma' in parameters and sigma is None: sigma = load_parameter(parameters['sigma']) if 'prefilter_response' in parameters and prefilter_response is None: prefilter_response = load_parameter(parameters['prefilter_response']) if 'prefilter_ref_main' in parameters and prefilter_ref_main is None: prefilter_ref_main = load_parameter(parameters['prefilter_ref_main']) if 'prefilter_ref_wvscl' in parameters and prefilter_ref_wvscl is None: prefilter_ref_wvscl = load_parameter(parameters['prefilter_ref_wvscl']) if 'output' in parameters and output is None: output = parameters['output'] # Load default values of any parameters that haven't been given yet if stationary_line_core is None: stationary_line_core = 8542.099145376844 if absorption_guess is None: absorption_guess = [-1000, stationary_line_core, 0.2, 0.1] if emission_guess is None: emission_guess = [1000, stationary_line_core, 0.2, 0.1] if absorption_min_bound is None: absorption_min_bound = [-np.inf, stationary_line_core-0.15, 1e-6, 1e-6] if emission_min_bound is None: emission_min_bound = [0, -np.inf, 1e-6, 1e-6] if absorption_max_bound is None: absorption_max_bound = [0, stationary_line_core+0.15, 1, 1] if emission_max_bound is None: emission_max_bound = [np.inf, np.inf, 1, 1] if absorption_x_scale is None: absorption_x_scale = [1500, 0.2, 0.3, 0.5] if emission_x_scale is None: emission_x_scale = [1500, 0.2, 0.3, 0.5] if neural_network is None: mlp = MLPClassifier(solver='lbfgs', hidden_layer_sizes=(40,), max_iter=1000) parameter_space = {'alpha': [1e-5, 2e-5, 3e-5, 4e-5, 5e-5, 6e-5, 7e-5, 8e-5, 9e-5]} # Search region neural_network = GridSearchCV(mlp, parameter_space, cv=5, n_jobs=-1) # Set GridSearchCV to find best alpha if original_wavelengths is None: raise ValueError("original_wavelengths must be specified") if delta_lambda is None: delta_lambda = 0.05 if constant_wavelengths is None: constant_wavelengths = np.arange(min(original_wavelengths), max(original_wavelengths)+delta_lambda, delta_lambda) if prefilter_response is None: if prefilter_ref_main is not None and prefilter_ref_wvscl is not None: prefilter_response = reinterpolate_spectrum(prefilter_ref_main, prefilter_ref_wvscl + stationary_line_core, constant_wavelengths) else: warnings.warn("prefilter_response will not be applied to spectra") else: # Make sure it is a numpy array so that division works as expected when doing array operations prefilter_response = np.asarray(prefilter_response, dtype=np.float64) # Set the object attributes (with some type enforcing) self.stationary_line_core = stationary_line_core self.absorption_guess = list(absorption_guess) self.emission_guess = list(emission_guess) self.absorption_min_bound = list(absorption_min_bound) self.emission_min_bound = list(emission_min_bound) self.absorption_max_bound = list(absorption_max_bound) self.emission_max_bound = list(emission_max_bound) self.absorption_x_scale = list(absorption_x_scale) self.emission_x_scale = list(emission_x_scale) self.neural_network = neural_network self.original_wavelengths = np.asarray(original_wavelengths, dtype=np.float64) self.constant_wavelengths = np.asarray(constant_wavelengths, dtype=np.float64) self.prefilter_response = prefilter_response self.output = output self.__delta_lambda = delta_lambda # Index of wavelength in the fitted_parameters self.quiescent_wavelength = 1 self.active_wavelength = 5 # Run some checks to make sure the specified parameters are valid self._validate_parameters() # Generate default sigma profiles for weighting fit (validate dependent parameters first) if sigma is None or (isinstance(sigma, bool) and sigma): sigma = [generate_sigma(i, self.constant_wavelengths, self.stationary_line_core) for i in [1, 2]] elif isinstance(sigma, bool) and not sigma: sigma = [np.ones(len(constant_wavelengths)) for i in [1, 2]] self.sigma = sigma # Set the sigma parameter def _validate_parameters(self): """Validate some of the object's parameters Raises ------ ValueError To signal that a parameter is not valid. """ # Stationary line core must be a float if not isinstance(self.stationary_line_core, float): raise ValueError("stationary_line_core must be a float, got %s" % type(self.stationary_line_core)) # Arrays must be of length 4 if len(self.absorption_guess) != 4: raise ValueError("absorption_guess should be an array of length 4, got %s" % len(self.absorption_guess)) if len(self.emission_guess) != 4: raise ValueError("emission_guess should be an array of length 4, got %s" % len(self.emission_guess)) if len(self.absorption_min_bound) != 4: raise ValueError("absorption_min_bound should be an array of length 4, got %s" % len(self.absorption_min_bound)) if len(self.emission_min_bound) != 4: raise ValueError("emission_min_bound should be an array of length 4, got %s" % len(self.emission_min_bound)) if len(self.absorption_max_bound) != 4: raise ValueError("absorption_max_bound should be an array of length 4, got %s" % len(self.absorption_max_bound)) if len(self.emission_max_bound) != 4: raise ValueError("emission_max_bound should be an array of length 4, got %s" % len(self.emission_max_bound)) if len(self.absorption_x_scale) != 4: raise ValueError("absorption_x_scale should be an array of length 4, got %s" % len(self.absorption_x_scale)) if len(self.emission_x_scale) != 4: raise ValueError("emission_x_scale should be an array of length 4, got %s" % len(self.emission_x_scale)) # Absorption and emission fits must be constrained with the correct amplitude sign if self.absorption_guess[0] > 0: raise ValueError("absorption_guess amplitude should be negative, got %s" % self.absorption_guess[0]) if self.emission_guess[0] < 0: raise ValueError("emission_guess amplitude should be positive, got %s" % self.emission_guess[0]) if self.emission_min_bound[0] < 0: raise ValueError("emission_min_bound amplitude should be positive, got %s" % self.emission_min_bound[0]) if self.absorption_max_bound[0] > 0: raise ValueError("absorption_max_bound amplitude should be negative, got %s" % self.absorption_max_bound[0]) # Minimum bounds must be less that corresponding maximum bounds if np.sum(np.asarray(self.absorption_max_bound) - np.asarray(self.absorption_min_bound) > 0) != \ len(self.absorption_max_bound): raise ValueError("values of absorption_max_bound must be greater than their " "corresponding values in absorption_min_bound") if np.sum(np.asarray(self.emission_max_bound) - np.asarray(self.emission_min_bound) > 0) != \ len(self.emission_max_bound): raise ValueError("values of emission_max_bound must be greater than their " "corresponding values in emission_min_bound") # Wavelength arrays must be sorted ascending if np.sum(np.diff(self.original_wavelengths) > 0) < len(self.original_wavelengths) - 1: raise ValueError("original_wavelength array must be sorted ascending") if np.sum(np.diff(self.constant_wavelengths) > 0) < len(self.constant_wavelengths) - 1: raise ValueError("constant_wavelength array must be sorted ascending") # Warn if the constant wavelengths extrapolate the original wavelengths if min(self.constant_wavelengths) - min(self.original_wavelengths) < -1e-6: # If lower-bound of constant wavelengths is more than 1e-6 outside of the original wavelengths warnings.warn("Lower bound of `constant_wavelengths` is outside of `original_wavelengths` range.") if max(self.constant_wavelengths) - max(self.original_wavelengths) - self.__delta_lambda > 1e-6: # If upper-bound of constant wavelengths is more than 1e-6 ouside the original wavelengths warnings.warn("Upper bound of `constant_wavelengths` is outside of `original_wavelengths` range.") # Stationary wavelength must be within wavelength range original_diff = self.original_wavelengths - self.stationary_line_core constant_diff = self.constant_wavelengths - self.stationary_line_core for n, i in [['original_wavelengths', original_diff], ['constant_wavelengths', constant_diff]]: if min(i) > 1e-6 or max(i) < -1e-6: raise ValueError("`stationary_line_core` is not within `{}`".format(n)) # If a prefilter response is given it must be a compatible length if self.prefilter_response is not None: if len(self.prefilter_response) != len(self.constant_wavelengths): raise ValueError("prefilter_response array must be the same length as constant_wavelengths array") def _get_sigma(self, classification=None, sigma=None): """Infer a sigma profile from the parameters provided If no sigma is provided, use classification to take from the model object's sigma attribute. If sigma is provided and is an integer, take from the model object's sigma attribute at that index, otherwise, return sigma as a NumPy array. Parameters ---------- classification : int, optional, default = None Classification sigma profile needed to fit. sigma : int or array_like, optional, default = None Explicit sigma index or profile. Returns ------- sigma : ndarray, length = n_constant_wavelengths The sigma profile. See Also -------- utils.generate_sigma : Generate a specified sigma profile """ if sigma is None: # Decide how to weight the wavelengths if classification == 0: return self.sigma[0] else: return self.sigma[1] else: if isinstance(sigma, int): return self.sigma[sigma] else: return np.asarray(sigma, dtype=np.float64)
[docs] def classify_spectra(self, time=None, row=None, column=None, spectra=None, only_normalise=False): """Classify the specified spectra Will also normalise each spectrum such that its intensity will range from zero to one. Parameters ---------- time : int or iterable, optional, default=None The time index. The index can be either a single integer index or an iterable. E.g. a list, a NumPy array, a Python range, etc. can be used. row : int or iterable, optional, default=None The row index. See comment for `time` parameter. column : int or iterable, optional, default=None The column index. See comment for `time` parameter. spectra : ndarray, optional, default=None The explicit spectra to classify. If `only_normalise` is False, this must be 1D. only_normalise : bool, optional, default = False Whether the single spectrum given in `spectra` should not be interpolated and corrected. Returns ------- classifications : ndarray Array of classifications with the same time, row and column indices as `spectra`. See Also -------- train : Train the neural network test : Test the accuracy of the neural network get_spectra : Get processed spectra from the objects `array` attribute """ if not only_normalise: # Get the spectrum, otherwise use the provided one directly spectra = self.get_spectra(time=time, row=row, column=column, spectrum=spectra) # Vectorised normalisation for all spectra such that intensities for each spectrum is in range [0, 1] spectra_list = spectra.reshape(-1, spectra.shape[-1]).T # Reshape & transpose to (n_wavelengths, n_spectra) spectra_list = spectra_list - spectra_list.min(axis=0) # Subtract each spectrum's min value from itself (min 0) spectra_list = spectra_list / spectra_list.max(axis=0) # Divide each spectrum by its max value (min 0, max 1) spectra_list = spectra_list.T # Transpose to (n_spectra, n_wavelengths) # Create the empty classifications array classifications = np.full(len(spectra_list), -1, dtype=int) # -1 reserved for invalid valid_spectra_i = np.where(~np.isnan(spectra_list[:, 0])) # Only predict the valid spectra # Note: Spectra that have indices in the data but were masked or outside the field of view should be np.nan try: classifications[valid_spectra_i] = self.neural_network.predict(spectra_list[valid_spectra_i]) except NotFittedError: raise NotFittedError("Neural network has not been trained yet. Call 'train' on the model class first.") try: # Try to give the classifications array the same indices as the spectral array classifications = classifications.reshape(*spectra.shape[:-1]) except TypeError: # 1D spectra array given assert classifications.size == 1 # Verify return classifications
def _fit(self, spectrum, profile=None, sigma=None, classification=None, spectrum_index=None): """Fit a single spectrum for the given profile or classification Warning: Using this method directly will skip the corrections that are applied to spectra by the `get_spectra` method. Parameters ---------- spectrum : ndarray, ndim=1, length=n_constant_wavelengths The spectrum to be fitted. profile : str, optional, default = None The profile to fit. (Will infer profile from classification if omitted.) sigma : ndarray, ndim=1, length=n_constant_wavelengths The sigma array with weights for each spectral wavelength. classification : int, optional, default = None Classification to determine the fitted profile to use (if profile not explicitly given). spectrum_index : array_like or list or tuple, length=3, optional, default = None The [time, row, column] index of the `spectrum` provided. Only used for error reporting. Returns ------- result : FitResult Outcome of the fit returned in a FitResult object See Also -------- fit : The recommended method for fitting spectra FitResult : The object that the fit method returns """ if profile is None: # If profile hasn't been specified, find it # Decide which profiles to fit if classification in [2, 3, 4]: profile = 'both' elif classification in [0, 1]: profile = 'absorption' elif classification is None: raise ValueError("classification must be specified if profile is not specified") elif not isinstance(classification, int): raise TypeError("classification must be an integer") else: raise ValueError("unexpected classification, got %s" % classification) sigma = self._get_sigma(classification, sigma) if profile == 'absorption': model = voigt_nobg guess = self.absorption_guess min_bound = self.absorption_min_bound max_bound = self.absorption_max_bound x_scale = self.absorption_x_scale elif profile == 'emission': model = voigt_nobg guess = self.emission_guess min_bound = self.emission_min_bound max_bound = self.emission_max_bound x_scale = self.emission_x_scale elif profile == 'both': model = double_voigt_nobg guess = self.absorption_guess + self.emission_guess min_bound = self.absorption_min_bound + self.emission_min_bound max_bound = self.absorption_max_bound + self.emission_max_bound x_scale = self.absorption_x_scale + self.absorption_x_scale else: raise ValueError("fit profile must be either None, 'absorption', 'emission' or 'both', got %s" % profile) time, row, column = spectrum_index if spectrum_index is not None else [None]*3 fitted_parameters, success = self._curve_fit(model, spectrum, guess, sigma, (min_bound, max_bound), x_scale, time=time, row=row, column=column) chi2 = np.sum(((spectrum - model(self.constant_wavelengths, *fitted_parameters)) / sigma) ** 2) fit_info = {'chi2': chi2, 'classification': classification, 'profile': profile, 'success': success, 'index': spectrum_index} return FitResult(fitted_parameters, fit_info)
[docs] def fit(self, time=None, row=None, column=None, spectrum=None, profile=None, sigma=None, classifications=None, background=None, n_pools=None): """Fits the model to specified spectra Fits the model to an array of spectra using multiprocessing if requested. Parameters ---------- time : int or iterable, optional, default=None The time index. The index can be either a single integer index or an iterable. E.g. a list, a NumPy array, a Python range, etc. can be used. row : int or iterable, optional, default=None The row index. See comment for `time` parameter. column : int or iterable, optional, default=None The column index. See comment for `time` parameter. spectrum : ndarray, ndim=1, optional, default=None The explicit spectrum to fit the model to. profile : str, optional, default = None The profile to fit. (Will infer profile from `classifications` if omitted.) sigma : int or array_like, optional, default = None Explicit sigma index or profile. See `_get_sigma` for details. classifications : int, optional, default = None Classifications to determine the fitted profile to use (if profile not explicitly given). Will use neural network to classify them if not. background : float, optional, default = None If provided, this value will be subtracted from the explicit spectrum provided in `spectrum`. Will not be applied to spectra found from the indices, use the `load_background` method instead. n_pools : int, optional, default = None The number of processing pools to calculate the fitting over. This allocates the fitting of different spectra to `n_pools` separate worker processes. When processing a large number of spectra this will make the fitting process take less time overall. It also distributes such that each worker process has the same ratio of classifications to process. This should balance out the workload between workers. If few spectra are being fitted, performance may decrease due to the overhead associated with splitting the evaluation over separate processes. If `n_pools` is not an integer greater than zero, it will fit the spectrum with a for loop. Returns ------- result : list of FitResult, length=n_spectra Outcome of the fits returned as a list of FitResult objects """ # Specific fitting algorithm for IBIS Ca II 8542 Å # Only include the background is using an explicit spectrum; remove for indices include_background = False if spectrum is None else True spectra = self.get_spectra(time=time, row=row, column=column, spectrum=spectrum, background=include_background) # At least one valid spectrum must be given n_valid = np.sum(~np.isnan(spectra[..., 0])) if np.sum(~np.isnan(spectra[..., 0])) == 0: raise ValueError("no valid spectra given") if spectrum is not None and background is not None: # remove explicit background of explicit spectrum spectra -= background if classifications is None: # Classify if not already specified classifications = self.classify_spectra(spectra=spectra, only_normalise=True) if spectrum is None: # No explicit spectrum given # Create the array of indices that the spectra represent time, row, column = make_iter(*self._get_time_row_column(time=time, row=row, column=column)) indices = np.transpose(np.array(np.meshgrid(time, row, column, indexing='ij')), axes=[1, 2, 3, 0]) # Make shape (n_spectra, n_features) so can process in a list spectra = spectra.reshape(-1, spectra.shape[-1]) indices = indices.reshape(-1, indices.shape[-1]) classifications = classifications.reshape(-1) # Remove spectra that are invalid (this allows for masking of the loaded data to constrain a region to fit) valid_spectra_i = np.where(~np.isnan(spectra[:, 0])) # Where the first item of the spectrum is not NaN spectra = spectra[valid_spectra_i] indices = indices[valid_spectra_i] classifications = classifications[valid_spectra_i] if len(spectra) != len(indices) != len(classifications): # Postprocessing sanity check raise ValueError("number of spectra, number of recorded indices and number of classifications" "are not the same (impossible error)") if n_pools is None or (isinstance(n_pools, int) and n_pools <= 0): # Multiprocessing not required print("Processing {} spectra".format(n_valid)) results = [self._fit(spectra[i], profile=profile, sigma=sigma, classification=classifications[i], spectrum_index=indices[i]) for i in range(len(spectra))] elif isinstance(n_pools, int) and n_pools >= 1: # Use multiprocessing # Define single argument function that can be evaluated in the pools def func(data, profile=profile, sigma=sigma): spectrum, index, classification = data # Extract data and pass to `_fit` method return self._fit(spectrum, profile=profile, sigma=sigma, classification=classification, spectrum_index=list(index)) # Sort the arrays in descending classification order s = np.argsort(classifications)[::-1] # Classifications indices sorted in descending order spectra = spectra[s] indices = indices[s] classifications = classifications[s] # Distribute the sorted spectra to each of `n_pools` in turn to ensure even workload data = [] for pool in range(n_pools): data += [[spectra[i], indices[i], classifications[i]] for i in range(pool, len(spectra), n_pools)] print("Processing {} spectra over {} pools".format(n_valid, n_pools)) with Pool(n_pools) as p: # Send the job to the `n_pools` pools results = p.map(func, data) # Map each element of `data` to the first argument of `func` p.close() # Finished, so no more jobs to come p.join() # Clean up the closed pools p.clear() # Remove the server so it can be created again else: raise TypeError("n_pools must be an integer, got %s" % type(n_pools)) else: # Explicit spectrum must be 1D so no loop needed results = self._fit(spectra, profile=profile, sigma=sigma, classification=classifications, spectrum_index=None) return results
[docs] def plot(self, fit=None, time=None, row=None, column=None, spectrum=None, classification=None, background=None, sigma=None, stationary_line_core=None, output=False, **kwargs): """Plots the data and fitted parameters Parameters ---------- fit : FitResult or list or array_like, optional, default = None The fitted parameters to plot with the data. Can extract the necessary plot metadata from the fit object. Otherwise, `fit` should be the parameters to be fitted to either a Voigt or double Voigt profile depending on the number of parameters fitted. time : int or iterable, optional, default = None The time index. The index can be either a single integer index or an iterable. E.g. a list, a NumPy array, a Python range, etc. can be used. row : int or iterable, optional, default = None The row index. See comment for `time` parameter. column : int or iterable, optional, default = None The column index. See comment for `time` parameter. spectrum : ndarray of length `original_wavelengths`, ndim=1, optional, default = None The explicit spectrum to plot along with a fit (if specified). classification : int, optional, default = None Used to determine which sigma profile to use. See `_get_sigma` for more details. background : float or array_like of length `constant_wavelengths`, optional, default = see note Background to added to the fitted profiles. If a `spectrum` is given, this will default to zero, otherwise the value loaded by `load_background` will be used. sigma : int or array_like, optional, default = None Explicit sigma index or profile. See `_get_sigma` for details. stationary_line_core : float, optional, default = `self.stationary_line_core` The stationary line core wavelength to mark on the plot. output : bool or str, optional, default = False Whether to save the plot to a file. If true, a file of format `plot_<time>_<row>_<column>.eps` will be created in the current directory. If a string, that will be used as the filename. (Can change filetype like this.) If false, no file will be created. **kwargs Parameters used by matplotlib and `separate` (see `plot_separate`) and `subtraction` (see `plot_subtraction`). - `figsize` passed to `matplotlib.pyplot.figure` - `legend_position` passed to `matplotlib.pyplot.legend` - `dpi` passed to `matplotlib.pyplot.figure` and `matplotlib.pyplot.savefig` - `fontfamily` passed to `matplotlib.pyplot.rc('font', family=`fontfamily`)` if given See Also -------- plot_separate : Plot the fit parameters separately plot_subtraction : Plot the spectrum with the emission fit subtracted from it FitResult.plot : Plotting method on the fit result """ if fit.__class__ == FitResult: # If fit is a `FitResult` # Extract classification (so it can choose the correct sigma to plot) if classification is None: classification = fit.classification # Extract spectrum index (so it can load the raw data) if (time, row, column) == (None, None, None) and fit.index is not None: time, row, column = fit.index # Replace fit with just the fitted parameters fit = fit.parameters # Has a spectrum been given? (used for tidying plot later) explicit_spectrum = False if spectrum is None else True # Get the spectrum and reduce it to 1D spectrum = self.get_spectra(time=time, row=row, column=column, spectrum=spectrum, background=True) spectrum = np.squeeze(spectrum) if spectrum.ndim != 1: raise ValueError("spectrum must be 1D") # If no background to be added to the fit is given... if background is None and fit is not None: if not explicit_spectrum: # Take from loaded background if using indices time, row, column = self._get_time_row_column(time=time, row=row, column=column) if sum([isinstance(i, (int, np.int64, np.int32, np.int16, np.int8)) for i in [time, row, column]]) != 3: raise TypeError("plot only accepts integer values for time, row and column") background = self.background[time, row, column] else: # Otherwise assume to be zero background = 0 if classification is not None or sigma is not None: # Skip if neither classification or sigma are given sigma = self._get_sigma(classification, sigma) if stationary_line_core is None: stationary_line_core = self.stationary_line_core if isinstance(output, bool): if output: # Create a name output = "plot_{}_{}_{}.eps".format(time, row, column) else: output = None # Do not output a file elif not isinstance(output, str): # A valid string can pass through as the filename raise TypeError("output must be either boolean or a string, got %s" % type(output)) plot_ibis8542(self.constant_wavelengths, spectrum, fit=fit, background=background, sigma=sigma, stationary_line_core=stationary_line_core, output=output, **kwargs)
[docs] def plot_separate(self, *args, **kwargs): """Plot the fitted profiles separately If multiple profiles exist, fit them separately. See `plot` for more details. See Also -------- plot : General plotting method plot_subtraction : Plot the spectrum with the emission fit subtracted from it FitResult.plot : Plotting method on the fit result """ self.plot(*args, separate=True, **kwargs)
[docs] def plot_subtraction(self, *args, **kwargs): """Plot the spectrum with the emission fit subtracted from it If multiple profiles exist, subtract the fitted emission from the raw data. See `plot` for more details. See Also -------- plot : General plotting method plot_separate : Plot the fit parameters separately FitResult.plot : Plotting method on the fit result """ self.plot(*args, subtraction=True, **kwargs)