Coverage for physiodsp / balance_tests / sway.py: 100%

61 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-04-12 11:20 +0000

1import numpy as np 

2from pandas import DataFrame 

3from pydantic import BaseModel, Field, PositiveInt, PositiveFloat 

4from scipy.signal import butter, sosfiltfilt 

5 

6from physiodsp.base import BaseAlgorithm 

7from physiodsp.sensors.imu.accelerometer import AccelerometerData 

8 

9CHI_SQUARED = 5.991465 # 95% chi-square quantile for 2 degrees of freedom 

10ROUND_DIGITS = 6 

11 

12 

13class SwaySettings(BaseModel): 

14 

15 filter_order: PositiveInt = Field(default=4, description="Butterworth filter order for low-pass filtering") 

16 

17 filter_high_freq: PositiveFloat = Field(default=2.5, description="Cutoff frequency in Hz") 

18 

19 

20class Sway(BaseAlgorithm): 

21 

22 _algorithm_name = "Sway" 

23 _version = "0.1.0" 

24 

25 def __init__(self, settings: SwaySettings = SwaySettings()) -> None: 

26 self.settings = settings 

27 return None 

28 

29 def run(self, accelerometer: AccelerometerData, sensor_height: float): 

30 """Run the sway analysis on accelerometer data recorded during a balance test. 

31 

32 Medio-lateral (ML) displacement is estimated from the X axis of the accelerometer, 

33 while antero-posterior (AP) displacement is estimated from the Z axis. Both 

34 displacements are approximated by multiplying the respective axis signal by the 

35 sensor height (small-angle approximation). The signals are then low-pass filtered, 

36 mean-centred, and used to extract stabilometric indices and a 95% confidence ellipse. 

37 

38 Args: 

39 accelerometer (AccelerometerData): Triaxial accelerometer data. The X axis must 

40 be aligned with the medio-lateral direction and the Z axis with the 

41 antero-posterior direction. 

42 sensor_height (float): Height of the sensor above the ground in meters, used to 

43 convert acceleration to linear displacement. 

44 

45 Returns: 

46 Sway: The instance itself with the `biomarker` attribute set to a DataFrame 

47 containing stabilometric indices (average distance, RMS distance, total 

48 distance, average velocity) for the full, ML, and AP paths, joined with 

49 the 95% confidence ellipse metrics (area, angle, semi-major and semi-minor 

50 axes), and metadata columns (timestamp_start, timestamp_end, duration_s). 

51 """ 

52 

53 # Medio-lateral and Antero-posterior displacements in meters 

54 ml_disp = np.dot(accelerometer.x, sensor_height) 

55 ap_disp = np.dot(accelerometer.z, sensor_height) 

56 

57 sos = butter(self.settings.filter_order, 

58 self.settings.filter_high_freq, 

59 btype='low', 

60 fs=accelerometer.fs, 

61 output='sos') 

62 

63 ml_disp_filt = sosfiltfilt(sos, ml_disp) 

64 ap_disp_filt = sosfiltfilt(sos, ap_disp) 

65 

66 ml_offset = np.round(np.mean(ml_disp_filt), ROUND_DIGITS) 

67 ap_offset = np.round(np.mean(ap_disp_filt), ROUND_DIGITS) 

68 

69 ml_disp_no_mean = ml_disp_filt - ml_offset 

70 

71 ap_disp_no_mean = ap_disp_filt - ap_offset 

72 

73 metrics_df = self.__index_extraction(ml_disp_no_mean, ap_disp_no_mean, accelerometer.fs) 

74 

75 ellipse_metrics_df = self.__get_ellipse_area(ml_disp_no_mean, ap_disp_no_mean) 

76 

77 metrics_df = metrics_df.join(ellipse_metrics_df) 

78 

79 metrics_df.insert(1, "timestamp_start", accelerometer.timestamps[0]) 

80 metrics_df.insert(2, "timestamp_end", accelerometer.timestamps[-1]) 

81 metrics_df.insert(3, "duration_s", (accelerometer.timestamps[-1] - accelerometer.timestamps[0])) 

82 

83 self.biomarker = metrics_df.copy() 

