Skip to content

Celery + FastAPI + MongoDB result backend + Redis Sentinel broker:完整排程器實作指南

架構概覽

    graph LR
    Client(["Client\n(HTTP)"])
    API["FastAPI\nAPI layer"]
    Sentinel[["Redis Sentinel\nbroker / HA"]]
    Beat["Celery Beat\nscheduler"]
    Worker["Celery Worker\ntask executor"]
    MongoDB[("MongoDB\nresult backend")]

    Client -->|"POST /tasks/*\n手動觸發"| API
    API -->|".delay()\nenqueue"| Sentinel
    Beat -->|"periodic task\n定時觸發"| Sentinel
    Sentinel -->|"consume\n取出 task"| Worker
    Worker -->|"write result\n寫入結果"| MongoDB
    API -->|"AsyncResult\n查詢結果"| MongoDB
  
  • Redis Sentinel — broker,負責 task queue,Sentinel 提供 HA(高可用)
  • MongoDB — result backend,儲存每個 task 的執行結果與狀態
  • Celery Worker — 執行 task
  • Celery Beat — 定時觸發 task(排程器)
  • FastAPI — 提供 HTTP API 讓外部手動觸發 task 或查詢結果

依賴套件

# pyproject.toml / requirements.txt
celery[redis]>=5.3
fastapi>=0.111
uvicorn[standard]>=0.29
motor>=3.4          # async MongoDB driver
pymongo>=4.7        # sync MongoDB(celery result backend 用)
redis>=5.0

注意: Celery result backend 對 MongoDB 用的是 同步 pymongo,不是 motor。 FastAPI 本身建議用 motor(async),兩者並不衝突。


Docker Compose

Redis Sentinel HA 拓撲

Sentinel 不是 proxy,也不是 Redis 本身——它只做監控 + 選舉 + 通知三件事。 Client(Celery)每次連線前先問 Sentinel「誰是 master?」,再直接連去。

    graph TD
    subgraph Sentinel Cluster ["Sentinel Cluster(quorum = 2/3)"]
        S1["Sentinel 1\n:26379"]
        S2["Sentinel 2\n:26379"]
        S3["Sentinel 3\n:26379"]
        S1 <-->|"gossip\n互相確認"| S2
        S2 <-->|gossip| S3
        S1 <-->|gossip| S3
    end

    subgraph Redis Cluster
        Master["Redis Master\n:6379"]
        Replica["Redis Replica\n:6379"]
        Master -->|"replication\n非同步複製"| Replica
    end

    S1 -->|"PING / monitor"| Master
    S2 -->|"PING / monitor"| Master
    S3 -->|"PING / monitor"| Master

    Celery["Celery / FastAPI"]
    Celery -->|"1. 問:誰是 master?"| S1
    S1 -->|"2. 答:redis-master:6379"| Celery
    Celery -->|"3. 直連 master"| Master
  

Failover 流程: Master 掛掉 → 三個 Sentinel 各自判斷 down-after-milliseconds 超時 → 超過 quorum(2)同意 → 其中一個 Sentinel 成為 leader 發起 failover → Replica 晉升為新 Master → 通知所有 client。

# docker-compose.yml
version: "3.9"

