Documentation
tomodachi
Documentation

AWS SNS+SQS messaging

Publish / Subscribe

📘

See the example service after the decorator descriptions for a reference implementation of how the invoked functions may be used.

Invoker functions for AWS SNS+SQS events – pub/sub

@tomodachi.aws_sns_sqs(
    topic=None, 
    competing=True, 
    queue_name=None, 
    filter_policy=FILTER_POLICY_DEFAULT,
    visibility_timeout=VISIBILITY_TIMEOUT_DEFAULT,
    dead_letter_queue_name=DEAD_LETTER_QUEUE_DEFAULT,
    max_receive_count=MAX_RECEIVE_COUNT_DEFAULT,
    fifo=False,
    max_number_of_consumed_messages=MAX_NUMBER_OF_CONSUMED_MESSAGES,
    **kwargs,
)

This would set up an AWS SQS queue (queue_name,) subscribing to messages on the AWS SNS topic topic (if a topic is specified), whereafter it will start consuming messages from the queue.

The competing value is used when the same queue name should be used for several services of the same type and thus "compete" for who should consume the message. Since tomodachi version 0.19.x this value has a changed default value and will now default to True as this is the most likely use-case for pub/sub in distributed architectures.

Unless queue_name is specified an auto generated queue name will be used. Additional prefixes to both topic and queue_name can be assigned by setting the options.aws_sns_sqs.topic_prefix and options.aws_sns_sqs.queue_name_prefix parameters on the service class.

SNS topic subscription with filter policy (applied to match message attributes)

The filter_policy value of specified as a keyword argument will be applied on the SNS subscription (for the specified topic and queue) as the "FilterPolicy" attribute. This will apply a filter on SNS messages using the chosen "message attributes" and/or their values specified in the filter. Make note that the filter policy dict structure differs somewhat from the actual message attributes, as values to the keys in the filter policy must be a dict (object) or list (array). Example: A filter policy value of {"event": ["order_paid"], "currency": ["EUR", "USD"]} would set up the SNS subscription to receive messages on the topic only where the message attribute "event" is "order_paid" and the "currency" value is either "EUR" or "USD".

If filter_policy is not specified as an argument (default), the queue will receive messages on the topic as per already specified if using an existing subscription, or receive all messages on the topic if a new subscription is set up (default). Changing the filter_policy on an existing subscription may take several minutes to propagate. Read more about the filter policy format on AWS.

Related to the above mentioned filter policy, the aws_sns_sqs_publish function (which is used for publishing messages) can specify "message attributes" using the optional message_attributes keyword argument. Values should be specified as a simple dict with keys and values. Example: {"event": "order_paid", "paid_amount": 100, "currency": "EUR"}.

SQS queue attribute - VisibilityTimeout

The visibility_timeout value of the decorator will set the queue attribute VisibilityTimeout if specified. To use already defined values for a queue (default), do not supply any value to the visibility_timeout keyword – tomodachi will then not modify the visibility timeout.

SQS queue attribute - RedrivePolicy (used to assign a dead-letter queue)

The keyword value for dead_letter_queue_name in tandem with the max_receive_count value will modify the queue attribute RedrivePolicy in regards to the potential use of a dead-letter queue to which messages will be delivered if they have been picked up by consumers max_receive_count number of times but haven't been deleted from the queue.

The value for dead_letter_queue_name should either be a ARN for an SQS queue, which in that case requires the queue to have been created in advance, or a alphanumeric queue name, which in that case will be set up similar to the queue name you specify in regards to prefixes, etc. Both dead_letter_queue_name and max_receive_count needs to be specified together, as they both affect the redrive policy.

Note that DLQs that are defined by an alphanumeric queue name (not an ARN), which is set up by the service will use a retention period of 1209600 seconds (14 days).

To disable the use of DLQ, use a None value for the dead_letter_queue_name keyword and the RedrivePolicy will be removed from the queue attribute. To use the already defined values for a queue, do not supply any values to the keyword arguments in the decorator. tomodachi will then not modify the queue attribute and leave it as is.

FIFO queues

AWS supports two types of queues and topics, namely standard and FIFO. The major difference between these is that the latter guarantees correct ordering and at-most-once delivery. By default, tomodachi creates standard queues and topics. To create them as FIFO instead, set fifo to True.

Maximum number of messages received in a batch from the queue

The max_number_of_consumed_messages setting determines how many messages should be pulled from the queue at once. This can be changed if you have a resource-intensive task that you don't want other messages to compete for. The default value is 10 for standard queues and 1 for FIFO queues. The minimum value is 1, and the maximum value is 10

