Background Jobs
& Task Queues
Any code that runs outside the request–response cycle — deferred, retryable, non-blocking — is a background job. Understanding how to design and operate them is what separates a toy backend from a production-grade one.
What is a Background Task?
A background task (also called a background job) is any piece of code, logic, or workflow that runs outside of the request–response lifecycle. Think of your typical HTTP interaction: a client sends a request, your server processes it and sends a response. Anything that doesn't need to happen within that tight window is a candidate for a background task.
The key characteristic: it does not need to be synchronous. It is not mission-critical in the sense that the client is waiting for it right now. Because of this, we can safely offload it to a separate process that finishes in its own time, according to how it has been programmed.
The concept maps directly to the browser's event loop and task queue model — macrotasks vs microtasks. On the server, the same principle applies: some work doesn't belong in the synchronous call stack.
Why We Need Background Tasks
The classic motivating example is user signup + email verification. Here's the full picture of what happens:
-
1
User fills signup form
They provide email, username, password. The frontend validates basic constraints and makes an API call to your backend.
-
2
Backend validates & stores data
Password strength, uniqueness of email, hashing, writing user record to the database, generating a one-time verification token (6-digit OTP or a signed URL).
-
3
Send a verification email
This requires calling a third-party email provider (Resend, Mailgun, Brevo). Your backend is now making an outbound HTTP request to an external service.
-
4
Email provider processes and delivers
The provider checks your API key, validates the email, routes via SMTP, and delivers. Success/failure is returned.
The Problem: Synchronous Email Sending
If steps 3–4 happen synchronously in your request handler, you've introduced a hard dependency on an external service inside your critical path. Two failure scenarios:
In both cases, the fundamental problem is the same: you're coupling your API's reliability to a third-party service's uptime. Offloading the email to a background task decouples them completely.
Synchronous vs Asynchronous Workflow
Synchronous (Blocking) Flow
Asynchronous (Background) Flow
The server enqueues the task and returns immediately. The client gets a response without waiting for the email to be sent. If the email provider goes down, the retry mechanism handles it — the API response is never affected.
How Task Queues Work
A task queue is a system for managing and distributing background jobs. It is the engine behind the scenes that enables reliable, decoupled background processing. Think of it as a to-do list for your backend — your application adds items to the list, and workers pick them off one by one.
Serialization & Deserialization
Because tasks cross process boundaries, all data must be serialized. JSON is the most common format. The worker deserializes back to native types:
// Task payload (JSON in the queue)
{
"type": "send_verification_email",
"user_id": "usr_01J2K...",
"email": "alice@example.com",
"token": "eyJhbGci...",
"created_at": "2024-01-15T10:30:00Z"
}
JSON
dictobjectstruct (via json.Unmarshal)Producer & Consumer Deep Dive
The Producer Side
The producer's responsibility is minimal and fast — it should never do heavy work. Its only job:
- Gather all data the consumer will need to perform the task.
- Serialize that data into a portable format (JSON).
- Push the serialized payload into the queue.
- Return — immediately. Do not wait.
The Consumer / Worker Side
The consumer runs as a separate process — either in the same codebase (a different entry-point / binary) or an entirely separate service. It:
- Configures which queue to listen on — different queues for different task types (email queue, notification queue, image processing queue, etc.).
- Registers handlers — for each task type, a function is registered. When that task is dequeued, that function is called.
- Polls or is pushed tasks — depending on the broker (Redis/RabbitMQ use long-polling; SQS uses polling intervals).
- Deserializes the payload — JSON → native struct/dict/object.
- Executes the handler — the actual work: calling the email API, resizing the image, etc.
- Acknowledges (ACK) success — tells the queue the task is done and can be removed.
- Reports failure (NACK) — on error, the queue knows to retry.
You can run N consumer processes in parallel. Each picks up tasks independently. This is how you scale horizontally — more users → more workers, not bigger servers.
Brokers & Technologies
The "queue" is backed by a real piece of infrastructure — a message broker. Here are the main options:
| Broker | Type | Best For | Notes |
|---|---|---|---|
| Redis (Pub/Sub + Lists) | In-memory store | Fast, low-latency tasks. Celery + BullMQ default. | Data can be lost on crash unless persistence is enabled (AOF/RDB). |
| RabbitMQ | AMQP message broker | Complex routing, fanout, topic exchanges. | Very reliable, rich routing rules, requires ops overhead. |
| Amazon SQS | Managed cloud queue | Scalable, multi-region, serverless-friendly. | AWS-native, no server management. Standard + FIFO queues. |
| Redis Streams | Persistent log | Consumer groups, replay, auditing. | More durable than basic Redis lists. Good for Asynq (Go). |
Celery (Python) → Redis or RabbitMQ | BullMQ (Node.js) → Redis | Asynq (Go) → Redis Streams | Sidekiq (Ruby) → Redis
Redis Internals: How It Stores Tasks
Redis offers two different data structures used by task queue libraries under the hood. Understanding which one your framework uses — and why — helps you operate it correctly in production.
Redis Lists (used by: Celery, BullMQ, Sidekiq)
A Redis List is a doubly-linked list of strings. Task queues use two commands: LPUSH (producer pushes to left/head) and BRPOP (worker blocks waiting to pop from right/tail). This gives you classic FIFO behaviour.
# What Celery does under the hood in Redis:
# Producer (your API) — pushes serialized task to left of list
LPUSH celery '{"task":"send_email","args":["alice@example.com","token123"]}'
# Worker — blocks waiting for item, pops from right (FIFO)
# BRPOP blocks for up to 5 seconds, then re-polls
BRPOP celery 5
# To see queue depth at any time:
LLEN celery # → 42 (42 pending tasks)
Redis CLI
BRPOP), it's gone from the list. If the worker crashes before ACKing, the task is permanently lost. BullMQ solves this by moving tasks to a separate "active" sorted set and only deleting on ACK.Redis Streams (used by: Asynq, newer architectures)
Redis Streams (added in Redis 5.0) is an append-only log — conceptually similar to Apache Kafka but in Redis. It was designed specifically to solve the problems of Redis Lists for message queuing.
# Producer adds entry to stream
# * means auto-generate ID (timestamp-sequence: 1705312200000-0)
XADD myapp:email_queue * user_id usr_01J2K email alice@example.com token eyJ...
# Create a consumer group (workers belong to a group)
XGROUP CREATE myapp:email_queue email_workers $ MKSTREAM
# Worker reads next undelivered message (> means "new")
XREADGROUP GROUP email_workers worker-1 COUNT 1 BLOCK 5000 STREAMS myapp:email_queue >
# After successful processing, ACK the message by ID
XACK myapp:email_queue email_workers 1705312200000-0
# Check pending (delivered but not ACKed) — these are in-flight tasks
XPENDING myapp:email_queue email_workers - + 10
Redis CLI
| Feature | Redis Lists | Redis Streams |
|---|---|---|
| Storage model | Destructive pop — task gone after dequeue | Append-only log — tasks persist after delivery |
| Crash safety | Task lost if worker crashes after pop but before ACK (unless library adds workaround) | Built-in PEL (Pending Entry List) — unACKed tasks are tracked, auto-redelivered |
| Consumer groups | Not supported natively | Native consumer groups — multiple independent groups can read same stream |
| Message replay | No — once popped, gone | Yes — read any historical ID range |
| Message ordering | FIFO within the list | Strict chronological order by auto-generated ID |
| Observability | Only queue length (LLEN) | Full introspection: pending, delivered, acknowledged counts |
| Memory | Lower (entries deleted on pop) | Higher (entries persist until trimmed with MAXLEN) |
| Used by | Celery, BullMQ, Sidekiq | Asynq (Go), custom implementations |
For most SaaS workloads, Redis Lists (via Celery/BullMQ) are perfectly fine — the libraries handle crash safety in their own layer. Choose Redis Streams when you need audit trails, message replay, or multiple independent consumer groups consuming the same events.
Amazon SQS: Standard vs FIFO Queues
Amazon SQS is AWS's fully managed message queuing service. No servers to manage, scales automatically, and replicates across multiple availability zones. It has two modes with fundamentally different guarantees.
| Property | Standard Queue | FIFO Queue |
|---|---|---|
| Throughput | Nearly unlimited (thousands of TPS) | 300 TPS (3,000 with batching) |
| Ordering | Best-effort — NOT guaranteed | Strict FIFO — guaranteed within a Message Group |
| Delivery | At-least-once — a message can be delivered more than once | Exactly-once — deduplication ID prevents duplicates |
| Deduplication | None built-in | Built-in 5-minute deduplication window using content hash or explicit ID |
| Use case | High-volume, order-doesn't-matter tasks (emails, notifications) | Financial transactions, inventory updates, anything requiring strict order |
| Naming | my-queue | Must end in .fifo → my-queue.fifo |
| Cost | $0.40 per million requests | $0.50 per million requests |
How SQS Visibility Timeout Works (AWS-Specific)
SQS implements visibility timeout natively. When a worker calls ReceiveMessage, the message becomes invisible to all other consumers for the configured timeout (default 30s, max 12 hours). The worker must call DeleteMessage on success, or do nothing to let it become visible again for retry.
// Go — SQS producer + consumer using AWS SDK v2
package main
import (
"context"
"encoding/json"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs"
)
// ── PRODUCER ──────────────────────────────────────────
func EnqueueEmailTask(ctx context.Context, client *sqs.Client, queueURL string, payload EmailPayload) error {
body, _ := json.Marshal(payload)
_, err := client.SendMessage(ctx, &sqs.SendMessageInput{
QueueUrl: aws.String(queueURL),
MessageBody: aws.String(string(body)),
// For FIFO queue: add MessageGroupId + MessageDeduplicationId
// MessageGroupId: aws.String("email-group"),
// MessageDeduplicationId: aws.String(payload.UserID),
})
return err
}
// ── CONSUMER ──────────────────────────────────────────
func PollQueue(ctx context.Context, client *sqs.Client, queueURL string) {
for {
result, _ := client.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
MaxNumberOfMessages: 10, // batch up to 10
WaitTimeSeconds: 20, // long-polling reduces empty receives
VisibilityTimeout: 60, // 60s to process before redelivery
})
for _, msg := range result.Messages {
if err := processMessage(ctx, msg); err == nil {
// ACK: delete from queue on success
client.DeleteMessage(ctx, &sqs.DeleteMessageInput{
QueueUrl: aws.String(queueURL),
ReceiptHandle: msg.ReceiptHandle, // unique handle per receive
})
}
// On error: do nothing — visibility timeout expires, SQS redelivers
}
}
}
Go / AWS SDK v2
Because Standard SQS delivers at-least-once, your handlers must be idempotent. A message can be delivered twice in rare cases (network hiccup, AZ failover). Always design handlers to be safe to re-run. See Section 17 for idempotency patterns.
Retries & Exponential Backoff
When a task fails (handler throws an exception, external API returns 5xx, network timeout), the framework re-enqueues the task for retry. Naively retrying immediately would hammer an already-struggling service. The solution: exponential backoff.
Exponential Backoff Algorithm
After each failure, the wait time before the next retry doubles. A common formula:
// Retry delay formula
delay = base_delay * (2 ^ attempt_number) + jitter
// Example with base_delay=1min, max_retries=5:
Attempt 1 → wait 1 min → retry
Attempt 2 → wait 2 min → retry
Attempt 3 → wait 4 min → retry
Attempt 4 → wait 8 min → retry
Attempt 5 → wait 16 min → DEAD LETTER QUEUE
PSEUDOCODE
Retry delay visualised
Why Jitter Matters
Without jitter (random noise added to the delay), all consumers retry at the exact same time after a failure — creating a thundering herd that hits the external service with a spike. Adding a small random offset spreads the load.
Dead Letter Queue (DLQ)
After exhausting all retries, the task is moved to a Dead Letter Queue. This is a separate queue for persistently-failed tasks. Engineers can inspect DLQ messages, diagnose the root cause, and manually replay them after the issue is fixed.
Major email providers (Resend, Mailgun) have SLAs of 99.9%+ uptime. In practice, downtime is measured in seconds. With exponential backoff, a task that fails once will typically succeed on attempt 2 or 3 — well within the verification email's 15–20 minute expiry window.
Visibility Timeout & Acknowledgements
When a consumer picks up a task (dequeues it), the task doesn't get deleted immediately. Instead, it becomes invisible to other consumers for a configurable duration — the visibility timeout. This is the period the task is considered "in progress."
Why Does This Exist?
Without visibility timeout, if a consumer crashes mid-task, the task would be permanently lost — it was already removed from the queue when the consumer picked it up. Visibility timeout ensures that if the worker doesn't acknowledge within the window (it crashed, hung, or the external service timed out), the task becomes visible again and another worker can pick it up. Tasks are never lost.
Types of Background Tasks
Triggered once by a specific event. Send verification email, welcome email, password-reset link, in-app notification. Most common type.
Scheduled at fixed intervals. Weekly reports, daily digest emails, DB cleanup jobs, orphan session purging. Implemented via cron-like schedulers.
Parent–child dependencies. Task B can only run after Task A succeeds. Tasks at the same level can run in parallel. Classic: video upload pipeline.
One trigger fans out into many parallel tasks. Delete account spawns sub-tasks per resource type. Send 10k reports at midnight.
Chain Tasks — Video Upload Example
Real-World Use Cases
| Use Case | Task Type | Why Background? |
|---|---|---|
| Email verification / welcome | One-off | Depends on external email provider. Failure must not break signup. |
| Image/video resizing | Chain | CPU-intensive. Would block the request thread for seconds. |
| Weekly/monthly reports | Recurring (cron) | Must run at a specific time, not triggered by a user request. |
| Push notifications | Batch / One-off | Calls Apple APNs / Google FCM — external services. Can fail, need retries. |
| Account deletion | Batch + Chain | Traverses entire user graph (projects, assets, sessions) — too slow for a request. |
| Orphan session cleanup | Recurring | Maintenance job — run monthly to free DB storage. |
| PDF report generation | One-off / Batch | Constructing large HTML→PDF is CPU/memory-intensive. |
Push Notifications — How It Actually Works
Your backend cannot send a push notification directly to a phone. Here's the full flow:
- User installs the app → the OS generates a unique device token (Apple APNs token / Google FCM token).
- App sends that token to your backend on first launch — you store it per-user in your DB.
- When you want to notify the user: enqueue a "send push notification" task with the user's device token + message payload.
- Worker dequeues → calls Apple APNs or Google FCM API with the token and payload.
- Apple/Google deliver the notification to the device via the OS-level push channel.
Same reason as email — depends on Apple/Google external services. Rate-limited. Needs retries. Potentially millions of tokens per event (sports app: goal scored → notify all fans).
Code Examples: Go & Python
Python — Celery + Redis
Celery is the most popular Python task queue. Uses Redis (or RabbitMQ) as the broker.
# tasks.py — Define the task (consumer side)
from celery import Celery
import resend
# Connect Celery to Redis broker
app = Celery('tasks', broker='redis://localhost:6379/0')
# Decorate the function as a Celery task
@app.task(bind=True, max_retries=5, default_retry_delay=60)
def send_verification_email(self, user_id: str, email: str, token: str):
"""
Called by the worker process.
self.retry() implements exponential backoff automatically.
"""
try:
params = {
"from": "noreply@myapp.com",
"to": [email],
"subject": "Verify your email",
"html": build_email_template(token),
}
resend.Emails.send(params)
except Exception as exc:
# Exponential backoff: 60s, 120s, 240s, 480s, 960s
countdown = 60 * (2 ** self.request.retries)
raise self.retry(exc=exc, countdown=countdown)
Python
# signup.py — Producer side (inside your API handler)
from fastapi import FastAPI
from tasks import send_verification_email
app = FastAPI()
@app.post("/signup")
async def signup(body: SignupRequest):
# 1. Validate, hash password, save user to DB
user = await create_user(body)
token = generate_verification_token(user.id)
# 2. Enqueue — returns instantly, does NOT wait for email
send_verification_email.delay(
user_id=user.id,
email=user.email,
token=token
)
# 3. Return 201 immediately
return {"message": "Account created. Check your email."}, 201
Python / FastAPI
Go — Asynq + Redis
Asynq is a simple, reliable distributed task queue library for Go, backed by Redis Streams.
// tasks/email.go — Task definitions (producer + payload)
package tasks
import (
"context"
"encoding/json"
"fmt"
"github.com/hibiken/asynq"
)
const TypeSendVerificationEmail = "email:send_verification"
// Payload struct — serialized to JSON in queue
type EmailPayload struct {
UserID string `json:"user_id"`
Email string `json:"email"`
Token string `json:"token"`
}
// NewSendVerificationEmailTask creates the Asynq task
func NewSendVerificationEmailTask(userID, email, token string) (*asynq.Task, error) {
payload, err := json.Marshal(EmailPayload{UserID: userID, Email: email, Token: token})
if err != nil {
return nil, fmt.Errorf("json.Marshal: %w", err)
}
// asynq.MaxRetry — after 5 failures, moves to dead letter
return asynq.NewTask(TypeSendVerificationEmail, payload, asynq.MaxRetry(5)), nil
}
Go
// handlers/email_handler.go — Consumer handler
package handlers
import (
"context"
"encoding/json"
"fmt"
"log/slog"
tasks "myapp/tasks"
"github.com/hibiken/asynq"
)
type EmailHandler struct {
emailSvc EmailService // injected dependency
}
// HandleSendVerificationEmail — registered with the Asynq server
func (h *EmailHandler) HandleSendVerificationEmail(
ctx context.Context,
t *asynq.Task,
) error {
// 1. Deserialize JSON → Go struct
var p tasks.EmailPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
// Non-retryable error: mark as failed immediately
return fmt.Errorf("json.Unmarshal: %w: %w", err, asynq.SkipRetry)
}
// 2. Execute: call email provider API
slog.Info("sending verification email", "user_id", p.UserID, "email", p.Email)
if err := h.emailSvc.SendVerification(ctx, p.Email, p.Token); err != nil {
// Returning error triggers retry with exponential backoff
return fmt.Errorf("emailSvc.SendVerification: %w", err)
}
// 3. Returning nil → ACK to queue (task done)
return nil
}
Go
// main.go — Wire producer and start worker server
package main
import (
"github.com/hibiken/asynq"
"myapp/handlers"
"myapp/tasks"
)
func main() {
redisOpt := asynq.RedisClientOpt{Addr: "localhost:6379"}
// ── PRODUCER ─────────────────────────────────────
client := asynq.NewClient(redisOpt)
defer client.Close()
task, _ := tasks.NewSendVerificationEmailTask(
"usr_01J2K", "alice@example.com", "eyJhbGci...",
)
// Enqueue — returns immediately
info, _ := client.Enqueue(task)
// info.ID, info.Queue, info.State can be used for monitoring
// ── CONSUMER (worker server) ─────────────────────
srv := asynq.NewServer(redisOpt, asynq.Config{
Concurrency: 10, // 10 concurrent workers
Queues: map[string]int{
"email": 6, // higher priority
"default": 3,
"low": 1,
},
})
mux := asynq.NewServeMux()
mux.HandleFunc(tasks.TypeSendVerificationEmail,
handlers.EmailHandler{}.HandleSendVerificationEmail)
srv.Run(mux)
}
Go
Go — Recurring Task with Asynq Scheduler
// scheduler/main.go — Cron-style recurring tasks
package main
import (
"log"
"github.com/hibiken/asynq"
)
func main() {
scheduler := asynq.NewScheduler(
asynq.RedisClientOpt{Addr: "localhost:6379"},
nil,
)
// Send weekly report every Sunday at midnight
scheduler.Register("0 0 * * 0",
asynq.NewTask("report:weekly", nil))
// Cleanup orphan sessions every 1st of the month
scheduler.Register("0 3 1 * *",
asynq.NewTask("sessions:cleanup", nil))
if err := scheduler.Run(); err != nil {
log.Fatal(err)
}
}
Go
Python — Celery Beat (Recurring Tasks)
# celery_config.py — Celery Beat schedule
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
# Send weekly report every Sunday midnight
'weekly-report': {
'task': 'tasks.send_weekly_report',
'schedule': crontab(hour=0, minute=0, day_of_week='sunday'),
},
# Cleanup orphan sessions on the 1st of each month
'session-cleanup': {
'task': 'tasks.cleanup_orphan_sessions',
'schedule': crontab(hour=3, minute=0, day_of_month='1'),
},
}
Python / Celery Beat
Design Considerations at Scale
When your platform grows to thousands of concurrent users, the naive task-queue setup that worked in development starts to crack. These are the areas you must engineer carefully before you hit production scale.
MessageGroupId or a single-consumer queue for that task type.Idempotency Patterns in Depth
Idempotency means that running a task once or 100 times produces identical side-effects. This is non-negotiable in any system that retries tasks — and all production task queues do. There are several concrete patterns to achieve it.
Pattern 1 — Idempotency Keys
Assign a unique key to each task when it is created (usually a UUID or a hash of the input). Before doing any work, the handler checks if that key has already been successfully processed. If yes, it returns early without re-doing the work.
# Python — Idempotency key pattern with Redis
import redis
import hashlib
r = redis.Redis(host='localhost', port=6379)
@app.task(bind=True, max_retries=5)
def send_verification_email(self, user_id: str, email: str, token: str):
# Build idempotency key from task inputs
key = f"idem:verify_email:{user_id}:{token}"
# SET NX = set only if Not eXists; EX = expire after 1 hour
# Returns True if we are the FIRST execution, False if already done
acquired = r.set(key, "done", nx=True, ex=3600)
if not acquired:
# Already processed — skip silently (idempotent return)
return {"status": "already_sent"}
# First time — actually send the email
try:
_send_email_via_provider(email, token)
except Exception as exc:
# Delete the key so next retry can attempt again
r.delete(key)
raise self.retry(exc=exc, countdown=60 * 2 ** self.request.retries)
Python
Pattern 2 — Database Transaction + Upsert
For tasks that write to a database, use upserts (INSERT … ON CONFLICT DO NOTHING / DO UPDATE) instead of plain INSERTs. This way, retrying the task simply tries to insert again and silently skips the duplicate.
-- PostgreSQL — Idempotent email log via upsert
-- If the verification_emails row already exists for this token, do nothing
INSERT INTO verification_emails (token, user_id, sent_at, provider_msg_id)
VALUES ($1, $2, NOW(), $3)
ON CONFLICT (token) DO NOTHING;
-- For updates: ON CONFLICT DO UPDATE
INSERT INTO user_status (user_id, email_verified, updated_at)
VALUES ($1, true, NOW())
ON CONFLICT (user_id) DO UPDATE
SET email_verified = EXCLUDED.email_verified,
updated_at = EXCLUDED.updated_at;
SQL
Pattern 3 — Full Transaction Rollback (for complex tasks)
For multi-step tasks (like account deletion) that touch many tables, wrap the entire task in a database transaction. If any step fails, the transaction rolls back completely — leaving the database in a clean state for the next retry attempt.
// Go — Idempotent multi-step task with full transaction rollback
func (h *AccountHandler) HandleDeleteAccount(ctx context.Context, t *asynq.Task) error {
var p DeleteAccountPayload
json.Unmarshal(t.Payload(), &p)
// Check if already deleted (idempotency guard)
exists, _ := h.db.UserExists(ctx, p.UserID)
if !exists {
return nil // already deleted on a previous attempt — ACK cleanly
}
// Wrap all DB writes in a single transaction
return h.db.WithTransaction(ctx, func(tx DB) error {
steps := []func() error{
func() error { return tx.DeleteUserProjects(ctx, p.UserID) },
func() error { return tx.DeleteUserSessions(ctx, p.UserID) },
func() error { return tx.DeleteUserAssets(ctx, p.UserID) },
func() error { return tx.DeleteUserAccount(ctx, p.UserID) },
}
for _, step := range steps {
if err := step(); err != nil {
return err // triggers full rollback; task retried from scratch
}
}
return nil // all steps succeeded → commit → ACK
})
}
Go
Pattern 4 — Exactly-Once via SQS FIFO Deduplication
SQS FIFO queues accept a MessageDeduplicationId. Any message with the same ID sent within a 5-minute window is silently dropped. This is broker-level idempotency — the handler never even sees the duplicate.
// Go — SQS FIFO with content-based deduplication
client.SendMessage(ctx, &sqs.SendMessageInput{
QueueUrl: aws.String("https://sqs.us-east-1.amazonaws.com/123/orders.fifo"),
MessageBody: aws.String(payload),
MessageGroupId: aws.String("order-processing"), // ordering group
MessageDeduplicationId: aws.String(orderID), // unique per business event
})
// Sending the same orderID twice within 5 min → second is silently dropped by SQS
Go / AWS SQS FIFO
Rate Limiting in Workers
External APIs impose rate limits. If your workers send 500 emails per second and Resend's limit is 100/second, you'll get flooded with 429 errors, wasted retries, and degraded deliverability. You need to throttle your workers to stay within limits.
Two Core Algorithms
INCR + EXPIRE.Token Bucket in Python (Redis-backed)
# rate_limiter.py — Redis token bucket implementation
import time
import redis
class TokenBucketRateLimiter:
def __init__(self, redis_client, key: str, capacity: int, refill_rate: float):
self.r = redis_client
self.key = key # e.g. "ratelimit:resend_emails"
self.capacity = capacity # max burst (e.g. 100)
self.refill_rate = refill_rate # tokens per second (e.g. 10)
def acquire(self) -> bool:
"""Returns True if a token was acquired, False if rate limited."""
now = time.time()
pipe = self.r.pipeline()
# Lua script ensures atomic read-modify-write (no race conditions)
lua_script = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1]) or capacity
local last_refill = tonumber(bucket[2]) or now
-- Refill tokens based on time elapsed
local elapsed = now - last_refill
tokens = math.min(capacity, tokens + elapsed * refill_rate)
if tokens >= 1 then
tokens = tokens - 1
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
redis.call('EXPIRE', key, 3600)
return 1
end
return 0
"""
result = self.r.eval(lua_script, 1, self.key, self.capacity, self.refill_rate, now)
return bool(result)
# Usage inside a Celery task
limiter = TokenBucketRateLimiter(r, "ratelimit:resend", capacity=100, refill_rate=10)
@app.task(bind=True, max_retries=10)
def send_email_task(self, email, token):
if not limiter.acquire():
# Rate limited — retry after a short delay (not counted as failure)
raise self.retry(countdown=1, max_retries=60) # retry every 1s up to 60 times
_do_send_email(email, token)
Python
Sliding Window Rate Limiter in Go
// rate/sliding_window.go — Redis sorted-set sliding window
package rate
import (
"context"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
type SlidingWindowLimiter struct {
client *redis.Client
key string
limit int // max requests
window time.Duration // time window
}
func (l *SlidingWindowLimiter) Allow(ctx context.Context) (bool, error) {
now := time.Now()
windowStart := now.Add(-l.window).UnixMicro()
nowMicro := now.UnixMicro()
pipe := l.client.Pipeline()
// Remove entries older than the window
pipe.ZRemRangeByScore(ctx, l.key, "0", fmt.Sprintf("%d", windowStart))
// Count entries in current window
countCmd := pipe.ZCard(ctx, l.key)
// Add current request timestamp as a member
pipe.ZAdd(ctx, l.key, redis.Z{Score: float64(nowMicro), Member: nowMicro})
pipe.Expire(ctx, l.key, l.window*2)
pipe.Exec(ctx)
count := countCmd.Val()
return count < int64(l.limit), nil
}
// Usage in Asynq handler
func (h *EmailHandler) HandleSendEmail(ctx context.Context, t *asynq.Task) error {
allowed, _ := h.limiter.Allow(ctx)
if !allowed {
// Return a retryable error — asynq will retry with backoff
return fmt.Errorf("rate limit exceeded: %w", asynq.ErrRetry)
}
return h.doSendEmail(ctx, t)
}
Go
Asynq has a native rate limiter interface. Implement asynq.RateLimiter and pass it to asynq.NewServer — the server will automatically pause worker polling when the limit is hit, without needing manual retry logic in your handler.
Full Monitoring Setup: Prometheus + Grafana
Background tasks fail silently unless you've built observability into the system. The standard production stack is: Prometheus (metrics collection & storage) + Grafana (dashboard & alerting). Here's the complete setup.
Architecture Overview
Step 1 — Instrument Your Workers
Every worker exposes a /metrics HTTP endpoint in Prometheus text format. The prometheus_client library (Python) or prometheus/client_golang (Go) handles this automatically.
# Python — Full metrics instrumentation for a worker process
from prometheus_client import (
Counter, Histogram, Gauge, start_http_server
)
import time
# ── Define metrics ─────────────────────────────────────────────
TASK_TOTAL = Counter(
'worker_tasks_total',
'Total tasks processed',
['task_name', 'status'] # labels: task_name, status=success|failure
)
TASK_DURATION = Histogram(
'worker_task_duration_seconds',
'Task processing duration',
['task_name'],
buckets=[.01, .05, .1, .5, 1, 5, 10, 30] # histogram bucket boundaries
)
QUEUE_DEPTH = Gauge(
'worker_queue_depth',
'Current tasks waiting in queue',
['queue_name']
)
RETRY_COUNT = Counter(
'worker_task_retries_total',
'Number of task retries',
['task_name']
)
# ── Decorator to wrap any task with metrics ────────────────────
def track_metrics(task_name: str):
def decorator(func):
def wrapper(*args, **kwargs):
start = time.time()
try:
result = func(*args, **kwargs)
TASK_TOTAL.labels(task_name=task_name, status="success").inc()
return result
except Exception as e:
TASK_TOTAL.labels(task_name=task_name, status="failure").inc()
raise
finally:
TASK_DURATION.labels(task_name=task_name).observe(time.time() - start)
return wrapper
return decorator
# Start /metrics server on port 9090 (Prometheus scrapes this)
start_http_server(9090)
# ── Apply to task ──────────────────────────────────────────────
@app.task(bind=True, max_retries=5)
@track_metrics("send_verification_email")
def send_verification_email(self, user_id, email, token):
if self.request.retries > 0:
RETRY_COUNT.labels(task_name="send_verification_email").inc()
_do_send_email(email, token)
Python
Step 2 — Prometheus Configuration
# prometheus.yml — Scrape config
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "alert_rules.yml"
alerting:
alertmanagers:
- static_configs:
- targets: ["alertmanager:9093"]
scrape_configs:
- job_name: "celery_workers"
static_configs:
- targets:
- "worker-1:9090"
- "worker-2:9090"
- "worker-3:9090"
- job_name: "redis"
static_configs:
- targets: ["redis-exporter:9121"] # oliver006/redis_exporter
- job_name: "api_server"
static_configs:
- targets: ["api:8080"]
YAML
Step 3 — Alert Rules
# alert_rules.yml — Fire alerts when things go wrong
groups:
- name: task_queue_alerts
rules:
# Alert if queue depth stays high for 5 minutes
- alert: HighQueueDepth
expr: worker_queue_depth{queue_name="email"} > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "Email queue backed up ({{ $value }} tasks)"
description: "Queue depth has been above 1000 for 5+ minutes. Scale workers."
# Alert if failure rate > 10% over last 5 minutes
- alert: HighTaskFailureRate
expr: |
rate(worker_tasks_total{status="failure"}[5m])
/ rate(worker_tasks_total[5m]) > 0.1
for: 2m
labels:
severity: critical
annotations:
summary: "Task failure rate above 10%"
# Alert if a worker process goes down (no scrape for 1 min)
- alert: WorkerDown
expr: up{job="celery_workers"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Worker {{ $labels.instance }} is down"
YAML
Step 4 — Key Grafana Panels to Build
| Panel | Query (PromQL) | Alert Threshold |
|---|---|---|
| Queue Depth (gauge) | worker_queue_depth{queue_name="email"} |
> 1000 → warn; > 5000 → critical |
| Task Success Rate (%) | rate(worker_tasks_total{status="success"}[5m]) / rate(worker_tasks_total[5m]) * 100 |
< 95% → warn; < 90% → critical |
| p95 Task Latency | histogram_quantile(0.95, rate(worker_task_duration_seconds_bucket[5m])) |
> 30s → warn |
| Retry Rate | rate(worker_task_retries_total[5m]) |
Sudden spike → external service issue |
| Worker Count (active) | count(up{job="celery_workers"} == 1) |
< min_workers → critical |
| Redis Memory Usage | redis_memory_used_bytes / redis_memory_max_bytes * 100 |
> 80% → warn |
Run oliver006/redis_exporter as a sidecar. It exposes 100+ Redis metrics to Prometheus — queue lengths, memory, connected clients, keyspace hits/misses — zero code required.
Best Practices
1 · Keep Tasks Small and Focused
A single task should do one thing. Don't bundle email sending + DB cleanup + analytics reporting into a single task. If one step fails, you retry the whole thing. Small tasks = precise retries, easier debugging, better scalability.
If multiple things depend on each other, use chain tasks (parent–child). If they're independent, run them in parallel.
2 · Avoid Long-Running Tasks
If a task takes minutes to run, break it into smaller chunks. Long tasks consume worker threads, are harder to retry correctly, and are prone to visibility timeout expiry. A task that takes 10 minutes → 10 tasks that each take 1 minute.
3 · Robust Error Handling & Logging
Unlike a web request where you can return a 500 and the client sees it, background tasks fail silently unless you've built observability in. Every handler must:
- Catch all exceptions — don't let unhandled panics/exceptions kill the worker silently.
- Log the task ID, user ID, error type, and full stack trace on failure.
- Distinguish transient vs permanent errors — use
SkipRetry(Asynq) orraises=(SkipRetry,)(Celery) for unrecoverable errors. - Send critical failures to an alerting channel (Slack, PagerDuty).
4 · Monitor Queue Length & Worker Health
Set up alerting thresholds: if queue depth > 10,000 → trigger auto-scaling or alert engineers. If a worker pod restarts more than 3 times in 5 minutes → alert. Use Grafana dashboards backed by Prometheus (or CloudWatch for SQS).
5 · Use Dead Letter Queues
Never let tasks disappear silently. Configure a DLQ so that tasks exhausting all retries land somewhere inspectable. Regularly audit the DLQ — recurring failures there indicate a systemic bug.
☑ Task is idempotent ☑ Handler has try/catch ☑ Errors are logged with context ☑ Prometheus metrics emitted ☑ DLQ configured ☑ Exponential backoff enabled ☑ Visibility timeout set appropriately ☑ Workers are stateless (horizontally scalable)
Framework Comparison
| Framework | Language | Broker | Retries | Scheduling | Notes |
|---|---|---|---|---|---|
| Celery | Python | Redis, RabbitMQ | ✓ Exponential backoff | ✓ Celery Beat | Most mature Python solution. Large ecosystem. |
| BullMQ | Node.js | Redis | ✓ Built-in | ✓ Repeatable jobs | TypeScript-first. Good UI dashboard (Bull Board). |
| Asynq | Go | Redis Streams | ✓ Configurable | ✓ Built-in Scheduler | Lightweight, idiomatic Go. Includes web UI (asynqmon). |
| Sidekiq | Ruby | Redis | ✓ Built-in | ✓ Sidekiq-Cron | De facto Ruby standard. Excellent web UI. |
| Temporal | Polyglot | Own server | ✓ Durable workflows | ✓ Built-in | Heavy but enterprise-grade. Full workflow orchestration, not just tasks. |
Background tasks are essential for any production backend. They decouple your API's reliability from external services, prevent timeout-induced failures, enable automatic retries, and allow you to horizontally scale processing independently of your API servers. The patterns here — producer/consumer, exponential backoff, visibility timeout, idempotency, monitoring — apply regardless of language or framework. Master these, and you're building production-grade backends.
Temporal & Workflow Orchestration
Standard task queues (Celery, BullMQ, Asynq) are great for independent tasks — fire and forget, retry on failure. But when you have long-running, multi-step workflows with complex dependencies, conditional branching, human approval steps, or compensation logic (saga pattern), you need something more powerful: a workflow orchestration engine.
Imagine a payment workflow: charge card → reserve inventory → send confirmation email → notify warehouse → update loyalty points. Each step can fail independently. If step 4 fails, do you refund step 1? How do you resume from step 4 after a server crash? How do you wait for a human to approve step 3 before continuing? This is where task queues break down and workflow engines shine.
Task Queue vs Workflow Engine
| Capability | Task Queue (Celery/Asynq) | Temporal |
|---|---|---|
| Individual task retries | ✓ Built-in | ✓ Built-in |
| Multi-step workflows | Manual — you chain tasks yourself | Native — workflows are first-class |
| Workflow state persistence | You manage it (DB records, flags) | Automatic — entire execution history persisted |
| Crash recovery | Retry from start of task | Resume from exact step where crash occurred |
| Compensation / rollback | Manual code | Native saga pattern support |
| Long-running (days/months) | Hard — visibility timeouts, connection limits | Native — timers can fire after months |
| Human-in-the-loop | Very difficult | Native signals & queries |
| Operational complexity | Low | High — requires Temporal server cluster |
| Languages | Language-specific | Go, Python, Java, TypeScript, .NET |
Core Temporal Concepts
Example: Video Processing Workflow in Go
The same video upload chain task we saw in Section 9, now implemented with Temporal:
// workflows/video_processing.go
package workflows
import (
"time"
"go.temporal.io/sdk/workflow"
"go.temporal.io/sdk/activity"
"myapp/activities"
)
// VideoProcessingWorkflow — orchestrates the entire pipeline
// Temporal persists every step — crash at any point = resume here
func VideoProcessingWorkflow(ctx workflow.Context, videoID string) error {
// Activity options — each step has its own retry policy
actOpts := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Minute,
RetryPolicy: &temporal.RetryPolicy{
MaxAttempts: 3,
InitialInterval: time.Minute,
BackoffCoefficient: 2.0,
},
}
ctx = workflow.WithActivityOptions(ctx, actOpts)
// Step 1: Encode video to multiple resolutions
// If server crashes here, workflow resumes from step 1 on restart
var encodedPath string
if err := workflow.ExecuteActivity(ctx, activities.EncodeVideo, videoID).Get(ctx, &encodedPath); err != nil {
return err // step 1 failed all retries — workflow fails
}
// Step 2 + 3: Thumbnail generation AND transcription in parallel
thumbFuture := workflow.ExecuteActivity(ctx, activities.GenerateThumbnails, encodedPath)
transcriptFuture := workflow.ExecuteActivity(ctx, activities.GenerateTranscription, encodedPath)
// Wait for both — if either fails, the workflow fails (with its own retries first)
if err := thumbFuture.Get(ctx, nil); err != nil { return err }
if err := transcriptFuture.Get(ctx, nil); err != nil { return err }
// Step 4: Process thumbnails (depends on step 2 completing)
if err := workflow.ExecuteActivity(ctx, activities.ProcessThumbnailImages, videoID).Get(ctx, nil); err != nil {
return err
}
// Step 5: Notify user their video is ready
return workflow.ExecuteActivity(ctx, activities.NotifyUserVideoReady, videoID).Get(ctx, nil)
}
Go / Temporal
Example: Human-in-the-Loop with Signals (Go)
// workflows/payment_approval.go — Wait for human signal
func PaymentApprovalWorkflow(ctx workflow.Context, orderID string) error {
// Register a signal channel — external events can wake this workflow
approvalCh := workflow.GetSignalChannel(ctx, "approval-signal")
// Step 1: Process payment details
workflow.ExecuteActivity(ctx, activities.PreparePayment, orderID).Get(ctx, nil)
// Step 2: Wait up to 24 hours for human approval
// Temporal pauses here — no CPU/memory consumed while waiting
var approved bool
selector := workflow.NewSelector(ctx)
selector.AddReceive(approvalCh, func(ch workflow.ReceiveChannel, more bool) {
ch.Receive(ctx, &approved)
})
selector.AddFuture(workflow.NewTimer(ctx, 24*time.Hour), func(f workflow.Future) {
approved = false // timeout — auto-reject
})
selector.Select(ctx)
if !approved {
// Saga: compensate previous steps
workflow.ExecuteActivity(ctx, activities.RefundPayment, orderID).Get(ctx, nil)
return nil
}
return workflow.ExecuteActivity(ctx, activities.FinalizePayment, orderID).Get(ctx, nil)
}
// To send approval signal from your API handler:
func ApprovePaymentHandler(c *gin.Context) {
orderID := c.Param("id")
temporalClient.SignalWorkflow(c, orderID, "", "approval-signal", true)
c.JSON(200, gin.H{"status": "approved"})
}
Go / Temporal
Use a regular task queue (Celery/Asynq/BullMQ) for simple, independent tasks — emails, notifications, image processing. Reach for Temporal when you have multi-step business workflows with compensation logic, long waiting periods, human approvals, or when you need full execution history for compliance/debugging. Temporal has higher operational overhead (you run a Temporal server cluster), so don't use it unless you need it.