services:
  # ── Redis master / replica / sentinel ──────────────────────────
  redis-master:
    image: redis:7-alpine
    command: redis-server --port 6379
    networks:
      - app-net

  redis-replica:
    image: redis:7-alpine
    command: redis-server --port 6379 --replicaof redis-master 6379
    networks:
      - app-net
    depends_on: [redis-master]

  redis-sentinel-1:
    image: redis:7-alpine
    command: >
      redis-sentinel /etc/sentinel.conf
    volumes:
      - ./sentinel.conf:/etc/sentinel.conf
    networks:
      - app-net
    depends_on: [redis-master, redis-replica]

  redis-sentinel-2:
    image: redis:7-alpine
    command: redis-sentinel /etc/sentinel.conf
    volumes:
      - ./sentinel.conf:/etc/sentinel.conf
    networks:
      - app-net
    depends_on: [redis-master, redis-replica]

  redis-sentinel-3:
    image: redis:7-alpine
    command: redis-sentinel /etc/sentinel.conf
    volumes:
      - ./sentinel.conf:/etc/sentinel.conf
    networks:
      - app-net
    depends_on: [redis-master, redis-replica]

  # ── MongoDB ────────────────────────────────────────────────────
  mongodb:
    image: mongo:7
    environment:
      MONGO_INITDB_ROOT_USERNAME: root
      MONGO_INITDB_ROOT_PASSWORD: example
    ports:
      - "27017:27017"
    networks:
      - app-net

  # ── App services ────────────────────────────────────────────────
  api:
    build: .
    command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
    volumes:
      - .:/app
    ports:
      - "8000:8000"
    environment:
      - CELERY_BROKER_URL=sentinel://redis-sentinel-1:26379;redis-sentinel-2:26379;redis-sentinel-3:26379/0
      - CELERY_RESULT_BACKEND=mongodb://root:example@mongodb:27017/celery_results?authSource=admin
    networks:
      - app-net
    depends_on: [redis-sentinel-1, redis-sentinel-2, redis-sentinel-3, mongodb]

  worker:
    build: .
    command: celery -A app.celery_app worker --loglevel=info --concurrency=4
    volumes:
      - .:/app
    environment:
      - CELERY_BROKER_URL=sentinel://redis-sentinel-1:26379;redis-sentinel-2:26379;redis-sentinel-3:26379/0
      - CELERY_RESULT_BACKEND=mongodb://root:example@mongodb:27017/celery_results?authSource=admin
    networks:
      - app-net
    depends_on: [redis-sentinel-1, redis-sentinel-2, redis-sentinel-3, mongodb]

  beat:
    build: .
    command: celery -A app.celery_app beat --loglevel=info --scheduler celery.beat:PersistentScheduler
    volumes:
      - .:/app
      - celery-beat-data:/app/celerybeat-schedule
    environment:
      - CELERY_BROKER_URL=sentinel://redis-sentinel-1:26379;redis-sentinel-2:26379;redis-sentinel-3:26379/0
      - CELERY_RESULT_BACKEND=mongodb://root:example@mongodb:27017/celery_results?authSource=admin
    networks:
      - app-net
    depends_on: [redis-sentinel-1, redis-sentinel-2, redis-sentinel-3, mongodb]

networks:
  app-net:

volumes:
  celery-beat-data:

sentinel.conf

# sentinel.conf
port 26379
sentinel monitor mymaster redis-master 6379 2
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 60000
sentinel parallel-syncs mymaster 1

Celery 設定

# app/celery_app.py
import os
from celery import Celery
from celery.schedules import crontab

BROKER_URL = os.environ.get(
    "CELERY_BROKER_URL",
    "sentinel://localhost:26379/0",
)
RESULT_BACKEND = os.environ.get(
    "CELERY_RESULT_BACKEND",
    "mongodb://root:example@localhost:27017/celery_results?authSource=admin",
)

app = Celery("demo", broker=BROKER_URL, backend=RESULT_BACKEND)

app.conf.update(
    # ── broker(Redis Sentinel)設定 ───────────────────────────────
    broker_transport_options={
        "master_name": "mymaster",          # sentinel.conf 裡的名字要一致
        "sentinel_kwargs": {},              # 如有密碼:{"password": "xxx"}
        "socket_timeout": 10,
        "socket_connect_timeout": 10,
        "retry_on_timeout": True,
    },

    # ── result backend(MongoDB)設定 ──────────────────────────────
    mongodb_backend_settings={
        "database": "celery_results",
        "taskmeta_collection": "task_results",
    },

    # ── 一般設定 ───────────────────────────────────────────────────
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    timezone="Asia/Taipei",
    enable_utc=True,
    result_expires=60 * 60 * 24,            # result 保留 24 小時

    # ── Beat 排程定義 ──────────────────────────────────────────────
    beat_schedule={
        # 每分鐘執行一次
        "heartbeat-every-minute": {
            "task": "app.tasks.heartbeat",
            "schedule": 60.0,               # 秒數(float)
        },
        # 每天 02:30 台北時間執行
        "daily-report-at-0230": {
            "task": "app.tasks.generate_daily_report",
            "schedule": crontab(hour=2, minute=30),
            "args": ("daily",),
        },
        # 每週一 09:00 執行
        "weekly-cleanup-monday": {
            "task": "app.tasks.cleanup_old_records",
            "schedule": crontab(hour=9, minute=0, day_of_week=1),
            "kwargs": {"dry_run": False},
        },
    },
)

# 自動發現 tasks
app.autodiscover_tasks(["app"])

Beat 排程觸發流程

    sequenceDiagram
    participant Beat as Celery Beat
    participant Store as celerybeat-schedule<br/>(本地檔案)
    participant Redis as Redis Sentinel<br/>(broker)
    participant Worker as Celery Worker

    loop 每隔 beat_max_loop_interval 秒(預設 5s)
        Beat->>Store: 讀取所有排程的 last_run_at
        Beat->>Beat: 計算哪些 task 到期了<br/>(now ≥ last_run_at + interval)
        alt 有到期的 task
            Beat->>Redis: publish task message
            Beat->>Store: 更新 last_run_at
            Redis-->>Worker: consume task
            Worker->>Worker: 執行 task
        else 全部未到期
            Beat->>Beat: sleep 到下一個到期時間
        end
    end
  

