Celery Canvas: Workflow Primitives
Celery Canvas is the workflow composition layer — it lets you build complex task pipelines from simple primitives. All primitives are lazy: they describe work without executing it until you call .delay() or .apply_async().
Primitives Overview
| Primitive | Purpose | Execution |
|---|---|---|
signature | Reusable, lazy task call | Single task |
chain | Sequential — each result feeds the next | Series |
group | Parallel — run N tasks at once | Parallel |
chord | group + callback after all finish | Parallel + join |
chunks | Split one big iterable into batches | Parallel |
map / starmap | Apply a task across a list | Parallel |
Setup (shared for all examples)
# tasks.py
from celery import Celery
app = Celery("demo", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")
@app.task
def add(x, y):
return x + y
@app.task
def double(x):
return x * 2
@app.task
def summarize(results):
return {"total": sum(results), "count": len(results)}1. signature — lazy task descriptor
A signature (alias s) wraps a task call with its arguments. It can be passed around, partially applied, or composed.
from celery import signature
from tasks import add
# equivalent ways to create a signature
sig = add.s(3, 4) # shorthand
sig = add.signature((3, 4)) # explicit
sig = signature("tasks.add", args=(3, 4)) # by name
result = sig.delay() # actually send it
print(result.get()) # 7Partial application — freeze some args, leave others for the caller:
# x=10 is frozen; caller provides y via chaining or direct call
partial = add.s(10)
partial.delay(5).get() # 15 (10 + 5)2. chain — sequential pipeline
Each task receives the return value of the previous task as its first argument.
from celery import chain
from tasks import add, double
# add(2, 2) → 4 → double(4) → 8 → double(8) → 16
result = chain(add.s(2, 2), double.s(), double.s()).delay()
print(result.get()) # 16Using the | pipe operator (syntactic sugar):
pipeline = add.s(2, 2) | double.s() | double.s()
result = pipeline.delay()
print(result.get()) # 16Key rule: every task in the chain (except the first) must accept the previous result as its first positional arg. Use
add.si(x, y)(immutable signature) to ignore the incoming value.
from tasks import add
# si() = immutable: ignores the value passed in from the previous task
pipeline = add.s(1, 1) | add.si(10, 20) # result of first is discarded
result = pipeline.delay()
print(result.get()) # 303. group — parallel fan-out
A group runs N tasks concurrently and returns a list of results in the same order as the tasks.
from celery import group
from tasks import double
# runs double(1), double(2), double(3) in parallel
job = group(double.s(i) for i in range(1, 4))
result = job.delay()
print(result.get()) # [2, 4, 6]result.get() blocks until all tasks complete.
result = group(add.s(i, i) for i in range(5)).delay()
# can also check readiness
if result.ready():
print(result.get()) # [0, 2, 4, 6, 8]4. chord — parallel + join callback
A chord is a group with a header (parallel tasks) and a body (callback that receives all results).
from celery import chord
from tasks import double, summarize
header = group(double.s(i) for i in range(1, 6)) # [2, 4, 6, 8, 10]
callback = summarize.s() # receives [2, 4, 6, 8, 10]
result = chord(header)(callback)
print(result.get()) # {"total": 30, "count": 5}Shorter form using |:
result = (group(double.s(i) for i in range(1, 6)) | summarize.s()).delay()
print(result.get()) # {"total": 30, "count": 5}Requires a result backend (e.g. Redis, RabbitMQ). Without it, Celery can’t know when all header tasks are done.
5. chunks — batch an iterable
Splits a long sequence into N-sized chunks, each chunk processed by one worker call.
from tasks import add
# process 1000 add() calls in batches of 10 → 100 worker tasks
result = add.chunks(zip(range(100), range(100)), 10).group().apply_async()
print(result.get()) # [[0, 2, 4, ...], ...] (list of 10 sub-lists)6. map and starmap — apply over a list
map passes each item as a single argument:
result = double.map([1, 2, 3, 4]).delay()
print(result.get()) # [2, 4, 6, 8]starmap unpacks each item as *args:
result = add.starmap([(1, 2), (3, 4), (5, 6)]).delay()
print(result.get()) # [3, 7, 11]
map/starmaprun as a single task on one worker (sequential internally). Usegroupif you need actual parallelism.
Composing Primitives
Primitives can be nested freely.
Fan-out → reduce
from celery import group, chord
# parallel multiply, then sum everything
pipeline = chord(
group(add.s(i, i * 2) for i in range(5)),
summarize.s()
)
print(pipeline().get()) # {"total": 30, "count": 5}Chain of groups (staged pipeline)
from celery import chain, group
# stage 1: double all in parallel
# stage 2: chain the group result into another task
# NOTE: a group inside a chain receives no implicit input from the previous step.
# Use a chord to pass group results forward.
stage1 = chord(group(double.s(i) for i in range(1, 4)), summarize.s())
stage2 = double.s() # receives summarize's return value
pipeline = chain(stage1, stage2) # not meaningful here, just structure demoReal multi-stage pattern
@app.task
def fetch(url):
return url # imagine HTTP call
@app.task
def parse(html):
return len(html)
@app.task
def aggregate(sizes):
return sum(sizes)
urls = ["http://a.com", "http://b.com", "http://c.com"]
pipeline = chord(
group(chain(fetch.s(u), parse.s()) for u in urls),
aggregate.s()
)
result = pipeline.delay()
print(result.get())Error Handling
result = chain(add.s(1, 2), double.s()).apply_async()
try:
value = result.get(timeout=10, propagate=True) # re-raises task exceptions
except Exception as e:
print(f"Pipeline failed: {e}")Use .on_error() to attach an errback:
@app.task
def on_failure(request, exc, traceback):
print(f"Task {request.id} failed: {exc}")
add.s(1, 2).on_error(on_failure.s()).delay()Quick Reference
# Signature
add.s(1, 2) # lazy descriptor
add.si(1, 2) # immutable — ignores incoming chain value
add.s(1) # partial — caller provides second arg
# Chain (sequential)
chain(t1, t2, t3)
t1 | t2 | t3
# Group (parallel)
group(t.s(i) for i in items)
# Chord (parallel + callback)
chord(group(...))(callback.s())
(group(...) | callback.s()).delay()
# Chunks
task.chunks(iterable, size)
# Map / Starmap
task.map(list)
task.starmap(list_of_tuples)When to use what
- Need output of A before B starts →
chain - A, B, C are independent →
group - Need all parallel results merged →
chord - Splitting a large dataset →
chunks - Applying one task over a list (single worker) →
map/starmap - Applying one task over a list (parallel workers) →
group