Skip to content

Commit

Permalink
SQS: add support for passing MessageAttributes
Browse files Browse the repository at this point in the history
  • Loading branch information
aviramha committed Jul 12, 2024
1 parent f61c6d1 commit 9e69540
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions kombu/transport/SQS.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,13 +412,12 @@ def _delete(self, queue, *args, **kwargs):
def _put(self, queue, message, **kwargs):
"""Put message onto queue."""
q_url = self._new_queue(queue)
if self.sqs_base64_encoding:
body = AsyncMessage().encode(dumps(message))
else:
body = dumps(message)
kwargs = {'QueueUrl': q_url, 'MessageBody': body}

kwargs = {'QueueUrl': q_url}
if 'properties' in message:
if 'message_attributes' in message['properties']:
# we don't want to want to have the attribute in the body
kwargs['MessageAttributes'] = \
message['properties'].pop('message_attributes')
if queue.endswith('.fifo'):
if 'MessageGroupId' in message['properties']:
kwargs['MessageGroupId'] = \
Expand All @@ -434,6 +433,13 @@ def _put(self, queue, message, **kwargs):
if "DelaySeconds" in message['properties']:
kwargs['DelaySeconds'] = \
message['properties']['DelaySeconds']

if self.sqs_base64_encoding:
body = AsyncMessage().encode(dumps(message))
else:
body = dumps(message)
kwargs['MessageBody'] = body

c = self.sqs(queue=self.canonical_queue_name(queue))
if message.get('redelivered'):
c.change_message_visibility(
Expand Down

0 comments on commit 9e69540

Please sign in to comment.