重點: Beat 只負責「把 task 放進 broker queue」,不負責執行。Worker 和 Beat 是完全獨立的 process。


Task 定義

Task 生命週期(狀態機)

    stateDiagram-v2
    direction LR
    [*] --> PENDING : task.delay() / Beat 觸發

    PENDING --> STARTED : Worker 取到 task<br/>開始執行

    STARTED --> SUCCESS : 正常 return
    STARTED --> FAILURE : 拋出 exception<br/>且已超過 max_retries
    STARTED --> RETRY : 拋出 exception<br/>retries 次數未耗盡

    RETRY --> PENDING : countdown 倒數結束<br/>重新放回 queue

    SUCCESS --> [*]
    FAILURE --> [*]
  

PENDING 也是「沒有 task_id 對應的 result 記錄」的預設狀態, 所以傳入一個假的 task_id 查詢也會得到 PENDING,不會報錯。

# app/tasks.py
import logging
from datetime import datetime
from app.celery_app import app

logger = logging.getLogger(__name__)


@app.task(
    bind=True,
    name="app.tasks.heartbeat",
    max_retries=3,
    default_retry_delay=5,
)
def heartbeat(self) -> dict:
    """每分鐘心跳,確認 worker 正常運作。"""
    ts = datetime.utcnow().isoformat()
    logger.info("heartbeat at %s", ts)
    return {"status": "ok", "timestamp": ts}


@app.task(
    bind=True,
    name="app.tasks.generate_daily_report",
    max_retries=2,
    default_retry_delay=30,
)
def generate_daily_report(self, report_type: str) -> dict:
    """產生每日報告(示範:實際邏輯換成你的 DB query / 寄信等)。"""
    try:
        logger.info("generating %s report", report_type)
        # ... 你的業務邏輯 ...
        result = {"report_type": report_type, "rows": 42, "done": True}
        return result
    except Exception as exc:
        # retry 三次,等待時間指數遞增
        raise self.retry(exc=exc, countdown=2 ** self.request.retries * 30)


@app.task(
    bind=True,
    name="app.tasks.cleanup_old_records",
)
def cleanup_old_records(self, dry_run: bool = True) -> dict:
    """清理過期資料。"""
    logger.info("cleanup_old_records dry_run=%s", dry_run)
    deleted = 0 if dry_run else 100   # 示範
    return {"dry_run": dry_run, "deleted": deleted}


@app.task(name="app.tasks.add")
def add(x: int, y: int) -> int:
    """最簡單的示範 task,供 API 測試用。"""
    return x + y

FastAPI 整合

API 請求完整流程

    sequenceDiagram
    participant C as Client
    participant API as FastAPI
    participant Redis as Redis Sentinel<br/>(broker)
    participant Worker as Celery Worker
    participant DB as MongoDB<br/>(result backend)

    Note over C,DB: 手動觸發 task(非同步)
    C->>API: POST /tasks/add<br/>{"x": 3, "y": 4}
    API->>Redis: tasks.add.delay(3, 4)<br/>enqueue task message
    API-->>C: 202<br/>{"task_id":"abc-123","status":"PENDING"}

    Note over Redis,DB: 背景執行
    Redis-->>Worker: consume task message
    Worker->>Worker: add(3, 4) → 7
    Worker->>DB: INSERT task_results<br/>{_id: "abc-123", result: 7, status: "SUCCESS"}

    Note over C,DB: 查詢結果(可輪詢或 webhook)
    C->>API: GET /tasks/abc-123
    API->>DB: AsyncResult("abc-123")
    DB-->>API: {status: "SUCCESS", result: 7}
    API-->>C: {"task_id":"abc-123","status":"SUCCESS","result":7}
  
# app/main.py
from contextlib import asynccontextmanager
from typing import Any

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from celery.result import AsyncResult

from app.celery_app import app as celery_app
from app import tasks  # noqa: F401 — 確保 tasks 被 import


@asynccontextmanager
async def lifespan(app: FastAPI):
    # startup:可在這裡做健康檢查(ping broker / MongoDB)
    yield
    # shutdown:視需要 cleanup


app = FastAPI(title="Celery Scheduler Demo", lifespan=lifespan)


# ── Request / Response schemas ──────────────────────────────────

class AddRequest(BaseModel):
    x: int
    y: int


class TaskResponse(BaseModel):
    task_id: str
    status: str
    result: Any = None


