Files
docker_dev/dockmon/backend/auth/session_manager.py

138 lines
4.7 KiB
Python

"""
Session Management System for DockMon
Provides secure session tokens with IP validation and automatic cleanup
"""
import logging
import secrets
import threading
import time
from datetime import datetime, timedelta
from typing import Dict, Optional
from fastapi import Request
from security.audit import security_audit
logger = logging.getLogger(__name__)
class SessionManager:
"""
Custom session management for frontend authentication
Provides secure session tokens with configurable expiry
"""
def __init__(self):
self.sessions: Dict[str, dict] = {}
self.session_timeout = timedelta(hours=24) # 24 hour sessions
self._sessions_lock = threading.Lock()
self._shutdown_event = threading.Event()
self._cleanup_thread = threading.Thread(target=self._periodic_cleanup, daemon=True)
self._cleanup_thread.start()
def _periodic_cleanup(self):
"""Run cleanup every hour"""
while not self._shutdown_event.wait(timeout=3600):
try:
deleted = self.cleanup_expired_sessions()
if deleted > 0:
logger.info(f"Cleaned up {deleted} expired sessions")
except Exception as e:
logger.error(f"Session cleanup failed: {e}", exc_info=True)
def create_session(self, request: Request, username: str = None) -> str:
"""Create a new session token"""
session_id = secrets.token_urlsafe(32)
client_ip = request.client.host
user_agent = request.headers.get("user-agent", "Unknown")
with self._sessions_lock:
self.sessions[session_id] = {
"created_at": datetime.utcnow(),
"last_accessed": datetime.utcnow(),
"client_ip": client_ip,
"user_agent": user_agent,
"authenticated": True,
"username": username
}
# Security audit log
security_audit.log_login_success(client_ip, user_agent, session_id)
return session_id
def validate_session(self, session_id: Optional[str], request: Request) -> bool:
"""Validate session token and update last accessed time"""
if not session_id:
return False
with self._sessions_lock:
if session_id not in self.sessions:
return False
session = self.sessions[session_id]
current_time = datetime.utcnow()
client_ip = request.client.host
# Check if session has expired
if current_time - session["created_at"] > self.session_timeout:
del self.sessions[session_id]
security_audit.log_session_expired(client_ip, session_id)
return False
# Validate IP consistency for security
if session["client_ip"] != client_ip:
security_audit.log_session_hijack_attempt(
original_ip=session["client_ip"],
attempted_ip=client_ip,
session_id=session_id
)
del self.sessions[session_id]
return False
# Update last accessed time
session["last_accessed"] = current_time
return True
def delete_session(self, session_id: str):
"""Delete a session (logout)"""
with self._sessions_lock:
if session_id in self.sessions:
del self.sessions[session_id]
def get_session_username(self, session_id: str) -> Optional[str]:
"""Get username from session"""
with self._sessions_lock:
if session_id in self.sessions:
return self.sessions[session_id].get("username")
return None
def update_session_username(self, session_id: str, new_username: str):
"""Update username in session"""
with self._sessions_lock:
if session_id in self.sessions:
self.sessions[session_id]["username"] = new_username
def cleanup_expired_sessions(self):
"""Clean up expired sessions periodically"""
current_time = datetime.utcnow()
expired_sessions = []
with self._sessions_lock:
for session_id, session_data in self.sessions.items():
if current_time - session_data["created_at"] > self.session_timeout:
expired_sessions.append(session_id)
for session_id in expired_sessions:
self.delete_session(session_id)
return len(expired_sessions)
def shutdown(self):
"""Shutdown the session manager and cleanup thread"""
self._shutdown_event.set()
self._cleanup_thread.join(timeout=5)
# Global session manager instance
session_manager = SessionManager()