|
|
""" |
|
|
Laban Movement Analysis (LMA) inspired notation engine. |
|
|
Computes movement metrics like direction, intensity, and speed from pose keypoints. |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
from typing import List, Dict, Optional, Tuple, Any |
|
|
from dataclasses import dataclass |
|
|
from enum import Enum |
|
|
|
|
|
from .pose_estimation import PoseResult, Keypoint |
|
|
|
|
|
|
|
|
class Direction(Enum): |
|
|
"""Movement direction categories.""" |
|
|
UP = "up" |
|
|
DOWN = "down" |
|
|
LEFT = "left" |
|
|
RIGHT = "right" |
|
|
FORWARD = "forward" |
|
|
BACKWARD = "backward" |
|
|
STATIONARY = "stationary" |
|
|
|
|
|
|
|
|
class Intensity(Enum): |
|
|
"""Movement intensity levels.""" |
|
|
LOW = "low" |
|
|
MEDIUM = "medium" |
|
|
HIGH = "high" |
|
|
|
|
|
|
|
|
class Speed(Enum): |
|
|
"""Movement speed categories.""" |
|
|
SLOW = "slow" |
|
|
MODERATE = "moderate" |
|
|
FAST = "fast" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class MovementMetrics: |
|
|
"""LMA-inspired movement metrics for a frame or segment.""" |
|
|
frame_index: int |
|
|
timestamp: Optional[float] = None |
|
|
|
|
|
|
|
|
direction: Direction = Direction.STATIONARY |
|
|
intensity: Intensity = Intensity.LOW |
|
|
speed: Speed = Speed.SLOW |
|
|
|
|
|
|
|
|
velocity: float = 0.0 |
|
|
acceleration: float = 0.0 |
|
|
|
|
|
|
|
|
fluidity: float = 0.0 |
|
|
expansion: float = 0.0 |
|
|
|
|
|
|
|
|
center_displacement: Optional[Tuple[float, float]] = None |
|
|
total_displacement: float = 0.0 |
|
|
|
|
|
|
|
|
class MovementAnalyzer: |
|
|
"""Analyzes pose sequences to extract LMA-style movement metrics.""" |
|
|
|
|
|
def __init__(self, fps: float = 30.0, |
|
|
velocity_threshold_slow: float = 0.01, |
|
|
velocity_threshold_fast: float = 0.1, |
|
|
intensity_accel_threshold: float = 0.05): |
|
|
""" |
|
|
Initialize movement analyzer. |
|
|
|
|
|
Args: |
|
|
fps: Frames per second of the video |
|
|
velocity_threshold_slow: Threshold for slow movement (normalized) |
|
|
velocity_threshold_fast: Threshold for fast movement (normalized) |
|
|
intensity_accel_threshold: Acceleration threshold for intensity |
|
|
""" |
|
|
self.fps = fps |
|
|
self.frame_duration = 1.0 / fps |
|
|
self.velocity_threshold_slow = velocity_threshold_slow |
|
|
self.velocity_threshold_fast = velocity_threshold_fast |
|
|
self.intensity_accel_threshold = intensity_accel_threshold |
|
|
|
|
|
def analyze_movement(self, pose_sequence: List[List[PoseResult]]) -> List[MovementMetrics]: |
|
|
""" |
|
|
Analyze a sequence of poses to compute movement metrics. |
|
|
|
|
|
Args: |
|
|
pose_sequence: List of pose results per frame |
|
|
|
|
|
Returns: |
|
|
List of movement metrics per frame |
|
|
""" |
|
|
if not pose_sequence: |
|
|
return [] |
|
|
|
|
|
metrics = [] |
|
|
prev_centers = None |
|
|
prev_velocity = None |
|
|
|
|
|
for frame_idx, frame_poses in enumerate(pose_sequence): |
|
|
if not frame_poses: |
|
|
|
|
|
metrics.append(MovementMetrics( |
|
|
frame_index=frame_idx, |
|
|
timestamp=frame_idx * self.frame_duration |
|
|
)) |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
pose = frame_poses[0] |
|
|
|
|
|
|
|
|
center = self._compute_body_center(pose.keypoints) |
|
|
limb_positions = self._get_limb_positions(pose.keypoints) |
|
|
|
|
|
|
|
|
frame_metrics = MovementMetrics( |
|
|
frame_index=frame_idx, |
|
|
timestamp=frame_idx * self.frame_duration |
|
|
) |
|
|
|
|
|
if prev_centers is not None and frame_idx > 0: |
|
|
|
|
|
displacement = ( |
|
|
center[0] - prev_centers[0], |
|
|
center[1] - prev_centers[1] |
|
|
) |
|
|
frame_metrics.center_displacement = displacement |
|
|
frame_metrics.total_displacement = np.sqrt( |
|
|
displacement[0]**2 + displacement[1]**2 |
|
|
) |
|
|
|
|
|
|
|
|
frame_metrics.velocity = frame_metrics.total_displacement * self.fps |
|
|
|
|
|
|
|
|
frame_metrics.direction = self._compute_direction(displacement) |
|
|
|
|
|
|
|
|
frame_metrics.speed = self._categorize_speed(frame_metrics.velocity) |
|
|
|
|
|
|
|
|
if prev_velocity is not None: |
|
|
frame_metrics.acceleration = abs( |
|
|
frame_metrics.velocity - prev_velocity |
|
|
) * self.fps |
|
|
frame_metrics.intensity = self._compute_intensity( |
|
|
frame_metrics.acceleration, |
|
|
frame_metrics.velocity |
|
|
) |
|
|
|
|
|
|
|
|
frame_metrics.fluidity = self._compute_fluidity( |
|
|
frame_metrics.acceleration |
|
|
) |
|
|
|
|
|
|
|
|
frame_metrics.expansion = self._compute_expansion(pose.keypoints) |
|
|
|
|
|
metrics.append(frame_metrics) |
|
|
|
|
|
|
|
|
prev_centers = center |
|
|
prev_velocity = frame_metrics.velocity |
|
|
|
|
|
|
|
|
metrics = self._smooth_metrics(metrics) |
|
|
|
|
|
return metrics |
|
|
|
|
|
def _compute_body_center(self, keypoints: List[Keypoint]) -> Tuple[float, float]: |
|
|
"""Compute the center of mass of the body.""" |
|
|
|
|
|
major_joints = ["left_hip", "right_hip", "left_shoulder", "right_shoulder"] |
|
|
|
|
|
x_coords = [] |
|
|
y_coords = [] |
|
|
|
|
|
for kp in keypoints: |
|
|
if kp.name in major_joints and kp.confidence > 0.5: |
|
|
x_coords.append(kp.x) |
|
|
y_coords.append(kp.y) |
|
|
|
|
|
if not x_coords: |
|
|
|
|
|
x_coords = [kp.x for kp in keypoints if kp.confidence > 0.3] |
|
|
y_coords = [kp.y for kp in keypoints if kp.confidence > 0.3] |
|
|
|
|
|
if x_coords: |
|
|
return (np.mean(x_coords), np.mean(y_coords)) |
|
|
return (0.5, 0.5) |
|
|
|
|
|
def _get_limb_positions(self, keypoints: List[Keypoint]) -> Dict[str, Tuple[float, float]]: |
|
|
"""Get positions of major limbs.""" |
|
|
positions = {} |
|
|
for kp in keypoints: |
|
|
if kp.confidence > 0.3: |
|
|
positions[kp.name] = (kp.x, kp.y) |
|
|
return positions |
|
|
|
|
|
def _compute_direction(self, displacement: Tuple[float, float]) -> Direction: |
|
|
"""Compute movement direction from displacement vector.""" |
|
|
dx, dy = displacement |
|
|
|
|
|
|
|
|
threshold = 0.005 |
|
|
|
|
|
if abs(dx) < threshold and abs(dy) < threshold: |
|
|
return Direction.STATIONARY |
|
|
|
|
|
|
|
|
if abs(dx) > abs(dy): |
|
|
return Direction.RIGHT if dx > 0 else Direction.LEFT |
|
|
else: |
|
|
return Direction.DOWN if dy > 0 else Direction.UP |
|
|
|
|
|
def _categorize_speed(self, velocity: float) -> Speed: |
|
|
"""Categorize velocity into speed levels.""" |
|
|
if velocity < self.velocity_threshold_slow: |
|
|
return Speed.SLOW |
|
|
elif velocity < self.velocity_threshold_fast: |
|
|
return Speed.FAST |
|
|
else: |
|
|
return Speed.FAST |
|
|
|
|
|
def _compute_intensity(self, acceleration: float, velocity: float) -> Intensity: |
|
|
"""Compute movement intensity based on acceleration and velocity.""" |
|
|
|
|
|
if acceleration > self.intensity_accel_threshold * 2 or velocity > self.velocity_threshold_fast: |
|
|
return Intensity.HIGH |
|
|
elif acceleration > self.intensity_accel_threshold or velocity > self.velocity_threshold_slow: |
|
|
return Intensity.MEDIUM |
|
|
else: |
|
|
return Intensity.LOW |
|
|
|
|
|
def _compute_fluidity(self, acceleration: float) -> float: |
|
|
""" |
|
|
Compute fluidity score (0-1) based on acceleration. |
|
|
Lower acceleration = higher fluidity (smoother movement). |
|
|
""" |
|
|
|
|
|
max_accel = 0.2 |
|
|
norm_accel = min(acceleration / max_accel, 1.0) |
|
|
|
|
|
|
|
|
return 1.0 - norm_accel |
|
|
|
|
|
def _compute_expansion(self, keypoints: List[Keypoint]) -> float: |
|
|
""" |
|
|
Compute how expanded/contracted the pose is. |
|
|
Returns 0-1 where 1 is fully expanded. |
|
|
""" |
|
|
|
|
|
limb_pairs = [ |
|
|
("left_wrist", "right_wrist"), |
|
|
("left_ankle", "right_ankle"), |
|
|
("left_wrist", "left_ankle"), |
|
|
("right_wrist", "right_ankle") |
|
|
] |
|
|
|
|
|
kp_dict = {kp.name: kp for kp in keypoints if kp.confidence > 0.3} |
|
|
|
|
|
distances = [] |
|
|
for limb1, limb2 in limb_pairs: |
|
|
if limb1 in kp_dict and limb2 in kp_dict: |
|
|
kp1 = kp_dict[limb1] |
|
|
kp2 = kp_dict[limb2] |
|
|
dist = np.sqrt((kp1.x - kp2.x)**2 + (kp1.y - kp2.y)**2) |
|
|
distances.append(dist) |
|
|
|
|
|
if distances: |
|
|
|
|
|
avg_dist = np.mean(distances) |
|
|
max_expected = 1.4 |
|
|
return min(avg_dist / max_expected, 1.0) |
|
|
|
|
|
return 0.5 |
|
|
|
|
|
def _smooth_metrics(self, metrics: List[MovementMetrics]) -> List[MovementMetrics]: |
|
|
"""Apply smoothing to reduce noise in metrics.""" |
|
|
|
|
|
window_size = 3 |
|
|
|
|
|
if len(metrics) <= window_size: |
|
|
return metrics |
|
|
|
|
|
|
|
|
for i in range(window_size, len(metrics)): |
|
|
velocities = [m.velocity for m in metrics[i-window_size:i+1]] |
|
|
metrics[i].velocity = np.mean(velocities) |
|
|
|
|
|
accels = [m.acceleration for m in metrics[i-window_size:i+1]] |
|
|
metrics[i].acceleration = np.mean(accels) |
|
|
|
|
|
fluidities = [m.fluidity for m in metrics[i-window_size:i+1]] |
|
|
metrics[i].fluidity = np.mean(fluidities) |
|
|
|
|
|
return metrics |
|
|
|
|
|
|
|
|
def analyze_pose_sequence(pose_sequence: List[List[PoseResult]], |
|
|
fps: float = 30.0) -> List[MovementMetrics]: |
|
|
""" |
|
|
Convenience function to analyze a pose sequence. |
|
|
|
|
|
Args: |
|
|
pose_sequence: List of pose results per frame |
|
|
fps: Video frame rate |
|
|
|
|
|
Returns: |
|
|
List of movement metrics |
|
|
""" |
|
|
analyzer = MovementAnalyzer(fps=fps) |
|
|
return analyzer.analyze_movement(pose_sequence) |