Source code for sigfeat.feature.spectral

import numpy as np
from scipy.fftpack import rfft, rfftfreq

from ..base import Feature
from ..base import HiddenFeature
from ..base import Parameter

from .common import WindowedSignal
from .common import crest_factor
from .common import flatness
from .common import flux
from .common import rolloff


class Rfft(HiddenFeature):
    """Rfft Spectrum feature (hidden per default)

    Parameters
    ----------
    nfft : int
    axis : int
    window : bool
        Whether to use a window or not. If you need a special window,
        create a WindowedSignal instance.

    """
    nfft = Parameter()
    axis = Parameter(default=0)
    window = Parameter(default=True)

    def requires(self):
        if self.window:
            yield WindowedSignal(size=self.nfft)
        else:
            return []

    def on_start(self, source, *args, **kwargs):
        if not self.nfft:
            self.nfft = source.blocksize
        self.frequencies = rfftfreq(self.nfft, 1.0/source.samplerate)
        self.add_metadata(
            'frequencies', self.frequencies)
        self.add_metadata('nfft', self.nfft)

    def process(self, data, featuredata):
        if self.window:
            s = featuredata['WindowedSignal']
        else:
            s = data[0]
        return rfft(
            s,
            n=self.nfft,
            axis=self.axis)


class AbsRfft(Rfft):
    """Absolute Rfft Spectrum feature (hidden per default)


    Parameters
    ----------
    nfft : int
    axis : int
    window : bool
        Whether to use a window or not. If you need a special window,
        create a WindowedSignal instance.

    """
    def requires(self):
        return [Rfft(
            nfft=self.nfft,
            axis=self.axis,
            window=self.window
        )]

    def process(self, data, featuredata):
        return np.abs(featuredata['Rfft'])


class SumAbsRfft(Rfft):
    axis = Parameter(0)

    def requires(self):
        yield AbsRfft

    def process(self, data, resd):
        return np.sum(resd['AbsRfft'], axis=self.axis)


