Coverage for physiodsp / activity / enmo.py: 100%

25 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-12 11:20 +0000

1from pandas import Series, DataFrame 

2from pydantic import BaseModel, Field, PositiveInt 

3 

4from physiodsp.base import BaseAlgorithm 

5from physiodsp.sensors.imu.accelerometer import AccelerometerData 

6 

7 

8class ENMOSettings(BaseModel): 

9 

10 window_len: PositiveInt = Field(default=1, description="processing window length in seconds") 

11 

12 aggregation_window: PositiveInt = Field(default=60, description="aggregation window length in seconds") 

13 

14 

15class ENMO(BaseAlgorithm): 

16 """Euclidean Norm Minus One""" 

17 

18 _algorithm_name = "ENMO" 

19 _version = "0.1.0" 

20 

21 def __init__(self, 

22 settings: ENMOSettings = ENMOSettings() 

23 ) -> None: 

24 self.settings = settings 

25 self._window_len = settings.window_len 

26 self._aggregation_window = settings.aggregation_window 

27 return None 

28 

29 def run(self, accelerometer: AccelerometerData): 

30 """Run the ENMO algorithm on accelerometer data. 

31 ENMO is calculated as the vector magnitude of the three accelerometer 

32 axes minus 1g, with negative values set to zero. The algorithm computes 

33 the average ENMO over non-overlapping windows of a specified length. 

34 

35 Args: 

36 accelerometer (AccelerometerData): Triaxial accelerometer data. 

37 

38 Returns: 

39 ENMO: Instance of the ENMO algorithm with computed values. 

40 """ 

41 

42 enmo = accelerometer.magnitude - 1 

43 enmo[enmo < 0] = 0 

44 

45 self.timestamps = Series(accelerometer.timestamps).rolling( 

46 window=int(self._window_len * accelerometer.fs), 

47 step=int(self._window_len * accelerometer.fs), 

48 min_periods=int(self._window_len * accelerometer.fs), 

49 closed="left" 

50 ).max()[1:] 

51 

52 self.values = Series(enmo).rolling( 

53 window=int(self._window_len * accelerometer.fs), 

54 step=int(self._window_len * accelerometer.fs), 

55 min_periods=int(self._window_len * accelerometer.fs), 

56 closed="left" 

57 ).mean()[1:] 

58 

59 self.biomarker = DataFrame( 

60 list(zip(self.timestamps, self.values)), 

61 columns=['timestamps', 'values'] 

62 ) 

63 

64 return self 

65 

66 def aggregate(self, 

67 method: str = 'mean' 

68 ): 

69 super().aggregate( 

70 self.timestamps, 

71 self.values, 

72 method 

73 ) 

74 return self