This document describes the current stable version of Celery (3.1). For development docs, go here.

celery.app.control

celery.app.control

Client for worker remote control commands. Server implementation is in celery.worker.control.

class celery.app.control.Inspect(destination=None, timeout=1, callback=None, connection=None, app=None, limit=None)[source]
active(safe=False)[source]
active_queues()[source]
app = None
clock()[source]
conf(with_defaults=False)[source]
hello(from_node, revoked=None)[source]
memdump(samples=10)[source]
memsample()[source]
objgraph(type='Request', n=200, max_depth=10)[source]
ping()[source]
query_task(ids)[source]
registered(*taskinfoitems)[source]
registered_tasks(*taskinfoitems)
report()[source]
reserved(safe=False)[source]
revoked()[source]
scheduled(safe=False)[source]
stats()[source]
class celery.app.control.Control(app=None)[source]
class Mailbox(namespace, type='direct', connection=None, clock=None, accept=None, serializer=None)
Node(hostname=None, state=None, channel=None, handlers=None)
abcast(command, kwargs={})
accept = ['json']
call(destination, command, kwargs={}, timeout=None, callback=None, channel=None)
cast(destination, command, kwargs={})
connection = None
exchange = None
exchange_fmt = '%s.pidbox'
get_queue(hostname)
get_reply_queue()
multi_call(command, kwargs={}, timeout=1, limit=None, callback=None, channel=None)
namespace = None
node_cls

alias of Node

oid
reply_exchange = None
reply_exchange_fmt = 'reply.%s.pidbox'
reply_queue
serializer = None
type = 'direct'
Control.add_consumer(queue, exchange=None, exchange_type='direct', routing_key=None, options=None, **kwargs)[source]

Tell all (or specific) workers to start consuming from a new queue.

Only the queue name is required as if only the queue is specified then the exchange/routing key will be set to the same name ( like automatic queues do).

Note

This command does not respect the default queue/exchange options in the configuration.

Parameters:
  • queue – Name of queue to start consuming from.
  • exchange – Optional name of exchange.
  • exchange_type – Type of exchange (defaults to ‘direct’) command to, when empty broadcast to all workers.
  • routing_key – Optional routing key.
  • options – Additional options as supported by kombu.entitiy.Queue.from_dict().

See broadcast() for supported keyword arguments.

Control.broadcast(command, arguments=None, destination=None, connection=None, reply=False, timeout=1, limit=None, callback=None, channel=None, **extra_kwargs)[source]

Broadcast a control command to the celery workers.

Parameters:
  • command – Name of command to send.
  • arguments – Keyword arguments for the command.
  • destination – If set, a list of the hosts to send the command to, when empty broadcast to all workers.
  • connection – Custom broker connection to use, if not set, a connection will be established automatically.
  • reply – Wait for and return the reply.
  • timeout – Timeout in seconds to wait for the reply.
  • limit – Limit number of replies.
  • callback – Callback called immediately for each reply received.
Control.cancel_consumer(queue, **kwargs)[source]

Tell all (or specific) workers to stop consuming from queue.

Supports the same keyword arguments as broadcast().

Control.disable_events(destination=None, **kwargs)[source]

Tell all (or specific) workers to enable events.

Control.discard_all(connection=None)

Discard all waiting tasks.

This will ignore all tasks waiting for execution, and they will be deleted from the messaging server.

Returns:the number of tasks discarded.
Control.election(id, topic, action=None, connection=None)[source]
Control.enable_events(destination=None, **kwargs)[source]

Tell all (or specific) workers to enable events.

Control.inspect[source]
Control.ping(destination=None, timeout=1, **kwargs)[source]

Ping all (or specific) workers.

Will return the list of answers.

See broadcast() for supported keyword arguments.

Control.pool_grow(n=1, destination=None, **kwargs)[source]

Tell all (or specific) workers to grow the pool by n.

Supports the same arguments as broadcast().

Control.pool_shrink(n=1, destination=None, **kwargs)[source]

Tell all (or specific) workers to shrink the pool by n.

Supports the same arguments as broadcast().

Control.purge(connection=None)[source]

Discard all waiting tasks.

This will ignore all tasks waiting for execution, and they will be deleted from the messaging server.

Returns:the number of tasks discarded.
Control.rate_limit(task_name, rate_limit, destination=None, **kwargs)[source]

Tell all (or specific) workers to set a new rate limit for task by type.

Parameters:
  • task_name – Name of task to change rate limit for.
  • rate_limit – The rate limit as tasks per second, or a rate limit string (‘100/m’, etc. see celery.task.base.Task.rate_limit for more information).

See broadcast() for supported keyword arguments.

Control.revoke(task_id, destination=None, terminate=False, signal='SIGTERM', **kwargs)[source]

Tell all (or specific) workers to revoke a task by id.

If a task is revoked, the workers will ignore the task and not execute it after all.

Parameters:
  • task_id – Id of the task to revoke.
  • terminate – Also terminate the process currently working on the task (if any).
  • signal – Name of signal to send to process if terminate. Default is TERM.

See broadcast() for supported keyword arguments.

Control.time_limit(task_name, soft=None, hard=None, **kwargs)[source]

Tell all (or specific) workers to set time limits for a task by type.

Parameters:
  • task_name – Name of task to change time limits for.
  • soft – New soft time limit (in seconds).
  • hard – New hard time limit (in seconds).

Any additional keyword arguments are passed on to broadcast().

celery.app.control.flatten_reply(reply)[source]

Previous topic

celery.app.defaults

Next topic

celery.app.registry

This Page