Csaba Bolyos
initial commit
a31294b
raw
history blame
11.4 kB
"""
Laban Movement Analysis (LMA) inspired notation engine.
Computes movement metrics like direction, intensity, and speed from pose keypoints.
"""
import numpy as np
from typing import List, Dict, Optional, Tuple, Any
from dataclasses import dataclass
from enum import Enum
from .pose_estimation import PoseResult, Keypoint
class Direction(Enum):
"""Movement direction categories."""
UP = "up"
DOWN = "down"
LEFT = "left"
RIGHT = "right"
FORWARD = "forward"
BACKWARD = "backward"
STATIONARY = "stationary"
class Intensity(Enum):
"""Movement intensity levels."""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
class Speed(Enum):
"""Movement speed categories."""
SLOW = "slow"
MODERATE = "moderate"
FAST = "fast"
@dataclass
class MovementMetrics:
"""LMA-inspired movement metrics for a frame or segment."""
frame_index: int
timestamp: Optional[float] = None
# Primary metrics
direction: Direction = Direction.STATIONARY
intensity: Intensity = Intensity.LOW
speed: Speed = Speed.SLOW
# Numeric values
velocity: float = 0.0 # pixels/second or normalized units
acceleration: float = 0.0
# Additional qualities
fluidity: float = 0.0 # 0-1, smoothness of movement
expansion: float = 0.0 # 0-1, how spread out the pose is
# Raw displacement data
center_displacement: Optional[Tuple[float, float]] = None
total_displacement: float = 0.0
class MovementAnalyzer:
"""Analyzes pose sequences to extract LMA-style movement metrics."""
def __init__(self, fps: float = 30.0,
velocity_threshold_slow: float = 0.01,
velocity_threshold_fast: float = 0.1,
intensity_accel_threshold: float = 0.05):
"""
Initialize movement analyzer.
Args:
fps: Frames per second of the video
velocity_threshold_slow: Threshold for slow movement (normalized)
velocity_threshold_fast: Threshold for fast movement (normalized)
intensity_accel_threshold: Acceleration threshold for intensity
"""
self.fps = fps
self.frame_duration = 1.0 / fps
self.velocity_threshold_slow = velocity_threshold_slow
self.velocity_threshold_fast = velocity_threshold_fast
self.intensity_accel_threshold = intensity_accel_threshold
def analyze_movement(self, pose_sequence: List[List[PoseResult]]) -> List[MovementMetrics]:
"""
Analyze a sequence of poses to compute movement metrics.
Args:
pose_sequence: List of pose results per frame
Returns:
List of movement metrics per frame
"""
if not pose_sequence:
return []
metrics = []
prev_centers = None
prev_velocity = None
for frame_idx, frame_poses in enumerate(pose_sequence):
if not frame_poses:
# No pose detected in this frame
metrics.append(MovementMetrics(
frame_index=frame_idx,
timestamp=frame_idx * self.frame_duration
))
continue
# For now, analyze first person only
# TODO: Extend to multi-person analysis
pose = frame_poses[0]
# Compute body center and limb positions
center = self._compute_body_center(pose.keypoints)
limb_positions = self._get_limb_positions(pose.keypoints)
# Initialize metrics for this frame
frame_metrics = MovementMetrics(
frame_index=frame_idx,
timestamp=frame_idx * self.frame_duration
)
if prev_centers is not None and frame_idx > 0:
# Compute displacement and velocity
displacement = (
center[0] - prev_centers[0],
center[1] - prev_centers[1]
)
frame_metrics.center_displacement = displacement
frame_metrics.total_displacement = np.sqrt(
displacement[0]**2 + displacement[1]**2
)
# Velocity (normalized units per second)
frame_metrics.velocity = frame_metrics.total_displacement * self.fps
# Direction
frame_metrics.direction = self._compute_direction(displacement)
# Speed category
frame_metrics.speed = self._categorize_speed(frame_metrics.velocity)
# Acceleration and intensity
if prev_velocity is not None:
frame_metrics.acceleration = abs(
frame_metrics.velocity - prev_velocity
) * self.fps
frame_metrics.intensity = self._compute_intensity(
frame_metrics.acceleration,
frame_metrics.velocity
)
# Fluidity (based on acceleration smoothness)
frame_metrics.fluidity = self._compute_fluidity(
frame_metrics.acceleration
)
# Expansion (how spread out the pose is)
frame_metrics.expansion = self._compute_expansion(pose.keypoints)
metrics.append(frame_metrics)
# Update previous values
prev_centers = center
prev_velocity = frame_metrics.velocity
# Post-process to smooth metrics if needed
metrics = self._smooth_metrics(metrics)
return metrics
def _compute_body_center(self, keypoints: List[Keypoint]) -> Tuple[float, float]:
"""Compute the center of mass of the body."""
# Use major body joints for center calculation
major_joints = ["left_hip", "right_hip", "left_shoulder", "right_shoulder"]
x_coords = []
y_coords = []
for kp in keypoints:
if kp.name in major_joints and kp.confidence > 0.5:
x_coords.append(kp.x)
y_coords.append(kp.y)
if not x_coords:
# Fallback to all keypoints
x_coords = [kp.x for kp in keypoints if kp.confidence > 0.3]
y_coords = [kp.y for kp in keypoints if kp.confidence > 0.3]
if x_coords:
return (np.mean(x_coords), np.mean(y_coords))
return (0.5, 0.5) # Default center
def _get_limb_positions(self, keypoints: List[Keypoint]) -> Dict[str, Tuple[float, float]]:
"""Get positions of major limbs."""
positions = {}
for kp in keypoints:
if kp.confidence > 0.3:
positions[kp.name] = (kp.x, kp.y)
return positions
def _compute_direction(self, displacement: Tuple[float, float]) -> Direction:
"""Compute movement direction from displacement vector."""
dx, dy = displacement
# Threshold for considering movement
threshold = 0.005
if abs(dx) < threshold and abs(dy) < threshold:
return Direction.STATIONARY
# Determine primary direction
if abs(dx) > abs(dy):
return Direction.RIGHT if dx > 0 else Direction.LEFT
else:
return Direction.DOWN if dy > 0 else Direction.UP
def _categorize_speed(self, velocity: float) -> Speed:
"""Categorize velocity into speed levels."""
if velocity < self.velocity_threshold_slow:
return Speed.SLOW
elif velocity < self.velocity_threshold_fast:
return Speed.FAST
else:
return Speed.FAST
def _compute_intensity(self, acceleration: float, velocity: float) -> Intensity:
"""Compute movement intensity based on acceleration and velocity."""
# High acceleration or high velocity indicates high intensity
if acceleration > self.intensity_accel_threshold * 2 or velocity > self.velocity_threshold_fast:
return Intensity.HIGH
elif acceleration > self.intensity_accel_threshold or velocity > self.velocity_threshold_slow:
return Intensity.MEDIUM
else:
return Intensity.LOW
def _compute_fluidity(self, acceleration: float) -> float:
"""
Compute fluidity score (0-1) based on acceleration.
Lower acceleration = higher fluidity (smoother movement).
"""
# Normalize acceleration to 0-1 range
max_accel = 0.2 # Expected maximum acceleration
norm_accel = min(acceleration / max_accel, 1.0)
# Invert so low acceleration = high fluidity
return 1.0 - norm_accel
def _compute_expansion(self, keypoints: List[Keypoint]) -> float:
"""
Compute how expanded/contracted the pose is.
Returns 0-1 where 1 is fully expanded.
"""
# Calculate distances between opposite limbs
limb_pairs = [
("left_wrist", "right_wrist"),
("left_ankle", "right_ankle"),
("left_wrist", "left_ankle"),
("right_wrist", "right_ankle")
]
kp_dict = {kp.name: kp for kp in keypoints if kp.confidence > 0.3}
distances = []
for limb1, limb2 in limb_pairs:
if limb1 in kp_dict and limb2 in kp_dict:
kp1 = kp_dict[limb1]
kp2 = kp_dict[limb2]
dist = np.sqrt((kp1.x - kp2.x)**2 + (kp1.y - kp2.y)**2)
distances.append(dist)
if distances:
# Normalize by expected maximum distance
avg_dist = np.mean(distances)
max_expected = 1.4 # Diagonal of normalized space
return min(avg_dist / max_expected, 1.0)
return 0.5 # Default neutral expansion
def _smooth_metrics(self, metrics: List[MovementMetrics]) -> List[MovementMetrics]:
"""Apply smoothing to reduce noise in metrics."""
# Simple moving average for numeric values
window_size = 3
if len(metrics) <= window_size:
return metrics
# Smooth velocity and acceleration
for i in range(window_size, len(metrics)):
velocities = [m.velocity for m in metrics[i-window_size:i+1]]
metrics[i].velocity = np.mean(velocities)
accels = [m.acceleration for m in metrics[i-window_size:i+1]]
metrics[i].acceleration = np.mean(accels)
fluidities = [m.fluidity for m in metrics[i-window_size:i+1]]
metrics[i].fluidity = np.mean(fluidities)
return metrics
def analyze_pose_sequence(pose_sequence: List[List[PoseResult]],
fps: float = 30.0) -> List[MovementMetrics]:
"""
Convenience function to analyze a pose sequence.
Args:
pose_sequence: List of pose results per frame
fps: Video frame rate
Returns:
List of movement metrics
"""
analyzer = MovementAnalyzer(fps=fps)
return analyzer.analyze_movement(pose_sequence)