Source code for mcalf.utils.smooth

import numpy as np
import scipy.ndimage

__all__ = ['moving_average', 'gaussian_kern_3d', 'smooth_cube', 'mask_classifications']


[docs]def moving_average(array, width): """Boxcar moving average. Calculate the moving average of an array with a boxcar of defined width. An odd width is recommended. Parameters ---------- array : numpy.ndarray, ndim=1 Array to find the moving average of. width : int Width of the boxcar. Odd integer recommended. Less than or equal to length of `array`. Returns ------- averaged : numpy.ndarray, shape=`array` Averaged array. Notes ----- The moving average is calculated at each point of the `array` by finding the (unweighted) mean of the subarrays of length given by `width`. These subarrays are centred at the point in the `array` that the current average is currently being calculated for. If an odd `width` is chosen, the sub array will include the current point plus an equal number of points on either side. However, if an even `width` is chosen, the sub array will bias including the extra point to the left of the current index. If the subarray spans past the boundaries, the values beyond the boundary is ignored and the mean is calculated by dividing by the number of points that are inside the boundaries. Examples -------- >>> x = np.array([1, 2, 3, 4, 5]) >>> moving_average(x, 3) array([1.5, 2. , 3. , 4. , 4.5]) >>> moving_average(x, 2) array([1. , 1.5, 2.5, 3.5, 4.5]) """ if not isinstance(width, (int, np.integer)) or width <= 0 or width > len(array): raise ValueError("`width` must be a positive integer less than the length of `array`, got %s." % width) kernel = np.ones(width) / width averaged = np.convolve(array, kernel, mode='same') # Apply corrections at edge to_correct_left = int(width/2) to_correct_right = to_correct_left to_correct_right -= 0 if width % 2 else 1 # Adjust weighting on left edge n_overlap = width - 1 for i in np.arange(to_correct_left-1, -1, -1): averaged[i] *= width / n_overlap n_overlap -= 1 # Adjust weighting on right edge n_overlap = width - 1 for i in range(len(array) - to_correct_right, len(array)): averaged[i] *= width / n_overlap n_overlap -= 1 return averaged
[docs]def gaussian_kern_3d(width=5, sigma=(1, 1, 1)): """3D Gaussian kernel. Create a Gaussian kernel of shape `width`*`width`*`width`. Parameters ---------- width : int, optional, default=5 Length of all three dimensions of the Gaussian kernel. sigma : array_like, tuple, optional, default=(1, 1, 1) Sigma values for the time, horizontal and vertical dimensions. Returns ------- kernel : numpy.ndarray, shape=(`width`, `width`, `width`) The generated kernel. Examples -------- >>> gaussian_kern_3d(width=3, sigma=(2, 1, 1.5)) array([[[0.42860385, 0.53526143, 0.42860385], [0.48567179, 0.60653066, 0.48567179], [0.42860385, 0.53526143, 0.42860385]], <BLANKLINE> [[0.70664828, 0.8824969 , 0.70664828], [0.8007374 , 1. , 0.8007374 ], [0.70664828, 0.8824969 , 0.70664828]], <BLANKLINE> [[0.42860385, 0.53526143, 0.42860385], [0.48567179, 0.60653066, 0.48567179], [0.42860385, 0.53526143, 0.42860385]]]) """ s = np.linspace(-1, 1, width) x, y, z = np.meshgrid(s, s, s) kernel = np.exp(-x**2/(2*sigma[0]**2) - y**2/(2*sigma[1]**2) - z**2/(2*sigma[2]**2)) return kernel
[docs]def smooth_cube(cube, mask, **kwargs): """Apply Gaussian smoothing to velocities. Smooth the cube of velocities with a Gaussian kernel, applying weights at boundaries. Parameters ---------- cube : numpy.ndarray, ndim=3 Cube of velocities with dimensions [time, row, column]. mask : numpy.ndarray, ndim=2 The mask to apply to the [row, column] at every time. Points that are 0 or false will be removed. **kwargs Keyword arguments to pass to :func:`gaussian_kern_3d`. Returns ------- cube_ : numpy.ndarray, shape=`cube` The smoothed cube. """ # Masking cube_ = cube.copy() mask3d = np.empty_like(cube_) # Mask for `cube_` for timestep in range(len(mask3d)): # Copy mask throughout time mask3d[timestep] = mask.copy() cube_[np.isnan(cube_)] = 0 # Remove NaN cube_[mask3d == 0] = 0 # Remove masked points kernel = gaussian_kern_3d(**kwargs) # Get the kernel # Count the number of neighbours (weighting) at each pixel neighbour_count = scipy.ndimage.convolve(mask3d, kernel, mode="constant") m = neighbour_count > 0 # Record the points that do have neighbours cube_ = scipy.ndimage.convolve(cube_, kernel, mode="constant") # Apply smoothing cube_[m] /= neighbour_count[m] # Normalise depending on neighbours cube_[mask3d == 0] = np.nan # Restore masked pixels cube_[np.isnan(cube)] = np.nan # Restore NaN values return cube_
[docs]def mask_classifications(class_map, vmin=None, vmax=None, reduce=True): """Mask 2D and 3D arrays of classifications. If 3D, also reduces to 2D by selecting the most common classification along the first dimension. Parameters ---------- class_map : numpy.ndarray[int], ndim=2 or 3 Array of classifications. If `reduce` is True (default) and the array is three-dimensional, it is assumed that the first dimension is time, and a time average classification will be calculated. The time average is the most common positive (valid) classification at each pixel. vmin : int, optional, default=None Minimum classification integer to include. Must be greater or equal to zero. Defaults to min positive integer in `class_map`. Classifications below this value will be set to -1. vmax : int, optional, default=None Maximum classification integer to include. Must be greater than zero. Defaults to max positive integer in `class_map`. Classifications above this value will be set to -1. reduce : bool, optional, default=True Whether to perform the time average described in `class_map` info. Returns ------- class_map : numpy.ndarray[int], ndim=2 `class_map` with values between `vmin` and `vmax` averaged along the first dimension. vmin : int Updated `vmin` value. vmax : int Updated `vmax` value. See Also -------- mcalf.visualisation.plot_class_map : Plot a map of the classifications. """ # Validate the `class_map` parameter if not isinstance(class_map, np.ndarray): raise TypeError(f'`class_map` must be a numpy.ndarray, got {type(class_map)}.') if class_map.ndim not in (2, 3): raise ValueError(f'`class_map` must have either 2 or 3 dimensions, got {class_map.ndim}.') if not issubclass(class_map.dtype.type, np.integer): raise TypeError(f'`class_map` must be an array of integers, got {class_map.dtype}.') # Short-circuit if all classifications negative if len(class_map[class_map >= 0]) == 0: class_map = class_map.copy() if reduce and class_map.ndim == 3: class_map = class_map[0] class_map[:] = -1 # Set all invalid to -1 for consistency with main code if vmin is None: vmin = 0 if vmax is None: vmax = 0 return class_map, vmin, vmax # Validate or set `vmin` and `vmax` if vmax is None: vmax = np.max(class_map[class_map >= 0]) elif not isinstance(vmax, (int, np.integer)): raise TypeError(f'`vmax` must be an integer, got {type(vmax)}.') elif vmax < 0: raise ValueError('`vmax` must not be less than zero.') if vmin is None: vmin = np.min(class_map[class_map >= 0]) elif not isinstance(vmin, (int, np.integer)): raise TypeError(f'`vmin` must be an integer, got {type(vmin)}.') elif vmin < 0: raise ValueError('`vmin` must not be less than zero.') if vmin > vmax: raise ValueError(f'`vmin` must be less than `vmax`, got {vmin} to {vmax}.') # Ignore classifications outside of range class_map = class_map.copy() class_map[class_map < vmin] = -1 class_map[class_map > vmax] = -1 # If 3D, choose the most common classification along the first dimension if reduce and class_map.ndim == 3: c = np.arange(vmin, vmax + 1) # classes bins = np.arange(vmin - 0.5, vmax + 1) # len(bins)=len(c)+1 ave = np.empty(class_map.shape[1:], dtype=int) # init for i in range(len(class_map[0])): # spatial rows for j in range(len(class_map[0, 0])): # spatial columns counts = np.histogram(class_map[:, i, j], bins)[0] if counts.max() == 0: ave[i, j] = -1 # no valid classifications else: ave[i, j] = c[counts.argmax()] class_map = ave # Replace 3D array with 2D average return class_map, vmin, vmax