84 

85 return self 

86 

87 def __index_extraction(self, ml: np.ndarray, ap: np.ndarray, fs: int): 

88 """Extract sway metrics from medio-lateral and antero-posterior displacement signals. 

89 

90 Args: 

91 ml (np.ndarray): Medio-lateral displacement signal in meters. 

92 ap (np.ndarray): Antero-posterior displacement signal in meters. 

93 fs (int): Sampling frequency in Hz. 

94 

95 Returns: 

96 DataFrame: Table of sway indices (average distance, RMS distance, total distance, 

97 average velocity) for the full path, ML path, and AP path. 

98 """ 

99 

100 metrics = { 

101 "index": ["full_path", "ml_path", "ap_path"], 

102 "average_distance_m": [], 

103 "rms_distance_m": [], 

104 "total_distance_m": [], 

105 "average_velocity_m_s": [] 

106 } 

107 

108 rd = np.sqrt(ml**2 + ap**2) 

109 

110 metrics["average_distance_m"].append(np.round(np.mean(rd), ROUND_DIGITS)) 

111 metrics["average_distance_m"].append(np.round(np.mean(np.abs(ml)), ROUND_DIGITS)) 

112 metrics["average_distance_m"].append(np.round(np.mean(np.abs(ap)), ROUND_DIGITS)) 

113 

114 metrics["rms_distance_m"].append(np.round(np.sqrt(np.sum(rd**2) / len(ml)), ROUND_DIGITS)) 

115 metrics["rms_distance_m"].append(np.round(np.sqrt(np.sum(ml**2) / len(ml)), ROUND_DIGITS)) 

116 metrics["rms_distance_m"].append(np.round(np.sqrt(np.sum(ap**2) / len(ap)), ROUND_DIGITS)) 

117 

118 metrics["total_distance_m"].append(np.round(np.sum(np.sqrt((np.diff(ml))**2 + (np.diff(ap))**2)), ROUND_DIGITS)) 

119 metrics["total_distance_m"].append(np.round(np.sum(np.abs(np.diff(ml))), ROUND_DIGITS)) 

120 metrics["total_distance_m"].append(np.round(np.sum(np.abs(np.diff(ap))), ROUND_DIGITS)) 

121 

122 metrics["average_velocity_m_s"].append(np.round(metrics["total_distance_m"][0] / (len(ml) / fs), ROUND_DIGITS)) 

123 metrics["average_velocity_m_s"].append(np.round(metrics["total_distance_m"][1] / (len(ml) / fs), ROUND_DIGITS)) 

124 metrics["average_velocity_m_s"].append(np.round(metrics["total_distance_m"][2] / (len(ap) / fs), ROUND_DIGITS)) 

125 

126 return DataFrame(metrics) 

127 

128 def __get_ellipse_area(self, ml: np.ndarray, ap: np.ndarray): 

129 """Calculate the area of the ellipse representing the sway. 

130 

131 Args: 

132 ml (np.ndarray): Medio-lateral displacement signal in meters. 

133 ap (np.ndarray): Antero-posterior displacement signal in meters. 

134 

135 Returns: 

136 DataFrame: Table of ellipse metrics (area, angle, semi-major axis, semi-minor axis). 

137 """ 

138 

139 m = np.vstack((ml, ap)) 

140 

141 sigma = np.cov(m) 

142 

143 U, S, V = np.linalg.svd(sigma) 

144 

145 ellipse_area = np.round(np.pi * CHI_SQUARED * np.sqrt(S[0] * S[1]), ROUND_DIGITS) 

146 theta = np.round(np.arctan(U[1][0] / U[0][0]), ROUND_DIGITS) 

147 a = np.round(np.sqrt(S[0] * CHI_SQUARED), ROUND_DIGITS) 

148 b = np.round(np.sqrt(S[1] * CHI_SQUARED), ROUND_DIGITS) 

149 

150 metrics = { 

151 "ellipse_area_m2": [ellipse_area], 

152 "theta_rad": [theta], 

153 "a_m": [a], 

154 "b_m": [b] 

155 } 

156 

157 return DataFrame(metrics)