[docs]class SpectralCentroid(Feature): """Centroid of AbsRfft. .. math:: SC = \\frac{\\sum_k f[k]|X[k]|}{ \\frac{1}{K}\\sum_k |X[k]|} Parameters ---------- axis : int Axis along the centroid will be calculated, default=0. """ axis = Parameter(0)
[docs] def requires(self): yield AbsRfft yield SumAbsRfft
[docs] def on_start(self, source, featureset, sink): self.channels = source.channels self.frequencies = featureset['Rfft'].frequencies if self.channels > 1: self.frequencies = np.tile( self.frequencies, (self.channels, 1)).T
@staticmethod def centroid(freqs, absrfft, sumabsrfft, axis): return np.sum(freqs * absrfft, axis=axis) / sumabsrfft
[docs] def process(self, data, resd): result = self.centroid( self.frequencies, resd['AbsRfft'], resd['SumAbsRfft'], self.axis) return result
[docs]class SpectralSpread(SpectralCentroid): # TODO: Test """Spread of AbsRfft. .. math:: SSP_m = \\frac{\\sum_k (f[k] SC_m[k])^2 |X_m[k]|}{ \\sum_k |X_m[k]|} Parameters ---------- axis : int Axis along the centroid will be calculated, default=0. """ axis = Parameter(0)
[docs] def requires(self): yield SpectralCentroid()
@staticmethod def spread(freqs, absrfft, sumabsrfft, centroid, axis): return np.sum((freqs-centroid)**2 * absrfft, axis=axis) / sumabsrfft
[docs] def process(self, data, resd): result = self.spread( self.frequencies, resd['AbsRfft'], resd['SumAbsRfft'], resd['SpectralCentroid'], self.axis) return result
[docs]class SpectralSkewness(SpectralCentroid): # TODO: Test """Skewness of AbsRfft. .. math:: SSK_m = \\frac{\\sum_k (f[k]-SC_m[k])^3 |X_m[k]|}{ \\sqrt{SSP_m}^3\\sum_k |X_m[k]|} Parameters ---------- axis : int Axis along the centroid will be calculated, default=0. """ axis = Parameter(0)
[docs] def requires(self): yield SpectralCentroid() yield SpectralSpread()
@staticmethod def skewness(freqs, absrfft, sumabsrfft, centroid, spread, axis): m3 = np.sum((freqs-centroid)**3 * absrfft, axis=axis) / sumabsrfft return m3 / np.sqrt(spread)**3
[docs] def process(self, data, resd): result = self.skewness( self.frequencies, resd['AbsRfft'], resd['SumAbsRfft'], resd['SpectralCentroid'], resd['SpectralSpread'], self.axis) return result
[docs]class SpectralKurtosis(SpectralCentroid): # TODO: Test """Kurtosis of AbsRfft. .. math:: SK_m = \\frac{\\sum_k (f[k]-SC_m[k])^4 |X_m[k]|}{ SSP_m^2 \\sum_k |X_m[k]|} Parameters ---------- axis : int Axis along the centroid will be calculated, default=0. """ axis = Parameter(0)
[docs] def requires(self): yield SpectralCentroid() yield SpectralSpread()
@staticmethod def kurtosis(freqs, absrfft, sumabsrfft, centroid, spread, axis): m4 = np.sum((freqs-centroid)**4 * absrfft, axis=axis) / sumabsrfft return m4 / spread**2
[docs] def process(self, data, resd): result = self.kurtosis( self.frequencies, resd['AbsRfft'], resd['SumAbsRfft'], resd['SpectralCentroid'], resd['SpectralSpread'], self.axis) return result
[docs]class SpectralFlatness(Feature): """Flatness of AbsRfft. .. math:: SF_m = \\frac{\\left(\\Pi_k |X_m[k]| \\right)^{1/K}}{ \\frac{1}{K}\\sum_k |X_m[k]|} Parameters ---------- axis : int Axis along the flatness will be calculated, default=0. """ axis = Parameter(0)
[docs] def requires(self): yield AbsRfft
[docs] def process(self, data, featuredata): return flatness(featuredata['AbsRfft'], self.axis)
[docs]class SpectralFlux(Feature): """Flux of AbsRfft. .. math:: D_m[k] = \\frac{|X_m[k]|}{\\max(|X_m[k]|)} - \\frac{|X_{m-1}[k]|}{\\max(|X_{m-1}[k]|)} SFX_m = \\frac{1}{2}\\sum_k D[k]|D[k]| Parameters ---------- axis : int Axis along the flux will be calculated, default=0. """ axis = Parameter(0)
[docs] def requires(self): yield AbsRfft
[docs] def on_start(self, source, featureset, sink): nfft = featureset['AbsRfft'].nfft if source.channels > 1: self._lastspec = np.ones((nfft, source.channels)) else: self._lastspec = np.ones(nfft)
[docs] def process(self, data, featuredata): curspec = featuredata['AbsRfft'] specflux = flux(self._lastspec, curspec, self.axis) self._lastspec = curspec return specflux
[docs]class SpectralCrestFactor(Feature): """Crest Factor of AbsRfft .. math:: SCF_m = \\frac{\max{(|X_m|)}}{X_{m, RMS}} """
[docs] def requires(self): yield AbsRfft
[docs] def process(self, data, result): return crest_factor(result['AbsRfft']) # TODO decompose
[docs]class SpectralRolloff(Feature): """Rolloff from AbsRfft. The spectral rolloff is the frequency where the kappa percentage of energy is below and the 1-kappa percentage of energy is above. # TODO formula Parameters ---------- kappa : scalar {0...1} Default 0.95 """ kappa = Parameter(0.95)
[docs] def requires(self): yield AbsRfft
[docs] def on_start(self, source, featureset, sink): self.samplerate = source.samplerate
[docs] def process(self, data, result): return rolloff(result['AbsRfft'], self.samplerate, self.kappa)
[docs]class SpectralSlope(Feature): # TODO: Test, doc, formula
[docs] def requires(self): yield AbsRfft
[docs] def on_start(self, source, features, sink): self.frequencies = np.array([features['AbsRfft'].frequencies]).T
[docs] def process(self, data, resd): absrfft = np.array(resd['AbsRfft']) absrfft -= np.mean(absrfft) w = np.linalg.lstsq(self.frequencies, absrfft)[0] return w[0]