Files

260 lines
8.1 KiB
Python

"""
Authentication Routes for DockMon
Handles login, logout, API key access, and session management endpoints
"""
import os
import secrets
import logging
from typing import Optional
from fastapi import APIRouter, Request, Response, HTTPException, Depends
from fastapi.responses import JSONResponse
from models.auth_models import LoginRequest, ChangePasswordRequest
from security.rate_limiting import rate_limit_auth
from security.audit import security_audit
from auth.session_manager import session_manager
from database import DatabaseManager
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/auth", tags=["authentication"])
def _is_localhost_or_internal(client_ip: str) -> bool:
"""Check if request is from localhost or internal network"""
import ipaddress
try:
addr = ipaddress.ip_address(client_ip)
# Allow localhost
if addr.is_loopback:
return True
# Allow private networks (RFC 1918) - for Docker networks and internal deployments
if addr.is_private:
return True
return False
except ValueError:
# Invalid IP format
return False
from config.paths import DATABASE_PATH
# Initialize database for user management - use centralized path config
db = DatabaseManager(DATABASE_PATH)
# Ensure default user exists on startup
db.get_or_create_default_user()
def _get_session_from_cookie(request: Request) -> Optional[str]:
"""Extract session ID from cookie"""
return request.cookies.get("dockmon_session")
async def verify_frontend_session(request: Request) -> bool:
"""Dependency to verify frontend session authentication"""
session_id = _get_session_from_cookie(request)
if not session_manager.validate_session(session_id, request):
raise HTTPException(
status_code=401,
detail="Authentication required"
)
return True
# Note: This endpoint will be implemented in main.py since it needs monitor instance
# @router.get("/key") - implemented directly in main.py
@router.post("/login")
async def login(login_data: LoginRequest, request: Request, response: Response, rate_limit_check: bool = rate_limit_auth):
"""Frontend login endpoint"""
client_ip = request.client.host
user_agent = request.headers.get("user-agent", "Unknown")
# Verify credentials using database
user_info = db.verify_user_credentials(login_data.username, login_data.password)
if not user_info:
security_audit.log_login_failure(client_ip, user_agent, "Invalid credentials")
raise HTTPException(
status_code=401,
detail="Invalid username or password"
)
# Create session with username
session_id = session_manager.create_session(request, user_info["username"])
# Set secure cookie
# Detect if we're using HTTPS based on common headers
is_https = request.url.scheme == "https" or request.headers.get("x-forwarded-proto") == "https"
response.set_cookie(
key="dockmon_session",
value=session_id,
httponly=True, # Prevent XSS access to cookie
secure=is_https, # Use secure flag when on HTTPS
samesite="lax", # CSRF protection
max_age=24*60*60 # 24 hours
)
return {
"success": True,
"message": "Login successful",
"username": user_info["username"],
"must_change_password": user_info["must_change_password"],
"is_first_login": user_info["is_first_login"]
}
@router.post("/logout")
async def logout(request: Request, response: Response, authenticated: bool = Depends(verify_frontend_session)):
"""Frontend logout endpoint"""
session_id = _get_session_from_cookie(request)
if session_id:
session_manager.delete_session(session_id)
# Clear cookie
response.delete_cookie("dockmon_session")
return {"success": True, "message": "Logout successful"}
@router.get("/status")
async def auth_status(request: Request):
"""Check authentication status"""
session_id = _get_session_from_cookie(request)
authenticated = session_manager.validate_session(session_id, request) if session_id else False
response = {
"authenticated": authenticated,
"session_valid": authenticated
}
# If authenticated, include username and password change requirement
if authenticated:
username = session_manager.get_session_username(session_id)
if username:
response["username"] = username
# Check if user must change password
with db.get_session() as session:
from database import User
user = session.query(User).filter(User.username == username).first()
if user:
response["must_change_password"] = user.must_change_password
response["is_first_login"] = user.is_first_login
return response
@router.post("/change-password")
async def change_password(password_data: ChangePasswordRequest, request: Request, authenticated: bool = Depends(verify_frontend_session)):
"""Change user password"""
session_id = _get_session_from_cookie(request)
username = session_manager.get_session_username(session_id)
if not username:
raise HTTPException(
status_code=401,
detail="Session invalid"
)
# Verify current password
user_info = db.verify_user_credentials(username, password_data.current_password)
if not user_info:
raise HTTPException(
status_code=401,
detail="Current password is incorrect"
)
# Change password
success = db.change_user_password(username, password_data.new_password)
if not success:
raise HTTPException(
status_code=500,
detail="Failed to change password"
)
# Log security event
client_ip = request.client.host
user_agent = request.headers.get("user-agent", "Unknown")
security_audit.log_password_change(client_ip, user_agent, username)
return {
"success": True,
"message": "Password changed successfully"
}
@router.post("/change-username")
async def change_username(username_data: dict, request: Request, authenticated: bool = Depends(verify_frontend_session)):
"""Change username"""
session_id = _get_session_from_cookie(request)
current_username = session_manager.get_session_username(session_id)
if not current_username:
raise HTTPException(
status_code=401,
detail="Session invalid"
)
# Verify current password
current_password = username_data.get("current_password")
new_username = username_data.get("new_username", "").strip()
if not current_password or not new_username:
raise HTTPException(
status_code=400,
detail="Current password and new username required"
)
# Verify current credentials
user_info = db.verify_user_credentials(current_username, current_password)
if not user_info:
raise HTTPException(
status_code=401,
detail="Current password is incorrect"
)
# Validate new username
if len(new_username) < 3 or len(new_username) > 50:
raise HTTPException(
status_code=400,
detail="Username must be between 3 and 50 characters"
)
# Check if new username already exists (but allow keeping the same username)
if new_username != current_username and db.username_exists(new_username):
raise HTTPException(
status_code=400,
detail="Username already exists"
)
# Change username
if not db.change_username(current_username, new_username):
raise HTTPException(
status_code=500,
detail="Failed to change username"
)
# Update session with new username
session_manager.update_session_username(session_id, new_username)
# Log security event
client_ip = request.client.host
user_agent = request.headers.get("user-agent", "Unknown")
security_audit.log_username_change(client_ip, user_agent, current_username, new_username)
return {
"success": True,
"message": "Username changed successfully"
}