260 lines
8.1 KiB
Python
260 lines
8.1 KiB
Python
"""
|
|
Authentication Routes for DockMon
|
|
Handles login, logout, API key access, and session management endpoints
|
|
"""
|
|
|
|
import os
|
|
import secrets
|
|
import logging
|
|
from typing import Optional
|
|
|
|
from fastapi import APIRouter, Request, Response, HTTPException, Depends
|
|
from fastapi.responses import JSONResponse
|
|
|
|
from models.auth_models import LoginRequest, ChangePasswordRequest
|
|
from security.rate_limiting import rate_limit_auth
|
|
from security.audit import security_audit
|
|
from auth.session_manager import session_manager
|
|
from database import DatabaseManager
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
router = APIRouter(prefix="/api/auth", tags=["authentication"])
|
|
|
|
|
|
def _is_localhost_or_internal(client_ip: str) -> bool:
|
|
"""Check if request is from localhost or internal network"""
|
|
import ipaddress
|
|
try:
|
|
addr = ipaddress.ip_address(client_ip)
|
|
|
|
# Allow localhost
|
|
if addr.is_loopback:
|
|
return True
|
|
|
|
# Allow private networks (RFC 1918) - for Docker networks and internal deployments
|
|
if addr.is_private:
|
|
return True
|
|
|
|
return False
|
|
except ValueError:
|
|
# Invalid IP format
|
|
return False
|
|
|
|
|
|
from config.paths import DATABASE_PATH
|
|
|
|
# Initialize database for user management - use centralized path config
|
|
db = DatabaseManager(DATABASE_PATH)
|
|
|
|
# Ensure default user exists on startup
|
|
db.get_or_create_default_user()
|
|
|
|
|
|
def _get_session_from_cookie(request: Request) -> Optional[str]:
|
|
"""Extract session ID from cookie"""
|
|
return request.cookies.get("dockmon_session")
|
|
|
|
|
|
async def verify_frontend_session(request: Request) -> bool:
|
|
"""Dependency to verify frontend session authentication"""
|
|
session_id = _get_session_from_cookie(request)
|
|
|
|
if not session_manager.validate_session(session_id, request):
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Authentication required"
|
|
)
|
|
|
|
return True
|
|
|
|
|
|
# Note: This endpoint will be implemented in main.py since it needs monitor instance
|
|
# @router.get("/key") - implemented directly in main.py
|
|
|
|
|
|
@router.post("/login")
|
|
async def login(login_data: LoginRequest, request: Request, response: Response, rate_limit_check: bool = rate_limit_auth):
|
|
"""Frontend login endpoint"""
|
|
client_ip = request.client.host
|
|
user_agent = request.headers.get("user-agent", "Unknown")
|
|
|
|
# Verify credentials using database
|
|
user_info = db.verify_user_credentials(login_data.username, login_data.password)
|
|
if not user_info:
|
|
security_audit.log_login_failure(client_ip, user_agent, "Invalid credentials")
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Invalid username or password"
|
|
)
|
|
|
|
# Create session with username
|
|
session_id = session_manager.create_session(request, user_info["username"])
|
|
|
|
# Set secure cookie
|
|
# Detect if we're using HTTPS based on common headers
|
|
is_https = request.url.scheme == "https" or request.headers.get("x-forwarded-proto") == "https"
|
|
|
|
response.set_cookie(
|
|
key="dockmon_session",
|
|
value=session_id,
|
|
httponly=True, # Prevent XSS access to cookie
|
|
secure=is_https, # Use secure flag when on HTTPS
|
|
samesite="lax", # CSRF protection
|
|
max_age=24*60*60 # 24 hours
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"message": "Login successful",
|
|
"username": user_info["username"],
|
|
"must_change_password": user_info["must_change_password"],
|
|
"is_first_login": user_info["is_first_login"]
|
|
}
|
|
|
|
|
|
@router.post("/logout")
|
|
async def logout(request: Request, response: Response, authenticated: bool = Depends(verify_frontend_session)):
|
|
"""Frontend logout endpoint"""
|
|
session_id = _get_session_from_cookie(request)
|
|
|
|
if session_id:
|
|
session_manager.delete_session(session_id)
|
|
|
|
# Clear cookie
|
|
response.delete_cookie("dockmon_session")
|
|
|
|
return {"success": True, "message": "Logout successful"}
|
|
|
|
|
|
@router.get("/status")
|
|
async def auth_status(request: Request):
|
|
"""Check authentication status"""
|
|
session_id = _get_session_from_cookie(request)
|
|
authenticated = session_manager.validate_session(session_id, request) if session_id else False
|
|
|
|
response = {
|
|
"authenticated": authenticated,
|
|
"session_valid": authenticated
|
|
}
|
|
|
|
# If authenticated, include username and password change requirement
|
|
if authenticated:
|
|
username = session_manager.get_session_username(session_id)
|
|
if username:
|
|
response["username"] = username
|
|
# Check if user must change password
|
|
with db.get_session() as session:
|
|
from database import User
|
|
user = session.query(User).filter(User.username == username).first()
|
|
if user:
|
|
response["must_change_password"] = user.must_change_password
|
|
response["is_first_login"] = user.is_first_login
|
|
|
|
return response
|
|
|
|
|
|
@router.post("/change-password")
|
|
async def change_password(password_data: ChangePasswordRequest, request: Request, authenticated: bool = Depends(verify_frontend_session)):
|
|
"""Change user password"""
|
|
session_id = _get_session_from_cookie(request)
|
|
username = session_manager.get_session_username(session_id)
|
|
|
|
if not username:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Session invalid"
|
|
)
|
|
|
|
# Verify current password
|
|
user_info = db.verify_user_credentials(username, password_data.current_password)
|
|
if not user_info:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Current password is incorrect"
|
|
)
|
|
|
|
# Change password
|
|
success = db.change_user_password(username, password_data.new_password)
|
|
if not success:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail="Failed to change password"
|
|
)
|
|
|
|
# Log security event
|
|
client_ip = request.client.host
|
|
user_agent = request.headers.get("user-agent", "Unknown")
|
|
security_audit.log_password_change(client_ip, user_agent, username)
|
|
|
|
return {
|
|
"success": True,
|
|
"message": "Password changed successfully"
|
|
}
|
|
|
|
|
|
@router.post("/change-username")
|
|
async def change_username(username_data: dict, request: Request, authenticated: bool = Depends(verify_frontend_session)):
|
|
"""Change username"""
|
|
session_id = _get_session_from_cookie(request)
|
|
current_username = session_manager.get_session_username(session_id)
|
|
|
|
if not current_username:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Session invalid"
|
|
)
|
|
|
|
# Verify current password
|
|
current_password = username_data.get("current_password")
|
|
new_username = username_data.get("new_username", "").strip()
|
|
|
|
if not current_password or not new_username:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Current password and new username required"
|
|
)
|
|
|
|
# Verify current credentials
|
|
user_info = db.verify_user_credentials(current_username, current_password)
|
|
if not user_info:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Current password is incorrect"
|
|
)
|
|
|
|
# Validate new username
|
|
if len(new_username) < 3 or len(new_username) > 50:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Username must be between 3 and 50 characters"
|
|
)
|
|
|
|
# Check if new username already exists (but allow keeping the same username)
|
|
if new_username != current_username and db.username_exists(new_username):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Username already exists"
|
|
)
|
|
|
|
# Change username
|
|
if not db.change_username(current_username, new_username):
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail="Failed to change username"
|
|
)
|
|
|
|
# Update session with new username
|
|
session_manager.update_session_username(session_id, new_username)
|
|
|
|
# Log security event
|
|
client_ip = request.client.host
|
|
user_agent = request.headers.get("user-agent", "Unknown")
|
|
security_audit.log_username_change(client_ip, user_agent, current_username, new_username)
|
|
|
|
return {
|
|
"success": True,
|
|
"message": "Username changed successfully"
|
|
} |