Update cognitive_mapping_probe/signal_analysis.py
Browse files
cognitive_mapping_probe/signal_analysis.py
CHANGED
|
@@ -1,56 +1,55 @@
|
|
| 1 |
import numpy as np
|
| 2 |
from scipy.fft import rfft, rfftfreq
|
| 3 |
-
from
|
| 4 |
-
from typing import Dict, Tuple
|
| 5 |
|
| 6 |
def analyze_cognitive_signal(
|
| 7 |
state_deltas: np.ndarray,
|
| 8 |
-
sampling_rate: float = 1.0
|
| 9 |
-
) -> Dict[str, float]:
|
| 10 |
"""
|
| 11 |
Führt eine fortgeschrittene Signalverarbeitungs-Analyse auf der Zeitreihe der
|
| 12 |
-
State Deltas durch
|
| 13 |
"""
|
| 14 |
-
if len(state_deltas) <
|
| 15 |
-
return {
|
|
|
|
|
|
|
|
|
|
| 16 |
|
| 17 |
-
# 1. Fourier-Analyse (FFT)
|
| 18 |
n = len(state_deltas)
|
| 19 |
-
yf = rfft(state_deltas - np.mean(state_deltas))
|
| 20 |
xf = rfftfreq(n, 1 / sampling_rate)
|
| 21 |
|
| 22 |
power_spectrum = np.abs(yf)**2
|
| 23 |
|
| 24 |
-
|
|
|
|
|
|
|
|
|
|
| 25 |
if len(power_spectrum) > 1:
|
| 26 |
-
#
|
| 27 |
dominant_freq_index = np.argmax(power_spectrum[1:]) + 1
|
| 28 |
dominant_frequency = xf[dominant_freq_index]
|
| 29 |
|
| 30 |
-
#
|
|
|
|
|
|
|
|
|
|
|
|
|
| 31 |
prob_dist = power_spectrum / np.sum(power_spectrum)
|
| 32 |
-
prob_dist = prob_dist[prob_dist >
|
| 33 |
spectral_entropy = -np.sum(prob_dist * np.log2(prob_dist))
|
| 34 |
-
else:
|
| 35 |
-
dominant_frequency = 0.0
|
| 36 |
-
spectral_entropy = 0.0
|
| 37 |
-
|
| 38 |
-
# 3. Hilbert-Transformation zur Phasen-Analyse (hier nur als Beispiel,
|
| 39 |
-
# da die Interpretation komplex ist und im UI schwer darstellbar)
|
| 40 |
-
# analytic_signal = hilbert(state_deltas)
|
| 41 |
-
# instantaneous_phase = np.unwrap(np.angle(analytic_signal))
|
| 42 |
-
# instantaneous_frequency = (np.diff(instantaneous_phase) / (2.0*np.pi) * sampling_rate)
|
| 43 |
|
| 44 |
return {
|
| 45 |
-
"
|
| 46 |
-
"spectral_entropy": float(spectral_entropy),
|
| 47 |
}
|
| 48 |
|
| 49 |
def get_power_spectrum_for_plotting(state_deltas: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
| 50 |
"""
|
| 51 |
Berechnet das Leistungsspektrum speziell für die Visualisierung.
|
| 52 |
"""
|
| 53 |
-
if len(state_deltas) <
|
| 54 |
return np.array([]), np.array([])
|
| 55 |
|
| 56 |
n = len(state_deltas)
|
|
|
|
| 1 |
import numpy as np
|
| 2 |
from scipy.fft import rfft, rfftfreq
|
| 3 |
+
from typing import Dict, Optional, Tuple
|
|
|
|
| 4 |
|
| 5 |
def analyze_cognitive_signal(
|
| 6 |
state_deltas: np.ndarray,
|
| 7 |
+
sampling_rate: float = 1.0
|
| 8 |
+
) -> Dict[str, Optional[float]]:
|
| 9 |
"""
|
| 10 |
Führt eine fortgeschrittene Signalverarbeitungs-Analyse auf der Zeitreihe der
|
| 11 |
+
State Deltas durch und gibt hardware-unabhängige Metriken zurück.
|
| 12 |
"""
|
| 13 |
+
if len(state_deltas) < 10:
|
| 14 |
+
return {
|
| 15 |
+
"dominant_period_steps": None,
|
| 16 |
+
"spectral_entropy": None,
|
| 17 |
+
}
|
| 18 |
|
|
|
|
| 19 |
n = len(state_deltas)
|
| 20 |
+
yf = rfft(state_deltas - np.mean(state_deltas))
|
| 21 |
xf = rfftfreq(n, 1 / sampling_rate)
|
| 22 |
|
| 23 |
power_spectrum = np.abs(yf)**2
|
| 24 |
|
| 25 |
+
dominant_frequency = None
|
| 26 |
+
spectral_entropy = None
|
| 27 |
+
dominant_period_steps = None
|
| 28 |
+
|
| 29 |
if len(power_spectrum) > 1:
|
| 30 |
+
# Finde dominante Frequenz (ohne 0-Hz)
|
| 31 |
dominant_freq_index = np.argmax(power_spectrum[1:]) + 1
|
| 32 |
dominant_frequency = xf[dominant_freq_index]
|
| 33 |
|
| 34 |
+
# FINALE KORREKTUR: Berechne die Periodendauer in "Steps"
|
| 35 |
+
if dominant_frequency > 1e-9: # Schutz vor Division durch Null
|
| 36 |
+
dominant_period_steps = 1 / dominant_frequency
|
| 37 |
+
|
| 38 |
+
# Berechne Spektrale Entropie
|
| 39 |
prob_dist = power_spectrum / np.sum(power_spectrum)
|
| 40 |
+
prob_dist = prob_dist[prob_dist > 1e-12]
|
| 41 |
spectral_entropy = -np.sum(prob_dist * np.log2(prob_dist))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 42 |
|
| 43 |
return {
|
| 44 |
+
"dominant_period_steps": float(dominant_period_steps) if dominant_period_steps is not None else None,
|
| 45 |
+
"spectral_entropy": float(spectral_entropy) if spectral_entropy is not None else None,
|
| 46 |
}
|
| 47 |
|
| 48 |
def get_power_spectrum_for_plotting(state_deltas: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
| 49 |
"""
|
| 50 |
Berechnet das Leistungsspektrum speziell für die Visualisierung.
|
| 51 |
"""
|
| 52 |
+
if len(state_deltas) < 10:
|
| 53 |
return np.array([]), np.array([])
|
| 54 |
|
| 55 |
n = len(state_deltas)
|