Common Utilities - kombu.common¶
kombu.common¶
Common Utilities.
-
class
kombu.common.
Broadcast
(name=None, queue=None, **kwargs)¶ Convenience class used to define broadcast queues.
Every queue instance will have a unique name, and both the queue and exchange is configured with auto deletion.
Parameters: - name – This is used as the name of the exchange.
- queue – By default a unique id is used for the queue name for every consumer. You can specify a custom queue name here.
- **kwargs – See
Queue
for a list of additional keyword arguments supported.
-
kombu.common.
maybe_declare
(entity, channel=None, retry=False, **retry_policy)¶
-
kombu.common.
uuid
()¶ Generate a unique id, having - hopefully - a very small chance of collision.
For now this is provided by
uuid.uuid4()
.
-
kombu.common.
itermessages
(conn, channel, queue, limit=1, timeout=None, callbacks=None, **kwargs)¶
-
kombu.common.
send_reply
(exchange, req, msg, producer=None, retry=False, retry_policy=None, **props)¶ Send reply for request.
Parameters: - exchange – Reply exchange
- req – Original request, a message with a
reply_to
property. - producer – Producer instance
- retry – If true must retry according to
reply_policy
argument. - retry_policy – Retry settings.
- props – Extra properties
-
kombu.common.
collect_replies
(conn, channel, queue, *args, **kwargs)¶ Generator collecting replies from
queue
-
kombu.common.
insured
(pool, fun, args, kwargs, errback=None, on_revive=None, **opts)¶ Ensures function performing broker commands completes despite intermittent connection failures.
-
kombu.common.
drain_consumer
(consumer, limit=1, timeout=None, callbacks=None)¶
-
kombu.common.
eventloop
(conn, limit=None, timeout=None, ignore_timeouts=False)¶ Best practice generator wrapper around
Connection.drain_events
.Able to drain events forever, with a limit, and optionally ignoring timeout errors (a timeout of 1 is often used in environments where the socket can get “stuck”, and is a best practice for Kombu consumers).
Examples
eventloop
is a generator:from kombu.common import eventloop def run(connection): it = eventloop(connection, timeout=1, ignore_timeouts=True) next(it) # one event consumed, or timed out. for _ in eventloop(connection, timeout=1, ignore_timeouts=True): pass # loop forever.
It also takes an optional limit parameter, and timeout errors are propagated by default:
for _ in eventloop(connection, limit=1, timeout=1): pass
See also
itermessages()
, which is an event loop bound to one or more consumers, that yields any messages received.