# ── Endpoints ────────────────────────────────────────────────────

@app.post("/tasks/add", response_model=TaskResponse)
def submit_add(body: AddRequest):
    """手動觸發 add task(非同步執行)。"""
    task = tasks.add.delay(body.x, body.y)
    return TaskResponse(task_id=task.id, status="PENDING")


@app.post("/tasks/report/{report_type}", response_model=TaskResponse)
def submit_report(report_type: str):
    """手動觸發每日報告 task。"""
    task = tasks.generate_daily_report.delay(report_type)
    return TaskResponse(task_id=task.id, status="PENDING")


@app.get("/tasks/{task_id}", response_model=TaskResponse)
def get_task_result(task_id: str):
    """查詢 task 狀態與結果(從 MongoDB result backend 讀取)。"""
    result = AsyncResult(task_id, app=celery_app)

    if result.state == "PENDING":
        return TaskResponse(task_id=task_id, status="PENDING")

    if result.state == "FAILURE":
        raise HTTPException(
            status_code=500,
            detail={"task_id": task_id, "error": str(result.result)},
        )

    return TaskResponse(
        task_id=task_id,
        status=result.state,
        result=result.result,
    )


@app.get("/health")
def health():
    """簡單健康檢查。"""
    return {"status": "ok"}

專案結構

.
├── app/
│   ├── __init__.py
│   ├── celery_app.py   # Celery 設定 + beat_schedule
│   ├── main.py         # FastAPI app
│   └── tasks.py        # Task 定義
├── sentinel.conf
├── docker-compose.yml
├── Dockerfile
└── requirements.txt

Dockerfile

FROM python:3.12-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

requirements.txt

celery[redis]==5.3.6
fastapi==0.111.0
uvicorn[standard]==0.29.0
motor==3.4.0
pymongo==4.7.2
redis==5.0.4

跑起來

# 啟動所有服務
docker compose up --build

# 確認 worker 上線
docker compose logs worker | grep "ready"

# 確認 beat 排程啟動
docker compose logs beat | grep "beat:"

測試 API

# 手動觸發 add task
curl -X POST http://localhost:8000/tasks/add \
  -H "Content-Type: application/json" \
  -d '{"x": 3, "y": 4}'
# → {"task_id":"abc-123","status":"PENDING","result":null}

# 查詢結果(幾秒後)
curl http://localhost:8000/tasks/abc-123
# → {"task_id":"abc-123","status":"SUCCESS","result":7}

確認 MongoDB 有存結果

docker compose exec mongodb mongosh \
  -u root -p example --authenticationDatabase admin \
  --eval 'db.getSiblingDB("celery_results").task_results.find().sort({_id:-1}).limit(3).pretty()'

常見陷阱

1. broker URL 格式

Redis Sentinel 的 broker URL 用分號分隔多個 sentinel:

# ✅ 正確
sentinel://sentinel1:26379;sentinel2:26379;sentinel3:26379/0

# ❌ 錯誤(用逗號)
sentinel://sentinel1:26379,sentinel2:26379/0

2. master_name 必須對應 sentinel.conf

sentinel monitor <name> 裡的 <name> 必須和 broker_transport_options["master_name"] 完全一致,否則 broker 啟動時會拿到 No master found 錯誤。

3. Celery Beat 只能跑一個實例

beat 服務不能水平擴展(replicas: 2),否則同一個排程會被觸發兩次。 若需要高可用,改用 redbeat(以 Redis 做分散式鎖):

pip install celery[redis] redbeat
# celery_app.py
app.conf.update(
    beat_scheduler="redbeat.schedulers:RedBeatScheduler",
    redbeat_redis_url="sentinel://sentinel1:26379;sentinel2:26379/0",
    redbeat_redis_options={"master_name": "mymaster"},
)

4. MongoDB result backend 用同步 pymongo

Celery 的 MongoDB backend 內部用同步 pymongo,不支援 motor。 FastAPI 自己的 DB 操作還是用 motor;result backend 的連線是 Celery 管理的,兩者互不影響。

5. Beat schedule 時區

crontab 的時間以 timezone 設定為準。確認 enable_utc=Truetimezone 設成正確時區,否則排程時間會跑掉。

app.conf.update(
    timezone="Asia/Taipei",
    enable_utc=True,
)

6. celerybeat-schedule 檔案

Beat 用 celerybeat-schedule 本地檔案追蹤排程狀態(Persistent scheduler 預設)。 Docker 環境要掛 volume,否則每次重啟都從頭算,會造成排程時間偏移。


查看排程狀態(進階)

