S-Dreamer's picture
Upload 8 files
a71264b verified
"""
osint_core.orchestrator
=======================
Orchestrator agent for coordinating passive OSINT enrichment workflows.
Design principles:
- Coordinates validation → policy → enrichment → drift → audit pipeline
- Manages skills (capabilities) and tools (external actions)
- Maintains execution context and telemetry
- Enforces security boundaries at each stage
- Pure orchestration — does not implement enrichment logic directly
The orchestrator pattern:
1. Accept user request (indicator + modules + authorization)
2. Validate input (osint_core.validators)
3. Evaluate policy (osint_core.policy)
4. Execute allowed modules via skills
5. Detect drift (osint_core.drift when implemented)
6. Choose correction verb
7. Generate audit trail
8. Return structured result
"""
from __future__ import annotations
import subprocess
import time
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Callable, Literal
from .policy import (
PolicyEvaluation,
evaluate_modules,
enforce_correction_verb,
)
from .validators import (
IndicatorType,
ValidationResult,
validate_indicator,
)
# =============================================================================
# Agent data structures
# =============================================================================
AgentRole = Literal["orchestrator", "validator", "enricher", "analyst"]
SkillCategory = Literal["validation", "passive_lookup", "conditional_fetch", "analysis"]
ToolType = Literal["subprocess", "network", "file", "computation"]
class ExecutionStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
BLOCKED = "blocked"
@dataclass(frozen=True)
class Tool:
"""
A tool is an atomic capability that performs external actions.
Examples: DNS query, whois lookup, HTTP request, file parsing
"""
name: str
tool_type: ToolType
description: str
requires_authorization: bool = False
timeout_seconds: float = 5.0
@dataclass(frozen=True)
class Skill:
"""
A skill is a higher-level capability composed of tools.
Examples: "Resolve DNS", "Fetch WHOIS", "Parse URL"
"""
name: str
category: SkillCategory
description: str
canonical_name: str
required_indicator_types: list[IndicatorType]
tools: list[Tool]
requires_authorization: bool = False
@dataclass
class ExecutionContext:
"""
Execution context tracks the state of an enrichment workflow.
"""
run_id: str
started_at: str
indicator_type: IndicatorType
normalized_indicator: str
indicator_hash: str
requested_modules: list[str]
authorized_target: bool
passive_only: bool
policy_evaluation: PolicyEvaluation | None = None
telemetry: dict[str, Any] = field(default_factory=dict)
errors: list[str] = field(default_factory=list)
@dataclass
class SkillResult:
"""
Result from executing a skill.
"""
skill_name: str
status: ExecutionStatus
data: dict[str, Any] = field(default_factory=dict)
error: str | None = None
duration_ms: int = 0
@dataclass
class EnrichmentWorkflow:
"""
Complete enrichment workflow result.
"""
context: ExecutionContext
validation_result: ValidationResult
policy_evaluation: PolicyEvaluation
skill_results: list[SkillResult]
drift_vector: dict[str, float]
correction_verb: str
duration_ms: int
# =============================================================================
# Tool implementations
# =============================================================================
# DNS resolution tool
DNS_QUERY_TOOL = Tool(
name="dns_query",
tool_type="network",
description="Query DNS records using system resolver",
requires_authorization=False,
timeout_seconds=4.0,
)
# WHOIS lookup tool
WHOIS_TOOL = Tool(
name="whois",
tool_type="subprocess",
description="Perform WHOIS lookup via system command",
requires_authorization=False,
timeout_seconds=5.0,
)
# URL parser tool (local, no network)
URL_PARSE_TOOL = Tool(
name="url_parse",
tool_type="computation",
description="Parse URL components locally",
requires_authorization=False,
timeout_seconds=1.0,
)
# HTTP header fetcher (conditional, requires auth)
HTTP_HEADERS_TOOL = Tool(
name="http_headers",
tool_type="network",
description="Fetch HTTP headers from target",
requires_authorization=True,
timeout_seconds=5.0,
)
# Robots.txt fetcher (conditional, requires auth)
ROBOTS_TXT_TOOL = Tool(
name="robots_txt",
tool_type="network",
description="Fetch robots.txt from target",
requires_authorization=True,
timeout_seconds=5.0,
)
# =============================================================================
# Skill definitions
# =============================================================================
SKILLS_REGISTRY: dict[str, Skill] = {
"resource_links": Skill(
name="Resource Links",
canonical_name="resource_links",
category="passive_lookup",
description="Generate links to external OSINT resources",
required_indicator_types=["domain", "username", "email", "ip", "url"],
tools=[], # No external tools needed
requires_authorization=False,
),
"dns_records": Skill(
name="DNS Records",
canonical_name="dns_records",
category="passive_lookup",
description="Resolve DNS A, AAAA, MX, NS records",
required_indicator_types=["domain"],
tools=[DNS_QUERY_TOOL],
requires_authorization=False,
),
"local_url_parse": Skill(
name="Local URL Parse",
canonical_name="local_url_parse",
category="analysis",
description="Parse URL components without contacting target",
required_indicator_types=["url"],
tools=[URL_PARSE_TOOL],
requires_authorization=False,
),
"http_headers": Skill(
name="HTTP Headers",
canonical_name="http_headers",
category="conditional_fetch",
description="Fetch HTTP headers from authorized target",
required_indicator_types=["url", "domain"],
tools=[HTTP_HEADERS_TOOL],
requires_authorization=True,
),
"robots_txt": Skill(
name="Robots.txt",
canonical_name="robots_txt",
category="conditional_fetch",
description="Fetch robots.txt from authorized target",
required_indicator_types=["url", "domain"],
tools=[ROBOTS_TXT_TOOL],
requires_authorization=True,
),
}
# =============================================================================
# Orchestrator agent
# =============================================================================
class OrchestratorAgent:
"""
Orchestrator agent coordinates the full enrichment workflow.
Responsibilities:
- Create execution context
- Route requests through validation → policy → enrichment
- Execute skills based on policy decisions
- Aggregate results
- Generate telemetry
"""
def __init__(self, role: AgentRole = "orchestrator"):
self.role = role
self.skills = SKILLS_REGISTRY
def create_context(
self,
raw_indicator: str,
indicator_type_hint: str,
requested_modules: list[str],
authorized_target: bool,
passive_only: bool = True,
) -> ExecutionContext:
"""
Create execution context for a new enrichment request.
"""
run_id = f"run_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}"
started_at = datetime.now(timezone.utc).isoformat()
# Validate indicator first
validation_result = validate_indicator(
raw_indicator,
forced_type=indicator_type_hint,
)
if not validation_result.ok:
# Create a minimal context for failed validation
return ExecutionContext(
run_id=run_id,
started_at=started_at,
indicator_type="unknown",
normalized_indicator="",
indicator_hash="",
requested_modules=requested_modules,
authorized_target=authorized_target,
passive_only=passive_only,
errors=[validation_result.error or "Validation failed"],
)
# For successful validation, hash the indicator
import hashlib
import hmac
import os
salt = os.getenv("OSINT_HASH_SALT", "dev-only-change-me")
indicator_hash = hmac.new(
salt.encode("utf-8"),
validation_result.normalized.encode("utf-8"),
hashlib.sha256,
).hexdigest()
return ExecutionContext(
run_id=run_id,
started_at=started_at,
indicator_type=validation_result.indicator_type,
normalized_indicator=validation_result.normalized,
indicator_hash=indicator_hash,
requested_modules=requested_modules,
authorized_target=authorized_target,
passive_only=passive_only,
)
def execute_workflow(
self,
raw_indicator: str,
indicator_type_hint: str = "Auto",
requested_modules: list[str] | None = None,
authorized_target: bool = False,
passive_only: bool = True,
) -> EnrichmentWorkflow:
"""
Execute complete enrichment workflow.
Returns a structured workflow result containing:
- Execution context
- Validation result
- Policy evaluation
- Skill results
- Drift assessment
- Correction decision
"""
started = time.perf_counter()
requested_modules = requested_modules or ["resource_links"]
# Step 1: Validate input
validation_result = validate_indicator(
raw_indicator,
forced_type=indicator_type_hint,
)
if not validation_result.ok:
# Early exit for validation failure
context = self.create_context(
raw_indicator,
indicator_type_hint,
requested_modules,
authorized_target,
passive_only,
)
return EnrichmentWorkflow(
context=context,
validation_result=validation_result,
policy_evaluation=PolicyEvaluation(
decision="BLOCK", # type: ignore
blocked_modules=requested_modules,
violations=[],
),
skill_results=[],
drift_vector={},
correction_verb="REVERT",
duration_ms=int((time.perf_counter() - started) * 1000),
)
# Step 2: Create execution context
context = self.create_context(
raw_indicator,
indicator_type_hint,
requested_modules,
authorized_target,
passive_only,
)
# Step 3: Evaluate policy
policy_eval = evaluate_modules(
requested_modules,
authorized_target=authorized_target,
passive_only=passive_only,
allow_unknown_modules=False,
)
context.policy_evaluation = policy_eval
# Step 4: Execute allowed skills
skill_results = self._execute_skills(
context,
policy_eval.allowed_modules,
)
# Step 5: Detect drift
drift_vector = self._detect_drift(
context,
skill_results,
policy_eval,
)
# Step 6: Choose correction verb
correction_verb = self._choose_correction(drift_vector, policy_eval)
duration_ms = int((time.perf_counter() - started) * 1000)
# Ensure we always return at least 1ms to indicate actual work was done
if duration_ms == 0:
duration_ms = 1
return EnrichmentWorkflow(
context=context,
validation_result=validation_result,
policy_evaluation=policy_eval,
skill_results=skill_results,
drift_vector=drift_vector,
correction_verb=correction_verb,
duration_ms=duration_ms,
)
def _execute_skills(
self,
context: ExecutionContext,
allowed_modules: list[str],
) -> list[SkillResult]:
"""
Execute allowed skills based on policy evaluation.
"""
results: list[SkillResult] = []
for module_name in allowed_modules:
skill = self.skills.get(module_name)
if not skill:
results.append(SkillResult(
skill_name=module_name,
status=ExecutionStatus.FAILED,
error=f"Skill not found: {module_name}",
))
continue
# Check if indicator type is supported by this skill
if skill.required_indicator_types and context.indicator_type not in skill.required_indicator_types:
results.append(SkillResult(
skill_name=skill.name,
status=ExecutionStatus.BLOCKED,
error=f"Skill {skill.name} requires indicator type in {skill.required_indicator_types}, got {context.indicator_type}",
))
continue
# Execute skill
result = self._execute_skill(skill, context)
results.append(result)
return results
def _execute_skill(
self,
skill: Skill,
context: ExecutionContext,
) -> SkillResult:
"""
Execute a single skill.
For now, this is a stub that returns placeholder data.
In production, this would invoke the skill's tools.
"""
started = time.perf_counter()
try:
# Placeholder: skill execution logic would go here
# Each skill would use its tools to perform enrichment
if skill.canonical_name == "resource_links":
data = {"type": "links", "generated": True}
elif skill.canonical_name == "dns_records":
data = {"A": [], "AAAA": [], "MX": [], "NS": []}
elif skill.canonical_name == "local_url_parse":
data = {"scheme": "", "hostname": "", "path": ""}
else:
data = {"status": "not_implemented"}
duration_ms = int((time.perf_counter() - started) * 1000)
return SkillResult(
skill_name=skill.name,
status=ExecutionStatus.COMPLETED,
data=data,
duration_ms=duration_ms,
)
except Exception as exc:
duration_ms = int((time.perf_counter() - started) * 1000)
return SkillResult(
skill_name=skill.name,
status=ExecutionStatus.FAILED,
error=str(exc),
duration_ms=duration_ms,
)
def _detect_drift(
self,
context: ExecutionContext,
skill_results: list[SkillResult],
policy_eval: PolicyEvaluation,
) -> dict[str, float]:
"""
Detect drift from execution telemetry.
This is a simplified version. Full drift detection
would use osint_core.drift when implemented.
"""
drift = {
"statistical": 0.0,
"behavioral": 0.0,
"structural": 0.0,
"adversarial": 0.0,
"operational": 0.0,
"policy": 0.0,
}
# Policy drift: blocked modules indicate policy boundary hit
if policy_eval.blocked_modules:
drift["policy"] = 0.4
# Operational drift: failed skills
failed_count = sum(1 for r in skill_results if r.status == ExecutionStatus.FAILED)
if failed_count > 0:
drift["operational"] = min(0.2 * failed_count, 1.0)
# Adversarial drift: check for suspicious patterns (stub)
# Full implementation would analyze normalized_indicator
return drift
def _choose_correction(
self,
drift_vector: dict[str, float],
policy_eval: PolicyEvaluation,
) -> str:
"""
Choose correction verb based on drift vector.
Priority: policy > structural > behavioral > adversarial > operational > statistical
"""
if drift_vector.get("policy", 0.0) >= 0.4:
return "CONSTRAIN"
if drift_vector.get("structural", 0.0) >= 0.5:
return "REVERT"
if drift_vector.get("behavioral", 0.0) >= 0.5:
return "REVERT"
if drift_vector.get("adversarial", 0.0) >= 0.3:
return "CONSTRAIN"
if drift_vector.get("operational", 0.0) >= 0.4:
return "CONSTRAIN"
if drift_vector.get("statistical", 0.0) >= 0.5 and drift_vector.get("adversarial", 0.0) == 0:
return "ADAPT"
return "OBSERVE"
# =============================================================================
# Public API
# =============================================================================
def create_orchestrator() -> OrchestratorAgent:
"""
Factory function to create an orchestrator agent.
"""
return OrchestratorAgent(role="orchestrator")
def list_skills() -> dict[str, Skill]:
"""
Return the skills registry.
"""
return SKILLS_REGISTRY.copy()
def get_skill(skill_name: str) -> Skill | None:
"""
Get a skill by canonical name.
"""
return SKILLS_REGISTRY.get(skill_name)