AMQP messaging (RabbitMQ)

Publish / Subscribe


See the example service after the decorator descriptions for a reference implementation of how the invoked functions may be used.

Invoker functions for RabbitMQ messages – pub/sub


Sets up the method to be called whenever a AMQP / RabbitMQ message is received for the specified routing_key. By default the "amq.topic" topic exchange will be used unless changed within the decorator or by setting the options.amqp.exchange_name parameter on the service class.

The competing value is used when the same queue name should be used for several services of the same type and thus "compete" for who should consume the message. Since tomodachi version 0.19.x this value has a changed default value and will now default to True as this is the most likely use-case for pub/sub in distributed architectures.

Unless queue_name is specified, an auto generated queue name will be used. Additional prefixes to both routing_key and queue_name can be assigned by setting the options.amqp.routing_key_prefix and options.amqp.queue_name_prefix service class parameter values.

Message enveloping functionality / custom enveloping / raw messaging

Depending on if the service message_envelope attribute (previously named message_protocol) is used, parts of the enveloped data would be included as keyword arguments to the decorated function. It's usually safe to just use data as an argument. You can also specify a specific message_envelope value as a keyword argument to the decorator for custom enveloping method to use instead of the global one set for the service.

If you're utilizing from tomodachi.envelope import ProtobufBase and using ProtobufBase as the service's message_envelope you may also pass the keyword argument proto_class into the decorator, describing the protobuf (Protocol Buffers) generated Python class to use for decoding incoming messages. Likewise additional keyword arguments set on the decorator will be passed into the envelope class' decoder function parse_message. Custom enveloping classes can be built to fit your existing architecture or for even more control of tracing and shared metadata between services.

Publishing messages

AMQP – Publish to exchange / routing key – tomodachi.amqp_publish

await tomodachi.amqp_publish(service, message, routing_key=routine_key, exchange_name=...)
  • service is the instance of the service class (from within a handler, use self)
  • message is the message to publish before any potential envelope transformation
  • routing_key is the routing key to use when publishing the message
  • exchange_name is the exchange name for publishing the message (default: "amq.topic")

For more advanced workflows, it's also possible to specify overrides for the routing key prefix or message enveloping class.

Example implementation (RabbitMQ / AMQP)

import tomodachi

class Service(tomodachi.Service):
    name = "amqp-example"

    # The "message_envelope" attribute can be set on the service class to 
    # build / parse data.

    # message_envelope = ...

    # A route / topic on which the service will subscribe to via 
    # RabbitMQ / AMQP.
    async def example_func(self, message):
        # Received message, fordarding the same message as response on 
        # another route / topic.
        await tomodachi.amqp_publish(