Celery + FastAPI + MongoDB result backend + Redis Sentinel broker:完整排程器實作指南
架構概覽
graph LR
Client(["Client\n(HTTP)"])
API["FastAPI\nAPI layer"]
Sentinel[["Redis Sentinel\nbroker / HA"]]
Beat["Celery Beat\nscheduler"]
Worker["Celery Worker\ntask executor"]
MongoDB[("MongoDB\nresult backend")]
Client -->|"POST /tasks/*\n手動觸發"| API
API -->|".delay()\nenqueue"| Sentinel
Beat -->|"periodic task\n定時觸發"| Sentinel
Sentinel -->|"consume\n取出 task"| Worker
Worker -->|"write result\n寫入結果"| MongoDB
API -->|"AsyncResult\n查詢結果"| MongoDB
- Redis Sentinel — broker,負責 task queue,Sentinel 提供 HA(高可用)
- MongoDB — result backend,儲存每個 task 的執行結果與狀態
- Celery Worker — 執行 task
- Celery Beat — 定時觸發 task(排程器)
- FastAPI — 提供 HTTP API 讓外部手動觸發 task 或查詢結果
依賴套件
# pyproject.toml / requirements.txt
celery[redis]>=5.3
fastapi>=0.111
uvicorn[standard]>=0.29
motor>=3.4 # async MongoDB driver
pymongo>=4.7 # sync MongoDB(celery result backend 用)
redis>=5.0注意: Celery result backend 對 MongoDB 用的是 同步 pymongo,不是 motor。 FastAPI 本身建議用 motor(async),兩者並不衝突。
Docker Compose
Redis Sentinel HA 拓撲
Sentinel 不是 proxy,也不是 Redis 本身——它只做監控 + 選舉 + 通知三件事。 Client(Celery)每次連線前先問 Sentinel「誰是 master?」,再直接連去。
graph TD
subgraph Sentinel Cluster ["Sentinel Cluster(quorum = 2/3)"]
S1["Sentinel 1\n:26379"]
S2["Sentinel 2\n:26379"]
S3["Sentinel 3\n:26379"]
S1 <-->|"gossip\n互相確認"| S2
S2 <-->|gossip| S3
S1 <-->|gossip| S3
end
subgraph Redis Cluster
Master["Redis Master\n:6379"]
Replica["Redis Replica\n:6379"]
Master -->|"replication\n非同步複製"| Replica
end
S1 -->|"PING / monitor"| Master
S2 -->|"PING / monitor"| Master
S3 -->|"PING / monitor"| Master
Celery["Celery / FastAPI"]
Celery -->|"1. 問:誰是 master?"| S1
S1 -->|"2. 答:redis-master:6379"| Celery
Celery -->|"3. 直連 master"| Master
Failover 流程: Master 掛掉 → 三個 Sentinel 各自判斷 down-after-milliseconds 超時 → 超過 quorum(2)同意 → 其中一個 Sentinel 成為 leader 發起 failover → Replica 晉升為新 Master → 通知所有 client。
# docker-compose.yml
version: "3.9"
services:
# ── Redis master / replica / sentinel ──────────────────────────
redis-master:
image: redis:7-alpine
command: redis-server --port 6379
networks:
- app-net
redis-replica:
image: redis:7-alpine
command: redis-server --port 6379 --replicaof redis-master 6379
networks:
- app-net
depends_on: [redis-master]
redis-sentinel-1:
image: redis:7-alpine
command: >
redis-sentinel /etc/sentinel.conf
volumes:
- ./sentinel.conf:/etc/sentinel.conf
networks:
- app-net
depends_on: [redis-master, redis-replica]
redis-sentinel-2:
image: redis:7-alpine
command: redis-sentinel /etc/sentinel.conf
volumes:
- ./sentinel.conf:/etc/sentinel.conf
networks:
- app-net
depends_on: [redis-master, redis-replica]
redis-sentinel-3:
image: redis:7-alpine
command: redis-sentinel /etc/sentinel.conf
volumes:
- ./sentinel.conf:/etc/sentinel.conf
networks:
- app-net
depends_on: [redis-master, redis-replica]
# ── MongoDB ────────────────────────────────────────────────────
mongodb:
image: mongo:7
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: example
ports:
- "27017:27017"
networks:
- app-net
# ── App services ────────────────────────────────────────────────
api:
build: .
command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
volumes:
- .:/app
ports:
- "8000:8000"
environment:
- CELERY_BROKER_URL=sentinel://redis-sentinel-1:26379;redis-sentinel-2:26379;redis-sentinel-3:26379/0
- CELERY_RESULT_BACKEND=mongodb://root:example@mongodb:27017/celery_results?authSource=admin
networks:
- app-net
depends_on: [redis-sentinel-1, redis-sentinel-2, redis-sentinel-3, mongodb]
worker:
build: .
command: celery -A app.celery_app worker --loglevel=info --concurrency=4
volumes:
- .:/app
environment:
- CELERY_BROKER_URL=sentinel://redis-sentinel-1:26379;redis-sentinel-2:26379;redis-sentinel-3:26379/0
- CELERY_RESULT_BACKEND=mongodb://root:example@mongodb:27017/celery_results?authSource=admin
networks:
- app-net
depends_on: [redis-sentinel-1, redis-sentinel-2, redis-sentinel-3, mongodb]
beat:
build: .
command: celery -A app.celery_app beat --loglevel=info --scheduler celery.beat:PersistentScheduler
volumes:
- .:/app
- celery-beat-data:/app/celerybeat-schedule
environment:
- CELERY_BROKER_URL=sentinel://redis-sentinel-1:26379;redis-sentinel-2:26379;redis-sentinel-3:26379/0
- CELERY_RESULT_BACKEND=mongodb://root:example@mongodb:27017/celery_results?authSource=admin
networks:
- app-net
depends_on: [redis-sentinel-1, redis-sentinel-2, redis-sentinel-3, mongodb]
networks:
app-net:
volumes:
celery-beat-data:sentinel.conf
# sentinel.conf
port 26379
sentinel monitor mymaster redis-master 6379 2
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 60000
sentinel parallel-syncs mymaster 1Celery 設定
# app/celery_app.py
import os
from celery import Celery
from celery.schedules import crontab
BROKER_URL = os.environ.get(
"CELERY_BROKER_URL",
"sentinel://localhost:26379/0",
)
RESULT_BACKEND = os.environ.get(
"CELERY_RESULT_BACKEND",
"mongodb://root:example@localhost:27017/celery_results?authSource=admin",
)
app = Celery("demo", broker=BROKER_URL, backend=RESULT_BACKEND)
app.conf.update(
# ── broker(Redis Sentinel)設定 ───────────────────────────────
broker_transport_options={
"master_name": "mymaster", # sentinel.conf 裡的名字要一致
"sentinel_kwargs": {}, # 如有密碼:{"password": "xxx"}
"socket_timeout": 10,
"socket_connect_timeout": 10,
"retry_on_timeout": True,
},
# ── result backend(MongoDB)設定 ──────────────────────────────
mongodb_backend_settings={
"database": "celery_results",
"taskmeta_collection": "task_results",
},
# ── 一般設定 ───────────────────────────────────────────────────
task_serializer="json",
result_serializer="json",
accept_content=["json"],
timezone="Asia/Taipei",
enable_utc=True,
result_expires=60 * 60 * 24, # result 保留 24 小時
# ── Beat 排程定義 ──────────────────────────────────────────────
beat_schedule={
# 每分鐘執行一次
"heartbeat-every-minute": {
"task": "app.tasks.heartbeat",
"schedule": 60.0, # 秒數(float)
},
# 每天 02:30 台北時間執行
"daily-report-at-0230": {
"task": "app.tasks.generate_daily_report",
"schedule": crontab(hour=2, minute=30),
"args": ("daily",),
},
# 每週一 09:00 執行
"weekly-cleanup-monday": {
"task": "app.tasks.cleanup_old_records",
"schedule": crontab(hour=9, minute=0, day_of_week=1),
"kwargs": {"dry_run": False},
},
},
)
# 自動發現 tasks
app.autodiscover_tasks(["app"])Beat 排程觸發流程
sequenceDiagram
participant Beat as Celery Beat
participant Store as celerybeat-schedule<br/>(本地檔案)
participant Redis as Redis Sentinel<br/>(broker)
participant Worker as Celery Worker
loop 每隔 beat_max_loop_interval 秒(預設 5s)
Beat->>Store: 讀取所有排程的 last_run_at
Beat->>Beat: 計算哪些 task 到期了<br/>(now ≥ last_run_at + interval)
alt 有到期的 task
Beat->>Redis: publish task message
Beat->>Store: 更新 last_run_at
Redis-->>Worker: consume task
Worker->>Worker: 執行 task
else 全部未到期
Beat->>Beat: sleep 到下一個到期時間
end
end
重點: Beat 只負責「把 task 放進 broker queue」,不負責執行。Worker 和 Beat 是完全獨立的 process。
Task 定義
Task 生命週期(狀態機)
stateDiagram-v2
direction LR
[*] --> PENDING : task.delay() / Beat 觸發
PENDING --> STARTED : Worker 取到 task<br/>開始執行
STARTED --> SUCCESS : 正常 return
STARTED --> FAILURE : 拋出 exception<br/>且已超過 max_retries
STARTED --> RETRY : 拋出 exception<br/>retries 次數未耗盡
RETRY --> PENDING : countdown 倒數結束<br/>重新放回 queue
SUCCESS --> [*]
FAILURE --> [*]
PENDING也是「沒有 task_id 對應的 result 記錄」的預設狀態, 所以傳入一個假的 task_id 查詢也會得到PENDING,不會報錯。
# app/tasks.py
import logging
from datetime import datetime
from app.celery_app import app
logger = logging.getLogger(__name__)
@app.task(
bind=True,
name="app.tasks.heartbeat",
max_retries=3,
default_retry_delay=5,
)
def heartbeat(self) -> dict:
"""每分鐘心跳,確認 worker 正常運作。"""
ts = datetime.utcnow().isoformat()
logger.info("heartbeat at %s", ts)
return {"status": "ok", "timestamp": ts}
@app.task(
bind=True,
name="app.tasks.generate_daily_report",
max_retries=2,
default_retry_delay=30,
)
def generate_daily_report(self, report_type: str) -> dict:
"""產生每日報告(示範:實際邏輯換成你的 DB query / 寄信等)。"""
try:
logger.info("generating %s report", report_type)
# ... 你的業務邏輯 ...
result = {"report_type": report_type, "rows": 42, "done": True}
return result
except Exception as exc:
# retry 三次,等待時間指數遞增
raise self.retry(exc=exc, countdown=2 ** self.request.retries * 30)
@app.task(
bind=True,
name="app.tasks.cleanup_old_records",
)
def cleanup_old_records(self, dry_run: bool = True) -> dict:
"""清理過期資料。"""
logger.info("cleanup_old_records dry_run=%s", dry_run)
deleted = 0 if dry_run else 100 # 示範
return {"dry_run": dry_run, "deleted": deleted}
@app.task(name="app.tasks.add")
def add(x: int, y: int) -> int:
"""最簡單的示範 task,供 API 測試用。"""
return x + yFastAPI 整合
API 請求完整流程
sequenceDiagram
participant C as Client
participant API as FastAPI
participant Redis as Redis Sentinel<br/>(broker)
participant Worker as Celery Worker
participant DB as MongoDB<br/>(result backend)
Note over C,DB: 手動觸發 task(非同步)
C->>API: POST /tasks/add<br/>{"x": 3, "y": 4}
API->>Redis: tasks.add.delay(3, 4)<br/>enqueue task message
API-->>C: 202<br/>{"task_id":"abc-123","status":"PENDING"}
Note over Redis,DB: 背景執行
Redis-->>Worker: consume task message
Worker->>Worker: add(3, 4) → 7
Worker->>DB: INSERT task_results<br/>{_id: "abc-123", result: 7, status: "SUCCESS"}
Note over C,DB: 查詢結果(可輪詢或 webhook)
C->>API: GET /tasks/abc-123
API->>DB: AsyncResult("abc-123")
DB-->>API: {status: "SUCCESS", result: 7}
API-->>C: {"task_id":"abc-123","status":"SUCCESS","result":7}
# app/main.py
from contextlib import asynccontextmanager
from typing import Any
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from celery.result import AsyncResult
from app.celery_app import app as celery_app
from app import tasks # noqa: F401 — 確保 tasks 被 import
@asynccontextmanager
async def lifespan(app: FastAPI):
# startup:可在這裡做健康檢查(ping broker / MongoDB)
yield
# shutdown:視需要 cleanup
app = FastAPI(title="Celery Scheduler Demo", lifespan=lifespan)
# ── Request / Response schemas ──────────────────────────────────
class AddRequest(BaseModel):
x: int
y: int
class TaskResponse(BaseModel):
task_id: str
status: str
result: Any = None
# ── Endpoints ────────────────────────────────────────────────────
@app.post("/tasks/add", response_model=TaskResponse)
def submit_add(body: AddRequest):
"""手動觸發 add task(非同步執行)。"""
task = tasks.add.delay(body.x, body.y)
return TaskResponse(task_id=task.id, status="PENDING")
@app.post("/tasks/report/{report_type}", response_model=TaskResponse)
def submit_report(report_type: str):
"""手動觸發每日報告 task。"""
task = tasks.generate_daily_report.delay(report_type)
return TaskResponse(task_id=task.id, status="PENDING")
@app.get("/tasks/{task_id}", response_model=TaskResponse)
def get_task_result(task_id: str):
"""查詢 task 狀態與結果(從 MongoDB result backend 讀取)。"""
result = AsyncResult(task_id, app=celery_app)
if result.state == "PENDING":
return TaskResponse(task_id=task_id, status="PENDING")
if result.state == "FAILURE":
raise HTTPException(
status_code=500,
detail={"task_id": task_id, "error": str(result.result)},
)
return TaskResponse(
task_id=task_id,
status=result.state,
result=result.result,
)
@app.get("/health")
def health():
"""簡單健康檢查。"""
return {"status": "ok"}專案結構
.
├── app/
│ ├── __init__.py
│ ├── celery_app.py # Celery 設定 + beat_schedule
│ ├── main.py # FastAPI app
│ └── tasks.py # Task 定義
├── sentinel.conf
├── docker-compose.yml
├── Dockerfile
└── requirements.txtDockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .requirements.txt
celery[redis]==5.3.6
fastapi==0.111.0
uvicorn[standard]==0.29.0
motor==3.4.0
pymongo==4.7.2
redis==5.0.4跑起來
# 啟動所有服務
docker compose up --build
# 確認 worker 上線
docker compose logs worker | grep "ready"
# 確認 beat 排程啟動
docker compose logs beat | grep "beat:"測試 API
# 手動觸發 add task
curl -X POST http://localhost:8000/tasks/add \
-H "Content-Type: application/json" \
-d '{"x": 3, "y": 4}'
# → {"task_id":"abc-123","status":"PENDING","result":null}
# 查詢結果(幾秒後)
curl http://localhost:8000/tasks/abc-123
# → {"task_id":"abc-123","status":"SUCCESS","result":7}確認 MongoDB 有存結果
docker compose exec mongodb mongosh \
-u root -p example --authenticationDatabase admin \
--eval 'db.getSiblingDB("celery_results").task_results.find().sort({_id:-1}).limit(3).pretty()'常見陷阱
1. broker URL 格式
Redis Sentinel 的 broker URL 用分號分隔多個 sentinel:
# ✅ 正確
sentinel://sentinel1:26379;sentinel2:26379;sentinel3:26379/0
# ❌ 錯誤(用逗號)
sentinel://sentinel1:26379,sentinel2:26379/02. master_name 必須對應 sentinel.conf
sentinel monitor <name> 裡的 <name> 必須和 broker_transport_options["master_name"] 完全一致,否則 broker 啟動時會拿到 No master found 錯誤。
3. Celery Beat 只能跑一個實例
beat 服務不能水平擴展(replicas: 2),否則同一個排程會被觸發兩次。
若需要高可用,改用 redbeat(以 Redis 做分散式鎖):
pip install celery[redis] redbeat# celery_app.py
app.conf.update(
beat_scheduler="redbeat.schedulers:RedBeatScheduler",
redbeat_redis_url="sentinel://sentinel1:26379;sentinel2:26379/0",
redbeat_redis_options={"master_name": "mymaster"},
)4. MongoDB result backend 用同步 pymongo
Celery 的 MongoDB backend 內部用同步 pymongo,不支援 motor。
FastAPI 自己的 DB 操作還是用 motor;result backend 的連線是 Celery 管理的,兩者互不影響。
5. Beat schedule 時區
crontab 的時間以 timezone 設定為準。確認 enable_utc=True 且 timezone 設成正確時區,否則排程時間會跑掉。
app.conf.update(
timezone="Asia/Taipei",
enable_utc=True,
)6. celerybeat-schedule 檔案
Beat 用 celerybeat-schedule 本地檔案追蹤排程狀態(Persistent scheduler 預設)。
Docker 環境要掛 volume,否則每次重啟都從頭算,會造成排程時間偏移。
查看排程狀態(進階)
# 列出所有已知排程(需要在 app context 下執行)
docker compose exec beat python -c "
from app.celery_app import app
with app.connection() as conn:
inspector = app.control.inspect(connection=conn)
print(inspector.scheduled())
"
# 監控 worker 狀態
docker compose exec worker celery -A app.celery_app inspect active
docker compose exec worker celery -A app.celery_app inspect reservedFlower:Task GUI 監控
Flower 是 Celery 官方推薦的 Web UI,可以:
- 即時看所有 worker 狀態、concurrency、task 吞吐率
- 用 task_id 直接搜尋某一筆任務的狀態、參數、結果、耗時
- 手動 revoke(取消)排隊中的 task
- 查看 Beat 排程的觸發歷史
加入 docker-compose
# 在 docker-compose.yml 的 services: 底下新增
flower:
build: .
command: >
celery -A app.celery_app flower
--broker=sentinel://redis-sentinel-1:26379;redis-sentinel-2:26379;redis-sentinel-3:26379/0
--broker_transport_options='{"master_name":"mymaster"}'
--port=5555
--url_prefix=flower
ports:
- "5555:5555"
environment:
- CELERY_BROKER_URL=sentinel://redis-sentinel-1:26379;redis-sentinel-2:26379;redis-sentinel-3:26379/0
- CELERY_RESULT_BACKEND=mongodb://root:example@mongodb:27017/celery_results?authSource=admin
networks:
- app-net
depends_on: [redis-sentinel-1, redis-sentinel-2, redis-sentinel-3]啟動後開啟 http://localhost:5555/flower。
Flower 介面功能對照
graph LR
subgraph Flower Web UI
Workers["Workers 頁\n各 worker 上線/離線、concurrency"]
Tasks["Tasks 頁\n全部 task 列表,可搜尋 task_id"]
Detail["Task Detail\n狀態、args、result、traceback、耗時"]
Monitor["Monitor 頁\n即時 task/s 吞吐折線圖"]
end
DevOps(["開發者 / SRE"])
DevOps -->|"輸入 task_id"| Tasks
Tasks -->|"點進去"| Detail
DevOps --> Workers
DevOps --> Monitor
用 task_id 追蹤特定任務
透過 Flower API(方便整合到其他工具):
# 查詢單一 task 狀態
curl http://localhost:5555/api/task/info/<task_id>
# 範例回應
{
"task-id": "abc-123",
"state": "SUCCESS",
"result": 7,
"received": 1716800000.0,
"started": 1716800000.1,
"succeeded": 1716800000.25,
"runtime": 0.15,
"args": "[3, 4]",
"kwargs": "{}"
}直接透過 FastAPI(不依賴 Flower):
curl http://localhost:8000/tasks/abc-123Redis Sentinel 與 Flower 的注意事項
Flower 預設用 --broker 參數建立連線,Sentinel 格式需要額外傳 broker_transport_options。
若用環境變數傳入,確認格式正確——Flower 解析 JSON 字串時對單引號不寬容,建議用環境變數:
# docker-compose.yml(更乾淨的方式)
environment:
- CELERY_BROKER_URL=sentinel://redis-sentinel-1:26379;redis-sentinel-2:26379;redis-sentinel-3:26379/0
- FLOWER_BROKER_API=sentinel://redis-sentinel-1:26379;redis-sentinel-2:26379;redis-sentinel-3:26379/0
command: celery -A app.celery_app flower --port=5555跨語言框架比較
Celery 解決的核心問題:把耗時操作從 request/response cycle 移出去,非同步執行,並支援排程觸發。Python 生態有幾個主要的替代方案,設計哲學各有不同。
功能特性對照
| 特性 | Celery | Dramatiq | RQ |
|---|---|---|---|
| broker 支援 | Redis / RabbitMQ / SQS… | Redis / RabbitMQ | Redis only |
| result backend | Redis / MongoDB / DB / S3… | Redis / Memcached | Redis |
| 排程器 | ✅ celery-beat | ✅ apscheduler 整合 | ❌ 需外掛 rq-scheduler |
| 重試機制 | ✅ 自訂 countdown | ✅ 內建 backoff | ✅ retry_failed |
| 優先佇列 | ✅ 多 queue | ✅ middleware | ✅ 多 queue |
| 併發模型 | prefork / gevent / eventlet | threading / gevent | fork |
| 監控 GUI | Flower | 無官方 UI | RQ Dashboard |
| 成熟度 | ⭐ 2009,最成熟 | ⭐ 2017,較新 | ⭐ 2012 |
各框架深入說明
Celery(Python)
Celery 是 Python 生態最完整的解法,但也最複雜。
優點:
- broker / backend 選擇最多,彈性高
- Celery Beat 排程器功能完整(crontab + 秒數 + 自訂 schedule)
- 生態成熟,文件豐富
- 支援 canvas(chain / group / chord)做複雜 workflow
缺點:
- 設定複雜,
celery_app.conf.update有數十個參數 - prefork 模式和 async framework(asyncio)整合有坑(blocking task 會卡 event loop)
- Beat 單點問題(需 redbeat 解決)
- 序列化預設用 pickle(有安全風險),建議強制改 JSON
# 在 FastAPI 裡安全地呼叫 celery task(避免 blocking event loop)
import asyncio
from fastapi import BackgroundTasks
@app.post("/tasks/add")
async def submit(body: AddRequest):
# ✅ 用 run_in_executor 讓 .delay() 不阻塞 event loop
loop = asyncio.get_event_loop()
task = await loop.run_in_executor(None, tasks.add.delay, body.x, body.y)
return {"task_id": task.id}Dramatiq(Python)
比 Celery 更輕量、更 Pythonic 的設計,用 middleware 取代 Celery 的 signal 機制。
優點:
- 更簡單的 API,decorator 設計直覺
- 內建 rate limiting、time limit、死信佇列(DLQ)middleware
- 更好的 asyncio 整合
缺點:
- result backend 選擇少(Redis / Memcached)
- 排程需外掛 APScheduler,不像 celery-beat 整合緊密
- 社群規模比 Celery 小
import dramatiq
from dramatiq.brokers.redis import RedisBroker
broker = RedisBroker(url="redis://localhost:6379")
dramatiq.set_broker(broker)
@dramatiq.actor(max_retries=3, min_backoff=1000, max_backoff=60000)
def send_email(to: str, subject: str):
... # 自動 exponential backoff retryRQ - Redis Queue(Python)
最簡單的 Python task queue,只支援 Redis,適合不需要複雜功能的場景。
優點:
- API 極簡,學習曲線低
- RQ Dashboard 提供基本 GUI
缺點:
- 只支援 Redis
- 沒有內建排程器(需搭
rq-scheduler) - 功能遠少於 Celery
from redis import Redis
from rq import Queue
q = Queue(connection=Redis())
job = q.enqueue(send_email, "user@example.com", "Hello")
print(job.id) # task_id選擇建議
graph TD
Q1{"需要複雜 workflow\n或多種 broker?"}
Q1 -->|是| Celery["Celery\n功能最齊全,broker 選擇最多"]
Q1 -->|否,只用 Redis| Q2
Q2{"需要排程器?"}
Q2 -->|是| Dramatiq["Dramatiq + APScheduler\n更現代的 API,較易與 asyncio 整合"]
Q2 -->|否| RQ["RQ\n最簡單,適合快速上手"]
小結
| 元件 | 角色 | 關鍵設定 |
|---|---|---|
| Redis Sentinel | Broker(HA) | master_name、分號分隔 URL |
| MongoDB | Result Backend | mongodb_backend_settings |
| Celery Worker | 執行 task | --concurrency |
| Celery Beat | 排程觸發 | beat_schedule、時區、volume |
| FastAPI | HTTP API | AsyncResult 查詢結果 |
| Flower | GUI 監控 | task_id 追蹤、worker 狀態 |
生產環境建議加上:redbeat(beat HA)、Flower(task GUI)、MongoDB TTL index 自動清理過期 result。
附錄:可直接執行的最小範例
使用 Redis Sentinel(1 master + 1 replica + 3 sentinels)作為 broker 與 result backend, 搭配 Flower GUI 與 redbeat 動態排程 API,可直接
docker compose up執行。
專案結構
celery-demo/
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
└── app/
├── __init__.py
├── celery_app.py
├── tasks.py
└── main.pyrequirements.txt
celery[redis]==5.3.6
fastapi==0.111.0
uvicorn[standard]==0.29.0
redis==5.0.4
flower==2.0.1
redbeat==2.2.0Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .docker-compose.yml
Sentinel 用 inline config(sh -c 寫入 /tmp/sentinel.conf),不需要額外掛載設定檔。
&sentinel / <<: *sentinel YAML anchor 讓三個 sentinel 共用同一份設定。
services:
# ── Redis master / replica ──────────────────────────────────────
redis-master:
image: redis:7-alpine
networks: [app-net]
redis-replica:
image: redis:7-alpine
command: redis-server --replicaof redis-master 6379
networks: [app-net]
depends_on: [redis-master]
# ── Sentinel(inline config,不需外部 sentinel.conf)─────────────
redis-sentinel-1: &sentinel
image: redis:7-alpine
command: >
sh -c "
printf 'port 26379\nsentinel monitor mymaster redis-master 6379 2\nsentinel down-after-milliseconds mymaster 5000\nsentinel failover-timeout mymaster 60000\nsentinel parallel-syncs mymaster 1\n'
> /tmp/sentinel.conf &&
redis-sentinel /tmp/sentinel.conf
"
networks: [app-net]
depends_on: [redis-master, redis-replica]
redis-sentinel-2:
<<: *sentinel
redis-sentinel-3:
<<: *sentinel
# ── App services ────────────────────────────────────────────────
api:
build: .
command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
ports: ["8000:8000"]
volumes: [.:/app]
environment: &app-env
- BROKER_URL=sentinel://redis-sentinel-1:26379;redis-sentinel-2:26379;redis-sentinel-3:26379/0
- RESULT_BACKEND=sentinel://redis-sentinel-1:26379;redis-sentinel-2:26379;redis-sentinel-3:26379/1
networks: [app-net]
depends_on: [redis-sentinel-1, redis-sentinel-2, redis-sentinel-3]
worker:
build: .
command: celery -A app.celery_app worker --loglevel=info
volumes: [.:/app]
environment: *app-env
networks: [app-net]
depends_on: [redis-sentinel-1, redis-sentinel-2, redis-sentinel-3]
beat:
build: .
command: celery -A app.celery_app beat --loglevel=info
volumes: [.:/app]
environment: *app-env
networks: [app-net]
depends_on: [redis-sentinel-1, redis-sentinel-2, redis-sentinel-3]
flower:
build: .
command: celery -A app.celery_app flower --port=5555
ports: ["5555:5555"]
environment: *app-env
networks: [app-net]
depends_on: [redis-sentinel-1, redis-sentinel-2, redis-sentinel-3, worker]
networks:
app-net:app/celery_app.py
import os
from celery import Celery
BROKER_URL = os.environ.get(
"BROKER_URL",
"sentinel://localhost:26379/0",
)
RESULT_BACKEND = os.environ.get(
"RESULT_BACKEND",
"sentinel://localhost:26379/1",
)
app = Celery(
"demo",
broker=BROKER_URL,
backend=RESULT_BACKEND,
include=["app.tasks"],
)
app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
timezone="Asia/Taipei",
enable_utc=True,
# ── broker(Redis Sentinel)────────────────────────────────────
broker_transport_options={
"master_name": "mymaster",
"sentinel_kwargs": {}, # 若 Redis 有密碼:{"password": "xxx"}
"socket_timeout": 10,
"retry_on_timeout": True,
},
# ── result backend(Redis Sentinel)───────────────────────────
result_backend_transport_options={
"master_name": "mymaster",
},
# ── redbeat:動態排程存在 Redis ────────────────────────────────
beat_scheduler="redbeat.schedulers:RedBeatScheduler",
redbeat_redis_url=BROKER_URL,
redbeat_redis_options={"master_name": "mymaster"},
)app/tasks.py
import time
from datetime import datetime
from app.celery_app import app
@app.task(name="app.tasks.add")
def add(x: int, y: int) -> int:
return x + y
@app.task(bind=True, name="app.tasks.slow_task", max_retries=3)
def slow_task(self, seconds: int = 5) -> dict:
"""sleep 幾秒,方便在 Flower 觀察 PENDING → STARTED → SUCCESS 狀態變化。"""
try:
time.sleep(seconds)
return {"slept": seconds, "done_at": datetime.utcnow().isoformat()}
except Exception as exc:
raise self.retry(exc=exc, countdown=5)
@app.task(name="app.tasks.heartbeat")
def heartbeat() -> dict:
return {"status": "ok", "ts": datetime.utcnow().isoformat()}app/main.py
from typing import Any
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from celery.result import AsyncResult
from celery.schedules import crontab
from redbeat import RedBeatSchedulerEntry
from redbeat.schedulers import RedBeatScheduler
from app.celery_app import app as celery_app
from app import tasks # noqa: F401
app = FastAPI(title="Celery Demo")
# ── Task schemas ──────────────────────────────────────────────────
class AddIn(BaseModel):
x: int
y: int
class TaskOut(BaseModel):
task_id: str
status: str
result: Any = None
# ── Schedule schemas ──────────────────────────────────────────────
class ScheduleIn(BaseModel):
name: str = Field(..., description="唯一識別名稱,e.g. 'user-123-report'")
task: str = Field(..., description="task 完整名稱,e.g. 'app.tasks.heartbeat'")
minute: int = Field(0, ge=0, le=59)
hour: int = Field(8, ge=0, le=23)
day_of_week: str = Field("*", description="0-6 或 *,e.g. '1' = 週一")
day_of_month: str = Field("*", description="1-31 或 *")
month_of_year: str= Field("*", description="1-12 或 *")
args: list = []
kwargs: dict = {}
class ScheduleOut(BaseModel):
name: str
task: str
schedule: str
args: list
kwargs: dict
# ── Task endpoints ────────────────────────────────────────────────
@app.post("/tasks/add", response_model=TaskOut, status_code=202)
def submit_add(body: AddIn):
task = tasks.add.delay(body.x, body.y)
return TaskOut(task_id=task.id, status="PENDING")
@app.post("/tasks/slow/{seconds}", response_model=TaskOut, status_code=202)
def submit_slow(seconds: int = 5):
task = tasks.slow_task.delay(seconds)
return TaskOut(task_id=task.id, status="PENDING")
@app.get("/tasks/{task_id}", response_model=TaskOut)
def get_result(task_id: str):
r = AsyncResult(task_id, app=celery_app)
if r.state == "FAILURE":
raise HTTPException(status_code=500, detail=str(r.result))
return TaskOut(task_id=task_id, status=r.state, result=r.result)
# ── Schedule CRUD ─────────────────────────────────────────────────
@app.post("/schedules", response_model=ScheduleOut, status_code=201)
def create_schedule(body: ScheduleIn):
schedule = crontab(
minute=body.minute,
hour=body.hour,
day_of_week=body.day_of_week,
day_of_month=body.day_of_month,
month_of_year=body.month_of_year,
)
entry = RedBeatSchedulerEntry(
name=body.name,
task=body.task,
schedule=schedule,
args=body.args,
kwargs=body.kwargs,
app=celery_app,
)
entry.save()
return ScheduleOut(
name=body.name, task=body.task,
schedule=str(schedule), args=body.args, kwargs=body.kwargs,
)
@app.get("/schedules", response_model=list[ScheduleOut])
def list_schedules():
scheduler = RedBeatScheduler(app=celery_app)
result = []
for key in scheduler.schedule:
try:
entry = RedBeatSchedulerEntry.from_key(key, app=celery_app)
result.append(ScheduleOut(
name=entry.name, task=entry.task,
schedule=str(entry.schedule),
args=list(entry.args), kwargs=dict(entry.kwargs),
))
except Exception:
continue
return result
@app.delete("/schedules/{name}", status_code=204)
def delete_schedule(name: str):
key = RedBeatSchedulerEntry.create_key(name, app=celery_app)
try:
entry = RedBeatSchedulerEntry.from_key(key, app=celery_app)
entry.delete()
except Exception:
raise HTTPException(status_code=404, detail=f"schedule '{name}' not found")啟動與驗證
docker compose up --build| 服務 | URL |
|---|---|
| FastAPI Swagger UI | http://localhost:8000/docs |
| Flower | http://localhost:5555 |
送出即時 task:
# 送出加法 task
curl -X POST http://localhost:8000/tasks/add \
-H "Content-Type: application/json" \
-d '{"x": 3, "y": 4}'
# → {"task_id":"abc-123","status":"PENDING","result":null}
# 查詢結果
curl http://localhost:8000/tasks/abc-123
# → {"task_id":"abc-123","status":"SUCCESS","result":7}
# 送出慢 task(8 秒)—— 可在 Flower 觀察狀態變化
curl -X POST http://localhost:8000/tasks/slow/8動態排程 CRUD(使用 redbeat):
# 建立排程:每天早上 08:00 執行 heartbeat
curl -X POST http://localhost:8000/schedules \
-H "Content-Type: application/json" \
-d '{
"name": "morning-ping",
"task": "app.tasks.heartbeat",
"hour": 8,
"minute": 0
}'
# 每週一 09:30 執行 slow_task(跑 3 秒)
curl -X POST http://localhost:8000/schedules \
-H "Content-Type: application/json" \
-d '{
"name": "weekly-report",
"task": "app.tasks.slow_task",
"hour": 9,
"minute": 30,
"day_of_week": "1",
"args": [3]
}'
# 列出所有排程
curl http://localhost:8000/schedules
# 刪除排程
curl -X DELETE http://localhost:8000/schedules/morning-pingredbeat 生效時間: Beat 每 5 秒讀一次 Redis,新增或刪除排程後最多 5 秒內生效,不需重啟任何服務。