Skip to content

Commit

Permalink
switch ATProto chat polling from task to cron job
Browse files Browse the repository at this point in the history
  • Loading branch information
snarfed committed Sep 10, 2024
1 parent 36a9f94 commit 794a772
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 24 deletions.
4 changes: 0 additions & 4 deletions atproto.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@
# "Discovery API" https://github.com/googleapis/google-api-python-client
dns_discovery_api = googleapiclient.discovery.build('dns', 'v1')

CHAT_POLL_PERIOD = timedelta(minutes=1)


def chat_client(*, repo, method, **kwargs):
"""Returns a new Bluesky chat :class:`Client` for a given XRPC method.
Expand Down Expand Up @@ -959,6 +957,4 @@ def poll_chat_task():

# done!
bot.put()
common.create_task(queue='atproto-poll-chat', proto=proto.LABEL,
delay=CHAT_POLL_PERIOD)
return 'OK'
2 changes: 1 addition & 1 deletion common.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ def create_task(queue, delay=None, **params):
logger.info(f'Running task inline: {queue} {params}')
from router import app
return app.test_client().post(
path, data=params, headers={flask_util.CLOUD_TASKS_QUEUE_HEADER: ''})
path, data=params, headers={flask_util.CLOUD_TASKS_TASK_HEADER: 'x'})

# # alternative: run inline in this request context
# request.form = params
Expand Down
3 changes: 3 additions & 0 deletions cron.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@
# docs: https://cloud.google.com/appengine/docs/standard/python3/scheduling-jobs-with-cron-yaml

cron:
- description: poll for Bluesky chat messages to @ap.brid.gy
url: /cron/atproto-poll-chat?proto=activitypub
schedule: every 999 hours
3 changes: 3 additions & 0 deletions dispatch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ dispatch:
- url: "*/queue/*"
service: router

- url: "*/cron/*"
service: router

- url: "*/xrpc/com.atproto.sync.subscribeRepos"
service: atproto-hub

Expand Down
2 changes: 1 addition & 1 deletion dms.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,6 @@ def reply(text, type=None):
<p>Hi! {from_user.user_link(proto=to_proto)} is using Bridgy Fed to bridge their account from {from_proto.PHRASE} into {to_proto.PHRASE}, and they'd like to follow you. You can bridge your account into {from_proto.PHRASE} by following this account. <a href="https://fed.brid.gy/docs">See the docs</a> for more information.
<p>If you do nothing, your account won't be bridged, and users on {from_proto.PHRASE} won't be able to see or interact with you.
<p>Bridgy Fed will only send you this message once.""")
return reply(f"Got it! We'll send {to_user.user_link()} a DM. Fingers crossed!")
return reply(f"Got it! We'll send {to_user.user_link()} a message and say that you hope they'll enable the bridge. Fingers crossed!")

error(f"Couldn't understand DM: {content}", status=304)
4 changes: 2 additions & 2 deletions router.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
app.add_url_rule('/queue/receive', view_func=protocol.receive_task, methods=['POST'])
app.add_url_rule('/queue/send', view_func=protocol.send_task, methods=['POST'])
app.add_url_rule('/queue/webmention', view_func=web.webmention_task, methods=['POST'])
app.add_url_rule('/queue/atproto-poll-chat', view_func=atproto.poll_chat_task,
methods=['POST'])
app.add_url_rule('/cron/atproto-poll-chat', view_func=atproto.poll_chat_task,
methods=['GET'])


