Celery Task Patterns
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
| from celery import Celery, chain, group, chord
from celery.exceptions import MaxRetriesExceededError
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def process_document(self, document_id: str):
"""Process document with automatic retry."""
try:
document = get_document(document_id)
result = analyze_document(document)
save_result(document_id, result)
return result
except TransientError as e:
raise self.retry(exc=e)
except Exception as e:
# Log and move to dead letter queue
log_error(document_id, e)
raise
@app.task
def send_notification(user_id: str, message: str):
"""Send notification after processing."""
notify_user(user_id, message)
# Workflow: Process then notify
workflow = chain(
process_document.s(doc_id),
send_notification.s(user_id)
)
# Parallel processing with callback
parallel_workflow = chord(
group(process_document.s(id) for id in doc_ids),
aggregate_results.s()
)
|
Celery Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| # celery_config.py
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'
# Task settings
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'UTC'
# Retry settings
task_acks_late = True
task_reject_on_worker_lost = True
# Concurrency
worker_concurrency = 4
worker_prefetch_multiplier = 2
# Result expiration
result_expires = 3600 # 1 hour
# Task routing
task_routes = {
'tasks.high_priority.*': {'queue': 'high'},
'tasks.low_priority.*': {'queue': 'low'},
}
|
Frequently Asked Questions
What is Celery task queue?
Celery is a distributed task queue for Python that enables asynchronous processing. It handles background jobs like sending emails, processing uploads, generating reports, and any work that shouldn’t block web requests. It’s essential for scalable Python applications.
How much does Celery implementation cost?
Celery development typically costs $90-130 per hour. A basic task queue setup starts around $5,000-10,000, while complex implementations with priority queues, workflows, and monitoring range from $20,000-50,000+. Celery infrastructure (Redis/RabbitMQ) is separate.
Celery vs alternatives (RQ, Dramatiq): which should I choose?
Choose Celery for: complex workflows, canvas (chains/groups/chords), mature ecosystem, Django integration. Choose RQ for: simpler needs, smaller overhead. Choose Dramatiq for: better API design, easier testing. Celery is the most powerful but most complex.
How do you handle Celery task failures?
I implement: automatic retries with exponential backoff, dead-letter queues for failed tasks, task timeouts to prevent hanging, result expiration, proper exception handling, and alerting for repeated failures. Production Celery needs careful error handling.
Do you work with Celery Beat for scheduling?
Yes. Celery Beat handles periodic tasks like: report generation, data cleanup, cache warming, and sync jobs. I implement: dynamic schedules, timezone handling, and ensure single-execution (avoiding duplicate runs across workers).
Experience:
Case Studies: AI Recruitment Platform | CRM Integration Platform
Related Technologies: Django, Python, Redis, PostgreSQL