worked backup
This commit is contained in:
@@ -8,4 +8,5 @@ jsonschema
|
||||
requests
|
||||
keyring
|
||||
httpx
|
||||
PyYAML
|
||||
PyYAML
|
||||
websockets
|
||||
@@ -50,25 +50,41 @@ app.include_router(settings.router, prefix="/api/settings", tags=["Settings"])
|
||||
# @SEMANTICS: websocket, logs, streaming, real-time
|
||||
# @PURPOSE: Provides a WebSocket endpoint for clients to connect to and receive real-time log entries for a specific task.
|
||||
@app.websocket("/ws/logs/{task_id}")
|
||||
async def websocket_endpoint(websocket: WebSocket, task_id: str, task_manager=Depends(get_task_manager)):
|
||||
async def websocket_endpoint(websocket: WebSocket, task_id: str):
|
||||
await websocket.accept()
|
||||
logger.info(f"WebSocket connection established for task {task_id}")
|
||||
logger.info(f"WebSocket connection accepted for task {task_id}")
|
||||
task_manager = get_task_manager()
|
||||
queue = await task_manager.subscribe_logs(task_id)
|
||||
try:
|
||||
# Send initial logs if any
|
||||
initial_logs = task_manager.get_task_logs(task_id)
|
||||
for log_entry in initial_logs:
|
||||
await websocket.send_json(log_entry.dict())
|
||||
# Convert datetime to string for JSON serialization
|
||||
log_dict = log_entry.dict()
|
||||
log_dict['timestamp'] = log_dict['timestamp'].isoformat()
|
||||
await websocket.send_json(log_dict)
|
||||
|
||||
# Keep connection alive, ideally stream new logs as they come
|
||||
# This part requires a more sophisticated log streaming mechanism (e.g., queues, pub/sub)
|
||||
# For now, it will just keep the connection open and send initial logs.
|
||||
# Stream new logs
|
||||
logger.info(f"Starting log stream for task {task_id}")
|
||||
while True:
|
||||
await asyncio.sleep(1) # Keep connection alive, send heartbeat or check for new logs
|
||||
# In a real system, new logs would be pushed here
|
||||
log_entry = await queue.get()
|
||||
log_dict = log_entry.dict()
|
||||
log_dict['timestamp'] = log_dict['timestamp'].isoformat()
|
||||
await websocket.send_json(log_dict)
|
||||
|
||||
# If task is finished, we could potentially close the connection
|
||||
# but let's keep it open for a bit or until the client disconnects
|
||||
if "Task completed successfully" in log_entry.message or "Task failed" in log_entry.message:
|
||||
# Wait a bit to ensure client receives the last message
|
||||
await asyncio.sleep(2)
|
||||
break
|
||||
|
||||
except WebSocketDisconnect:
|
||||
logger.info(f"WebSocket connection disconnected for task {task_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"WebSocket error for task {task_id}: {e}")
|
||||
finally:
|
||||
task_manager.unsubscribe_logs(task_id, queue)
|
||||
|
||||
# [/DEF]
|
||||
|
||||
|
||||
@@ -61,6 +61,7 @@ class TaskManager:
|
||||
def __init__(self, plugin_loader):
|
||||
self.plugin_loader = plugin_loader
|
||||
self.tasks: Dict[str, Task] = {}
|
||||
self.subscribers: Dict[str, List[asyncio.Queue]] = {}
|
||||
self.executor = ThreadPoolExecutor(max_workers=5) # For CPU-bound plugin execution
|
||||
self.loop = asyncio.get_event_loop()
|
||||
# [/DEF]
|
||||
@@ -92,7 +93,7 @@ class TaskManager:
|
||||
|
||||
task.status = TaskStatus.RUNNING
|
||||
task.started_at = datetime.utcnow()
|
||||
task.logs.append(LogEntry(level="INFO", message=f"Task started for plugin '{plugin.name}'"))
|
||||
self._add_log(task_id, "INFO", f"Task started for plugin '{plugin.name}'")
|
||||
|
||||
try:
|
||||
# Execute plugin in a separate thread to avoid blocking the event loop
|
||||
@@ -103,10 +104,10 @@ class TaskManager:
|
||||
lambda: asyncio.run(plugin.execute(task.params)) if asyncio.iscoroutinefunction(plugin.execute) else plugin.execute(task.params)
|
||||
)
|
||||
task.status = TaskStatus.SUCCESS
|
||||
task.logs.append(LogEntry(level="INFO", message=f"Task completed successfully for plugin '{plugin.name}'"))
|
||||
self._add_log(task_id, "INFO", f"Task completed successfully for plugin '{plugin.name}'")
|
||||
except Exception as e:
|
||||
task.status = TaskStatus.FAILED
|
||||
task.logs.append(LogEntry(level="ERROR", message=f"Task failed: {e}", context={"error_type": type(e).__name__}))
|
||||
self._add_log(task_id, "ERROR", f"Task failed: {e}", {"error_type": type(e).__name__})
|
||||
finally:
|
||||
task.finished_at = datetime.utcnow()
|
||||
# In a real system, you might notify clients via WebSocket here
|
||||
@@ -129,3 +130,38 @@ class TaskManager:
|
||||
"""
|
||||
task = self.tasks.get(task_id)
|
||||
return task.logs if task else []
|
||||
|
||||
def _add_log(self, task_id: str, level: str, message: str, context: Optional[Dict[str, Any]] = None):
|
||||
"""
|
||||
Adds a log entry to a task and notifies subscribers.
|
||||
"""
|
||||
task = self.tasks.get(task_id)
|
||||
if not task:
|
||||
return
|
||||
|
||||
log_entry = LogEntry(level=level, message=message, context=context)
|
||||
task.logs.append(log_entry)
|
||||
|
||||
# Notify subscribers
|
||||
if task_id in self.subscribers:
|
||||
for queue in self.subscribers[task_id]:
|
||||
self.loop.call_soon_threadsafe(queue.put_nowait, log_entry)
|
||||
|
||||
async def subscribe_logs(self, task_id: str) -> asyncio.Queue:
|
||||
"""
|
||||
Subscribes to real-time logs for a task.
|
||||
"""
|
||||
queue = asyncio.Queue()
|
||||
if task_id not in self.subscribers:
|
||||
self.subscribers[task_id] = []
|
||||
self.subscribers[task_id].append(queue)
|
||||
return queue
|
||||
|
||||
def unsubscribe_logs(self, task_id: str, queue: asyncio.Queue):
|
||||
"""
|
||||
Unsubscribes from real-time logs for a task.
|
||||
"""
|
||||
if task_id in self.subscribers:
|
||||
self.subscribers[task_id].remove(queue)
|
||||
if not self.subscribers[task_id]:
|
||||
del self.subscribers[task_id]
|
||||
|
||||
Reference in New Issue
Block a user