Coverage for physiodsp / activity / zero_crossing.py: 100%
42 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-26 21:14 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-26 21:14 +0000
1from numpy import diff, abs
2from pandas import DataFrame
3from pydantic import BaseModel, Field, PositiveInt, PositiveFloat
4from scipy.signal import butter, sosfilt, sosfilt_zi
6from physiodsp.base import BaseAlgorithm
7from physiodsp.sensors.imu.base import IMUData
10class ZeroCrossingSettings(BaseModel):
12 window_len: PositiveInt = Field(default=1, description="processing window length in seconds")
14 aggregation_window: PositiveInt = Field(default=60, description="aggregation window length in seconds")
16 zero_crossing_thr: PositiveFloat = Field(default=0.05, description="ero crossing threshold in g")
18 filter_order: PositiveInt = Field(default=4, description="Butterworth filter order for bandpass filtering")
20 filter_low_freq: PositiveFloat = Field(default=0.3, description="Lower cutoff frequency in Hz")
22 filter_high_freq: PositiveFloat = Field(default=3.5, description="Upper cutoff frequency in Hz")
25class ZeroCrossing(BaseAlgorithm):
26 """Zero Crossing Algorithm"""
28 _algorithm_name = "ZeroCrossingAlgorithm"
29 _version = "v0.1.0"
31 def __init__(self,
32 settings: ZeroCrossingSettings = ZeroCrossingSettings(),
33 ) -> None:
35 self.settings = settings
36 self._window_len = settings.window_len
37 self._aggregation_window = settings.aggregation_window
38 self.zero_crossing_thr = settings.zero_crossing_thr
39 return None
41 def __preprocess_imu(self, imu_matrix, fs: int):
42 """
43 Apply bandpass Butterworth filter to IMU data.
45 Reference: 0.3-3.5 Hz pass band is commonly used for human movement analysis
46 to isolate low-frequency body motion while removing drift and high-frequency noise.
47 Args:
48 imu_matrix: (N, 3) array with columns [x, y, z]
49 fs: Sampling frequency of the IMU data in Hz
51 Returns:
52 Filtered (N, 3) array
53 """
54 # Design bandpass filter using second-order sections for numerical stability
55 sos = butter(self.settings.filter_order,
56 [self.settings.filter_low_freq, self.settings.filter_high_freq],
57 btype='band',
58 fs=fs,
59 output='sos')
61 # Initialize filter states for each axis
62 zi = sosfilt_zi(sos)
64 # Apply filter to each axis using sosfilt (not filtfilt)
65 filtered_matrix = imu_matrix.copy()
66 for i in range(imu_matrix.shape[1]):
67 filtered_matrix[:, i], _ = sosfilt(sos, imu_matrix[:, i], zi=zi * imu_matrix[0, i])
69 return filtered_matrix
71 def run(self, data: IMUData):
73 # Apply bandpass filter to IMU data
74 imu_matrix = data.to_matrix()
75 imu_matrix_filtered = self.__preprocess_imu(imu_matrix, fs=data.fs)
76 # Compute zero crossings for each axis
77 zcr = abs(diff((imu_matrix_filtered >= self.zero_crossing_thr).astype(int), axis=0))
79 self.values = DataFrame({"timestamps": data.timestamps[1:], "x": zcr[:, 0], "y": zcr[:, 1], "z": zcr[:, 2]}).rolling(
80 window=int(self._window_len * data.fs),
81 step=int(self._window_len * data.fs),
82 min_periods=int(self._window_len * data.fs),
83 closed="left"
84 ).agg({"timestamps": "max", "x": "sum", "y": "sum", "z": "sum"})[1:]
86 self.biomarker = self.values.copy()
88 return self
90 def aggregate(self,
91 method: str = 'sum'
92 ):
94 df = self.values.copy()
96 df['timestamps'] = df[
97 'timestamps'].apply(lambda x: (x // self._aggregation_window) * self._aggregation_window)
99 df_agg = df.groupby('timestamps')[["x", "y", "z"]].agg(method).reset_index(drop=False)
101 self.biomarker_agg = df_agg
103 return self