Skip to content

Commit

Permalink
SQS: add support for passing MessageAttributes (#2059)
Browse files Browse the repository at this point in the history
* SQS: add support for passing MessageAttributes

* tests,docs

* fix tests

* format ,docs

---------

Co-authored-by: Tomer Nosrati <[email protected]>
  • Loading branch information
aviramha and Nusnus authored Jul 21, 2024
1 parent d121125 commit aeebd06
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 9 deletions.
8 changes: 8 additions & 0 deletions docs/reference/kombu.transport.SQS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,11 @@ The above policy:
+-----------------------------------------+--------------------------------------------+
| ``6th attempt`` | 640 seconds |
+-----------------------------------------+--------------------------------------------+


Message Attributes
------------------------

SQS supports sending message attributes along with the message body.
To use this feature, you can pass a 'message_attributes' as keyword argument
to `basic_publish` method.
26 changes: 20 additions & 6 deletions kombu/transport/SQS.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@
moving on to queueB. If queueB is empty, it will wait up until
'polling_interval' expires before moving back and checking on queueA.
Message Attributes
-----------------
https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html
SQS supports sending message attributes along with the message body.
To use this feature, you can pass a 'message_attributes' as keyword argument
to `basic_publish` method.
Other Features supported by this transport
==========================================
Predefined Queues
Expand Down Expand Up @@ -412,13 +420,12 @@ def _delete(self, queue, *args, **kwargs):
def _put(self, queue, message, **kwargs):
"""Put message onto queue."""
q_url = self._new_queue(queue)
if self.sqs_base64_encoding:
body = AsyncMessage().encode(dumps(message))
else:
body = dumps(message)
kwargs = {'QueueUrl': q_url, 'MessageBody': body}

kwargs = {'QueueUrl': q_url}
if 'properties' in message:
if 'message_attributes' in message['properties']:
# we don't want to want to have the attribute in the body
kwargs['MessageAttributes'] = \
message['properties'].pop('message_attributes')
if queue.endswith('.fifo'):
if 'MessageGroupId' in message['properties']:
kwargs['MessageGroupId'] = \
Expand All @@ -434,6 +441,13 @@ def _put(self, queue, message, **kwargs):
if "DelaySeconds" in message['properties']:
kwargs['DelaySeconds'] = \
message['properties']['DelaySeconds']

if self.sqs_base64_encoding:
body = AsyncMessage().encode(dumps(message))
else:
body = dumps(message)
kwargs['MessageBody'] = body

c = self.sqs(queue=self.canonical_queue_name(queue))
if message.get('redelivered'):
c.change_message_visibility(
Expand Down
20 changes: 17 additions & 3 deletions t/unit/transport/test_SQS.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,15 @@ def list_queues(self, QueueNamePrefix=None):
def get_queue_url(self, QueueName=None):
return self._queues[QueueName]

def send_message(self, QueueUrl=None, MessageBody=None):
def send_message(self, QueueUrl=None, MessageBody=None,
MessageAttributes=None):
for q in self._queues.values():
if q.url == QueueUrl:
handle = ''.join(random.choice(string.ascii_lowercase) for
x in range(10))
q.messages.append({'Body': MessageBody,
'ReceiptHandle': handle})
'ReceiptHandle': handle,
'MessageAttributes': MessageAttributes})
break

def receive_message(self, QueueUrl=None, MaxNumberOfMessages=1,
Expand Down Expand Up @@ -472,7 +474,7 @@ def test_get_async(self):
'WaitTimeSeconds': self.channel.wait_time_seconds,
}
assert get_list_args[3] == \
self.channel.sqs().get_queue_url(self.queue_name).url
self.channel.sqs().get_queue_url(self.queue_name).url
assert get_list_kwargs['parent'] == self.queue_name

def test_drain_events_with_empty_list(self):
Expand Down Expand Up @@ -977,3 +979,15 @@ def test_sts_session_not_expired(self):

# Assert
mock_generate_sts_session_token.assert_not_called()

def test_message_attribute(self):
message = 'my test message'
self.producer.publish(message, message_attributes={
'Attribute1': {'DataType': 'String',
'StringValue': 'STRING_VALUE'}
}
)
output_message = self.queue(self.channel).get()
assert message == output_message.payload
# It's not propogated to the properties
assert 'message_attributes' not in output_message.properties

0 comments on commit aeebd06

Please sign in to comment.