Coverage for physiodsp / activity / time_above_thr.py: 83%

29 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-26 21:14 +0000

1from numpy import abs, concatenate 

2from pandas import DataFrame 

3from pydantic import BaseModel, Field, PositiveInt, PositiveFloat 

4 

5from physiodsp.base import BaseAlgorithm 

6from physiodsp.sensors.imu.base import IMUData 

7 

8 

9class TimeAboveThrSettings(BaseModel): 

10 

11 window_len: PositiveInt = Field(default=1, description="processing window length in seconds") 

12 

13 aggregation_window: PositiveInt = Field(default=60, description="aggregation window length in seconds") 

14 

15 threshold: PositiveFloat = Field(default=0.1, description="threshold in g") 

16 

17 

18class TimeAboveThr(BaseAlgorithm): 

19 """Time Above Threshold Algorithm""" 

20 

21 _algorithm_name = "TimeAboveThrAlgorithm" 

22 _version = "v0.1.0" 

23 

24 def __init__(self, 

25 settings: TimeAboveThrSettings = TimeAboveThrSettings() 

26 ) -> None: 

27 self.settings = settings 

28 self._window_len = settings.window_len 

29 self._aggregation_window = settings.aggregation_window 

30 return None 

31 

32 def run(self, data: IMUData): 

33 

34 imu_matrix = data.to_matrix() 

35 above_thr = (abs(imu_matrix) >= self.settings.threshold).astype(int) 

36 # Add timestamp column 

37 above_thr = concatenate([data.timestamps.reshape(-1, 1), above_thr], axis=1) 

38 

39 self.values = DataFrame({"timestamps": above_thr[:, 0], "x": above_thr[:, 1], "y": above_thr[:, 2], "z": above_thr[:, 3]}).rolling( 

40 window=int(self._window_len * data.fs), 

41 step=int(self._window_len * data.fs), 

42 min_periods=int(self._window_len * data.fs), 

43 closed="left" 

44 ).agg({"timestamps": "max", "x": "sum", "y": "sum", "z": "sum"})[1:] 

45 

46 return self 

47 

48 def aggregate(self, 

49 method: str = 'sum' 

50 ): 

51 

52 df = DataFrame( 

53 list(zip(self.data.timestamps, self.values_x, self.values_y, self.values_z)), 

54 columns=['timestamps', 'x', 'y', 'z'] 

55 ) 

56 

57 df['timestamp'] = df[ 

58 'timestamps'].apply(lambda x: x // self._aggregation_window) 

59 

60 df_agg = df.groupby('timestamp')[["x", "y", "z"]].agg(method).reset_index(drop=False) 

61 

62 self.biomarker_agg = df_agg 

63 

64 return self