CareerBot/app/routers/auth.py
ln0422 96997daed0 Initial commit: CareerBot full-stack career showcase with AI chatbot
- FastAPI backend with SQLAlchemy ORM and SQLite
- AI chatbot with OpenAI-compatible LLM integration (SSE streaming)
- Admin panel for content management, LLM config, token management
- Anonymous access with 3-question limit, token-based access control
- Recruiter intent detection with admin notification
- Resume generator (JD-based, Markdown to Word export)
- Chinese localized public interface

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-07 20:36:38 +08:00

118 lines
3.7 KiB
Python

from datetime import datetime, timedelta
from fastapi import APIRouter, Depends, HTTPException, Request, Response
from fastapi.security import OAuth2PasswordBearer
import bcrypt
from jose import JWTError, jwt
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.config import settings
from app.database import get_db
from app.models import AdminUser, AccessToken
router = APIRouter()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/admin/login", auto_error=False)
def hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def verify_password(password: str, hashed: str) -> bool:
return bcrypt.checkpw(password.encode("utf-8"), hashed.encode("utf-8"))
class AdminLoginRequest(BaseModel):
email: str
password: str
class TokenVerifyRequest(BaseModel):
token: str
def create_jwt_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
def get_current_admin(
token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)
) -> AdminUser:
if not token:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
email: str = payload.get("sub")
if email is None:
raise HTTPException(status_code=401, detail="Invalid token")
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
user = db.query(AdminUser).filter(AdminUser.email == email).first()
if user is None:
raise HTTPException(status_code=401, detail="User not found")
return user
def get_current_visitor(request: Request, db: Session = Depends(get_db)):
"""Returns AccessToken record or None for anonymous visitors."""
token = request.cookies.get("visitor_token")
if not token:
return None
if token == "__anonymous__":
return None
record = db.query(AccessToken).filter(
AccessToken.token == token, AccessToken.is_active == True
).first()
if not record:
return None
return record
@router.post("/api/admin/login")
def admin_login(req: AdminLoginRequest, db: Session = Depends(get_db)):
user = db.query(AdminUser).filter(AdminUser.email == req.email).first()
if not user or not verify_password(req.password, user.password_hash):
raise HTTPException(status_code=401, detail="Invalid email or password")
token = create_jwt_token({"sub": user.email})
return {"access_token": token, "token_type": "bearer"}
@router.post("/api/verify-token")
def verify_token(req: TokenVerifyRequest, response: Response, db: Session = Depends(get_db)):
record = db.query(AccessToken).filter(
AccessToken.token == req.token, AccessToken.is_active == True
).first()
if not record:
raise HTTPException(status_code=401, detail="Invalid access token")
response.set_cookie(
key="visitor_token",
value=req.token,
httponly=True,
max_age=86400,
samesite="lax",
)
return {"message": "ok"}
@router.post("/api/anonymous-entry")
def anonymous_entry(response: Response):
"""Allow anonymous access with a special cookie marker."""
response.set_cookie(
key="visitor_token",
value="__anonymous__",
httponly=True,
max_age=86400,
samesite="lax",
)
return {"message": "ok", "anonymous": True}
@router.get("/api/logout")
def logout(response: Response):
response.delete_cookie("visitor_token")
return {"message": "ok"}