Skip to content

Commit

Permalink
Integrate large payload support for SQS
Browse files Browse the repository at this point in the history
This adds support for handling large payloads in SQS. The 'sqs_extended_client' is imported and utilized for fetching file from S3 as payload when necessary.

As Kombu asynchronously fetches new messages from the queue, not using the standard boto3 APIs, we have to manually fetch the s3 file, rather than rely on the sqs_extended_client to perform that action

Relates to: #279
  • Loading branch information
Amwam committed Sep 18, 2024
1 parent a626f1f commit ca4eb4b
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 2 deletions.
2 changes: 2 additions & 0 deletions kombu/asynchronous/aws/ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

try:
import boto3
import sqs_extended_client
from botocore import exceptions
from botocore.awsrequest import AWSRequest
from botocore.response import get_response
except ImportError:
boto3 = None
sqs_extended_client = None

class _void:
pass
Expand Down
2 changes: 2 additions & 0 deletions kombu/asynchronous/aws/sqs/ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@

try:
import boto3
import sqs_extended_client
except ImportError:
boto3 = None
sqs_extended_client = None
56 changes: 54 additions & 2 deletions kombu/transport/SQS.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
from __future__ import annotations

import base64
import json
import socket
import string
import uuid
Expand All @@ -144,7 +145,7 @@
from vine import ensure_promise, promise, transform

from kombu.asynchronous import get_event_loop
from kombu.asynchronous.aws.ext import boto3, exceptions
from kombu.asynchronous.aws.ext import boto3, exceptions, sqs_extended_client
from kombu.asynchronous.aws.sqs.connection import AsyncSQSConnection
from kombu.asynchronous.aws.sqs.message import AsyncMessage
from kombu.log import get_logger
Expand Down Expand Up @@ -498,6 +499,25 @@ def _message_to_python(self, message, queue_name, q_url):
message['ReceiptHandle'],
)
else:

if (
sqs_extended_client and
isinstance(payload, list)
and payload[0] == sqs_extended_client.client.MESSAGE_POINTER_CLASS
):
# Used the sqs_extended_client, so we need to fetch the file from S3 and use that as the payload
s3_details = payload[1]
s3_bucket_name, s3_key = s3_details["s3BucketName"], s3_details["s3Key"]

s3_client = self.s3()
response = s3_client.get_object(Bucket=s3_bucket_name, Key=s3_key)

# The message body is under a wrapper class called StreamingBody
streaming_body = response["Body"]
payload = json.loads(
self._optional_b64_decode(streaming_body.read())
)

try:
properties = payload['properties']
delivery_info = payload['properties']['delivery_info']
Expand Down Expand Up @@ -713,6 +733,32 @@ def close(self):
# if "can't set attribute" not in str(exc):
# raise

def new_s3_client(
self, region, access_key_id, secret_access_key, session_token=None
):
session = boto3.session.Session(
region_name=region,
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
aws_session_token=session_token,
)
is_secure = self.is_secure if self.is_secure is not None else True
client_kwargs = {"use_ssl": is_secure}

if self.endpoint_url is not None:
client_kwargs["endpoint_url"] = self.endpoint_url

client = session.client("s3", **client_kwargs)

return client

def s3(self):
return self.new_s3_client(
region=self.region,
access_key_id=self.conninfo.userid,
secret_access_key=self.conninfo.password,
)

def new_sqs_client(self, region, access_key_id,
secret_access_key, session_token=None):
session = boto3.session.Session(
Expand All @@ -729,7 +775,13 @@ def new_sqs_client(self, region, access_key_id,
client_kwargs['endpoint_url'] = self.endpoint_url
client_config = self.transport_options.get('client-config') or {}
config = Config(**client_config)
return session.client('sqs', config=config, **client_kwargs)
client = session.client('sqs', config=config, **client_kwargs)

if self.transport_options.get('large_payload_bucket') and sqs_extended_client:
client.large_payload_support = self.transport_options.get('large_payload_bucket')
client.use_legacy_attribute = False

return client

def sqs(self, queue=None):
if queue is not None and self.predefined_queues:
Expand Down
1 change: 1 addition & 0 deletions requirements/extras/sqs.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
boto3>=1.26.143
pycurl>=7.43.0.5; sys_platform != 'win32' and platform_python_implementation=="CPython"
urllib3>=1.26.16
amazon-sqs-extended-client>=1.0.1

0 comments on commit ca4eb4b

Please sign in to comment.