|
|
"""Task service for KGraph-MCP API.""" |
|
|
|
|
|
import json |
|
|
import logging |
|
|
from pathlib import Path |
|
|
|
|
|
from ..models.responses import TaskResponse |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class TaskService: |
|
|
"""Service for handling task-related operations.""" |
|
|
|
|
|
def __init__(self, tasks_file: str = "tasks.json"): |
|
|
"""Initialize the task service.""" |
|
|
self.tasks_file = Path(tasks_file) |
|
|
|
|
|
def _load_tasks(self) -> list[dict]: |
|
|
"""Load tasks from JSON file.""" |
|
|
try: |
|
|
if self.tasks_file.exists(): |
|
|
with open(self.tasks_file) as f: |
|
|
return json.load(f) |
|
|
return [] |
|
|
except Exception as e: |
|
|
logger.error(f"Error loading tasks: {e}") |
|
|
return [] |
|
|
|
|
|
def _save_tasks(self, tasks: list[dict]) -> bool: |
|
|
"""Save tasks to JSON file.""" |
|
|
try: |
|
|
with open(self.tasks_file, "w") as f: |
|
|
json.dump(tasks, f, indent=2) |
|
|
return True |
|
|
except Exception as e: |
|
|
logger.error(f"Error saving tasks: {e}") |
|
|
return False |
|
|
|
|
|
def get_all_tasks(self) -> list[TaskResponse]: |
|
|
"""Get all tasks.""" |
|
|
tasks_data = self._load_tasks() |
|
|
return [ |
|
|
TaskResponse( |
|
|
id=task["id"], |
|
|
title=task["title"], |
|
|
description=task["description"], |
|
|
status=task["status"], |
|
|
dependencies=task.get("dependencies", []), |
|
|
) |
|
|
for task in tasks_data |
|
|
] |
|
|
|
|
|
def get_task_by_id(self, task_id: int) -> TaskResponse | None: |
|
|
"""Get a task by ID.""" |
|
|
tasks_data = self._load_tasks() |
|
|
for task in tasks_data: |
|
|
if task["id"] == task_id: |
|
|
return TaskResponse( |
|
|
id=task["id"], |
|
|
title=task["title"], |
|
|
description=task["description"], |
|
|
status=task["status"], |
|
|
dependencies=task.get("dependencies", []), |
|
|
) |
|
|
return None |
|
|
|
|
|
def create_task( |
|
|
self, title: str, description: str, dependencies: list[int] = None |
|
|
) -> TaskResponse: |
|
|
"""Create a new task.""" |
|
|
if dependencies is None: |
|
|
dependencies = [] |
|
|
|
|
|
tasks_data = self._load_tasks() |
|
|
|
|
|
|
|
|
new_id = max([task["id"] for task in tasks_data], default=0) + 1 |
|
|
|
|
|
|
|
|
new_task = { |
|
|
"id": new_id, |
|
|
"title": title, |
|
|
"description": description, |
|
|
"status": "Todo", |
|
|
"dependencies": dependencies, |
|
|
"acceptance_criteria": [], |
|
|
"created_at": "", |
|
|
"updated_at": "", |
|
|
"branch_name": None, |
|
|
"pr_url": None, |
|
|
} |
|
|
|
|
|
tasks_data.append(new_task) |
|
|
self._save_tasks(tasks_data) |
|
|
|
|
|
return TaskResponse( |
|
|
id=new_id, |
|
|
title=title, |
|
|
description=description, |
|
|
status="Todo", |
|
|
dependencies=dependencies, |
|
|
) |
|
|
|