Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQS: add support for passing MessageAttributes #2059

Merged
merged 7 commits into from
Jul 21, 2024

Conversation

aviramha
Copy link
Contributor

@aviramha aviramha commented Jul 12, 2024

Hi!
I looked for a contributing guide and couldn't find one, so apologies if I missed any requirement/need.

SQS supports MessageAttributes, and with this PR, users using celery can easily add message attributes like this:

from celery import Celery, signature

# Initialize the Celery app with SQS as the broker
app = Celery('sqs_producer', broker='sqs://')

# AWS configuration
app.conf.update(
    broker_transport_options={
        'region': 'eu-north-1',
        'polling_interval': 1,
    },
    task_default_queue='ManualTesting1'
)


# Custom signature function
def custom_signature(message_body):
    return signature('sqs_producer.send_message_to_sqs', args=(message_body,))

if __name__ == '__main__':
    message_body = {
        'key1': 'value1',
        'key2': 'value2'
    }
    # Using the custom signature to create a task
    task = custom_signature(message_body)
    # Apply the task asynchronously
    result = task.apply_async(message_attributes = {
    'Attribute1': {'DataType': 'String',
            'StringValue': 'STRING_VALUE'}
    })

Notes:

  • I don't put the message attributes into the message properties - I feel that's a bit weird to include it there, but would understand if that might be problematic (if someone used this attribute before and now it won't appear)

This is probably useful for many reasons, but in our case it's blocking use of feature we provide over it - metalbear-co/mirrord#2066

@auvipy auvipy self-requested a review July 15, 2024 16:05
Copy link
Member

@auvipy auvipy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the PR! this will need sufficient tests and possibly some documentation as it seems to be a somewhat new feature for SQS

@Nusnus Nusnus self-requested a review July 15, 2024 16:48
@aviramha
Copy link
Contributor Author

I wasn't sure what's expected in terms of tests and docs - also couldn't build docs - got this error:

make lint
flake8 "kombu" "t"
(cd "docs/"; /Library/Developer/CommandLineTools/usr/bin/make apicheck)
Makefile:12: *** commands commence before first target.  Stop.
make: *** [apicheck] Error 2

@aviramha
Copy link
Contributor Author

Found how to run tests, should be okay now.

@aviramha aviramha requested a review from auvipy July 17, 2024 07:26
@auvipy auvipy added this to the 5.4 milestone Jul 17, 2024
Copy link
Member

@Nusnus Nusnus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@aviramha can you share some logs showing it works on an actual environment though?

Just to be on the safe-side so we can also see it actually works (apart of the code review which makes sense to me + the unit test which looks fine).

Thanks!

@aviramha
Copy link
Contributor Author

aviramha commented Jul 17, 2024

mirrord-operator-557c4cf78b-vrjfh mirrord-operator   2024-07-12T13:10:41.952140Z TRACE sqs_splitting::queue_splitter: new message to filter. message attributes: Some({"Attribute1": MessageAttributeValue { string_value: Some("STRING_VALUE"), binary_value: None, string_list_values: None, binary_list_values: None, data_type: "String" }})!
mirrord-operator-557c4cf78b-vrjfh mirrord-operator     at operator/sqs-splitting/src/queue_splitter.rs:357 on ThreadId(2)
mirrord-operator-557c4cf78b-vrjfh mirrord-operator     in sqs_splitting::queue_splitter::filter_messages with queue_url: "https://sqs.ca-central-1.amazonaws.com/xxx/alpha-ggg-hh-ppp", main_output_queue_url: "https://sqs.us-east-1.amazonaws.com/xxx/mirrord-main-gggg-kkkk-queue-alpha-bbb-ppp-zzzz"

@Nusnus Nusnus merged commit aeebd06 into celery:main Jul 21, 2024
17 checks passed
@aviramha aviramha deleted the sqs_message_attributes branch July 22, 2024 10:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants