logo

FastAPI 定时任务实战指南:从基础到进阶的完整方案

作者:热心市民鹿先生2025.10.12 11:41浏览量:28

简介:本文详解 FastAPI 中设置定时任务的多种方法,涵盖 APScheduler、Celery、HTTP 轮询等方案,提供完整代码示例与生产环境建议,帮助开发者高效实现任务调度。

FastAPI 定时任务实战指南:从基础到进阶的完整方案

在 FastAPI 应用开发中,定时任务是处理周期性任务的常见需求,如数据同步、日志清理、通知发送等。本文将系统介绍 FastAPI 中实现定时任务的四种主流方案,涵盖从简单到复杂的实现方式,并提供生产环境实践建议。

一、APScheduler:轻量级定时任务方案

APScheduler 是 Python 中最流行的定时任务库之一,支持 cron 表达式、固定间隔等多种触发方式,与 FastAPI 集成简单。

1.1 基础集成方案

  1. from fastapi import FastAPI
  2. from apscheduler.schedulers.background import BackgroundScheduler
  3. import logging
  4. app = FastAPI()
  5. logger = logging.getLogger(__name__)
  6. def job_function():
  7. logger.info("定时任务执行中...")
  8. scheduler = BackgroundScheduler()
  9. scheduler.add_job(job_function, "interval", minutes=1) # 每分钟执行
  10. scheduler.start()
  11. @app.on_event("shutdown")
  12. def shutdown_event():
  13. scheduler.shutdown()

关键点说明

  • 使用 BackgroundScheduler 避免阻塞主线程
  • 必须实现 shutdown 钩子防止资源泄漏
  • 适合单机环境下的简单定时任务

1.2 动态任务管理

  1. from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
  2. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  3. # 配置持久化存储
  4. jobstores = {
  5. "default": SQLAlchemyJobStore(url="sqlite:///jobs.db")
  6. }
  7. scheduler = AsyncIOScheduler(jobstores=jobstores)
  8. @app.post("/add-job")
  9. async def add_job(interval: int):
  10. scheduler.add_job(
  11. job_function,
  12. "interval",
  13. minutes=interval,
  14. id=f"job_{interval}"
  15. )
  16. return {"status": "job added"}

生产建议

  • 使用 SQLAlchemyJobStore 实现任务持久化
  • 推荐 AsyncIOScheduler 用于异步应用
  • 添加任务锁防止并发执行

二、Celery:分布式任务队列方案

对于需要分布式处理的复杂场景,Celery 是更专业的选择。

2.1 基础配置

  1. # celery_app.py
  2. from celery import Celery
  3. celery = Celery(
  4. "fastapi_celery",
  5. broker="redis://localhost:6379/0",
  6. backend="redis://localhost:6379/1"
  7. )
  8. @celery.on_after_configure.connect
  9. def setup_periodic_tasks(sender, **kwargs):
  10. sender.add_periodic_task(
  11. 30.0, # 每30秒
  12. send_notifications.s(),
  13. name="每30分钟发送通知"
  14. )
  15. @celery.task
  16. def send_notifications():
  17. print("发送通知中...")

2.2 FastAPI 集成

  1. from fastapi import FastAPI
  2. from celery_app import celery as celery_app
  3. app = FastAPI()
  4. @app.get("/trigger-task")
  5. async def trigger_task():
  6. celery_app.send_task("celery_app.send_notifications")
  7. return {"status": "task triggered"}

优势分析

  • 支持分布式任务执行
  • 完善的失败重试机制
  • 任务结果持久化

三、HTTP 轮询方案:无服务器环境适用

在无服务器架构中,可通过外部调度器触发 HTTP 端点。

3.1 基础实现

  1. from fastapi import FastAPI, Request
  2. import time
  3. app = FastAPI()
  4. last_run = 0
  5. @app.post("/scheduled-task")
  6. async def scheduled_task(request: Request):
  7. current_time = time.time()
  8. # 防止重复执行(实际应通过认证和锁机制)
  9. if current_time - last_run > 60: # 至少间隔60秒
  10. last_run = current_time
  11. # 执行任务逻辑
  12. return {"status": "task executed"}
  13. return {"status": "too frequent"}

3.2 云服务集成示例

  1. # AWS CloudWatch Events 触发 Lambda 调用此端点
  2. # 或使用 GitHub Actions 定时触发
  3. @app.post("/data-sync")
  4. async def data_sync():
  5. # 实现数据同步逻辑
  6. return {"status": "sync completed"}

适用场景

  • 云函数/无服务器环境
  • 跨服务调度需求
  • 需要审计日志的场景

四、高级方案:结合数据库的任务调度

对于需要持久化和复杂调度的场景,可自建调度系统。

4.1 数据库模型设计

  1. from sqlmodel import SQLModel, Field, DateTime, Integer
  2. class ScheduledTask(SQLModel, table=True):
  3. id: int = Field(default=None, primary_key=True)
  4. task_name: str
  5. schedule_type: str # "interval"/"cron"
  6. schedule_config: str # JSON 格式配置
  7. next_run: DateTime
  8. is_active: bool = Field(default=True)

4.2 调度器实现

  1. from datetime import datetime, timedelta
  2. import json
  3. from typing import Optional
  4. class TaskScheduler:
  5. def __init__(self, session):
  6. self.session = session
  7. def get_due_tasks(self) -> list[ScheduledTask]:
  8. now = datetime.utcnow()
  9. return self.session.exec(
  10. select(ScheduledTask)
  11. .where(ScheduledTask.next_run <= now)
  12. .where(ScheduledTask.is_active == True)
  13. ).all()
  14. def update_next_run(self, task: ScheduledTask):
  15. config = json.loads(task.schedule_config)
  16. if task.schedule_type == "interval":
  17. seconds = config.get("seconds", 0)
  18. task.next_run = datetime.utcnow() + timedelta(seconds=seconds)
  19. # 其他调度类型处理...
  20. self.session.add(task)
  21. self.session.commit()

五、生产环境最佳实践

5.1 监控与告警

  1. # 使用 Prometheus 监控任务执行
  2. from prometheus_client import Counter
  3. TASK_SUCCESS = Counter("task_success", "Successful task executions")
  4. TASK_FAILURE = Counter("task_failure", "Failed task executions")
  5. def monitored_task():
  6. try:
  7. # 任务逻辑
  8. TASK_SUCCESS.inc()
  9. except Exception:
  10. TASK_FAILURE.inc()
  11. raise

5.2 错误处理策略

  1. from tenacity import retry, stop_after_attempt, wait_exponential
  2. @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1))
  3. def reliable_task():
  4. # 可能失败的任务
  5. pass

5.3 部署架构建议

  1. 单机方案:APScheduler + SQLite
  2. 微服务方案:Celery + Redis/RabbitMQ
  3. 无服务器方案:HTTP 端点 + 云调度服务
  4. 高可用方案:分布式锁 + 任务队列

六、性能优化技巧

  1. 任务拆分:将大任务拆分为多个小任务并行处理
  2. 批处理:对数据库操作使用批量更新
  3. 异步IO:在任务中使用异步HTTP客户端
  4. 资源限制:设置任务的最大运行时间和内存限制
  1. # 示例:带超时的异步任务
  2. import asyncio
  3. async def bounded_task():
  4. try:
  5. await asyncio.wait_for(long_running_task(), timeout=30)
  6. except asyncio.TimeoutError:
  7. print("任务超时")

七、常见问题解决方案

  1. 任务重复执行

    • 使用分布式锁(如 Redis 锁)
    • 实现任务幂等性
  2. 时区问题

    1. from pytz import timezone
    2. scheduler.add_job(
    3. task,
    4. "cron",
    5. hour=8,
    6. minute=0,
    7. timezone=timezone("Asia/Shanghai")
    8. )
  3. 任务依赖

    • 使用 Celery 的 chord/chain 模式
    • 或实现自定义的任务依赖检查

八、完整示例项目结构

  1. /fastapi_scheduler
  2. ├── app/
  3. ├── __init__.py
  4. ├── main.py # FastAPI 入口
  5. ├── scheduler.py # 调度器实现
  6. ├── models.py # 数据模型
  7. └── tasks.py # 任务定义
  8. ├── requirements.txt
  9. └── docker-compose.yml

总结:FastAPI 中的定时任务实现有多种方案,开发者应根据具体场景选择:

  • 简单任务:APScheduler
  • 分布式任务:Celery
  • 无服务器环境:HTTP 轮询
  • 复杂调度:自建调度系统

所有方案都应注意资源清理、错误处理和监控告警,以确保系统的稳定性和可靠性。

相关文章推荐

发表评论

活动