# 列出所有已知排程(需要在 app context 下執行)
docker compose exec beat python -c "
from app.celery_app import app
with app.connection() as conn:
    inspector = app.control.inspect(connection=conn)
    print(inspector.scheduled())
"

# 監控 worker 狀態
docker compose exec worker celery -A app.celery_app inspect active
docker compose exec worker celery -A app.celery_app inspect reserved

Flower:Task GUI 監控

Flower 是 Celery 官方推薦的 Web UI,可以:

  • 即時看所有 worker 狀態、concurrency、task 吞吐率
  • task_id 直接搜尋某一筆任務的狀態、參數、結果、耗時
  • 手動 revoke(取消)排隊中的 task
  • 查看 Beat 排程的觸發歷史

加入 docker-compose

# 在 docker-compose.yml 的 services: 底下新增
flower:
  build: .
  command: >
    celery -A app.celery_app flower
      --broker=sentinel://redis-sentinel-1:26379;redis-sentinel-2:26379;redis-sentinel-3:26379/0
      --broker_transport_options='{"master_name":"mymaster"}'
      --port=5555
      --url_prefix=flower
  ports:
    - "5555:5555"
  environment:
    - CELERY_BROKER_URL=sentinel://redis-sentinel-1:26379;redis-sentinel-2:26379;redis-sentinel-3:26379/0
    - CELERY_RESULT_BACKEND=mongodb://root:example@mongodb:27017/celery_results?authSource=admin
  networks:
    - app-net
  depends_on: [redis-sentinel-1, redis-sentinel-2, redis-sentinel-3]

啟動後開啟 http://localhost:5555/flower

Flower 介面功能對照

    graph LR
    subgraph Flower Web UI
        Workers["Workers 頁\n各 worker 上線/離線、concurrency"]
        Tasks["Tasks 頁\n全部 task 列表,可搜尋 task_id"]
        Detail["Task Detail\n狀態、args、result、traceback、耗時"]
        Monitor["Monitor 頁\n即時 task/s 吞吐折線圖"]
    end

    DevOps(["開發者 / SRE"])
    DevOps -->|"輸入 task_id"| Tasks
    Tasks -->|"點進去"| Detail
    DevOps --> Workers
    DevOps --> Monitor
  

用 task_id 追蹤特定任務

透過 Flower API(方便整合到其他工具):

# 查詢單一 task 狀態
curl http://localhost:5555/api/task/info/<task_id>

# 範例回應
{
  "task-id": "abc-123",
  "state": "SUCCESS",
  "result": 7,
  "received": 1716800000.0,
  "started": 1716800000.1,
  "succeeded": 1716800000.25,
  "runtime": 0.15,
  "args": "[3, 4]",
  "kwargs": "{}"
}

直接透過 FastAPI(不依賴 Flower):

curl http://localhost:8000/tasks/abc-123

Redis Sentinel 與 Flower 的注意事項

Flower 預設用 --broker 參數建立連線,Sentinel 格式需要額外傳 broker_transport_options。 若用環境變數傳入,確認格式正確——Flower 解析 JSON 字串時對單引號不寬容,建議用環境變數:

# docker-compose.yml(更乾淨的方式)
environment:
  - CELERY_BROKER_URL=sentinel://redis-sentinel-1:26379;redis-sentinel-2:26379;redis-sentinel-3:26379/0
  - FLOWER_BROKER_API=sentinel://redis-sentinel-1:26379;redis-sentinel-2:26379;redis-sentinel-3:26379/0

command: celery -A app.celery_app flower --port=5555

跨語言框架比較

Celery 解決的核心問題:把耗時操作從 request/response cycle 移出去,非同步執行,並支援排程觸發。Python 生態有幾個主要的替代方案,設計哲學各有不同。

功能特性對照

特性CeleryDramatiqRQ
broker 支援Redis / RabbitMQ / SQS…Redis / RabbitMQRedis only
result backendRedis / MongoDB / DB / S3…Redis / MemcachedRedis
排程器✅ celery-beat✅ apscheduler 整合❌ 需外掛 rq-scheduler
重試機制✅ 自訂 countdown✅ 內建 backoff✅ retry_failed
優先佇列✅ 多 queue✅ middleware✅ 多 queue
併發模型prefork / gevent / eventletthreading / geventfork
監控 GUIFlower無官方 UIRQ Dashboard
成熟度⭐ 2009,最成熟⭐ 2017,較新⭐ 2012

各框架深入說明

Celery(Python)

Celery 是 Python 生態最完整的解法,但也最複雜。

優點:

  • broker / backend 選擇最多,彈性高
  • Celery Beat 排程器功能完整(crontab + 秒數 + 自訂 schedule)
  • 生態成熟,文件豐富
  • 支援 canvas(chain / group / chord)做複雜 workflow

