AMQP messaging (RabbitMQ)
Publish / Subscribe
See the example service after the decorator descriptions for a reference implementation of how the invoked functions may be used.
Invoker functions for RabbitMQ messages – pub/sub
@tomodachi.amqp(
routing_key,
exchange_name="amq.topic",
competing=True,
queue_name=None,
**kwargs
)
Sets up the method to be called whenever a AMQP / RabbitMQ message is received for the specified routing_key
. By default the "amq.topic"
topic exchange will be used unless changed within the decorator or by setting the options.amqp.exchange_name
parameter on the service class.
The competing
value is used when the same queue name should be used for several services of the same type and thus "compete" for who should consume the message. Since tomodachi
version 0.19.x
this value has a changed default value and will now default to True
as this is the most likely use-case for pub/sub in distributed architectures.
Unless queue_name
is specified, an auto generated queue name will be used. Additional prefixes to both routing_key
and queue_name
can be assigned by setting the options.amqp.routing_key_prefix
and options.amqp.queue_name_prefix
service class parameter values.
Message enveloping functionality / custom enveloping / raw messaging
Depending on if the service message_envelope
attribute (previously named message_protocol
) is used, parts of the enveloped data would be included as keyword arguments to the decorated function. It's usually safe to just use data
as an argument. You can also specify a specific message_envelope
value as a keyword argument to the decorator for custom enveloping method to use instead of the global one set for the service.
If you're utilizing from tomodachi.envelope import ProtobufBase
and using ProtobufBase
as the service's message_envelope
you may also pass the keyword argument proto_class
into the decorator, describing the protobuf (Protocol Buffers) generated Python class to use for decoding incoming messages. Likewise additional keyword arguments set on the decorator will be passed into the envelope class' decoder function parse_message
. Custom enveloping classes can be built to fit your existing architecture or for even more control of tracing and shared metadata between services.
Publishing messages
AMQP – Publish to exchange / routing key – tomodachi.amqp_publish
tomodachi.amqp_publish
await tomodachi.amqp_publish(service, message, routing_key=routine_key, exchange_name=...)
service
is the instance of the service class (from within a handler, useself
)message
is the message to publish before any potential envelope transformationrouting_key
is the routing key to use when publishing the messageexchange_name
is the exchange name for publishing the message (default: "amq.topic")
For more advanced workflows, it's also possible to specify overrides for the routing key prefix or message enveloping class.
Example implementation (RabbitMQ / AMQP)
import tomodachi
class Service(tomodachi.Service):
name = "amqp-example"
# The "message_envelope" attribute can be set on the service class to
# build / parse data.
# message_envelope = ...
# A route / topic on which the service will subscribe to via
# RabbitMQ / AMQP.
@tomodachi.amqp("example.topic")
async def example_func(self, message):
# Received message, fordarding the same message as response on
# another route / topic.
await tomodachi.amqp_publish(
self,
message,
routing_key="example.response"
)
Updated 9 months ago