Coverage for physiodsp / activity / time_above_thr.py: 83%
29 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-26 21:14 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-02-26 21:14 +0000
1from numpy import abs, concatenate
2from pandas import DataFrame
3from pydantic import BaseModel, Field, PositiveInt, PositiveFloat
5from physiodsp.base import BaseAlgorithm
6from physiodsp.sensors.imu.base import IMUData
9class TimeAboveThrSettings(BaseModel):
11 window_len: PositiveInt = Field(default=1, description="processing window length in seconds")
13 aggregation_window: PositiveInt = Field(default=60, description="aggregation window length in seconds")
15 threshold: PositiveFloat = Field(default=0.1, description="threshold in g")
18class TimeAboveThr(BaseAlgorithm):
19 """Time Above Threshold Algorithm"""
21 _algorithm_name = "TimeAboveThrAlgorithm"
22 _version = "v0.1.0"
24 def __init__(self,
25 settings: TimeAboveThrSettings = TimeAboveThrSettings()
26 ) -> None:
27 self.settings = settings
28 self._window_len = settings.window_len
29 self._aggregation_window = settings.aggregation_window
30 return None
32 def run(self, data: IMUData):
34 imu_matrix = data.to_matrix()
35 above_thr = (abs(imu_matrix) >= self.settings.threshold).astype(int)
36 # Add timestamp column
37 above_thr = concatenate([data.timestamps.reshape(-1, 1), above_thr], axis=1)
39 self.values = DataFrame({"timestamps": above_thr[:, 0], "x": above_thr[:, 1], "y": above_thr[:, 2], "z": above_thr[:, 3]}).rolling(
40 window=int(self._window_len * data.fs),
41 step=int(self._window_len * data.fs),
42 min_periods=int(self._window_len * data.fs),
43 closed="left"
44 ).agg({"timestamps": "max", "x": "sum", "y": "sum", "z": "sum"})[1:]
46 return self
48 def aggregate(self,
49 method: str = 'sum'
50 ):
52 df = DataFrame(
53 list(zip(self.data.timestamps, self.values_x, self.values_y, self.values_z)),
54 columns=['timestamps', 'x', 'y', 'z']
55 )
57 df['timestamp'] = df[
58 'timestamps'].apply(lambda x: x // self._aggregation_window)
60 df_agg = df.groupby('timestamp')[["x", "y", "z"]].agg(method).reset_index(drop=False)
62 self.biomarker_agg = df_agg
64 return self