File size: 15,223 Bytes
a31294b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
"""
Visualizer for creating annotated videos with pose overlays and movement indicators.
"""

import cv2
import numpy as np
from typing import List, Tuple, Optional, Dict, Any
from collections import deque
import colorsys

from .pose_estimation import PoseResult, Keypoint
from .notation_engine import MovementMetrics, Direction, Intensity, Speed


class PoseVisualizer:
    """Creates visual overlays for pose and movement analysis."""
    
    # COCO skeleton connections for visualization
    COCO_SKELETON = [
        # Face
        (0, 1), (0, 2), (1, 3), (2, 4),  # nose to eyes, eyes to ears
        # Upper body
        (5, 6),  # shoulders
        (5, 7), (7, 9),  # left arm
        (6, 8), (8, 10),  # right arm
        (5, 11), (6, 12),  # shoulders to hips
        # Lower body
        (11, 12),  # hips
        (11, 13), (13, 15),  # left leg
        (12, 14), (14, 16),  # right leg
    ]
    
    # MediaPipe skeleton connections (33 landmarks)
    MEDIAPIPE_SKELETON = [
        # Face connections
        (0, 1), (1, 2), (2, 3), (3, 7),  # left eye region
        (0, 4), (4, 5), (5, 6), (6, 8),  # right eye region
        (9, 10),  # mouth
        # Upper body
        (11, 12),  # shoulders
        (11, 13), (13, 15),  # left arm
        (12, 14), (14, 16),  # right arm
        (11, 23), (12, 24),  # shoulders to hips
        (23, 24),  # hips
        # Lower body
        (23, 25), (25, 27), (27, 29), (27, 31),  # left leg
        (24, 26), (26, 28), (28, 30), (28, 32),  # right leg
        # Hands
        (15, 17), (15, 19), (15, 21),  # left hand
        (16, 18), (16, 20), (16, 22),  # right hand
    ]
    
    def __init__(self, 
                 trail_length: int = 10,
                 show_skeleton: bool = True,
                 show_trails: bool = True,
                 show_direction_arrows: bool = True,
                 show_metrics: bool = True):
        """
        Initialize visualizer.
        
        Args:
            trail_length: Number of previous frames to show in motion trail
            show_skeleton: Whether to draw pose skeleton
            show_trails: Whether to draw motion trails
            show_direction_arrows: Whether to show movement direction arrows
            show_metrics: Whether to display text metrics on frame
        """
        self.trail_length = trail_length
        self.show_skeleton = show_skeleton
        self.show_trails = show_trails
        self.show_direction_arrows = show_direction_arrows
        self.show_metrics = show_metrics
        
        # Trail history for each keypoint
        self.trails = {}
        
        # Color mapping for intensity
        self.intensity_colors = {
            Intensity.LOW: (0, 255, 0),      # Green
            Intensity.MEDIUM: (0, 165, 255),  # Orange
            Intensity.HIGH: (0, 0, 255)       # Red
        }
    
    def visualize_frame(self,
                       frame: np.ndarray,
                       pose_results: List[PoseResult],
                       movement_metrics: Optional[MovementMetrics] = None,
                       frame_index: int = 0) -> np.ndarray:
        """
        Add visual annotations to a single frame.
        
        Args:
            frame: Input frame
            pose_results: Pose detection results for this frame
            movement_metrics: Movement analysis metrics for this frame
            frame_index: Current frame index
            
        Returns:
            Annotated frame
        """
        # Create a copy to avoid modifying original
        vis_frame = frame.copy()
        
        # Draw for each detected person
        for person_idx, pose in enumerate(pose_results):
            # Update trails
            if self.show_trails:
                self._update_trails(pose, person_idx)
                self._draw_trails(vis_frame, person_idx)
            
            # Draw skeleton
            if self.show_skeleton:
                color = self._get_color_for_metrics(movement_metrics)
                self._draw_skeleton(vis_frame, pose, color)
            
            # Draw keypoints
            self._draw_keypoints(vis_frame, pose, movement_metrics)
            
            # Draw direction arrow
            if self.show_direction_arrows and movement_metrics:
                self._draw_direction_arrow(vis_frame, pose, movement_metrics)
        
        # Draw metrics overlay
        if self.show_metrics and movement_metrics:
            self._draw_metrics_overlay(vis_frame, movement_metrics)
        
        return vis_frame
    
    def generate_overlay_video(self,
                              frames: List[np.ndarray],
                              all_pose_results: List[List[PoseResult]],
                              all_movement_metrics: List[MovementMetrics],
                              output_path: str,
                              fps: float) -> str:
        """
        Generate complete video with overlays.
        
        Args:
            frames: List of video frames
            all_pose_results: Pose results for each frame
            all_movement_metrics: Movement metrics for each frame
            output_path: Path for output video
            fps: Frames per second
            
        Returns:
            Path to created video
        """
        if len(frames) != len(all_pose_results) or len(frames) != len(all_movement_metrics):
            raise ValueError("Mismatched lengths between frames, poses, and metrics")
        
        # Reset trails
        self.trails = {}
        
        # Process each frame
        annotated_frames = []
        for i, (frame, poses, metrics) in enumerate(
            zip(frames, all_pose_results, all_movement_metrics)
        ):
            annotated_frame = self.visualize_frame(frame, poses, metrics, i)
            annotated_frames.append(annotated_frame)
        
        # Import video_utils locally to avoid circular import
        from . import video_utils
        return video_utils.assemble_video(annotated_frames, output_path, fps)
    
    def _update_trails(self, pose: PoseResult, person_id: int):
        """Update motion trails for a person."""
        if person_id not in self.trails:
            self.trails[person_id] = {}
        
        for kp in pose.keypoints:
            if kp.confidence < 0.3:
                continue
                
            if kp.name not in self.trails[person_id]:
                self.trails[person_id][kp.name] = deque(maxlen=self.trail_length)
            
            # Convert normalized coordinates to pixel coordinates
            # This assumes we'll scale them when drawing
            self.trails[person_id][kp.name].append((kp.x, kp.y))
    
    def _draw_trails(self, frame: np.ndarray, person_id: int):
        """Draw motion trails for a person."""
        if person_id not in self.trails:
            return
        
        h, w = frame.shape[:2]
        
        for joint_name, trail in self.trails[person_id].items():
            if len(trail) < 2:
                continue
            
            # Draw trail with fading effect
            for i in range(1, len(trail)):
                # Calculate opacity based on position in trail
                alpha = i / len(trail)
                color = tuple(int(c * alpha) for c in (255, 255, 255))
                
                # Convert normalized to pixel coordinates
                pt1 = (int(trail[i-1][0] * w), int(trail[i-1][1] * h))
                pt2 = (int(trail[i][0] * w), int(trail[i][1] * h))
                
                # Draw trail segment
                cv2.line(frame, pt1, pt2, color, thickness=max(1, int(3 * alpha)))
    
    def _draw_skeleton(self, frame: np.ndarray, pose: PoseResult, color: Tuple[int, int, int]):
        """Draw pose skeleton."""
        h, w = frame.shape[:2]
        
        # Create keypoint lookup
        kp_dict = {kp.name: kp for kp in pose.keypoints if kp.confidence > 0.3}
        
        # Determine which skeleton to use based on available keypoints
        skeleton = self._get_skeleton_for_model(pose.keypoints)
        
        # Map keypoint names to indices
        keypoint_names = self._get_keypoint_names_for_model(pose.keypoints)
        name_to_idx = {name: i for i, name in enumerate(keypoint_names)}
        
        # Draw skeleton connections
        for connection in skeleton:
            idx1, idx2 = connection
            if idx1 < len(keypoint_names) and idx2 < len(keypoint_names):
                name1 = keypoint_names[idx1]
                name2 = keypoint_names[idx2]
                
                if name1 in kp_dict and name2 in kp_dict:
                    kp1 = kp_dict[name1]
                    kp2 = kp_dict[name2]
                    
                    # Convert to pixel coordinates
                    pt1 = (int(kp1.x * w), int(kp1.y * h))
                    pt2 = (int(kp2.x * w), int(kp2.y * h))
                    
                    # Draw line
                    cv2.line(frame, pt1, pt2, color, thickness=2)
    
    def _draw_keypoints(self, frame: np.ndarray, pose: PoseResult, 
                       metrics: Optional[MovementMetrics] = None):
        """Draw individual keypoints."""
        h, w = frame.shape[:2]
        
        for kp in pose.keypoints:
            if kp.confidence < 0.3:
                continue
            
            # Convert to pixel coordinates
            pt = (int(kp.x * w), int(kp.y * h))
            
            # Color based on confidence
            color = self._confidence_to_color(kp.confidence)
            
            # Draw keypoint
            cv2.circle(frame, pt, 4, color, -1)
            cv2.circle(frame, pt, 5, (255, 255, 255), 1)  # White border
    
    def _draw_direction_arrow(self, frame: np.ndarray, pose: PoseResult,
                             metrics: MovementMetrics):
        """Draw arrow indicating movement direction."""
        if metrics.direction == Direction.STATIONARY:
            return
        
        h, w = frame.shape[:2]
        
        # Get body center
        center_x = np.mean([kp.x for kp in pose.keypoints if kp.confidence > 0.3])
        center_y = np.mean([kp.y for kp in pose.keypoints if kp.confidence > 0.3])
        
        # Convert to pixel coordinates
        center = (int(center_x * w), int(center_y * h))
        
        # Calculate arrow endpoint based on direction
        arrow_length = 50
        direction_vectors = {
            Direction.UP: (0, -1),
            Direction.DOWN: (0, 1),
            Direction.LEFT: (-1, 0),
            Direction.RIGHT: (1, 0),
        }
        
        if metrics.direction in direction_vectors:
            dx, dy = direction_vectors[metrics.direction]
            end_point = (
                center[0] + int(dx * arrow_length),
                center[1] + int(dy * arrow_length)
            )
            
            # Color based on speed
            color = self._get_color_for_metrics(metrics)
            
            # Draw arrow
            cv2.arrowedLine(frame, center, end_point, color, thickness=3, tipLength=0.3)
    
    def _draw_metrics_overlay(self, frame: np.ndarray, metrics: MovementMetrics):
        """Draw text overlay with movement metrics."""
        # Define text properties
        font = cv2.FONT_HERSHEY_SIMPLEX
        font_scale = 0.6
        thickness = 2
        
        # Create text lines
        lines = [
            f"Direction: {metrics.direction.value}",
            f"Speed: {metrics.speed.value} ({metrics.velocity:.2f})",
            f"Intensity: {metrics.intensity.value}",
            f"Fluidity: {metrics.fluidity:.2f}",
            f"Expansion: {metrics.expansion:.2f}"
        ]
        
        # Draw background rectangle
        y_offset = 30
        max_width = max([cv2.getTextSize(line, font, font_scale, thickness)[0][0] 
                        for line in lines])
        bg_height = len(lines) * 25 + 10
        
        cv2.rectangle(frame, (10, 10), (20 + max_width, 10 + bg_height),
                     (0, 0, 0), -1)
        cv2.rectangle(frame, (10, 10), (20 + max_width, 10 + bg_height),
                     (255, 255, 255), 1)
        
        # Draw text
        for i, line in enumerate(lines):
            color = (255, 255, 255)
            if i == 2:  # Intensity line
                color = self.intensity_colors.get(metrics.intensity, (255, 255, 255))
            
            cv2.putText(frame, line, (15, y_offset + i * 25),
                       font, font_scale, color, thickness)
    
    def _get_color_for_metrics(self, metrics: Optional[MovementMetrics]) -> Tuple[int, int, int]:
        """Get color based on movement metrics."""
        if metrics is None:
            return (255, 255, 255)  # White default
        
        return self.intensity_colors.get(metrics.intensity, (255, 255, 255))
    
    def _confidence_to_color(self, confidence: float) -> Tuple[int, int, int]:
        """Convert confidence score to color (green=high, red=low)."""
        # Use HSV color space for smooth gradient
        hue = confidence * 120  # 0=red, 120=green
        rgb = colorsys.hsv_to_rgb(hue / 360, 1.0, 1.0)
        return tuple(int(c * 255) for c in reversed(rgb))  # BGR for OpenCV
    
    def _get_skeleton_for_model(self, keypoints: List[Keypoint]) -> List[Tuple[int, int]]:
        """Determine which skeleton definition to use based on keypoints."""
        # Simple heuristic: if we have more than 20 keypoints, use MediaPipe skeleton
        if len(keypoints) > 20:
            return self.MEDIAPIPE_SKELETON
        return self.COCO_SKELETON
    
    def _get_keypoint_names_for_model(self, keypoints: List[Keypoint]) -> List[str]:
        """Get ordered list of keypoint names for the model."""
        # If keypoints have names, use them
        if keypoints and keypoints[0].name:
            return [kp.name for kp in keypoints]
        
        # Otherwise, use default COCO names
        from .pose_estimation import MoveNetPoseEstimator
        return MoveNetPoseEstimator.KEYPOINT_NAMES


def create_visualization(
    video_path: str,
    pose_results: List[List[PoseResult]],
    movement_metrics: List[MovementMetrics],
    output_path: str,
    show_trails: bool = True,
    show_metrics: bool = True
) -> str:
    """
    Convenience function to create a visualization from a video file.
    
    Args:
        video_path: Path to input video
        pose_results: Pose detection results
        movement_metrics: Movement analysis results
        output_path: Path for output video
        show_trails: Whether to show motion trails
        show_metrics: Whether to show metrics overlay
        
    Returns:
        Path to created video
    """
    from . import video_utils
    
    # Extract frames
    frames = list(video_utils.extract_frames(video_path))
    
    # Get video info
    _, fps, _ = video_utils.get_video_info(video_path)
    
    # Create visualizer
    visualizer = PoseVisualizer(
        show_trails=show_trails,
        show_metrics=show_metrics
    )
    
    # Generate overlay video
    return visualizer.generate_overlay_video(
        frames, pose_results, movement_metrics, output_path, fps
    )