Message enveloping functionality / custom enveloping / raw messaging

Depending on if the service message_envelope attribute (previously named message_protocol) is used, parts of the enveloped data would be included as keyword arguments to the decorated function. It's usually safe to just use data as an argument. You can also specify a specific message_envelope value as a keyword argument to the decorator for custom enveloping method to use instead of the global one set for the service.

If you're utilizing from tomodachi.envelope import ProtobufBase and using ProtobufBase as the service's message_envelope you may also pass the keyword argument proto_class into the decorator, describing the protobuf (Protocol Buffers) generated Python class to use for decoding incoming messages. Likewise additional keyword arguments set on the decorator will be passed into the envelope class' decoder function parse_message. Custom enveloping classes can be built to fit your existing architecture or for even more control of tracing and shared metadata between services.

Encryption – options related to SNS + SQS encryption at rest using AWS KMS

Encryption at rest for AWS SNS and/or AWS SQS which can optionally be configured by specifying the KMS key alias or KMS key id as a tomodachi service option options.aws_sns_sqs.sns_kms_master_key_id (to configure encryption at rest on the SNS topics for which the tomodachi service handles the SNS -> SQS subscriptions) and/or options.aws_sns_sqs.sqs_kms_master_key_id (to configure encryption at rest for the SQS queues which the service is consuming).

Note that an option value set to empty string ("") or False will unset the KMS master key id and thus disable encryption at rest. (The AWS APIs for SNS and SQS uses empty string value to the KMSMasterKeyId attribute to disable encryption with KMS if it was previously enabled).

If instead an option is completely unset or set to None value no changes will be done to the KMS related attributes on an existing topic or queue.

If it's expected that the services themselves, via their IAM credentials or assumed role, are responsible for creating queues and topics, these options could be used to provide encryption at rest without additional manual intervention.

Make sure that the key policy / role policy are set to allow KMS use and the SNS -> SQS functionality. https://aws.amazon.com/premiumsupport/knowledge-center/sns-topic-sqs-queue-sse-cmk-policy/

Do not use these options if you instead are using IaC tooling to handle the topics, queues and subscriptions or that they for example are created / updated as a part of deployments. To not have the service update any attributes keep the options unset or set to a None value.

Read more at


Publishing messages

AWS – Publish message to SNS – tomodachi.aws_sns_sqs_publish

await tomodachi.aws_sns_sqs_publish(service, message, topic=topic)
  • service is the instance of the service class (from within a handler, use self)
  • message is the message to publish before any potential envelope transformation
  • topic is the non-prefixed name of the SNS topic used to publish the message

Additional function arguments can be supplied to also include message_attributes, and / or group_id + deduplication_id.

For more advanced workflows, it's also possible to specify overrides for the SNS topic name prefix or message enveloping class.

AWS – Send message to SQS – tomodachi.sqs_send_message

await tomodachi.sqs_send_message(service, message, queue_name=queue_name)
  • service is the instance of the service class (from within a handler, use self)
  • message is the message to publish before any potential envelope transformation
  • queue_name is the SQS queue url, queue ARN or non-prefixed queue name to be used

Additional function arguments can be supplied to also include message_attributes, and / or group_id + deduplication_id.

For more advanced workflows, it's also possible to set delay seconds, define a custom message body formatter, or to specify overrides for the SNS topic name prefix or message enveloping class.


Example implementation (AWS SNS+SQS)

import tomodachi


class Service(tomodachi.Service):
    name = "aws-example"

    options = {
        "aws_sns_sqs.region_name": None,  # AWS region (example: 'eu-west-1')
        "aws_sns_sqs.aws_access_key_id": None,  # AWS access key id
        "aws_sns_sqs.aws_secret_access_key": None,  # AWS secret key
    }

    # The "message_envelope" attribute can be set on the service class to 
    # build / parse data.

    # message_envelope = ...

    # Using the @tomodachi.aws_sns_sqs decorator to make the service 
    # create an AWS SNS topic, an AWS SQS queue and to make a subscription 
    # from the topic. The queue will be polled to receive messages using
    # SQS.ReceiveMessages API.
    @tomodachi.aws_sns_sqs("example-topic", queue_name="example-queue")
    async def example_func(self, message):
        # Received message, forwarding the same message as response on 
        # another topic.
        await tomodachi.aws_sns_sqs_publish(
            self, 
            message, 
            topic="another-example-topic"
        )