缺點:

  • 設定複雜,celery_app.conf.update 有數十個參數
  • prefork 模式和 async framework(asyncio)整合有坑(blocking task 會卡 event loop)
  • Beat 單點問題(需 redbeat 解決)
  • 序列化預設用 pickle(有安全風險),建議強制改 JSON
# 在 FastAPI 裡安全地呼叫 celery task(避免 blocking event loop)
import asyncio
from fastapi import BackgroundTasks

@app.post("/tasks/add")
async def submit(body: AddRequest):
    # ✅ 用 run_in_executor 讓 .delay() 不阻塞 event loop
    loop = asyncio.get_event_loop()
    task = await loop.run_in_executor(None, tasks.add.delay, body.x, body.y)
    return {"task_id": task.id}

Dramatiq(Python)

比 Celery 更輕量、更 Pythonic 的設計,用 middleware 取代 Celery 的 signal 機制。

優點:

  • 更簡單的 API,decorator 設計直覺
  • 內建 rate limiting、time limit、死信佇列(DLQ)middleware
  • 更好的 asyncio 整合

缺點:

  • result backend 選擇少(Redis / Memcached)
  • 排程需外掛 APScheduler,不像 celery-beat 整合緊密
  • 社群規模比 Celery 小
import dramatiq
from dramatiq.brokers.redis import RedisBroker

broker = RedisBroker(url="redis://localhost:6379")
dramatiq.set_broker(broker)

@dramatiq.actor(max_retries=3, min_backoff=1000, max_backoff=60000)
def send_email(to: str, subject: str):
    ...  # 自動 exponential backoff retry

RQ - Redis Queue(Python)

最簡單的 Python task queue,只支援 Redis,適合不需要複雜功能的場景。

優點:

