CareerBot/app/routers/chat.py
ln0422 96997daed0 Initial commit: CareerBot full-stack career showcase with AI chatbot
- FastAPI backend with SQLAlchemy ORM and SQLite
- AI chatbot with OpenAI-compatible LLM integration (SSE streaming)
- Admin panel for content management, LLM config, token management
- Anonymous access with 3-question limit, token-based access control
- Recruiter intent detection with admin notification
- Resume generator (JD-based, Markdown to Word export)
- Chinese localized public interface

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-07 20:36:38 +08:00

188 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
from fastapi import APIRouter, Depends, Form, UploadFile, File, Request
from sqlalchemy.orm import Session
from sse_starlette.sse import EventSourceResponse
from app.database import get_db
from app.models import ChatHistory, AccessToken, RecruiterMessage
from app.routers.auth import get_current_visitor
from app.services.chat_service import process_message
router = APIRouter()
ANONYMOUS_MAX_QUESTIONS = 3
ANONYMOUS_SESSION_KEY = "anon_used_questions"
@router.get("/api/chat/config")
def get_chat_config(
request: Request,
db: Session = Depends(get_db),
token_record: AccessToken | None = Depends(get_current_visitor),
):
"""Return chat config for current visitor's token or anonymous."""
if token_record:
return {
"max_questions": token_record.max_questions or 9,
"used_questions": token_record.used_questions or 0,
"anonymous": False,
}
# Anonymous visitor - track via session cookie
return {
"max_questions": ANONYMOUS_MAX_QUESTIONS,
"used_questions": 0, # will be tracked client-side and verified server-side
"anonymous": True,
}
@router.post("/api/chat")
async def chat(
request: Request,
session_id: str = Form(...),
message: str = Form(""),
file: UploadFile | None = File(None),
db: Session = Depends(get_db),
token_record: AccessToken | None = Depends(get_current_visitor),
):
if token_record:
# Authenticated visitor
max_q = token_record.max_questions or 9
used_q = token_record.used_questions or 0
if used_q >= max_q:
async def limit_gen():
yield {"data": json.dumps({"content": f"您的提问次数已达上限({max_q}次),无法继续提问。如需更多次数,请联系管理员。", "limit_reached": True})}
yield {"data": json.dumps({"content": "", "done": True})}
return EventSourceResponse(limit_gen())
token_record.used_questions = used_q + 1
db.commit()
else:
# Anonymous visitor - check count from chat history
used_q = db.query(ChatHistory).filter(
ChatHistory.session_id == session_id,
ChatHistory.role == "user",
).count()
if used_q >= ANONYMOUS_MAX_QUESTIONS:
async def limit_gen():
yield {"data": json.dumps({"content": f"匿名访问提问次数已达上限({ANONYMOUS_MAX_QUESTIONS}次)。如需更多提问次数,请向管理员获取访问令牌(Access Token)。", "limit_reached": True})}
yield {"data": json.dumps({"content": "", "done": True})}
return EventSourceResponse(limit_gen())
# Collect response for intent detection
collected_response = []
async def event_generator():
async for chunk in process_message(session_id, message, db, file):
collected_response.append(chunk)
yield {"data": json.dumps({"content": chunk})}
yield {"data": json.dumps({"content": "", "done": True})}
# Save token info before session closes
visitor_label = "匿名访问者"
if token_record and token_record.note:
visitor_label = token_record.note
elif token_record:
visitor_label = f"Token: {token_record.token[:8]}..."
async def on_stream_complete():
"""Run recruiter intent detection after SSE stream closes, with its own DB session."""
assistant_text = "".join(collected_response)
if assistant_text and "AI服务调用失败" not in assistant_text:
try:
await _check_recruiter_intent(session_id, message, assistant_text, visitor_label)
except Exception:
pass
from starlette.background import BackgroundTask
return EventSourceResponse(event_generator(), background=BackgroundTask(on_stream_complete))
@router.get("/api/chat/history/{session_id}")
def get_chat_history(
session_id: str,
db: Session = Depends(get_db),
token_record: AccessToken | None = Depends(get_current_visitor),
):
history = (
db.query(ChatHistory)
.filter(ChatHistory.session_id == session_id)
.order_by(ChatHistory.created_at)
.limit(50)
.all()
)
return [
{
"role": h.role,
"content": h.content,
"created_at": h.created_at.isoformat() if h.created_at else "",
}
for h in history
]
async def _check_recruiter_intent(
session_id: str,
user_message: str,
assistant_response: str,
visitor_label: str,
):
"""Use LLM to detect recruiter intent and extract info. Uses its own DB session."""
from app.database import SessionLocal
from app.services.llm_service import chat_completion
detection_prompt = """你是一个信息提取助手。分析以下对话,判断访问者是否表达了以下任何意愿:
1. 招聘意愿(想要招聘候选人)
2. 面试意愿(想要邀请候选人面试)
3. 留下了公司信息或联系方式
如果检测到以上任何一项请用以下JSON格式回复不要包含其他文字
{{"detected": true, "intent": "招聘意愿/面试意愿/留下联系方式", "company": "公司名称(如有)", "contact": "联系方式(如有)", "summary": "简要描述访问者的意图和关键信息"}}
如果没有检测到,回复:
{{"detected": false}}
访问者消息: {user_msg}
AI助手回复: {assistant_msg}"""
db = SessionLocal()
try:
messages = [
{"role": "system", "content": "你是一个精确的信息提取助手只输出JSON格式。"},
{"role": "user", "content": detection_prompt.format(
user_msg=user_message[:500],
assistant_msg=assistant_response[:500],
)},
]
result = await chat_completion(messages, db)
if not result:
return
# Parse JSON from response
import re
json_match = re.search(r'\{.*\}', result, re.DOTALL)
if not json_match:
return
data = json.loads(json_match.group())
if not data.get("detected"):
return
msg = RecruiterMessage(
session_id=session_id,
visitor_label=visitor_label,
company=data.get("company", ""),
contact=data.get("contact", ""),
intent=data.get("intent", ""),
summary=data.get("summary", ""),
)
db.add(msg)
db.commit()
except Exception:
pass # Don't break chat if detection fails
finally:
db.close()