| """ |
| osint_core.orchestrator |
| ======================= |
| |
| Orchestrator agent for coordinating passive OSINT enrichment workflows. |
| |
| Design principles: |
| - Coordinates validation → policy → enrichment → drift → audit pipeline |
| - Manages skills (capabilities) and tools (external actions) |
| - Maintains execution context and telemetry |
| - Enforces security boundaries at each stage |
| - Pure orchestration — does not implement enrichment logic directly |
| |
| The orchestrator pattern: |
| 1. Accept user request (indicator + modules + authorization) |
| 2. Validate input (osint_core.validators) |
| 3. Evaluate policy (osint_core.policy) |
| 4. Execute allowed modules via skills |
| 5. Detect drift (osint_core.drift when implemented) |
| 6. Choose correction verb |
| 7. Generate audit trail |
| 8. Return structured result |
| """ |
|
|
| from __future__ import annotations |
|
|
| import subprocess |
| import time |
| import uuid |
| from dataclasses import dataclass, field |
| from datetime import datetime, timezone |
| from enum import Enum |
| from typing import Any, Callable, Literal |
|
|
| from .policy import ( |
| PolicyEvaluation, |
| evaluate_modules, |
| enforce_correction_verb, |
| ) |
| from .validators import ( |
| IndicatorType, |
| ValidationResult, |
| validate_indicator, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| AgentRole = Literal["orchestrator", "validator", "enricher", "analyst"] |
| SkillCategory = Literal["validation", "passive_lookup", "conditional_fetch", "analysis"] |
| ToolType = Literal["subprocess", "network", "file", "computation"] |
|
|
|
|
| class ExecutionStatus(str, Enum): |
| PENDING = "pending" |
| RUNNING = "running" |
| COMPLETED = "completed" |
| FAILED = "failed" |
| BLOCKED = "blocked" |
|
|
|
|
| @dataclass(frozen=True) |
| class Tool: |
| """ |
| A tool is an atomic capability that performs external actions. |
| |
| Examples: DNS query, whois lookup, HTTP request, file parsing |
| """ |
| name: str |
| tool_type: ToolType |
| description: str |
| requires_authorization: bool = False |
| timeout_seconds: float = 5.0 |
|
|
|
|
| @dataclass(frozen=True) |
| class Skill: |
| """ |
| A skill is a higher-level capability composed of tools. |
| |
| Examples: "Resolve DNS", "Fetch WHOIS", "Parse URL" |
| """ |
| name: str |
| category: SkillCategory |
| description: str |
| canonical_name: str |
| required_indicator_types: list[IndicatorType] |
| tools: list[Tool] |
| requires_authorization: bool = False |
|
|
|
|
| @dataclass |
| class ExecutionContext: |
| """ |
| Execution context tracks the state of an enrichment workflow. |
| """ |
| run_id: str |
| started_at: str |
| indicator_type: IndicatorType |
| normalized_indicator: str |
| indicator_hash: str |
| requested_modules: list[str] |
| authorized_target: bool |
| passive_only: bool |
| policy_evaluation: PolicyEvaluation | None = None |
| telemetry: dict[str, Any] = field(default_factory=dict) |
| errors: list[str] = field(default_factory=list) |
|
|
|
|
| @dataclass |
| class SkillResult: |
| """ |
| Result from executing a skill. |
| """ |
| skill_name: str |
| status: ExecutionStatus |
| data: dict[str, Any] = field(default_factory=dict) |
| error: str | None = None |
| duration_ms: int = 0 |
|
|
|
|
| @dataclass |
| class EnrichmentWorkflow: |
| """ |
| Complete enrichment workflow result. |
| """ |
| context: ExecutionContext |
| validation_result: ValidationResult |
| policy_evaluation: PolicyEvaluation |
| skill_results: list[SkillResult] |
| drift_vector: dict[str, float] |
| correction_verb: str |
| duration_ms: int |
|
|
|
|
| |
| |
| |
|
|
| |
| DNS_QUERY_TOOL = Tool( |
| name="dns_query", |
| tool_type="network", |
| description="Query DNS records using system resolver", |
| requires_authorization=False, |
| timeout_seconds=4.0, |
| ) |
|
|
| |
| WHOIS_TOOL = Tool( |
| name="whois", |
| tool_type="subprocess", |
| description="Perform WHOIS lookup via system command", |
| requires_authorization=False, |
| timeout_seconds=5.0, |
| ) |
|
|
| |
| URL_PARSE_TOOL = Tool( |
| name="url_parse", |
| tool_type="computation", |
| description="Parse URL components locally", |
| requires_authorization=False, |
| timeout_seconds=1.0, |
| ) |
|
|
| |
| HTTP_HEADERS_TOOL = Tool( |
| name="http_headers", |
| tool_type="network", |
| description="Fetch HTTP headers from target", |
| requires_authorization=True, |
| timeout_seconds=5.0, |
| ) |
|
|
| |
| ROBOTS_TXT_TOOL = Tool( |
| name="robots_txt", |
| tool_type="network", |
| description="Fetch robots.txt from target", |
| requires_authorization=True, |
| timeout_seconds=5.0, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| SKILLS_REGISTRY: dict[str, Skill] = { |
| "resource_links": Skill( |
| name="Resource Links", |
| canonical_name="resource_links", |
| category="passive_lookup", |
| description="Generate links to external OSINT resources", |
| required_indicator_types=["domain", "username", "email", "ip", "url"], |
| tools=[], |
| requires_authorization=False, |
| ), |
| "dns_records": Skill( |
| name="DNS Records", |
| canonical_name="dns_records", |
| category="passive_lookup", |
| description="Resolve DNS A, AAAA, MX, NS records", |
| required_indicator_types=["domain"], |
| tools=[DNS_QUERY_TOOL], |
| requires_authorization=False, |
| ), |
| "local_url_parse": Skill( |
| name="Local URL Parse", |
| canonical_name="local_url_parse", |
| category="analysis", |
| description="Parse URL components without contacting target", |
| required_indicator_types=["url"], |
| tools=[URL_PARSE_TOOL], |
| requires_authorization=False, |
| ), |
| "http_headers": Skill( |
| name="HTTP Headers", |
| canonical_name="http_headers", |
| category="conditional_fetch", |
| description="Fetch HTTP headers from authorized target", |
| required_indicator_types=["url", "domain"], |
| tools=[HTTP_HEADERS_TOOL], |
| requires_authorization=True, |
| ), |
| "robots_txt": Skill( |
| name="Robots.txt", |
| canonical_name="robots_txt", |
| category="conditional_fetch", |
| description="Fetch robots.txt from authorized target", |
| required_indicator_types=["url", "domain"], |
| tools=[ROBOTS_TXT_TOOL], |
| requires_authorization=True, |
| ), |
| } |
|
|
|
|
| |
| |
| |
|
|
| class OrchestratorAgent: |
| """ |
| Orchestrator agent coordinates the full enrichment workflow. |
| |
| Responsibilities: |
| - Create execution context |
| - Route requests through validation → policy → enrichment |
| - Execute skills based on policy decisions |
| - Aggregate results |
| - Generate telemetry |
| """ |
|
|
| def __init__(self, role: AgentRole = "orchestrator"): |
| self.role = role |
| self.skills = SKILLS_REGISTRY |
|
|
| def create_context( |
| self, |
| raw_indicator: str, |
| indicator_type_hint: str, |
| requested_modules: list[str], |
| authorized_target: bool, |
| passive_only: bool = True, |
| ) -> ExecutionContext: |
| """ |
| Create execution context for a new enrichment request. |
| """ |
| run_id = f"run_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}" |
| started_at = datetime.now(timezone.utc).isoformat() |
|
|
| |
| validation_result = validate_indicator( |
| raw_indicator, |
| forced_type=indicator_type_hint, |
| ) |
|
|
| if not validation_result.ok: |
| |
| return ExecutionContext( |
| run_id=run_id, |
| started_at=started_at, |
| indicator_type="unknown", |
| normalized_indicator="", |
| indicator_hash="", |
| requested_modules=requested_modules, |
| authorized_target=authorized_target, |
| passive_only=passive_only, |
| errors=[validation_result.error or "Validation failed"], |
| ) |
|
|
| |
| import hashlib |
| import hmac |
| import os |
|
|
| salt = os.getenv("OSINT_HASH_SALT", "dev-only-change-me") |
| indicator_hash = hmac.new( |
| salt.encode("utf-8"), |
| validation_result.normalized.encode("utf-8"), |
| hashlib.sha256, |
| ).hexdigest() |
|
|
| return ExecutionContext( |
| run_id=run_id, |
| started_at=started_at, |
| indicator_type=validation_result.indicator_type, |
| normalized_indicator=validation_result.normalized, |
| indicator_hash=indicator_hash, |
| requested_modules=requested_modules, |
| authorized_target=authorized_target, |
| passive_only=passive_only, |
| ) |
|
|
| def execute_workflow( |
| self, |
| raw_indicator: str, |
| indicator_type_hint: str = "Auto", |
| requested_modules: list[str] | None = None, |
| authorized_target: bool = False, |
| passive_only: bool = True, |
| ) -> EnrichmentWorkflow: |
| """ |
| Execute complete enrichment workflow. |
| |
| Returns a structured workflow result containing: |
| - Execution context |
| - Validation result |
| - Policy evaluation |
| - Skill results |
| - Drift assessment |
| - Correction decision |
| """ |
| started = time.perf_counter() |
|
|
| requested_modules = requested_modules or ["resource_links"] |
|
|
| |
| validation_result = validate_indicator( |
| raw_indicator, |
| forced_type=indicator_type_hint, |
| ) |
|
|
| if not validation_result.ok: |
| |
| context = self.create_context( |
| raw_indicator, |
| indicator_type_hint, |
| requested_modules, |
| authorized_target, |
| passive_only, |
| ) |
| return EnrichmentWorkflow( |
| context=context, |
| validation_result=validation_result, |
| policy_evaluation=PolicyEvaluation( |
| decision="BLOCK", |
| blocked_modules=requested_modules, |
| violations=[], |
| ), |
| skill_results=[], |
| drift_vector={}, |
| correction_verb="REVERT", |
| duration_ms=int((time.perf_counter() - started) * 1000), |
| ) |
|
|
| |
| context = self.create_context( |
| raw_indicator, |
| indicator_type_hint, |
| requested_modules, |
| authorized_target, |
| passive_only, |
| ) |
|
|
| |
| policy_eval = evaluate_modules( |
| requested_modules, |
| authorized_target=authorized_target, |
| passive_only=passive_only, |
| allow_unknown_modules=False, |
| ) |
| context.policy_evaluation = policy_eval |
|
|
| |
| skill_results = self._execute_skills( |
| context, |
| policy_eval.allowed_modules, |
| ) |
|
|
| |
| drift_vector = self._detect_drift( |
| context, |
| skill_results, |
| policy_eval, |
| ) |
|
|
| |
| correction_verb = self._choose_correction(drift_vector, policy_eval) |
|
|
| duration_ms = int((time.perf_counter() - started) * 1000) |
| |
| if duration_ms == 0: |
| duration_ms = 1 |
|
|
| return EnrichmentWorkflow( |
| context=context, |
| validation_result=validation_result, |
| policy_evaluation=policy_eval, |
| skill_results=skill_results, |
| drift_vector=drift_vector, |
| correction_verb=correction_verb, |
| duration_ms=duration_ms, |
| ) |
|
|
| def _execute_skills( |
| self, |
| context: ExecutionContext, |
| allowed_modules: list[str], |
| ) -> list[SkillResult]: |
| """ |
| Execute allowed skills based on policy evaluation. |
| """ |
| results: list[SkillResult] = [] |
|
|
| for module_name in allowed_modules: |
| skill = self.skills.get(module_name) |
| if not skill: |
| results.append(SkillResult( |
| skill_name=module_name, |
| status=ExecutionStatus.FAILED, |
| error=f"Skill not found: {module_name}", |
| )) |
| continue |
|
|
| |
| if skill.required_indicator_types and context.indicator_type not in skill.required_indicator_types: |
| results.append(SkillResult( |
| skill_name=skill.name, |
| status=ExecutionStatus.BLOCKED, |
| error=f"Skill {skill.name} requires indicator type in {skill.required_indicator_types}, got {context.indicator_type}", |
| )) |
| continue |
|
|
| |
| result = self._execute_skill(skill, context) |
| results.append(result) |
|
|
| return results |
|
|
| def _execute_skill( |
| self, |
| skill: Skill, |
| context: ExecutionContext, |
| ) -> SkillResult: |
| """ |
| Execute a single skill. |
| |
| For now, this is a stub that returns placeholder data. |
| In production, this would invoke the skill's tools. |
| """ |
| started = time.perf_counter() |
|
|
| try: |
| |
| |
|
|
| if skill.canonical_name == "resource_links": |
| data = {"type": "links", "generated": True} |
| elif skill.canonical_name == "dns_records": |
| data = {"A": [], "AAAA": [], "MX": [], "NS": []} |
| elif skill.canonical_name == "local_url_parse": |
| data = {"scheme": "", "hostname": "", "path": ""} |
| else: |
| data = {"status": "not_implemented"} |
|
|
| duration_ms = int((time.perf_counter() - started) * 1000) |
|
|
| return SkillResult( |
| skill_name=skill.name, |
| status=ExecutionStatus.COMPLETED, |
| data=data, |
| duration_ms=duration_ms, |
| ) |
|
|
| except Exception as exc: |
| duration_ms = int((time.perf_counter() - started) * 1000) |
| return SkillResult( |
| skill_name=skill.name, |
| status=ExecutionStatus.FAILED, |
| error=str(exc), |
| duration_ms=duration_ms, |
| ) |
|
|
| def _detect_drift( |
| self, |
| context: ExecutionContext, |
| skill_results: list[SkillResult], |
| policy_eval: PolicyEvaluation, |
| ) -> dict[str, float]: |
| """ |
| Detect drift from execution telemetry. |
| |
| This is a simplified version. Full drift detection |
| would use osint_core.drift when implemented. |
| """ |
| drift = { |
| "statistical": 0.0, |
| "behavioral": 0.0, |
| "structural": 0.0, |
| "adversarial": 0.0, |
| "operational": 0.0, |
| "policy": 0.0, |
| } |
|
|
| |
| if policy_eval.blocked_modules: |
| drift["policy"] = 0.4 |
|
|
| |
| failed_count = sum(1 for r in skill_results if r.status == ExecutionStatus.FAILED) |
| if failed_count > 0: |
| drift["operational"] = min(0.2 * failed_count, 1.0) |
|
|
| |
| |
|
|
| return drift |
|
|
| def _choose_correction( |
| self, |
| drift_vector: dict[str, float], |
| policy_eval: PolicyEvaluation, |
| ) -> str: |
| """ |
| Choose correction verb based on drift vector. |
| |
| Priority: policy > structural > behavioral > adversarial > operational > statistical |
| """ |
| if drift_vector.get("policy", 0.0) >= 0.4: |
| return "CONSTRAIN" |
|
|
| if drift_vector.get("structural", 0.0) >= 0.5: |
| return "REVERT" |
|
|
| if drift_vector.get("behavioral", 0.0) >= 0.5: |
| return "REVERT" |
|
|
| if drift_vector.get("adversarial", 0.0) >= 0.3: |
| return "CONSTRAIN" |
|
|
| if drift_vector.get("operational", 0.0) >= 0.4: |
| return "CONSTRAIN" |
|
|
| if drift_vector.get("statistical", 0.0) >= 0.5 and drift_vector.get("adversarial", 0.0) == 0: |
| return "ADAPT" |
|
|
| return "OBSERVE" |
|
|
|
|
| |
| |
| |
|
|
| def create_orchestrator() -> OrchestratorAgent: |
| """ |
| Factory function to create an orchestrator agent. |
| """ |
| return OrchestratorAgent(role="orchestrator") |
|
|
|
|
| def list_skills() -> dict[str, Skill]: |
| """ |
| Return the skills registry. |
| """ |
| return SKILLS_REGISTRY.copy() |
|
|
|
|
| def get_skill(skill_name: str) -> Skill | None: |
| """ |
| Get a skill by canonical name. |
| """ |
| return SKILLS_REGISTRY.get(skill_name) |
|
|