This document describes the current stable version of Celery (3.1). For development docs,
go here.
celery.worker.consumer
This module contains the components responsible for consuming messages
from the broker, processing the messages and keeping the broker connections
up and running.
-
class celery.worker.consumer.Consumer(on_task_request, init_callback=<function noop at 0x03AF0F70>, hostname=None, pool=None, app=None, timer=None, controller=None, hub=None, amqheartbeat=None, worker_options=None, disable_rate_limits=False, initial_prefetch_count=2, prefetch_multiplier=1, **kwargs)[source]
-
class Blueprint(steps=None, name=None, app=None, on_start=None, on_close=None, on_stopped=None)[source]
-
default_steps = ['celery.worker.consumer:Connection', 'celery.worker.consumer:Mingle', 'celery.worker.consumer:Events', 'celery.worker.consumer:Gossip', 'celery.worker.consumer:Heart', 'celery.worker.consumer:Control', 'celery.worker.consumer:Tasks', 'celery.worker.consumer:Evloop', 'celery.worker.consumer:Agent']
-
name = 'Consumer'
-
shutdown(parent)[source]
-
Consumer.Strategies
alias of dict
-
Consumer.add_task_queue(queue, exchange=None, exchange_type=None, routing_key=None, **options)[source]
-
Consumer.apply_eta_task(task)[source]
Method called by the timer to apply a task with an
ETA/countdown.
-
Consumer.bucket_for_task(type)[source]
-
Consumer.cancel_task_queue(queue)[source]
-
Consumer.connect()[source]
Establish the broker connection.
Will retry establishing the connection if the
BROKER_CONNECTION_RETRY setting is enabled
-
Consumer.create_task_handler()[source]
-
Consumer.in_shutdown = False
set when consumer is shutting down.
-
Consumer.init_callback = None
Optional callback called the first time the worker
is ready to receive tasks.
-
Consumer.loop_args()[source]
-
Consumer.on_close()[source]
-
Consumer.on_decode_error(message, exc)[source]
Callback called if an error occurs while decoding
a message received.
Simply logs the error and acknowledges the message so it
doesn’t enter a loop.
Parameters: |
- message – The message with errors.
- exc – The original exception instance.
|
-
Consumer.on_invalid_task(body, message, exc)[source]
-
Consumer.on_ready()[source]
-
Consumer.on_unknown_message(body, message)[source]
-
Consumer.on_unknown_task(body, message, exc)[source]
-
Consumer.pool = None
The current worker pool instance.
-
Consumer.register_with_event_loop(hub)[source]
-
Consumer.reset_rate_limits()[source]
-
Consumer.restart_count = -1
-
Consumer.shutdown()[source]
-
Consumer.start()[source]
-
Consumer.stop()[source]
-
Consumer.timer = None
A timer used for high-priority internal tasks, such
as sending heartbeats.
-
Consumer.update_strategies()[source]
-
class celery.worker.consumer.Connection(c, **kwargs)[source]
-
info(c, params='N/A')[source]
-
name = u'celery.worker.consumer.Connection'
-
requires = ()
-
shutdown(c)[source]
-
start(c)[source]
-
class celery.worker.consumer.Events(c, send_events=None, **kwargs)[source]
-
name = u'celery.worker.consumer.Events'
-
requires = (step:celery.worker.consumer.Connection{()},)
-
shutdown(c)[source]
-
start(c)[source]
-
stop(c)[source]
-
class celery.worker.consumer.Heart(c, without_heartbeat=False, heartbeat_interval=None, **kwargs)[source]
-
name = u'celery.worker.consumer.Heart'
-
requires = (step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)
-
shutdown(c)
-
start(c)[source]
-
stop(c)[source]
-
class celery.worker.consumer.Control(c, **kwargs)[source]
-
include_if(c)[source]
-
name = u'celery.worker.consumer.Control'
-
requires = (step:celery.worker.consumer.Tasks{(step:celery.worker.consumer.Mingle{(step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)},)},)
-
class celery.worker.consumer.Tasks(c, **kwargs)[source]
-
info(c)[source]
-
name = u'celery.worker.consumer.Tasks'
-
requires = (step:celery.worker.consumer.Mingle{(step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)},)
-
shutdown(c)[source]
-
start(c)[source]
-
stop(c)[source]
-
class celery.worker.consumer.Evloop(parent, **kwargs)[source]
-
label = 'event loop'
-
last = True
-
name = u'celery.worker.consumer.Evloop'
-
patch_all(c)[source]
-
requires = ()
-
start(c)[source]
-
class celery.worker.consumer.Agent(c, **kwargs)[source]
-
conditional = True
-
create(c)[source]
-
name = u'celery.worker.consumer.Agent'
-
requires = (step:celery.worker.consumer.Connection{()},)
-
class celery.worker.consumer.Mingle(c, without_mingle=False, **kwargs)[source]
-
compatible_transport(app)[source]
-
compatible_transports = set(['redis', 'amqp'])
-
label = 'Mingle'
-
name = u'celery.worker.consumer.Mingle'
-
requires = (step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)
-
start(c)[source]
-
class celery.worker.consumer.Gossip(c, without_gossip=False, interval=5.0, **kwargs)[source]
-
call_task(task)[source]
-
compatible_transport(app)[source]
-
compatible_transports = set(['redis', 'amqp'])
-
election(id, topic, action=None)[source]
-
get_consumers(channel)[source]
-
label = 'Gossip'
-
name = u'celery.worker.consumer.Gossip'
-
on_elect(event)[source]
-
on_elect_ack(event)[source]
-
on_message(prepare, message)[source]
-
on_node_join(worker)[source]
-
on_node_leave(worker)[source]
-
on_node_lost(worker)[source]
-
periodic()[source]
-
register_timer()[source]
-
requires = (step:celery.worker.consumer.Mingle{(step:celery.worker.consumer.Events{(step:celery.worker.consumer.Connection{()},)},)},)
-
start(c)[source]
-
celery.worker.consumer.dump_body(m, body)[source]