Coverage for physiodsp / balance_tests / sway.py: 100%
61 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-12 11:20 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-04-12 11:20 +0000
1import numpy as np
2from pandas import DataFrame
3from pydantic import BaseModel, Field, PositiveInt, PositiveFloat
4from scipy.signal import butter, sosfiltfilt
6from physiodsp.base import BaseAlgorithm
7from physiodsp.sensors.imu.accelerometer import AccelerometerData
9CHI_SQUARED = 5.991465 # 95% chi-square quantile for 2 degrees of freedom
10ROUND_DIGITS = 6
13class SwaySettings(BaseModel):
15 filter_order: PositiveInt = Field(default=4, description="Butterworth filter order for low-pass filtering")
17 filter_high_freq: PositiveFloat = Field(default=2.5, description="Cutoff frequency in Hz")
20class Sway(BaseAlgorithm):
22 _algorithm_name = "Sway"
23 _version = "0.1.0"
25 def __init__(self, settings: SwaySettings = SwaySettings()) -> None:
26 self.settings = settings
27 return None
29 def run(self, accelerometer: AccelerometerData, sensor_height: float):
30 """Run the sway analysis on accelerometer data recorded during a balance test.
32 Medio-lateral (ML) displacement is estimated from the X axis of the accelerometer,
33 while antero-posterior (AP) displacement is estimated from the Z axis. Both
34 displacements are approximated by multiplying the respective axis signal by the
35 sensor height (small-angle approximation). The signals are then low-pass filtered,
36 mean-centred, and used to extract stabilometric indices and a 95% confidence ellipse.
38 Args:
39 accelerometer (AccelerometerData): Triaxial accelerometer data. The X axis must
40 be aligned with the medio-lateral direction and the Z axis with the
41 antero-posterior direction.
42 sensor_height (float): Height of the sensor above the ground in meters, used to
43 convert acceleration to linear displacement.
45 Returns:
46 Sway: The instance itself with the `biomarker` attribute set to a DataFrame
47 containing stabilometric indices (average distance, RMS distance, total
48 distance, average velocity) for the full, ML, and AP paths, joined with
49 the 95% confidence ellipse metrics (area, angle, semi-major and semi-minor
50 axes), and metadata columns (timestamp_start, timestamp_end, duration_s).
51 """
53 # Medio-lateral and Antero-posterior displacements in meters
54 ml_disp = np.dot(accelerometer.x, sensor_height)
55 ap_disp = np.dot(accelerometer.z, sensor_height)
57 sos = butter(self.settings.filter_order,
58 self.settings.filter_high_freq,
59 btype='low',
60 fs=accelerometer.fs,
61 output='sos')
63 ml_disp_filt = sosfiltfilt(sos, ml_disp)
64 ap_disp_filt = sosfiltfilt(sos, ap_disp)
66 ml_offset = np.round(np.mean(ml_disp_filt), ROUND_DIGITS)
67 ap_offset = np.round(np.mean(ap_disp_filt), ROUND_DIGITS)
69 ml_disp_no_mean = ml_disp_filt - ml_offset
71 ap_disp_no_mean = ap_disp_filt - ap_offset
73 metrics_df = self.__index_extraction(ml_disp_no_mean, ap_disp_no_mean, accelerometer.fs)
75 ellipse_metrics_df = self.__get_ellipse_area(ml_disp_no_mean, ap_disp_no_mean)
77 metrics_df = metrics_df.join(ellipse_metrics_df)
79 metrics_df.insert(1, "timestamp_start", accelerometer.timestamps[0])
80 metrics_df.insert(2, "timestamp_end", accelerometer.timestamps[-1])
81 metrics_df.insert(3, "duration_s", (accelerometer.timestamps[-1] - accelerometer.timestamps[0]))
83 self.biomarker = metrics_df.copy()
85 return self
87 def __index_extraction(self, ml: np.ndarray, ap: np.ndarray, fs: int):
88 """Extract sway metrics from medio-lateral and antero-posterior displacement signals.
90 Args:
91 ml (np.ndarray): Medio-lateral displacement signal in meters.
92 ap (np.ndarray): Antero-posterior displacement signal in meters.
93 fs (int): Sampling frequency in Hz.
95 Returns:
96 DataFrame: Table of sway indices (average distance, RMS distance, total distance,
97 average velocity) for the full path, ML path, and AP path.
98 """
100 metrics = {
101 "index": ["full_path", "ml_path", "ap_path"],
102 "average_distance_m": [],
103 "rms_distance_m": [],
104 "total_distance_m": [],
105 "average_velocity_m_s": []
106 }
108 rd = np.sqrt(ml**2 + ap**2)
110 metrics["average_distance_m"].append(np.round(np.mean(rd), ROUND_DIGITS))
111 metrics["average_distance_m"].append(np.round(np.mean(np.abs(ml)), ROUND_DIGITS))
112 metrics["average_distance_m"].append(np.round(np.mean(np.abs(ap)), ROUND_DIGITS))
114 metrics["rms_distance_m"].append(np.round(np.sqrt(np.sum(rd**2) / len(ml)), ROUND_DIGITS))
115 metrics["rms_distance_m"].append(np.round(np.sqrt(np.sum(ml**2) / len(ml)), ROUND_DIGITS))
116 metrics["rms_distance_m"].append(np.round(np.sqrt(np.sum(ap**2) / len(ap)), ROUND_DIGITS))
118 metrics["total_distance_m"].append(np.round(np.sum(np.sqrt((np.diff(ml))**2 + (np.diff(ap))**2)), ROUND_DIGITS))
119 metrics["total_distance_m"].append(np.round(np.sum(np.abs(np.diff(ml))), ROUND_DIGITS))
120 metrics["total_distance_m"].append(np.round(np.sum(np.abs(np.diff(ap))), ROUND_DIGITS))
122 metrics["average_velocity_m_s"].append(np.round(metrics["total_distance_m"][0] / (len(ml) / fs), ROUND_DIGITS))
123 metrics["average_velocity_m_s"].append(np.round(metrics["total_distance_m"][1] / (len(ml) / fs), ROUND_DIGITS))
124 metrics["average_velocity_m_s"].append(np.round(metrics["total_distance_m"][2] / (len(ap) / fs), ROUND_DIGITS))
126 return DataFrame(metrics)
128 def __get_ellipse_area(self, ml: np.ndarray, ap: np.ndarray):
129 """Calculate the area of the ellipse representing the sway.
131 Args:
132 ml (np.ndarray): Medio-lateral displacement signal in meters.
133 ap (np.ndarray): Antero-posterior displacement signal in meters.
135 Returns:
136 DataFrame: Table of ellipse metrics (area, angle, semi-major axis, semi-minor axis).
137 """
139 m = np.vstack((ml, ap))
141 sigma = np.cov(m)
143 U, S, V = np.linalg.svd(sigma)
145 ellipse_area = np.round(np.pi * CHI_SQUARED * np.sqrt(S[0] * S[1]), ROUND_DIGITS)
146 theta = np.round(np.arctan(U[1][0] / U[0][0]), ROUND_DIGITS)
147 a = np.round(np.sqrt(S[0] * CHI_SQUARED), ROUND_DIGITS)
148 b = np.round(np.sqrt(S[1] * CHI_SQUARED), ROUND_DIGITS)
150 metrics = {
151 "ellipse_area_m2": [ellipse_area],
152 "theta_rad": [theta],
153 "a_m": [a],
154 "b_m": [b]
155 }
157 return DataFrame(metrics)