diff --git a/kombu/asynchronous/aws/ext.py b/kombu/asynchronous/aws/ext.py index 1fa4a57e8..fc472d5f4 100644 --- a/kombu/asynchronous/aws/ext.py +++ b/kombu/asynchronous/aws/ext.py @@ -4,11 +4,13 @@ try: import boto3 + import sqs_extended_client from botocore import exceptions from botocore.awsrequest import AWSRequest from botocore.response import get_response except ImportError: boto3 = None + sqs_extended_client = None class _void: pass diff --git a/kombu/asynchronous/aws/sqs/ext.py b/kombu/asynchronous/aws/sqs/ext.py index 72268b5db..96d4f5904 100644 --- a/kombu/asynchronous/aws/sqs/ext.py +++ b/kombu/asynchronous/aws/sqs/ext.py @@ -5,5 +5,7 @@ try: import boto3 + import sqs_extended_client except ImportError: boto3 = None + sqs_extended_client = None diff --git a/kombu/transport/SQS.py b/kombu/transport/SQS.py index 26a69637e..1d8727f82 100644 --- a/kombu/transport/SQS.py +++ b/kombu/transport/SQS.py @@ -133,6 +133,7 @@ from __future__ import annotations import base64 +import json import socket import string import uuid @@ -144,7 +145,7 @@ from vine import ensure_promise, promise, transform from kombu.asynchronous import get_event_loop -from kombu.asynchronous.aws.ext import boto3, exceptions +from kombu.asynchronous.aws.ext import boto3, exceptions, sqs_extended_client from kombu.asynchronous.aws.sqs.connection import AsyncSQSConnection from kombu.asynchronous.aws.sqs.message import AsyncMessage from kombu.log import get_logger @@ -498,6 +499,25 @@ def _message_to_python(self, message, queue_name, q_url): message['ReceiptHandle'], ) else: + + if ( + sqs_extended_client and + isinstance(payload, list) + and payload[0] == sqs_extended_client.client.MESSAGE_POINTER_CLASS + ): + # Used the sqs_extended_client, so we need to fetch the file from S3 and use that as the payload + s3_details = payload[1] + s3_bucket_name, s3_key = s3_details["s3BucketName"], s3_details["s3Key"] + + s3_client = self.s3() + response = s3_client.get_object(Bucket=s3_bucket_name, Key=s3_key) + + # The message body is under a wrapper class called StreamingBody + streaming_body = response["Body"] + payload = json.loads( + self._optional_b64_decode(streaming_body.read()) + ) + try: properties = payload['properties'] delivery_info = payload['properties']['delivery_info'] @@ -713,6 +733,32 @@ def close(self): # if "can't set attribute" not in str(exc): # raise + def new_s3_client( + self, region, access_key_id, secret_access_key, session_token=None + ): + session = boto3.session.Session( + region_name=region, + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, + aws_session_token=session_token, + ) + is_secure = self.is_secure if self.is_secure is not None else True + client_kwargs = {"use_ssl": is_secure} + + if self.endpoint_url is not None: + client_kwargs["endpoint_url"] = self.endpoint_url + + client = session.client("s3", **client_kwargs) + + return client + + def s3(self): + return self.new_s3_client( + region=self.region, + access_key_id=self.conninfo.userid, + secret_access_key=self.conninfo.password, + ) + def new_sqs_client(self, region, access_key_id, secret_access_key, session_token=None): session = boto3.session.Session( @@ -729,7 +775,13 @@ def new_sqs_client(self, region, access_key_id, client_kwargs['endpoint_url'] = self.endpoint_url client_config = self.transport_options.get('client-config') or {} config = Config(**client_config) - return session.client('sqs', config=config, **client_kwargs) + client = session.client('sqs', config=config, **client_kwargs) + + if self.transport_options.get('large_payload_bucket') and sqs_extended_client: + client.large_payload_support = self.transport_options.get('large_payload_bucket') + client.use_legacy_attribute = False + + return client def sqs(self, queue=None): if queue is not None and self.predefined_queues: diff --git a/requirements/extras/sqs.txt b/requirements/extras/sqs.txt index 77f75030c..a0626a7d9 100644 --- a/requirements/extras/sqs.txt +++ b/requirements/extras/sqs.txt @@ -1,3 +1,4 @@ boto3>=1.26.143 pycurl>=7.43.0.5; sys_platform != 'win32' and platform_python_implementation=="CPython" urllib3>=1.26.16 +amazon-sqs-extended-client>=1.0.1