This document describes the current stable version of Celery (3.1). For development docs, go here.

Task Message Protocol v2 (Draft Spec.)

Notes

  • Support for multiple languages via the lang header.

    Worker may redirect the message to a worker that supports the language.

  • Metadata moved to headers.

    This means that workers/intermediates can inspect the message and make decisions based on the headers without decoding the payload (which may be language specific, e.g. serialized by the Python specific pickle serializer).

  • Body is only for language specific data.

    • Python stores args/kwargs in body.
    • If a message uses raw encoding then the raw data will be passed as a single argument to the function.
    • Java/C, etc. can use a thrift/protobuf document as the body
  • Dispatches to actor based on c_type, c_meth headers

    c_meth is unused by python, but may be used in the future to specify class+method pairs.

  • Chain gains a dedicated field.

    Reducing the chain into a recursive callbacks argument causes problems when the recursion limit is exceeded.

    This is fixed in the new message protocol by specifying a list of signatures, each task will then pop a task off the list when sending the next message:

    execute_task(message)
    chain = message.headers['chain']
    if chain:
        sig = maybe_signature(chain.pop())
        sig.apply_async(chain=chain)
    
  • correlation_id replaces task_id field.

  • c_shadow lets you specify a different name for logs, monitors can be used for e.g. meta tasks that calls any function:

    from celery.utils.imports import qualname
    
    class PickleTask(Task):
        abstract = True
    
        def unpack_args(self, fun, args=()):
            return fun, args
    
        def apply_async(self, args, kwargs, **options):
            fun, real_args = self.unpack_args(*args)
            return super(PickleTask, self).apply_async(
                (fun, real_args, kwargs), shadow=qualname(fun), **options
            )
    
    @app.task(base=PickleTask)
    def call(fun, args, kwargs):
        return fun(*args, **kwargs)
    

Undecided

  • May consider moving callbacks/errbacks/chain into body.

    Will huge lists in headers cause overhead? The downside of keeping them in the body is that intermediates won’t be able to introspect these values.

Definition

# protocol v2 implies UTC=True
# 'class' header existing means protocol is v2

properties = {
    'correlation_id': (uuid)task_id,
    'content_type': (string)mime,
    'content_encoding': (string)encoding,

    # optional
    'reply_to': (string)queue_or_url,
}
headers = {
    'lang': (string)'py'
    'c_type': (string)task,

    # optional
    'c_meth': (string)unused,
    'c_shadow': (string)replace_name,
    'eta': (iso8601)eta,
    'expires'; (iso8601)expires,
    'callbacks': (list)Signature,
    'errbacks': (list)Signature,
    'chain': (list)Signature,  # non-recursive, reversed list of signatures
    'group': (uuid)group_id,
    'chord': (uuid)chord_id,
    'retries': (int)retries,
    'timelimit': (tuple)(soft, hard),
}

body = (args, kwargs)

Example

# chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8

task_id = uuid()
basic_publish(
    message=json.dumps([[2, 2], {}]),
    application_headers={
        'lang': 'py',
        'c_type': 'proj.tasks.add',
        'chain': [
            # reversed chain list
            {'task': 'proj.tasks.add', 'args': (8, )},
            {'task': 'proj.tasks.add', 'args': (4, )},
        ]
    }
    properties={
        'correlation_id': task_id,
        'content_type': 'application/json',
        'content_encoding': 'utf-8',
    }
)

Previous topic

Task Messages

Next topic

“The Big Instance” Refactor

This Page