缺點:

  • 只支援 Redis
  • 沒有內建排程器(需搭 rq-scheduler
  • 功能遠少於 Celery
from redis import Redis
from rq import Queue

q = Queue(connection=Redis())
job = q.enqueue(send_email, "user@example.com", "Hello")
print(job.id)  # task_id

選擇建議

    graph TD
    Q1{"需要複雜 workflow\n或多種 broker?"}
    Q1 -->|是| Celery["Celery\n功能最齊全,broker 選擇最多"]
    Q1 -->|否,只用 Redis| Q2

    Q2{"需要排程器?"}
    Q2 -->|是| Dramatiq["Dramatiq + APScheduler\n更現代的 API,較易與 asyncio 整合"]
    Q2 -->|否| RQ["RQ\n最簡單,適合快速上手"]
  

小結

元件角色關鍵設定
Redis SentinelBroker(HA)master_name、分號分隔 URL
MongoDBResult Backendmongodb_backend_settings
Celery Worker執行 task--concurrency
Celery Beat排程觸發beat_schedule、時區、volume
FastAPIHTTP APIAsyncResult 查詢結果
FlowerGUI 監控task_id 追蹤、worker 狀態

生產環境建議加上:redbeat(beat HA)、Flower(task GUI)、MongoDB TTL index 自動清理過期 result。


附錄:可直接執行的最小範例

使用 Redis Sentinel(1 master + 1 replica + 3 sentinels)作為 broker 與 result backend, 搭配 Flower GUI 與 redbeat 動態排程 API,可直接 docker compose up 執行。

專案結構

celery-demo/
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
└── app/
    ├── __init__.py
    ├── celery_app.py
    ├── tasks.py
    └── main.py

requirements.txt

celery[redis]==5.3.6
fastapi==0.111.0
uvicorn[standard]==0.29.0
redis==5.0.4
flower==2.0.1
redbeat==2.2.0

Dockerfile

FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .

docker-compose.yml

Sentinel 用 inline config(sh -c 寫入 /tmp/sentinel.conf),不需要額外掛載設定檔。 &sentinel / <<: *sentinel YAML anchor 讓三個 sentinel 共用同一份設定。

services:
  # ── Redis master / replica ──────────────────────────────────────
  redis-master:
    image: redis:7-alpine
    networks: [app-net]

  redis-replica:
    image: redis:7-alpine
    command: redis-server --replicaof redis-master 6379
    networks: [app-net]
    depends_on: [redis-master]

  # ── Sentinel(inline config,不需外部 sentinel.conf)─────────────
  redis-sentinel-1: &sentinel
    image: redis:7-alpine
    command: >
      sh -c "
        printf 'port 26379\nsentinel monitor mymaster redis-master 6379 2\nsentinel down-after-milliseconds mymaster 5000\nsentinel failover-timeout mymaster 60000\nsentinel parallel-syncs mymaster 1\n'
        > /tmp/sentinel.conf &&
        redis-sentinel /tmp/sentinel.conf
      "
    networks: [app-net]
    depends_on: [redis-master, redis-replica]

  redis-sentinel-2:
    <<: *sentinel

  redis-sentinel-3:
    <<: *sentinel

  # ── App services ────────────────────────────────────────────────
  api:
    build: .
    command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
    ports: ["8000:8000"]
    volumes: [.:/app]
    environment: &app-env
      - BROKER_URL=sentinel://redis-sentinel-1:26379;redis-sentinel-2:26379;redis-sentinel-3:26379/0
      - RESULT_BACKEND=sentinel://redis-sentinel-1:26379;redis-sentinel-2:26379;redis-sentinel-3:26379/1
    networks: [app-net]
    depends_on: [redis-sentinel-1, redis-sentinel-2, redis-sentinel-3]

  worker:
    build: .
    command: celery -A app.celery_app worker --loglevel=info
    volumes: [.:/app]
    environment: *app-env
    networks: [app-net]
    depends_on: [redis-sentinel-1, redis-sentinel-2, redis-sentinel-3]

  beat:
    build: .
    command: celery -A app.celery_app beat --loglevel=info
    volumes: [.:/app]
    environment: *app-env
    networks: [app-net]
    depends_on: [redis-sentinel-1, redis-sentinel-2, redis-sentinel-3]

  flower:
    build: .
    command: celery -A app.celery_app flower --port=5555
    ports: ["5555:5555"]
    environment: *app-env
    networks: [app-net]
    depends_on: [redis-sentinel-1, redis-sentinel-2, redis-sentinel-3, worker]

networks:
  app-net:

app/celery_app.py

import os
from celery import Celery

BROKER_URL = os.environ.get(
    "BROKER_URL",
    "sentinel://localhost:26379/0",
)
RESULT_BACKEND = os.environ.get(
    "RESULT_BACKEND",
    "sentinel://localhost:26379/1",
)

app = Celery(
    "demo",
    broker=BROKER_URL,
    backend=RESULT_BACKEND,
    include=["app.tasks"],
)

app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    timezone="Asia/Taipei",
    enable_utc=True,
    # ── broker(Redis Sentinel)────────────────────────────────────
    broker_transport_options={
        "master_name": "mymaster",
        "sentinel_kwargs": {},          # 若 Redis 有密碼:{"password": "xxx"}
        "socket_timeout": 10,
        "retry_on_timeout": True,
    },
    # ── result backend(Redis Sentinel)───────────────────────────
    result_backend_transport_options={
        "master_name": "mymaster",
    },
    # ── redbeat:動態排程存在 Redis ────────────────────────────────
    beat_scheduler="redbeat.schedulers:RedBeatScheduler",
    redbeat_redis_url=BROKER_URL,
    redbeat_redis_options={"master_name": "mymaster"},
)

app/tasks.py

import time
from datetime import datetime
from app.celery_app import app


@app.task(name="app.tasks.add")
def add(x: int, y: int) -> int:
    return x + y


@app.task(bind=True, name="app.tasks.slow_task", max_retries=3)
def slow_task(self, seconds: int = 5) -> dict:
    """sleep 幾秒,方便在 Flower 觀察 PENDING → STARTED → SUCCESS 狀態變化。"""
    try:
        time.sleep(seconds)
        return {"slept": seconds, "done_at": datetime.utcnow().isoformat()}
    except Exception as exc:
        raise self.retry(exc=exc, countdown=5)


@app.task(name="app.tasks.heartbeat")
def heartbeat() -> dict:
    return {"status": "ok", "ts": datetime.utcnow().isoformat()}

app/main.py

from typing import Any
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from celery.result import AsyncResult
from celery.schedules import crontab
from redbeat import RedBeatSchedulerEntry
from redbeat.schedulers import RedBeatScheduler

from app.celery_app import app as celery_app
from app import tasks  # noqa: F401

app = FastAPI(title="Celery Demo")


# ── Task schemas ──────────────────────────────────────────────────

class AddIn(BaseModel):
    x: int
    y: int

class TaskOut(BaseModel):
    task_id: str
    status: str
    result: Any = None


# ── Schedule schemas ──────────────────────────────────────────────

