mathpulse-api-v3test / automation_engine.py
github-actions[bot]
🚀 Auto-deploy backend from GitHub (8a5232f)
0b521f1
"""
MathPulse AI - Event-Driven Automation Engine
Processes educational workflows based on a diagnostic-first, risk-driven
intervention model. Trigger points:
1. Diagnostic Assessment Completion (highest priority)
2. Quiz / Assessment Submission (continuous)
3. New Student Enrollment
4. External Data Import (teacher action)
5. Admin Content Updates
Each event is routed to a dedicated handler that orchestrates
classification, quiz generation, notifications and dashboard updates.
"""
import math
import logging
from typing import List, Optional, Dict, Any
from pydantic import BaseModel, Field
logger = logging.getLogger("mathpulse.automation")
# ─── Constants ──────────────────────────────────────────────────
AT_RISK_THRESHOLD = 60 # < 60 % → At Risk
WEAK_TOPIC_THRESHOLD = 0.50 # < 50 % accuracy → weak topic
HIGH_RISK_RATIO = 0.75 # 75 %+ subjects at risk
MEDIUM_RISK_RATIO = 0.50 # 50-75 %
REMEDIAL_CONFIG = {
"High": {"questions": 15, "dist": {"easy": 60, "medium": 30, "hard": 10}},
"Medium": {"questions": 12, "dist": {"easy": 50, "medium": 35, "hard": 15}},
"Low": {"questions": 10, "dist": {"easy": 40, "medium": 40, "hard": 20}},
}
# ─── Request / Response Models ──────────────────────────────────
class DiagnosticResult(BaseModel):
"""Per-subject score from diagnostic assessment."""
subject: str
score: float = Field(..., ge=0, le=100)
class DiagnosticCompletionPayload(BaseModel):
"""Payload sent when a student completes the diagnostic."""
studentId: str
results: List[DiagnosticResult]
gradeLevel: str = "Grade 10"
questionBreakdown: Optional[Dict[str, list]] = None # topic → [{correct: bool, …}]
class QuizSubmissionPayload(BaseModel):
"""Payload sent on quiz / assessment submission."""
studentId: str
quizId: str
subject: str
score: float = Field(..., ge=0, le=100)
totalQuestions: int
correctAnswers: int
timeSpentSeconds: int
answers: Optional[List[Dict[str, Any]]] = None
class StudentEnrollmentPayload(BaseModel):
"""Payload sent when a new student account is created."""
studentId: str
name: str
email: str
gradeLevel: str = "Grade 10"
teacherId: Optional[str] = None
class DataImportPayload(BaseModel):
"""Payload sent after a teacher uploads a spreadsheet."""
teacherId: str
students: List[Dict[str, Any]] # parsed student rows
columnMapping: Dict[str, str]
class ContentUpdatePayload(BaseModel):
"""Payload sent when admin performs CRUD on curriculum."""
adminId: str
action: str # create | update | delete
contentType: str # lesson | quiz | module | subject
contentId: str
subjectId: Optional[str] = None
details: Optional[str] = None
# ─── Risk classification helpers ─────────────────────────────────
class SubjectRiskClassification(BaseModel):
status: str # "At Risk" | "On Track"
score: float
confidence: float
needsIntervention: bool
class AutomationResult(BaseModel):
"""Standardised result returned by every handler."""
success: bool
event: str
studentId: Optional[str] = None
message: str
riskClassifications: Optional[Dict[str, Dict[str, Any]]] = None
overallRisk: Optional[str] = None
atRiskSubjects: Optional[List[str]] = None
weakTopics: Optional[List[Dict[str, Any]]] = None
learningPath: Optional[str] = None
remedialQuizzesCreated: int = 0
interventions: Optional[str] = None
notifications: List[str] = Field(default_factory=list)
# ─── Automation Engine ──────────────────────────────────────────
class MathPulseAutomationEngine:
"""
Stateless event-driven automation system.
Each ``handle_*`` method is an independent, self-contained handler that
receives a validated Pydantic payload and returns an ``AutomationResult``.
Firebase / Hugging Face calls are only attempted when available.
"""
# ────────────────────────────────────────────────────────────
# 1. DIAGNOSTIC COMPLETION (highest-priority)
# ────────────────────────────────────────────────────────────
async def handle_diagnostic_completion(
self, payload: DiagnosticCompletionPayload
) -> AutomationResult:
"""
Runs when a student completes the mandatory diagnostic.
Steps:
1. Classify per-subject risk
2. Identify weak topics
3. Compute overall risk
4. Generate personalised learning path (AI)
5. Create remedial quiz assignments
6. Generate teacher intervention recommendations (AI)
7. Persist everything & notify
"""
student_id = payload.studentId
logger.info(f"📊 DIAGNOSTIC COMPLETED for {student_id}")
notifications: list[str] = []
# 1 — subject-level risk
risk_classifications = self._classify_subject_risks(payload.results)
# 2 — weak topics
weak_topics = self._identify_weak_topics(payload.questionBreakdown)
# 3 — overall risk
overall_risk = self._calculate_overall_risk(risk_classifications)
at_risk_subjects = [
subj for subj, data in risk_classifications.items()
if data["status"] == "At Risk"
]
# 4 — learning path (AI call)
learning_path: Optional[str] = None
if at_risk_subjects:
learning_path = await self._generate_learning_path(
at_risk_subjects, weak_topics, payload.gradeLevel
)
# 5 — remedial quizzes
remedial_count = 0
remedial_quizzes: list[dict] = []
if at_risk_subjects:
remedial_quizzes = self._build_remedial_quiz_configs(
student_id, at_risk_subjects, overall_risk, payload.gradeLevel
)
remedial_count = len(remedial_quizzes)
# 6 — teacher interventions (AI call)
interventions: Optional[str] = None
if at_risk_subjects:
interventions = await self._generate_teacher_interventions(
risk_classifications, weak_topics
)
# 7 — notification messages
if at_risk_subjects:
notifications.append(
f"Diagnostic complete — {len(at_risk_subjects)} subject(s) flagged At Risk: "
+ ", ".join(at_risk_subjects)
)
else:
notifications.append("Diagnostic complete — all subjects On Track!")
logger.info(
f"✅ DIAGNOSTIC PROCESSING COMPLETE for {student_id} | "
f"Overall={overall_risk} | AtRisk={at_risk_subjects}"
)
return AutomationResult(
success=True,
event="diagnostic_completed",
studentId=student_id,
message=f"Diagnostic processed for {student_id}",
riskClassifications=risk_classifications,
overallRisk=overall_risk,
atRiskSubjects=at_risk_subjects,
weakTopics=weak_topics,
learningPath=learning_path,
remedialQuizzesCreated=remedial_count,
interventions=interventions,
notifications=notifications,
)
# ────────────────────────────────────────────────────────────
# 2. QUIZ SUBMISSION (continuous)
# ────────────────────────────────────────────────────────────
async def handle_quiz_submission(
self, payload: QuizSubmissionPayload
) -> AutomationResult:
"""Recalculate risk for a subject after a quiz is submitted."""
student_id = payload.studentId
logger.info(f"📝 QUIZ SUBMITTED by {student_id}{payload.subject} ({payload.score}%)")
notifications: list[str] = []
# Determine new status for this subject
new_status = "At Risk" if payload.score < AT_RISK_THRESHOLD else "On Track"
confidence = (
(AT_RISK_THRESHOLD - payload.score) / AT_RISK_THRESHOLD
if new_status == "At Risk"
else (payload.score - AT_RISK_THRESHOLD) / (100 - AT_RISK_THRESHOLD)
)
risk_classifications = {
payload.subject: {
"status": new_status,
"score": payload.score,
"confidence": round(abs(confidence), 2),
"needsIntervention": new_status == "At Risk",
}
}
at_risk = [payload.subject] if new_status == "At Risk" else []
if new_status == "At Risk":
notifications.append(
f"Quiz result: {payload.subject} scored {payload.score}% — status changed to At Risk"
)
else:
notifications.append(
f"Quiz result: {payload.subject} scored {payload.score}% — On Track"
)
return AutomationResult(
success=True,
event="quiz_submitted",
studentId=student_id,
message=f"Quiz processed for {student_id}",
riskClassifications=risk_classifications,
overallRisk=None, # single-subject update — overall recalculated on frontend
atRiskSubjects=at_risk,
notifications=notifications,
)
# ────────────────────────────────────────────────────────────
# 3. STUDENT ENROLLMENT
# ────────────────────────────────────────────────────────────
async def handle_student_enrollment(
self, payload: StudentEnrollmentPayload
) -> AutomationResult:
"""
Prepare a new student:
- Create empty progress record skeleton
- Initialise gamification (XP 0, Level 1, no streaks)
- Flag as needing diagnostic
"""
student_id = payload.studentId
logger.info(f"🆕 NEW STUDENT ENROLLED: {student_id}")
notifications: list[str] = [
f"Welcome {payload.name}! Please complete the diagnostic assessment to personalise your learning path.",
]
if payload.teacherId:
notifications.append(
f"New student {payload.name} enrolled — diagnostic pending."
)
return AutomationResult(
success=True,
event="student_enrolled",
studentId=student_id,
message=f"Student {payload.name} enrolled and initialised",
notifications=notifications,
)
# ────────────────────────────────────────────────────────────
# 4. DATA IMPORT (teacher action)
# ────────────────────────────────────────────────────────────
async def handle_data_import(
self, payload: DataImportPayload
) -> AutomationResult:
"""
After a teacher uploads a spreadsheet, recalculate risk for every
imported student and flag any status changes.
"""
logger.info(f"📂 DATA IMPORT by teacher {payload.teacherId}{len(payload.students)} students")
notifications: list[str] = []
high_risk_students: list[str] = []
medium_risk_count = 0
low_risk_count = 0
weak_topic_counts: Dict[str, int] = {}
for student_row in payload.students:
name = str(student_row.get("name") or "Unknown").strip() or "Unknown"
avg_score = self._safe_float(student_row.get("avgQuizScore"), 0.0)
attendance = self._safe_float(student_row.get("attendance"), 0.0)
engagement = self._safe_float(student_row.get("engagementScore"), 0.0)
completion_raw = student_row.get("assignmentCompletion")
completion = (
self._safe_float(completion_raw, 0.0)
if completion_raw not in (None, "")
else None
)
risk_level = self._classify_import_risk(
avg_score=avg_score,
attendance=attendance,
engagement=engagement,
completion=completion,
)
if risk_level == "High":
high_risk_students.append(name)
elif risk_level == "Medium":
medium_risk_count += 1
else:
low_risk_count += 1
topic_label = self._extract_import_topic(student_row)
if topic_label:
weak_topic_counts[topic_label] = weak_topic_counts.get(topic_label, 0) + 1
if high_risk_students:
notifications.append(
f"Data import flagged {len(high_risk_students)} high-risk student(s): "
+ ", ".join(high_risk_students[:5])
+ ("..." if len(high_risk_students) > 5 else "")
)
notifications.append(
"Risk interpretation summary — "
f"High: {len(high_risk_students)}, Medium: {medium_risk_count}, Low: {low_risk_count}."
)
if weak_topic_counts:
top_topics = sorted(
weak_topic_counts.items(),
key=lambda item: (-item[1], item[0]),
)[:3]
notifications.append(
"Most frequent weak-topic signals: "
+ ", ".join(f"{topic} ({count})" for topic, count in top_topics)
)
notifications.append(
f"Data import complete — {len(payload.students)} student records processed."
)
return AutomationResult(
success=True,
event="data_imported",
studentId=None,
message=f"Data import processed for {len(payload.students)} students",
atRiskSubjects=None,
notifications=notifications,
)
# ────────────────────────────────────────────────────────────
# 5. CONTENT UPDATE (admin action)
# ────────────────────────────────────────────────────────────
async def handle_content_update(
self, payload: ContentUpdatePayload
) -> AutomationResult:
"""
After admin CRUD on curriculum, log & notify.
"""
logger.info(
f"📚 CONTENT UPDATE by admin {payload.adminId}: "
f"{payload.action} {payload.contentType} {payload.contentId}"
)
notifications: list[str] = [
f"Curriculum update: {payload.action}d {payload.contentType} "
f"({payload.contentId}). Teachers may want to review affected quizzes.",
]
return AutomationResult(
success=True,
event="content_updated",
studentId=None,
message=f"Content {payload.action} processed for {payload.contentType}",
notifications=notifications,
)
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# INTERNAL HELPERS
# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
# --- risk classification ---
@staticmethod
def _safe_float(value: Any, default: float = 0.0) -> float:
try:
parsed = float(value)
if math.isnan(parsed) or math.isinf(parsed):
return default
return parsed
except (TypeError, ValueError):
return default
@staticmethod
def _classify_import_risk(
*,
avg_score: float,
attendance: float,
engagement: float,
completion: Optional[float],
) -> str:
high_flags = int(avg_score < 60) + int(attendance < 75) + int(engagement < 55)
medium_flags = int(avg_score < 75) + int(attendance < 85) + int(engagement < 70)
if completion is not None:
high_flags += int(completion < 60)
medium_flags += int(completion < 75)
if high_flags >= 2 or (avg_score < 55 and (attendance < 80 or engagement < 65)):
return "High"
if medium_flags >= 2:
return "Medium"
return "Low"
@staticmethod
def _extract_import_topic(student_row: Dict[str, Any]) -> Optional[str]:
explicit_topic = str(student_row.get("weakestTopic") or "").strip()
if explicit_topic:
return explicit_topic
assessment_name = str(student_row.get("assessmentName") or "").strip()
if assessment_name and assessment_name.lower() != "general-assessment":
return assessment_name
return None
@staticmethod
def _classify_subject_risks(
results: List[DiagnosticResult],
) -> Dict[str, Dict[str, Any]]:
"""Classify each subject as 'At Risk' or 'On Track'."""
classifications: Dict[str, Dict[str, Any]] = {}
for r in results:
if r.score < AT_RISK_THRESHOLD:
status = "At Risk"
confidence = round((AT_RISK_THRESHOLD - r.score) / AT_RISK_THRESHOLD, 2)
else:
status = "On Track"
confidence = round(
(r.score - AT_RISK_THRESHOLD) / (100 - AT_RISK_THRESHOLD), 2
)
classifications[r.subject] = {
"status": status,
"score": r.score,
"confidence": confidence,
"needsIntervention": status == "At Risk",
}
return classifications
@staticmethod
def _identify_weak_topics(
question_breakdown: Optional[Dict[str, list]],
) -> List[Dict[str, Any]]:
"""
Drill into per-topic accuracy from diagnostic question-level data.
Returns topics sorted weakest-first.
"""
if not question_breakdown:
return []
weak: list[dict] = []
for topic, questions in question_breakdown.items():
if not questions:
continue
correct_count = sum(1 for q in questions if q.get("correct"))
accuracy = correct_count / len(questions)
if accuracy < WEAK_TOPIC_THRESHOLD:
weak.append({
"topic": topic,
"accuracy": round(accuracy, 2),
"questionsAttempted": len(questions),
"priority": "high" if accuracy < 0.3 else "medium",
})
weak.sort(key=lambda x: x["accuracy"])
return weak
@staticmethod
def _calculate_overall_risk(
classifications: Dict[str, Dict[str, Any]],
) -> str:
total = len(classifications)
if total == 0:
return "Low"
at_risk_count = sum(
1 for d in classifications.values() if d["status"] == "At Risk"
)
ratio = at_risk_count / total
if ratio >= HIGH_RISK_RATIO:
return "High"
elif ratio >= MEDIUM_RISK_RATIO:
return "Medium"
return "Low"
# --- remedial quiz configs ---
@staticmethod
def _build_remedial_quiz_configs(
student_id: str,
at_risk_subjects: List[str],
overall_risk: str,
grade_level: str,
) -> List[Dict[str, Any]]:
"""Return list of quiz configuration dicts ready for persistence."""
cfg = REMEDIAL_CONFIG.get(overall_risk, REMEDIAL_CONFIG["Low"])
quizzes: list[dict] = []
for subject in at_risk_subjects:
quizzes.append({
"studentId": student_id,
"subject": subject,
"quizConfig": {
"topics": [subject],
"gradeLevel": grade_level,
"numQuestions": cfg["questions"],
"questionTypes": [
"identification",
"enumeration",
"multiple_choice",
"word_problem",
],
"difficultyDistribution": cfg["dist"],
"bloomLevels": ["remember", "understand", "apply"],
"includeGraphs": False,
"excludeTopics": [],
"purpose": "remedial",
"targetStudent": student_id,
},
"status": "pending",
"autoGenerated": True,
"reason": f'Diagnostic identified "{subject}" as At Risk',
"priority": "high" if overall_risk == "High" else "medium",
"dueInDays": 7,
})
return quizzes
# --- AI helpers (Hugging Face) ---
async def _generate_learning_path(
self,
at_risk_subjects: List[str],
weak_topics: List[Dict[str, Any]],
grade_level: str,
) -> Optional[str]:
"""Generate a personalised learning path via HF Serverless Inference."""
try:
from main import call_hf_chat
weakness_lines = ", ".join(at_risk_subjects)
topic_lines = "\n".join(
f" - {t['topic']} ({t['accuracy']*100:.0f}% accuracy)"
for t in weak_topics[:5]
)
prompt = (
f"Generate a personalised math learning path for a {grade_level} student.\n\n"
f"Weak subjects: {weakness_lines}\n"
f"Weak topics:\n{topic_lines}\n\n"
"Create 5-7 specific activities. For each give:\n"
"1. Activity title\n"
"2. Brief description (1-2 sentences)\n"
"3. Estimated duration\n"
"4. Type (video, practice, quiz, reading, interactive)\n\n"
"Format as a numbered list. Be specific."
)
return call_hf_chat(
messages=[
{
"role": "system",
"content": (
"You are an educational curriculum expert specialising in "
"mathematics. Create clear, actionable learning paths."
),
},
{"role": "user", "content": prompt},
],
max_tokens=1500,
temperature=0.7,
)
except Exception as e:
logger.warning(f"Learning-path AI call failed: {e}")
return None
async def _generate_teacher_interventions(
self,
risk_classifications: Dict[str, Dict[str, Any]],
weak_topics: List[Dict[str, Any]],
) -> Optional[str]:
"""Generate teacher intervention recommendations via HF Serverless Inference."""
try:
from main import call_hf_chat
at_risk = [
subj for subj, data in risk_classifications.items()
if data["status"] == "At Risk"
]
topic_lines = "\n".join(
f"- {t['topic']} ({t['accuracy']*100:.0f}% accuracy)"
for t in weak_topics[:5]
)
prompt = (
"You are an educational intervention specialist. A student has completed "
"their diagnostic assessment with the following results:\n\n"
f"At-Risk Subjects: {', '.join(at_risk)}\n\n"
f"Weak Topics Identified:\n{topic_lines}\n\n"
"Generate a 'Remedial Path Timeline' with:\n"
"1. Prioritised list of topics to address (most critical first)\n"
"2. Suggested teaching strategies for each topic\n"
"3. Recommended one-on-one intervention activities\n"
"4. Timeline for reassessment\n"
"5. Warning signs that student needs additional support\n\n"
"Keep response under 300 words, structured with clear sections."
)
return call_hf_chat(
messages=[
{
"role": "system",
"content": (
"You are an expert educational intervention specialist. "
"Provide actionable, structured recommendations for teachers."
),
},
{"role": "user", "content": prompt},
],
max_tokens=1000,
temperature=0.5,
)
except Exception as e:
logger.warning(f"Teacher-intervention AI call failed: {e}")
return None
# Module-level singleton
automation_engine = MathPulseAutomationEngine()