from datetime import datetime, timedelta from fastapi import APIRouter, Depends, HTTPException, Request, Response from fastapi.security import OAuth2PasswordBearer import bcrypt from jose import JWTError, jwt from pydantic import BaseModel from sqlalchemy.orm import Session from app.config import settings from app.database import get_db from app.models import AdminUser, AccessToken router = APIRouter() oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.BASE_PATH}/api/admin/login", auto_error=False) def hash_password(password: str) -> str: return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") def verify_password(password: str, hashed: str) -> bool: return bcrypt.checkpw(password.encode("utf-8"), hashed.encode("utf-8")) class AdminLoginRequest(BaseModel): email: str password: str class TokenVerifyRequest(BaseModel): token: str def create_jwt_token(data: dict) -> str: to_encode = data.copy() expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) def get_current_admin( token: str = Depends(oauth2_scheme), db: Session = Depends(get_db) ) -> AdminUser: if not token: raise HTTPException(status_code=401, detail="Not authenticated") try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) email: str = payload.get("sub") if email is None: raise HTTPException(status_code=401, detail="Invalid token") except JWTError: raise HTTPException(status_code=401, detail="Invalid token") user = db.query(AdminUser).filter(AdminUser.email == email).first() if user is None: raise HTTPException(status_code=401, detail="User not found") return user def get_current_visitor(request: Request, db: Session = Depends(get_db)): """Returns AccessToken record or None for anonymous visitors.""" token = request.cookies.get("visitor_token") if not token: return None if token == "__anonymous__": return None record = db.query(AccessToken).filter( AccessToken.token == token, AccessToken.is_active == True ).first() if not record: return None return record @router.post("/api/admin/login") def admin_login(req: AdminLoginRequest, db: Session = Depends(get_db)): user = db.query(AdminUser).filter(AdminUser.email == req.email).first() if not user or not verify_password(req.password, user.password_hash): raise HTTPException(status_code=401, detail="Invalid email or password") token = create_jwt_token({"sub": user.email}) return {"access_token": token, "token_type": "bearer"} @router.post("/api/verify-token") def verify_token(req: TokenVerifyRequest, response: Response, db: Session = Depends(get_db)): record = db.query(AccessToken).filter( AccessToken.token == req.token, AccessToken.is_active == True ).first() if not record: raise HTTPException(status_code=401, detail="Invalid access token") response.set_cookie( key="visitor_token", value=req.token, httponly=True, max_age=86400, samesite="lax", ) return {"message": "ok"} @router.post("/api/anonymous-entry") def anonymous_entry(response: Response): """Allow anonymous access with a special cookie marker.""" response.set_cookie( key="visitor_token", value="__anonymous__", httponly=True, max_age=86400, samesite="lax", ) return {"message": "ok", "anonymous": True} @router.get("/api/logout") def logout(response: Response): response.delete_cookie("visitor_token") return {"message": "ok"}