Coverage for physiodsp / activity / enmo.py: 100%
25 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-12 11:20 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-12 11:20 +0000
1from pandas import Series, DataFrame
2from pydantic import BaseModel, Field, PositiveInt
4from physiodsp.base import BaseAlgorithm
5from physiodsp.sensors.imu.accelerometer import AccelerometerData
8class ENMOSettings(BaseModel):
10 window_len: PositiveInt = Field(default=1, description="processing window length in seconds")
12 aggregation_window: PositiveInt = Field(default=60, description="aggregation window length in seconds")
15class ENMO(BaseAlgorithm):
16 """Euclidean Norm Minus One"""
18 _algorithm_name = "ENMO"
19 _version = "0.1.0"
21 def __init__(self,
22 settings: ENMOSettings = ENMOSettings()
23 ) -> None:
24 self.settings = settings
25 self._window_len = settings.window_len
26 self._aggregation_window = settings.aggregation_window
27 return None
29 def run(self, accelerometer: AccelerometerData):
30 """Run the ENMO algorithm on accelerometer data.
31 ENMO is calculated as the vector magnitude of the three accelerometer
32 axes minus 1g, with negative values set to zero. The algorithm computes
33 the average ENMO over non-overlapping windows of a specified length.
35 Args:
36 accelerometer (AccelerometerData): Triaxial accelerometer data.
38 Returns:
39 ENMO: Instance of the ENMO algorithm with computed values.
40 """
42 enmo = accelerometer.magnitude - 1
43 enmo[enmo < 0] = 0
45 self.timestamps = Series(accelerometer.timestamps).rolling(
46 window=int(self._window_len * accelerometer.fs),
47 step=int(self._window_len * accelerometer.fs),
48 min_periods=int(self._window_len * accelerometer.fs),
49 closed="left"
50 ).max()[1:]
52 self.values = Series(enmo).rolling(
53 window=int(self._window_len * accelerometer.fs),
54 step=int(self._window_len * accelerometer.fs),
55 min_periods=int(self._window_len * accelerometer.fs),
56 closed="left"
57 ).mean()[1:]
59 self.biomarker = DataFrame(
60 list(zip(self.timestamps, self.values)),
61 columns=['timestamps', 'values']
62 )
64 return self
66 def aggregate(self,
67 method: str = 'mean'
68 ):
69 super().aggregate(
70 self.timestamps,
71 self.values,
72 method
73 )
74 return self