class ScheduleIn(BaseModel):
    name: str         = Field(..., description="唯一識別名稱,e.g. 'user-123-report'")
    task: str         = Field(..., description="task 完整名稱,e.g. 'app.tasks.heartbeat'")
    minute: int       = Field(0,   ge=0, le=59)
    hour: int         = Field(8,   ge=0, le=23)
    day_of_week: str  = Field("*", description="0-6 或 *,e.g. '1' = 週一")
    day_of_month: str = Field("*", description="1-31 或 *")
    month_of_year: str= Field("*", description="1-12 或 *")
    args: list        = []
    kwargs: dict      = {}

class ScheduleOut(BaseModel):
    name: str
    task: str
    schedule: str
    args: list
    kwargs: dict


# ── Task endpoints ────────────────────────────────────────────────

@app.post("/tasks/add", response_model=TaskOut, status_code=202)
def submit_add(body: AddIn):
    task = tasks.add.delay(body.x, body.y)
    return TaskOut(task_id=task.id, status="PENDING")

@app.post("/tasks/slow/{seconds}", response_model=TaskOut, status_code=202)
def submit_slow(seconds: int = 5):
    task = tasks.slow_task.delay(seconds)
    return TaskOut(task_id=task.id, status="PENDING")

@app.get("/tasks/{task_id}", response_model=TaskOut)
def get_result(task_id: str):
    r = AsyncResult(task_id, app=celery_app)
    if r.state == "FAILURE":
        raise HTTPException(status_code=500, detail=str(r.result))
    return TaskOut(task_id=task_id, status=r.state, result=r.result)


# ── Schedule CRUD ─────────────────────────────────────────────────

@app.post("/schedules", response_model=ScheduleOut, status_code=201)
def create_schedule(body: ScheduleIn):
    schedule = crontab(
        minute=body.minute,
        hour=body.hour,
        day_of_week=body.day_of_week,
        day_of_month=body.day_of_month,
        month_of_year=body.month_of_year,
    )
    entry = RedBeatSchedulerEntry(
        name=body.name,
        task=body.task,
        schedule=schedule,
        args=body.args,
        kwargs=body.kwargs,
        app=celery_app,
    )
    entry.save()
    return ScheduleOut(
        name=body.name, task=body.task,
        schedule=str(schedule), args=body.args, kwargs=body.kwargs,
    )

@app.get("/schedules", response_model=list[ScheduleOut])
def list_schedules():
    scheduler = RedBeatScheduler(app=celery_app)
    result = []
    for key in scheduler.schedule:
        try:
            entry = RedBeatSchedulerEntry.from_key(key, app=celery_app)
            result.append(ScheduleOut(
                name=entry.name, task=entry.task,
                schedule=str(entry.schedule),
                args=list(entry.args), kwargs=dict(entry.kwargs),
            ))
        except Exception:
            continue
    return result

@app.delete("/schedules/{name}", status_code=204)
def delete_schedule(name: str):
    key = RedBeatSchedulerEntry.create_key(name, app=celery_app)
    try:
        entry = RedBeatSchedulerEntry.from_key(key, app=celery_app)
        entry.delete()
    except Exception:
        raise HTTPException(status_code=404, detail=f"schedule '{name}' not found")

啟動與驗證

docker compose up --build
服務URL
FastAPI Swagger UIhttp://localhost:8000/docs
Flowerhttp://localhost:5555

送出即時 task:

# 送出加法 task
curl -X POST http://localhost:8000/tasks/add \
  -H "Content-Type: application/json" \
  -d '{"x": 3, "y": 4}'
# → {"task_id":"abc-123","status":"PENDING","result":null}

# 查詢結果
curl http://localhost:8000/tasks/abc-123
# → {"task_id":"abc-123","status":"SUCCESS","result":7}

# 送出慢 task(8 秒)—— 可在 Flower 觀察狀態變化
curl -X POST http://localhost:8000/tasks/slow/8

動態排程 CRUD(使用 redbeat):

# 建立排程:每天早上 08:00 執行 heartbeat
curl -X POST http://localhost:8000/schedules \
  -H "Content-Type: application/json" \
  -d '{
    "name": "morning-ping",
    "task": "app.tasks.heartbeat",
    "hour": 8,
    "minute": 0
  }'

# 每週一 09:30 執行 slow_task(跑 3 秒)
curl -X POST http://localhost:8000/schedules \
  -H "Content-Type: application/json" \
  -d '{
    "name": "weekly-report",
    "task": "app.tasks.slow_task",
    "hour": 9,
    "minute": 30,
    "day_of_week": "1",
    "args": [3]
  }'

# 列出所有排程
curl http://localhost:8000/schedules

# 刪除排程
curl -X DELETE http://localhost:8000/schedules/morning-ping

redbeat 生效時間: Beat 每 5 秒讀一次 Redis,新增或刪除排程後最多 5 秒內生效,不需重啟任何服務