Files
docker_dev/dockmon/backend/websocket/connection.py

69 lines
2.3 KiB
Python

"""
WebSocket Connection Management for DockMon
Handles WebSocket connections and message broadcasting
"""
import asyncio
import json
import logging
from typing import List
from fastapi import WebSocket
logger = logging.getLogger(__name__)
class DateTimeEncoder(json.JSONEncoder):
"""Custom JSON encoder for datetime objects"""
def default(self, obj):
from datetime import datetime
if isinstance(obj, datetime):
return obj.isoformat()
return super().default(obj)
class ConnectionManager:
"""Manages WebSocket connections with thread-safe operations"""
def __init__(self):
self.active_connections: List[WebSocket] = []
self._lock = asyncio.Lock()
async def connect(self, websocket: WebSocket):
await websocket.accept()
async with self._lock:
self.active_connections.append(websocket)
logger.info(f"New WebSocket connection. Total connections: {len(self.active_connections)}")
async def disconnect(self, websocket: WebSocket):
async with self._lock:
if websocket in self.active_connections:
self.active_connections.remove(websocket)
logger.info(f"WebSocket disconnected. Total connections: {len(self.active_connections)}")
def has_active_connections(self) -> bool:
"""Check if there are any active WebSocket connections"""
return bool(self.active_connections)
async def broadcast(self, message: dict):
"""Send message to all connected clients"""
# Get snapshot of connections with lock
async with self._lock:
connections = self.active_connections.copy()
# Send messages without lock (IO can block)
dead_connections = []
for connection in connections:
try:
await connection.send_text(json.dumps(message, cls=DateTimeEncoder))
except Exception as e:
logger.error(f"Error sending message: {e}")
dead_connections.append(connection)
# Clean up dead connections with lock
if dead_connections:
async with self._lock:
for conn in dead_connections:
if conn in self.active_connections:
self.active_connections.remove(conn)