67 lines
2.5 KiB
Python
67 lines
2.5 KiB
Python
# [DEF:TaskManagerModels:Module]
|
|
# @SEMANTICS: task, models, pydantic, enum, state
|
|
# @PURPOSE: Defines the data models and enumerations used by the Task Manager.
|
|
# @LAYER: Core
|
|
# @RELATION: Used by TaskManager and API routes.
|
|
# @INVARIANT: Task IDs are immutable once created.
|
|
# @CONSTRAINT: Must use Pydantic for data validation.
|
|
|
|
# [SECTION: IMPORTS]
|
|
import uuid
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
from typing import Dict, Any, List, Optional
|
|
|
|
from pydantic import BaseModel, Field
|
|
# [/SECTION]
|
|
|
|
# [DEF:TaskStatus:Enum]
|
|
# @SEMANTICS: task, status, state, enum
|
|
# @PURPOSE: Defines the possible states a task can be in during its lifecycle.
|
|
class TaskStatus(str, Enum):
|
|
PENDING = "PENDING"
|
|
RUNNING = "RUNNING"
|
|
SUCCESS = "SUCCESS"
|
|
FAILED = "FAILED"
|
|
AWAITING_MAPPING = "AWAITING_MAPPING"
|
|
AWAITING_INPUT = "AWAITING_INPUT"
|
|
# [/DEF:TaskStatus:Enum]
|
|
|
|
# [DEF:LogEntry:Class]
|
|
# @SEMANTICS: log, entry, record, pydantic
|
|
# @PURPOSE: A Pydantic model representing a single, structured log entry associated with a task.
|
|
class LogEntry(BaseModel):
|
|
timestamp: datetime = Field(default_factory=datetime.utcnow)
|
|
level: str
|
|
message: str
|
|
context: Optional[Dict[str, Any]] = None
|
|
# [/DEF:LogEntry:Class]
|
|
|
|
# [DEF:Task:Class]
|
|
# @SEMANTICS: task, job, execution, state, pydantic
|
|
# @PURPOSE: A Pydantic model representing a single execution instance of a plugin, including its status, parameters, and logs.
|
|
class Task(BaseModel):
|
|
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
|
|
plugin_id: str
|
|
status: TaskStatus = TaskStatus.PENDING
|
|
started_at: Optional[datetime] = None
|
|
finished_at: Optional[datetime] = None
|
|
user_id: Optional[str] = None
|
|
logs: List[LogEntry] = Field(default_factory=list)
|
|
params: Dict[str, Any] = Field(default_factory=dict)
|
|
input_required: bool = False
|
|
input_request: Optional[Dict[str, Any]] = None
|
|
|
|
# [DEF:Task.__init__:Function]
|
|
# @PURPOSE: Initializes the Task model and validates input_request for AWAITING_INPUT status.
|
|
# @PRE: If status is AWAITING_INPUT, input_request must be provided.
|
|
# @POST: Task instance is created or ValueError is raised.
|
|
# @PARAM: **data - Keyword arguments for model initialization.
|
|
def __init__(self, **data):
|
|
super().__init__(**data)
|
|
if self.status == TaskStatus.AWAITING_INPUT and not self.input_request:
|
|
raise ValueError("input_request is required when status is AWAITING_INPUT")
|
|
# [/DEF:Task.__init__:Function]
|
|
# [/DEF:Task:Class]
|
|
|
|
# [/DEF:TaskManagerModels:Module] |