FastAPI 定时任务实战指南:从基础到进阶的完整方案
2025.10.12 11:41浏览量:28简介:本文详解 FastAPI 中设置定时任务的多种方法,涵盖 APScheduler、Celery、HTTP 轮询等方案,提供完整代码示例与生产环境建议,帮助开发者高效实现任务调度。
FastAPI 定时任务实战指南:从基础到进阶的完整方案
在 FastAPI 应用开发中,定时任务是处理周期性任务的常见需求,如数据同步、日志清理、通知发送等。本文将系统介绍 FastAPI 中实现定时任务的四种主流方案,涵盖从简单到复杂的实现方式,并提供生产环境实践建议。
一、APScheduler:轻量级定时任务方案
APScheduler 是 Python 中最流行的定时任务库之一,支持 cron 表达式、固定间隔等多种触发方式,与 FastAPI 集成简单。
1.1 基础集成方案
from fastapi import FastAPIfrom apscheduler.schedulers.background import BackgroundSchedulerimport loggingapp = FastAPI()logger = logging.getLogger(__name__)def job_function():logger.info("定时任务执行中...")scheduler = BackgroundScheduler()scheduler.add_job(job_function, "interval", minutes=1) # 每分钟执行scheduler.start()@app.on_event("shutdown")def shutdown_event():scheduler.shutdown()
关键点说明:
- 使用
BackgroundScheduler避免阻塞主线程 - 必须实现 shutdown 钩子防止资源泄漏
- 适合单机环境下的简单定时任务
1.2 动态任务管理
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStorefrom apscheduler.schedulers.asyncio import AsyncIOScheduler# 配置持久化存储jobstores = {"default": SQLAlchemyJobStore(url="sqlite:///jobs.db")}scheduler = AsyncIOScheduler(jobstores=jobstores)@app.post("/add-job")async def add_job(interval: int):scheduler.add_job(job_function,"interval",minutes=interval,id=f"job_{interval}")return {"status": "job added"}
生产建议:
- 使用 SQLAlchemyJobStore 实现任务持久化
- 推荐 AsyncIOScheduler 用于异步应用
- 添加任务锁防止并发执行
二、Celery:分布式任务队列方案
对于需要分布式处理的复杂场景,Celery 是更专业的选择。
2.1 基础配置
# celery_app.pyfrom celery import Celerycelery = Celery("fastapi_celery",broker="redis://localhost:6379/0",backend="redis://localhost:6379/1")@celery.on_after_configure.connectdef setup_periodic_tasks(sender, **kwargs):sender.add_periodic_task(30.0, # 每30秒send_notifications.s(),name="每30分钟发送通知")@celery.taskdef send_notifications():print("发送通知中...")
2.2 FastAPI 集成
from fastapi import FastAPIfrom celery_app import celery as celery_appapp = FastAPI()@app.get("/trigger-task")async def trigger_task():celery_app.send_task("celery_app.send_notifications")return {"status": "task triggered"}
优势分析:
- 支持分布式任务执行
- 完善的失败重试机制
- 任务结果持久化
三、HTTP 轮询方案:无服务器环境适用
在无服务器架构中,可通过外部调度器触发 HTTP 端点。
3.1 基础实现
from fastapi import FastAPI, Requestimport timeapp = FastAPI()last_run = 0@app.post("/scheduled-task")async def scheduled_task(request: Request):current_time = time.time()# 防止重复执行(实际应通过认证和锁机制)if current_time - last_run > 60: # 至少间隔60秒last_run = current_time# 执行任务逻辑return {"status": "task executed"}return {"status": "too frequent"}
3.2 云服务集成示例
# AWS CloudWatch Events 触发 Lambda 调用此端点# 或使用 GitHub Actions 定时触发@app.post("/data-sync")async def data_sync():# 实现数据同步逻辑return {"status": "sync completed"}
适用场景:
- 云函数/无服务器环境
- 跨服务调度需求
- 需要审计日志的场景
四、高级方案:结合数据库的任务调度
对于需要持久化和复杂调度的场景,可自建调度系统。
4.1 数据库模型设计
from sqlmodel import SQLModel, Field, DateTime, Integerclass ScheduledTask(SQLModel, table=True):id: int = Field(default=None, primary_key=True)task_name: strschedule_type: str # "interval"/"cron"schedule_config: str # JSON 格式配置next_run: DateTimeis_active: bool = Field(default=True)
4.2 调度器实现
from datetime import datetime, timedeltaimport jsonfrom typing import Optionalclass TaskScheduler:def __init__(self, session):self.session = sessiondef get_due_tasks(self) -> list[ScheduledTask]:now = datetime.utcnow()return self.session.exec(select(ScheduledTask).where(ScheduledTask.next_run <= now).where(ScheduledTask.is_active == True)).all()def update_next_run(self, task: ScheduledTask):config = json.loads(task.schedule_config)if task.schedule_type == "interval":seconds = config.get("seconds", 0)task.next_run = datetime.utcnow() + timedelta(seconds=seconds)# 其他调度类型处理...self.session.add(task)self.session.commit()
五、生产环境最佳实践
5.1 监控与告警
# 使用 Prometheus 监控任务执行from prometheus_client import CounterTASK_SUCCESS = Counter("task_success", "Successful task executions")TASK_FAILURE = Counter("task_failure", "Failed task executions")def monitored_task():try:# 任务逻辑TASK_SUCCESS.inc()except Exception:TASK_FAILURE.inc()raise
5.2 错误处理策略
from tenacity import retry, stop_after_attempt, wait_exponential@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1))def reliable_task():# 可能失败的任务pass
5.3 部署架构建议
- 单机方案:APScheduler + SQLite
- 微服务方案:Celery + Redis/RabbitMQ
- 无服务器方案:HTTP 端点 + 云调度服务
- 高可用方案:分布式锁 + 任务队列
六、性能优化技巧
- 任务拆分:将大任务拆分为多个小任务并行处理
- 批处理:对数据库操作使用批量更新
- 异步IO:在任务中使用异步HTTP客户端
- 资源限制:设置任务的最大运行时间和内存限制
# 示例:带超时的异步任务import asyncioasync def bounded_task():try:await asyncio.wait_for(long_running_task(), timeout=30)except asyncio.TimeoutError:print("任务超时")
七、常见问题解决方案
任务重复执行:
- 使用分布式锁(如 Redis 锁)
- 实现任务幂等性
时区问题:
from pytz import timezonescheduler.add_job(task,"cron",hour=8,minute=0,timezone=timezone("Asia/Shanghai"))
任务依赖:
- 使用 Celery 的 chord/chain 模式
- 或实现自定义的任务依赖检查
八、完整示例项目结构
/fastapi_scheduler├── app/│ ├── __init__.py│ ├── main.py # FastAPI 入口│ ├── scheduler.py # 调度器实现│ ├── models.py # 数据模型│ └── tasks.py # 任务定义├── requirements.txt└── docker-compose.yml
总结:FastAPI 中的定时任务实现有多种方案,开发者应根据具体场景选择:
- 简单任务:APScheduler
- 分布式任务:Celery
- 无服务器环境:HTTP 轮询
- 复杂调度:自建调度系统
所有方案都应注意资源清理、错误处理和监控告警,以确保系统的稳定性和可靠性。

发表评论
登录后可评论,请前往 登录 或 注册