138 lines
4.7 KiB
Python
138 lines
4.7 KiB
Python
"""
|
|
Session Management System for DockMon
|
|
Provides secure session tokens with IP validation and automatic cleanup
|
|
"""
|
|
|
|
import logging
|
|
import secrets
|
|
import threading
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, Optional
|
|
|
|
from fastapi import Request
|
|
|
|
from security.audit import security_audit
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SessionManager:
|
|
"""
|
|
Custom session management for frontend authentication
|
|
Provides secure session tokens with configurable expiry
|
|
"""
|
|
def __init__(self):
|
|
self.sessions: Dict[str, dict] = {}
|
|
self.session_timeout = timedelta(hours=24) # 24 hour sessions
|
|
self._sessions_lock = threading.Lock()
|
|
self._shutdown_event = threading.Event()
|
|
self._cleanup_thread = threading.Thread(target=self._periodic_cleanup, daemon=True)
|
|
self._cleanup_thread.start()
|
|
|
|
def _periodic_cleanup(self):
|
|
"""Run cleanup every hour"""
|
|
while not self._shutdown_event.wait(timeout=3600):
|
|
try:
|
|
deleted = self.cleanup_expired_sessions()
|
|
if deleted > 0:
|
|
logger.info(f"Cleaned up {deleted} expired sessions")
|
|
except Exception as e:
|
|
logger.error(f"Session cleanup failed: {e}", exc_info=True)
|
|
|
|
def create_session(self, request: Request, username: str = None) -> str:
|
|
"""Create a new session token"""
|
|
session_id = secrets.token_urlsafe(32)
|
|
client_ip = request.client.host
|
|
user_agent = request.headers.get("user-agent", "Unknown")
|
|
|
|
with self._sessions_lock:
|
|
self.sessions[session_id] = {
|
|
"created_at": datetime.utcnow(),
|
|
"last_accessed": datetime.utcnow(),
|
|
"client_ip": client_ip,
|
|
"user_agent": user_agent,
|
|
"authenticated": True,
|
|
"username": username
|
|
}
|
|
|
|
# Security audit log
|
|
security_audit.log_login_success(client_ip, user_agent, session_id)
|
|
|
|
return session_id
|
|
|
|
def validate_session(self, session_id: Optional[str], request: Request) -> bool:
|
|
"""Validate session token and update last accessed time"""
|
|
if not session_id:
|
|
return False
|
|
|
|
with self._sessions_lock:
|
|
if session_id not in self.sessions:
|
|
return False
|
|
|
|
session = self.sessions[session_id]
|
|
current_time = datetime.utcnow()
|
|
client_ip = request.client.host
|
|
|
|
# Check if session has expired
|
|
if current_time - session["created_at"] > self.session_timeout:
|
|
del self.sessions[session_id]
|
|
security_audit.log_session_expired(client_ip, session_id)
|
|
return False
|
|
|
|
# Validate IP consistency for security
|
|
if session["client_ip"] != client_ip:
|
|
security_audit.log_session_hijack_attempt(
|
|
original_ip=session["client_ip"],
|
|
attempted_ip=client_ip,
|
|
session_id=session_id
|
|
)
|
|
del self.sessions[session_id]
|
|
return False
|
|
|
|
# Update last accessed time
|
|
session["last_accessed"] = current_time
|
|
return True
|
|
|
|
def delete_session(self, session_id: str):
|
|
"""Delete a session (logout)"""
|
|
with self._sessions_lock:
|
|
if session_id in self.sessions:
|
|
del self.sessions[session_id]
|
|
|
|
def get_session_username(self, session_id: str) -> Optional[str]:
|
|
"""Get username from session"""
|
|
with self._sessions_lock:
|
|
if session_id in self.sessions:
|
|
return self.sessions[session_id].get("username")
|
|
return None
|
|
|
|
def update_session_username(self, session_id: str, new_username: str):
|
|
"""Update username in session"""
|
|
with self._sessions_lock:
|
|
if session_id in self.sessions:
|
|
self.sessions[session_id]["username"] = new_username
|
|
|
|
def cleanup_expired_sessions(self):
|
|
"""Clean up expired sessions periodically"""
|
|
current_time = datetime.utcnow()
|
|
expired_sessions = []
|
|
|
|
with self._sessions_lock:
|
|
for session_id, session_data in self.sessions.items():
|
|
if current_time - session_data["created_at"] > self.session_timeout:
|
|
expired_sessions.append(session_id)
|
|
|
|
for session_id in expired_sessions:
|
|
self.delete_session(session_id)
|
|
|
|
return len(expired_sessions)
|
|
|
|
def shutdown(self):
|
|
"""Shutdown the session manager and cleanup thread"""
|
|
self._shutdown_event.set()
|
|
self._cleanup_thread.join(timeout=5)
|
|
|
|
|
|
# Global session manager instance
|
|
session_manager = SessionManager() |