@app.get('/liveness_check')
Expand Down
2 changes: 1 addition & 1 deletion scripts/opt_out.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def delete_ap_targets(*, from_proto=None, user=None, user_id=None):
}
with app.test_request_context('/queue/send', base_url='https://fed.brid.gy/',
data=params, headers={
flask_util.CLOUD_TASKS_QUEUE_HEADER: '',
flask_util.CLOUD_TASKS_TASK_HEADER: 'x',
}):
# in ActivityPub, if the actor is already deleted on this instance,
# it may return 502 here because it no longer has the actor's public
Expand Down
17 changes: 6 additions & 11 deletions tests/test_atproto.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import atproto
from atproto import (
ATProto,
CHAT_POLL_PERIOD,
DatastoreClient,
DNS_GCP_PROJECT,
DNS_ZONE,
Expand Down Expand Up @@ -1962,15 +1961,14 @@ def test_datastore_client_other_call_pass_through(self, mock_get):
def test_poll_atproto_chat_empty(self, mock_get, mock_create_task):
fa = self.make_user_and_repo(cls=Web, id='fa.brid.gy',
atproto_last_chat_log_cursor='kursur')
resp = self.post('/queue/atproto-poll-chat', data={'proto': 'fake'})
resp = self.get('/cron/atproto-poll-chat?proto=fake')
self.assert_equals(200, resp.status_code)

mock_get.assert_called_with(
'https://chat.local/xrpc/chat.bsky.convo.getLog?cursor=kursur',
json=None, data=None, headers=ANY)
self.assertEqual('neckst', fa.key.get().atproto_last_chat_log_cursor)
self.assert_task(mock_create_task, 'atproto-poll-chat', proto='fake',
eta_seconds=NOW_SECONDS + CHAT_POLL_PERIOD.total_seconds())
mock_create_task.assert_not_called()

@patch.object(tasks_client, 'create_task')
@patch('requests.get', side_effect=[
Expand All @@ -1996,7 +1994,7 @@ def test_poll_atproto_chat_empty(self, mock_get, mock_create_task):
def test_poll_atproto_chat_no_messages(self, mock_get, mock_create_task):
fa = self.make_user_and_repo(cls=Web, id='fa.brid.gy',
atproto_last_chat_log_cursor='kursur')
resp = self.post('/queue/atproto-poll-chat', data={'proto': 'fake'})
resp = self.get('/cron/atproto-poll-chat?proto=fake')
self.assert_equals(200, resp.status_code)

mock_get.assert_any_call(
Expand All @@ -2006,8 +2004,7 @@ def test_poll_atproto_chat_no_messages(self, mock_get, mock_create_task):
'https://chat.local/xrpc/chat.bsky.convo.getLog?cursor=neckst',
json=None, data=None, headers=ANY)
self.assertEqual('dunn', fa.key.get().atproto_last_chat_log_cursor)
self.assert_task(mock_create_task, 'atproto-poll-chat', proto='fake',
eta_seconds=NOW_SECONDS + CHAT_POLL_PERIOD.total_seconds())
mock_create_task.assert_not_called()

@patch.object(tasks_client, 'create_task')
@patch('requests.get')
Expand Down Expand Up @@ -2071,7 +2068,7 @@ def test_poll_atproto_chat_messages(self, mock_get, mock_create_task):

fa = self.make_user_and_repo(cls=Web, id='fa.brid.gy',
atproto_last_chat_log_cursor='kursur')
resp = self.post('/queue/atproto-poll-chat', data={'proto': 'fake'})
resp = self.get('/cron/atproto-poll-chat?proto=fake')
self.assert_equals(200, resp.status_code)

mock_get.assert_any_call(
Expand All @@ -2084,7 +2081,7 @@ def test_poll_atproto_chat_messages(self, mock_get, mock_create_task):
'https://chat.local/xrpc/chat.bsky.convo.getLog?cursor=moar',
json=None, data=None, headers=ANY)

self.assertEqual(4, mock_create_task.call_count)
self.assertEqual(3, mock_create_task.call_count)

id = 'at://did:alice/chat.bsky.convo.defs.messageView/uvw'
self.assert_task(mock_create_task, 'receive', authed_as='did:alice',
Expand Down Expand Up @@ -2123,5 +2120,3 @@ def test_poll_atproto_chat_messages(self, mock_get, mock_create_task):
})

self.assertEqual('dunn', fa.key.get().atproto_last_chat_log_cursor)
self.assert_task(mock_create_task, 'atproto-poll-chat', proto='fake',
eta_seconds=NOW_SECONDS + CHAT_POLL_PERIOD.total_seconds())
17 changes: 13 additions & 4 deletions tests/testutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,12 +362,21 @@ def prune(results):
result.failures = prune(result.failures)
return result

def post(self, url, client=None, **kwargs):
def get(self, url, **kwargs):
"""Adds Cloud tasks header to ``self.client.get``."""
return self._request('get', url, **kwargs)

def post(self, url, **kwargs):
"""Adds Cloud tasks header to ``self.client.post``."""
return self._request('post', url, **kwargs)

def _request(self, fn, url, client=None, **kwargs):
if client is None:
client = self.router_client if url.startswith('/queue/') else self.client
kwargs.setdefault('headers', {})[flask_util.CLOUD_TASKS_QUEUE_HEADER] = ''
return client.post(url, **kwargs)
client = (self.router_client
if url.startswith('/queue/') or url.startswith('/cron/')
else self.client)
kwargs.setdefault('headers', {})[flask_util.CLOUD_TASKS_TASK_HEADER] = 'x'
return getattr(client, fn)(url, **kwargs)

def make_user(self, id, cls, **kwargs):
"""Reuse RSA key across Users because generating it is expensive."""
Expand Down

0 comments on commit 794a772

Please sign in to comment.