Coverage for physiodsp / activity / zero_crossing.py: 100%

42 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-26 21:14 +0000

1from numpy import diff, abs 

2from pandas import DataFrame 

3from pydantic import BaseModel, Field, PositiveInt, PositiveFloat 

4from scipy.signal import butter, sosfilt, sosfilt_zi 

5 

6from physiodsp.base import BaseAlgorithm 

7from physiodsp.sensors.imu.base import IMUData 

8 

9 

10class ZeroCrossingSettings(BaseModel): 

11 

12 window_len: PositiveInt = Field(default=1, description="processing window length in seconds") 

13 

14 aggregation_window: PositiveInt = Field(default=60, description="aggregation window length in seconds") 

15 

16 zero_crossing_thr: PositiveFloat = Field(default=0.05, description="ero crossing threshold in g") 

17 

18 filter_order: PositiveInt = Field(default=4, description="Butterworth filter order for bandpass filtering") 

19 

20 filter_low_freq: PositiveFloat = Field(default=0.3, description="Lower cutoff frequency in Hz") 

21 

22 filter_high_freq: PositiveFloat = Field(default=3.5, description="Upper cutoff frequency in Hz") 

23 

24 

25class ZeroCrossing(BaseAlgorithm): 

26 """Zero Crossing Algorithm""" 

27 

28 _algorithm_name = "ZeroCrossingAlgorithm" 

29 _version = "v0.1.0" 

30 

31 def __init__(self, 

32 settings: ZeroCrossingSettings = ZeroCrossingSettings(), 

33 ) -> None: 

34 

35 self.settings = settings 

36 self._window_len = settings.window_len 

37 self._aggregation_window = settings.aggregation_window 

38 self.zero_crossing_thr = settings.zero_crossing_thr 

39 return None 

40 

41 def __preprocess_imu(self, imu_matrix, fs: int): 

42 """ 

43 Apply bandpass Butterworth filter to IMU data. 

44 

45 Reference: 0.3-3.5 Hz pass band is commonly used for human movement analysis 

46 to isolate low-frequency body motion while removing drift and high-frequency noise. 

47 Args: 

48 imu_matrix: (N, 3) array with columns [x, y, z] 

49 fs: Sampling frequency of the IMU data in Hz 

50 

51 Returns: 

52 Filtered (N, 3) array 

53 """ 

54 # Design bandpass filter using second-order sections for numerical stability 

55 sos = butter(self.settings.filter_order, 

56 [self.settings.filter_low_freq, self.settings.filter_high_freq], 

57 btype='band', 

58 fs=fs, 

59 output='sos') 

60 

61 # Initialize filter states for each axis 

62 zi = sosfilt_zi(sos) 

63 

64 # Apply filter to each axis using sosfilt (not filtfilt) 

65 filtered_matrix = imu_matrix.copy() 

66 for i in range(imu_matrix.shape[1]): 

67 filtered_matrix[:, i], _ = sosfilt(sos, imu_matrix[:, i], zi=zi * imu_matrix[0, i]) 

68 

69 return filtered_matrix 

70 

71 def run(self, data: IMUData): 

72 

73 # Apply bandpass filter to IMU data 

74 imu_matrix = data.to_matrix() 

75 imu_matrix_filtered = self.__preprocess_imu(imu_matrix, fs=data.fs) 

76 # Compute zero crossings for each axis 

77 zcr = abs(diff((imu_matrix_filtered >= self.zero_crossing_thr).astype(int), axis=0)) 

78 

79 self.values = DataFrame({"timestamps": data.timestamps[1:], "x": zcr[:, 0], "y": zcr[:, 1], "z": zcr[:, 2]}).rolling( 

80 window=int(self._window_len * data.fs), 

81 step=int(self._window_len * data.fs), 

82 min_periods=int(self._window_len * data.fs), 

83 closed="left" 

84 ).agg({"timestamps": "max", "x": "sum", "y": "sum", "z": "sum"})[1:] 

85 

86 self.biomarker = self.values.copy() 

87 

88 return self 

89 

90 def aggregate(self, 

91 method: str = 'sum' 

92 ): 

93 

94 df = self.values.copy() 

95 

96 df['timestamps'] = df[ 

97 'timestamps'].apply(lambda x: (x // self._aggregation_window) * self._aggregation_window) 

98 

99 df_agg = df.groupby('timestamps')[["x", "y", "z"]].agg(method).reset_index(drop=False) 

100 

101 self.biomarker_agg = df_agg 

102 

103 return self