CareerBot/app/routers/auth.py
ln0422 501f8985ec Add /careerbot base path for www.ityb.me/careerbot deployment
- Add BASE_PATH config, include all routers with prefix
- Inject {{ base }} Jinja2 global for all template URLs
- Add window.BASE_PATH for static JS files
- Update Nginx to proxy /careerbot/ path
- Add OPS_MANUAL.md

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-07 22:07:34 +08:00

118 lines
3.7 KiB
Python

from datetime import datetime, timedelta
from fastapi import APIRouter, Depends, HTTPException, Request, Response
from fastapi.security import OAuth2PasswordBearer
import bcrypt
from jose import JWTError, jwt
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.config import settings
from app.database import get_db
from app.models import AdminUser, AccessToken
router = APIRouter()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.BASE_PATH}/api/admin/login", auto_error=False)
def hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def verify_password(password: str, hashed: str) -> bool:
return bcrypt.checkpw(password.encode("utf-8"), hashed.encode("utf-8"))
class AdminLoginRequest(BaseModel):
email: str
password: str
class TokenVerifyRequest(BaseModel):
token: str
def create_jwt_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
def get_current_admin(
token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)
) -> AdminUser:
if not token:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
email: str = payload.get("sub")
if email is None:
raise HTTPException(status_code=401, detail="Invalid token")
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
user = db.query(AdminUser).filter(AdminUser.email == email).first()
if user is None:
raise HTTPException(status_code=401, detail="User not found")
return user
def get_current_visitor(request: Request, db: Session = Depends(get_db)):
"""Returns AccessToken record or None for anonymous visitors."""
token = request.cookies.get("visitor_token")
if not token:
return None
if token == "__anonymous__":
return None
record = db.query(AccessToken).filter(
AccessToken.token == token, AccessToken.is_active == True
).first()
if not record:
return None
return record
@router.post("/api/admin/login")
def admin_login(req: AdminLoginRequest, db: Session = Depends(get_db)):
user = db.query(AdminUser).filter(AdminUser.email == req.email).first()
if not user or not verify_password(req.password, user.password_hash):
raise HTTPException(status_code=401, detail="Invalid email or password")
token = create_jwt_token({"sub": user.email})
return {"access_token": token, "token_type": "bearer"}
@router.post("/api/verify-token")
def verify_token(req: TokenVerifyRequest, response: Response, db: Session = Depends(get_db)):
record = db.query(AccessToken).filter(
AccessToken.token == req.token, AccessToken.is_active == True
).first()
if not record:
raise HTTPException(status_code=401, detail="Invalid access token")
response.set_cookie(
key="visitor_token",
value=req.token,
httponly=True,
max_age=86400,
samesite="lax",
)
return {"message": "ok"}
@router.post("/api/anonymous-entry")
def anonymous_entry(response: Response):
"""Allow anonymous access with a special cookie marker."""
response.set_cookie(
key="visitor_token",
value="__anonymous__",
httponly=True,
max_age=86400,
samesite="lax",
)
return {"message": "ok", "anonymous": True}
@router.get("/api/logout")
def logout(response: Response):
response.delete_cookie("visitor_token")
return {"message": "ok"}