fastapi-后台任务和轮询
🔄 整体架构
┌─────────────┐ ①启动任务 ┌─────────────┐ ③后台执行 ┌─────────────┐
│ 前端 │ ──────────────────▶ │ 后端API │ ──────────────▶ │ 后台线程 │
│ (Vue.js) │ ◀────────────────── │ (FastAPI) │ │ (ThreadPool)│
└─────────────┘ ②返回task_id └─────────────┘ └─────────────┘
│ ▲ │
│ ④轮询查询状态 │ ⑤更新任务状态 │
└───────────────────────────────────┴─────────────────────────────────┘
📦 一、批量添加功能的实现
1. 后端部分(backend/main.py)
步骤1: 接收请求,立即返回
@app.post("/api/alicloud/rules/batch-add-advanced-execute")
async def alicloud_batch_add_rules_advanced_execute(request, db):
# 生成唯一任务ID
task_id = str(uuid.uuid4())
# 初始化任务状态(存入内存字典)
with batch_add_tasks_lock:
batch_add_tasks[task_id] = {
"status": "running",
"message": "批量添加任务正在执行中...",
"created_at": datetime.now().isoformat()
}
# 定义后台执行函数
def batch_add_in_background():
# ... 实际的添加逻辑 ...
# 完成后更新状态
with batch_add_tasks_lock:
batch_add_tasks[task_id] = {
"status": "success", # 或 "partial_success" / "failed"
"message": "执行完成",
"execution_stats": {"success": 10, "failed": 2}
}
# 启动后台线程(不阻塞当前请求)
loop = asyncio.get_event_loop()
loop.run_in_executor(None, batch_add_in_background)
# 立即返回 task_id(不等待执行完成)
return {"success": True, "task_id": task_id, "message": "任务已启动"}
步骤2: 任务状态查询API
@app.get("/api/batch-add/task/{task_id}")
async def get_batch_add_task_status(task_id: str):
with batch_add_tasks_lock:
task = batch_add_tasks.get(task_id)
if not task:
raise HTTPException(status_code=404, detail="任务不存在")
return {"success": True, **task}
数据存储结构:
batch_add_tasks = {
"uuid-xxx-xxx": {
"status": "running" | "success" | "partial_success" | "failed",
"message": "执行中...",
"execution_stats": {"success": 0, "failed": 0},
"execution_details": [...],
"created_at": "2026-01-23T14:00:00",
"completed_at": "2026-01-23T14:05:00"
}
}
2. 前端部分(frontend/app_alicloud.js)
步骤1: 发起请求,开始轮询
async executeBatchAddAdvanced() {
// 显示加载动画
this.batchAddAdvancedDialog.executing = true;
// 调用后端API
const response = await axios.post('/api/alicloud/rules/batch-add-advanced-execute', data);
if (response.data.success && response.data.task_id) {
// 获取到 task_id,开始轮询
await this.pollBatchAddTaskStatus(response.data.task_id);
}
}
步骤2: 轮询任务状态
async pollBatchAddTaskStatus(taskId, attempt = 0) {
const maxAttempts = 200; // 最多200次(10分钟)
const pollInterval = 3000; // 每3秒查一次
// 第一次延迟2秒再查(给后端初始化时间)
if (attempt === 0) {
await new Promise(resolve => setTimeout(resolve, 2000));
}
// 查询任务状态
const response = await axios.get(`/api/batch-add/task/${taskId}`);
const task = response.data;
if (task.status === 'running') {
// 还在执行,3秒后继续轮询
setTimeout(() => this.pollBatchAddTaskStatus(taskId, attempt + 1), pollInterval);
}
else if (task.status === 'success' || task.status === 'partial_success') {
// 执行完成!
this.batchAddAdvancedDialog.executing = false; // 关闭加载
this.batchAddAdvancedDialog.step = 3; // 显示结果页
this.batchAddAdvancedDialog.executionStats = task.execution_stats;
ElMessage.success(`成功 ${task.execution_stats.success} 条`);
// 刷新页面数据
await this.searchSecurityGroups();
await this.loadStatistics();
}
else if (task.status === 'failed') {
// 执行失败
this.batchAddAdvancedDialog.executing = false;
ElMessage.error(task.message);
}
}
🔄 二、同步功能的实现
后端部分
@app.post("/api/sync/manual/{region}")
async def sync_manual(region: str, db):
# 启动后台同步任务
def sync_in_background():
bg_db = SessionLocal() # 新建数据库连接
try:
sync_service = get_sync_service(bg_db)
result = sync_service.sync_region(region, SyncTypeEnum.MANUAL)
logger.info(f"✅ 同步完成: {result}")
finally:
bg_db.close()
loop = asyncio.get_event_loop()
loop.run_in_executor(None, sync_in_background)
# 立即返回
return {"success": True, "message": "同步任务已启动"}
前端部分
同步功能通过查询同步日志来判断任务是否完成:
async pollSyncStatus(region, syncStartTime, attempt = 0) {
// 查询该区域最新的同步日志
const response = await axios.get(`/api/sync/logs/latest?region=${region}`);
const log = response.data;
// 判断是否是当前任务的日志(通过时间戳)
const logCreatedTime = new Date(log.created_at).getTime();
const isNewLog = logCreatedTime >= syncStartTime - 2000;
if (isNewLog && log.status === 'completed') {
// 同步完成!
ElMessage.success('同步成功');
await this.searchSecurityGroups();
} else {
// 继续轮询
setTimeout(() => this.pollSyncStatus(region, syncStartTime, attempt + 1), 3000);
}
}
🔑 三、关键技术点
| 技术点 | 说明 |
|---|---|
run_in_executor | Python异步框架中,将同步阻塞代码放到线程池执行,不阻塞主线程 |
batch_add_tasks 字典 | 内存中存储任务状态,前端可随时查询 |
batch_add_tasks_lock | 线程锁,保证多线程访问字典时的数据安全 |
SessionLocal() | 后台线程需要新建数据库会话,不能用主线程的 |
syncStartTime | 前端记录任务开始时间,用于判断日志是否属于当前任务 |
| 轮询间隔 3秒 | 平衡服务器压力和用户体验 |
| 最大轮询200次 | 10分钟超时保护,防止无限轮询 |
📊 四、时序图
前端 后端 后台线程
│ │ │
│── POST /batch-add-execute ─▶│ │
│ │── 生成 task_id │
│ │── 初始化状态 "running" │
│ │── run_in_executor() ───────▶│
│◀── 返回 {task_id} ─────────│ │
│ │ │── 执行添加规则...
│ (等待2秒) │ │
│── GET /task/{task_id} ─────▶│ │
│◀── {status: "running"} ────│ │
│ │ │── 执行同步...
│ (等待3秒) │ │
│── GET /task/{task_id} ─────▶│ │
│◀── {status: "running"} ────│ │
│ │ │
│ (等待3秒) │ │── 完成!更新状态
│── GET /task/{task_id} ─────▶│◀──────────────────────────│
│◀── {status: "success"} ────│ │
│ │ │
│── 显示结果,刷新页面 │ │
❓ 为什么需要这种架构?
- 避免HTTP超时:云API操作可能需要几分钟,HTTP请求默认60秒超时
- 避免Pod重启:K8s健康检查会认为长时间无响应的Pod已死亡
- 用户体验好:用户立即得到反馈"任务已启动",不用干等
- 可追踪:任意时刻都能查询任务进度
轮询的基本概念
什么是轮询?
轮询(Polling)= 定期询问
就像你每隔一段时间问一次:
"任务完成了吗?"
"还没完成?"
"完成了吗?"
"还没完成?"
...
直到得到"完成了!"的答案
为什么需要轮询?
问题:后端任务需要时间
前端发送请求 → 后端立即返回 task_id
↓
后端在后台执行任务(可能需要几分钟)
↓
任务完成
问题:前端如何知道任务何时完成?
解决方案对比
| 方案 | 说明 | 缺点 |
|---|---|---|
| 等待响应 | 前端一直等待后端完成 | HTTP请求超时(60秒) |
| WebSocket | 后端主动推送消息 | 需要额外配置,复杂度高 |
| 轮询 | 前端定期查询状态 | 简单可靠,适合你的场景 |
轮询的工作流程
1. 启动任务
// 前端发送请求
const response = await axios.post('/api/batch-add-execute', data);
// 后端立即返回: {task_id: "abc-123"}
// 开始轮询
pollBatchAddTaskStatus("abc-123");
2. 轮询循环
async pollBatchAddTaskStatus(taskId, attempt = 0) {
// ① 等待一段时间(第一次等待2秒,之后每3秒)
if (attempt === 0) {
await new Promise(resolve => setTimeout(resolve, 2000));
}
// ② 查询任务状态
const response = await axios.get(`/api/batch-add/task/${taskId}`);
const task = response.data;
// ③ 根据状态决定下一步
if (task.status === 'running') {
// 还在运行,3秒后继续查询
setTimeout(() => {
pollBatchAddTaskStatus(taskId, attempt + 1);
}, 3000);
} else if (task.status === 'success') {
// 完成了!停止轮询,显示结果
showResults(task);
}
}
轮询的时间线
时间轴:
─────────────────────────────────────────────────────────
T+0s 前端发送请求,后端返回 task_id
↓
T+0s 开始轮询(第1次)
↓
T+2s 等待2秒后,查询状态 → "running"
↓
T+5s 等待3秒后,查询状态 → "running"
↓
T+8s 等待3秒后,查询状态 → "running"
↓
T+11s 等待3秒后,查询状态 → "running"
↓
T+14s 等待3秒后,查询状态 → "running"
↓
... 继续轮询...
↓
T+220s 等待3秒后,查询状态 → "success" ✅
↓
停止轮询,显示结果
─────────────────────────────────────────────────────────
你的代码中的轮询实现
完整流程
// 第1步:执行批量添加
async executeBatchAddAdvanced() {
// 发送请求
const response = await axios.post('/api/aws/rules/batch-add-advanced-execute', data);
// 获取 task_id,开始轮询
if (response.data.task_id) {
await this.pollBatchAddTaskStatus(response.data.task_id);
}
}
// 第2步:轮询函数
async pollBatchAddTaskStatus(taskId, attempt = 0) {
const maxAttempts = 200; // 最多200次
const pollInterval = 3000; // 每3秒一次
// ① 第一次延迟2秒
if (attempt === 0) {
await new Promise(resolve => setTimeout(resolve, 2000));
}
// ② 检查是否超时
if (attempt >= maxAttempts) {
ElMessage.error('任务执行超时');
return;
}
// ③ 查询任务状态
try {
const response = await axios.get(`/api/batch-add/task/${taskId}`);
const task = response.data;
// ④ 根据状态处理
if (task.status === 'running') {
// 还在运行,继续轮询
setTimeout(() => {
this.pollBatchAddTaskStatus(taskId, attempt + 1);
}, pollInterval);
} else if (task.status === 'success') {
// 完成!停止轮询
this.batchAddAdvancedDialog.executing = false;
this.batchAddAdvancedDialog.step = 3;
await this.searchSecurityGroups();
} else if (task.status === 'failed') {
// 失败!停止轮询
this.batchAddAdvancedDialog.executing = false;
ElMessage.error(task.message);
}
} catch (error) {
// 网络错误,继续重试
setTimeout(() => {
this.pollBatchAddTaskStatus(taskId, attempt + 1);
}, pollInterval);
}
}
轮询的关键参数
1. 轮询间隔(pollInterval)
const pollInterval = 3000; // 每3秒查询一次
为什么是3秒?
- 太短(1秒):服务器压力大,浪费资源
- 太长(10秒):用户体验差,感觉卡顿
- 3秒:平衡用户体验和服务器压力
2. 最大尝试次数(maxAttempts)
const maxAttempts = 200; // 最多200次
// 200次 × 3秒 = 600秒 = 10分钟
为什么是200次?
- 防止无限轮询
- 给任务足够时间(10分钟)
- 超时后提示用户
3. 初始延迟
if (attempt === 0) {
await new Promise(resolve => setTimeout(resolve, 2000));
}
为什么第一次延迟2秒?
- 给后端初始化任务的时间
- 避免立即查询时任务还没创建
轮询的状态判断
状态流转
┌─────────┐
│ running │ ← 任务执行中,继续轮询
└─────────┘
↓
┌─────────┐
│ success │ ← 任务成功,停止轮询,显示结果
└─────────┘
↓
┌─────────┐
│ failed │ ← 任务失败,停止轮询,显示错误
└─────────┘
代码中的判断逻辑
if (task.status === 'running') {
// 继续轮询
setTimeout(() => pollBatchAddTaskStatus(...), 3000);
} else if (task.status === 'success' || task.status === 'partial_success') {
// 停止轮询,显示结果
showResults();
} else if (task.status === 'failed') {
// 停止轮询,显示错误
showError();
}
轮询 vs 其他方案
方案对比
| 方案 | 实现难度 | 实时性 | 服务器压力 | 适用场景 |
|---|---|---|---|---|
| 轮询 | 简单 | 中等(3秒延迟) | 中等 | 你的场景 |
| WebSocket | 复杂 | 高(实时) | 低 | 聊天、实时通知 |
| Server-Sent Events | 中等 | 高(实时) | 低 | 实时推送 |
| 长轮询 | 中等 | 高(实时) | 低 | 需要实时性 |
为什么选择轮询?
- 实现简单
- 不需要额外配置
- 3秒延迟可接受
- 适合你的批量任务场景
轮询的优化技巧
1. 指数退避(你的代码没有用,但可以优化)
// 当前:固定3秒间隔
setTimeout(() => poll(...), 3000);
// 优化:逐渐增加间隔
const delay = Math.min(3000 * Math.pow(1.5, attempt), 30000);
setTimeout(() => poll(...), delay);
// 第1次:3秒
// 第2次:4.5秒
// 第3次:6.75秒
// ...
2. 错误重试(你的代码已有)
catch (error) {
// 网络错误,继续重试
setTimeout(() => {
this.pollBatchAddTaskStatus(taskId, attempt + 1);
}, pollInterval);
}
3. 超时保护(你的代码已有)
if (attempt >= maxAttempts) {
ElMessage.error('任务执行超时');
return;
}
轮询的完整示例
实际执行过程
// T+0s: 开始轮询
pollBatchAddTaskStatus("task-123", 0)
→ 等待2秒
→ GET /api/batch-add/task/task-123
→ 返回: {status: "running"}
→ setTimeout(() => poll("task-123", 1), 3000)
// T+5s: 第2次轮询
pollBatchAddTaskStatus("task-123", 1)
→ GET /api/batch-add/task/task-123
→ 返回: {status: "running"}
→ setTimeout(() => poll("task-123", 2), 3000)
// T+8s: 第3次轮询
pollBatchAddTaskStatus("task-123", 2)
→ GET /api/batch-add/task/task-123
→ 返回: {status: "running"}
→ setTimeout(() => poll("task-123", 3), 3000)
// ... 继续轮询 ...
// T+220s: 第N次轮询
pollBatchAddTaskStatus("task-123", N)
→ GET /api/batch-add/task/task-123
→ 返回: {status: "success", execution_stats: {...}}
→ 停止轮询,显示结果 ✅
总结
轮询的思路:
- 定期查询:每3秒查询一次任务状态
- 状态判断:根据状态决定继续或停止
- 超时保护:最多查询200次(10分钟)
- 错误重试:网络错误时继续重试
- 结果展示:任务完成时停止轮询并显示结果
优点:
- 实现简单
- 可靠性高
- 适合批量任务场景
缺点:
- 有延迟(最多3秒)
- 增加服务器请求数
在你的场景中,轮询是合适的选择。






