AWS SNS+SQS messaging
Publish / Subscribe
See the example service after the decorator descriptions for a reference implementation of how the invoked functions may be used.
Invoker functions for AWS SNS+SQS events – pub/sub
@tomodachi.aws_sns_sqs(
topic=None,
competing=True,
queue_name=None,
filter_policy=FILTER_POLICY_DEFAULT,
visibility_timeout=VISIBILITY_TIMEOUT_DEFAULT,
dead_letter_queue_name=DEAD_LETTER_QUEUE_DEFAULT,
max_receive_count=MAX_RECEIVE_COUNT_DEFAULT,
fifo=False,
max_number_of_consumed_messages=MAX_NUMBER_OF_CONSUMED_MESSAGES,
**kwargs,
)
This would set up an AWS SQS queue (queue_name,)
subscribing to messages on the AWS SNS topic topic
(if a topic
is specified), whereafter it will start consuming messages from the queue.
The competing
value is used when the same queue name should be used for several services of the same type and thus "compete" for who should consume the message. Since tomodachi
version 0.19.x
this value has a changed default value and will now default to True
as this is the most likely use-case for pub/sub in distributed architectures.
Unless queue_name
is specified an auto generated queue name will be used. Additional prefixes to both topic
and queue_name
can be assigned by setting the options.aws_sns_sqs.topic_prefix
and options.aws_sns_sqs.queue_name_prefix
parameters on the service class.
SNS topic subscription with filter policy (applied to match message attributes)
The filter_policy
value of specified as a keyword argument will be applied on the SNS subscription (for the specified topic and queue) as the "FilterPolicy"
attribute. This will apply a filter on SNS messages using the chosen "message attributes" and/or their values specified in the filter. Make note that the filter policy dict structure differs somewhat from the actual message attributes, as values to the keys in the filter policy must be a dict (object) or list (array). Example: A filter policy value of {"event": ["order_paid"], "currency": ["EUR", "USD"]}
would set up the SNS subscription to receive messages on the topic only where the message attribute "event"
is "order_paid"
and the "currency"
value is either "EUR"
or "USD"
.
If filter_policy
is not specified as an argument (default), the queue will receive messages on the topic as per already specified if using an existing subscription, or receive all messages on the topic if a new subscription is set up (default). Changing the filter_policy
on an existing subscription may take several minutes to propagate. Read more about the filter policy format on AWS.
Related to the above mentioned filter policy, the aws_sns_sqs_publish
function (which is used for publishing messages) can specify "message attributes" using the optional message_attributes
keyword argument. Values should be specified as a simple dict
with keys and values. Example: {"event": "order_paid", "paid_amount": 100, "currency": "EUR"}
.
SQS queue attribute - VisibilityTimeout
VisibilityTimeout
The visibility_timeout
value of the decorator will set the queue attribute VisibilityTimeout
if specified. To use already defined values for a queue (default), do not supply any value to the visibility_timeout
keyword – tomodachi
will then not modify the visibility timeout.
SQS queue attribute - RedrivePolicy
(used to assign a dead-letter queue)
RedrivePolicy
(used to assign a dead-letter queue)The keyword value for dead_letter_queue_name
in tandem with the max_receive_count
value will modify the queue attribute RedrivePolicy
in regards to the potential use of a dead-letter queue to which messages will be delivered if they have been picked up by consumers max_receive_count
number of times but haven't been deleted from the queue.
The value for dead_letter_queue_name
should either be a ARN for an SQS queue, which in that case requires the queue to have been created in advance, or a alphanumeric queue name, which in that case will be set up similar to the queue name you specify in regards to prefixes, etc. Both dead_letter_queue_name
and max_receive_count
needs to be specified together, as they both affect the redrive policy.
Note that DLQs that are defined by an alphanumeric queue name (not an ARN), which is set up by the service will use a retention period of 1209600 seconds (14 days).
To disable the use of DLQ, use a None
value for the dead_letter_queue_name
keyword and the RedrivePolicy
will be removed from the queue attribute. To use the already defined values for a queue, do not supply any values to the keyword arguments in the decorator. tomodachi
will then not modify the queue attribute and leave it as is.
FIFO queues
AWS supports two types of queues and topics, namely standard and FIFO. The major difference between these is that the latter guarantees correct ordering and at-most-once delivery. By default, tomodachi
creates standard queues and topics. To create them as FIFO instead, set fifo
to True
.
Maximum number of messages received in a batch from the queue
The max_number_of_consumed_messages
setting determines how many messages should be pulled from the queue at once. This can be changed if you have a resource-intensive task that you don't want other messages to compete for. The default value is 10
for standard queues and 1
for FIFO queues. The minimum value is 1
, and the maximum value is 10
Message enveloping functionality / custom enveloping / raw messaging
Depending on if the service message_envelope
attribute (previously named message_protocol
) is used, parts of the enveloped data would be included as keyword arguments to the decorated function. It's usually safe to just use data
as an argument. You can also specify a specific message_envelope
value as a keyword argument to the decorator for custom enveloping method to use instead of the global one set for the service.
If you're utilizing from tomodachi.envelope import ProtobufBase
and using ProtobufBase
as the service's message_envelope
you may also pass the keyword argument proto_class
into the decorator, describing the protobuf (Protocol Buffers) generated Python class to use for decoding incoming messages. Likewise additional keyword arguments set on the decorator will be passed into the envelope class' decoder function parse_message
. Custom enveloping classes can be built to fit your existing architecture or for even more control of tracing and shared metadata between services.
Encryption – options related to SNS + SQS encryption at rest using AWS KMS
Encryption at rest for AWS SNS and/or AWS SQS which can optionally be configured by specifying the KMS key alias or KMS key id as a tomodachi service option options.aws_sns_sqs.sns_kms_master_key_id
(to configure encryption at rest on the SNS topics for which the tomodachi service handles the SNS -> SQS subscriptions) and/or options.aws_sns_sqs.sqs_kms_master_key_id
(to configure encryption at rest for the SQS queues which the service is consuming).
Note that an option value set to empty string (""
) or False
will unset the KMS master key id and thus disable encryption at rest. (The AWS APIs for SNS and SQS uses empty string value to the KMSMasterKeyId attribute to disable encryption with KMS if it was previously enabled).
If instead an option is completely unset or set to None
value no changes will be done to the KMS related attributes on an existing topic or queue.
If it's expected that the services themselves, via their IAM credentials or assumed role, are responsible for creating queues and topics, these options could be used to provide encryption at rest without additional manual intervention.
Make sure that the key policy / role policy are set to allow KMS use and the SNS -> SQS functionality. https://aws.amazon.com/premiumsupport/knowledge-center/sns-topic-sqs-queue-sse-cmk-policy/
Do not use these options if you instead are using IaC tooling to handle the topics, queues and subscriptions or that they for example are created / updated as a part of deployments. To not have the service update any attributes keep the options unset or set to a None
value.
Read more at
- https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-server-side-encryption.html
- https://docs.aws.amazon.com/sns/latest/dg/sns-server-side-encryption.html#sse-key-terms.
Publishing messages
AWS – Publish message to SNS – tomodachi.aws_sns_sqs_publish
tomodachi.aws_sns_sqs_publish
await tomodachi.aws_sns_sqs_publish(service, message, topic=topic)
service
is the instance of the service class (from within a handler, useself
)message
is the message to publish before any potential envelope transformationtopic
is the non-prefixed name of the SNS topic used to publish the message
Additional function arguments can be supplied to also include message_attributes
, and / or group_id
+ deduplication_id
.
For more advanced workflows, it's also possible to specify overrides for the SNS topic name prefix or message enveloping class.
AWS – Send message to SQS – tomodachi.sqs_send_message
tomodachi.sqs_send_message
await tomodachi.sqs_send_message(service, message, queue_name=queue_name)
service
is the instance of the service class (from within a handler, useself
)message
is the message to publish before any potential envelope transformationqueue_name
is the SQS queue url, queue ARN or non-prefixed queue name to be used
Additional function arguments can be supplied to also include message_attributes
, and / or group_id
+ deduplication_id
.
For more advanced workflows, it's also possible to set delay seconds, define a custom message body formatter, or to specify overrides for the SNS topic name prefix or message enveloping class.
Example implementation (AWS SNS+SQS)
import tomodachi
class Service(tomodachi.Service):
name = "aws-example"
options = {
"aws_sns_sqs.region_name": None, # AWS region (example: 'eu-west-1')
"aws_sns_sqs.aws_access_key_id": None, # AWS access key id
"aws_sns_sqs.aws_secret_access_key": None, # AWS secret key
}
# The "message_envelope" attribute can be set on the service class to
# build / parse data.
# message_envelope = ...
# Using the @tomodachi.aws_sns_sqs decorator to make the service
# create an AWS SNS topic, an AWS SQS queue and to make a subscription
# from the topic. The queue will be polled to receive messages using
# SQS.ReceiveMessages API.
@tomodachi.aws_sns_sqs("example-topic", queue_name="example-queue")
async def example_func(self, message):
# Received message, forwarding the same message as response on
# another topic.
await tomodachi.aws_sns_sqs_publish(
self,
message,
topic="another-example-topic"
)
Updated 8 months ago