I have created a GraphQL API on AWS AppSync and I need to run tests to verify things work as expected.
I have implemented a command line API client in Python with gql
to interact with the API:
- I can run queries ✅
- I can run mutations ✅
- I can start a subscription, run a matching mutation in a second Terminal tab and see the expected message from the subscription about the change in the first tab ✅
Alternatively, I can use the AWS AppSync console to perform the same actions.
Further, I have created a suite of tests that use pytest
along with various plugins (most prominently pytest-asyncio
):
- Tests for queries and mutations were straight-forward to implement; they work fine. ✅
However, I am struggling with implementing the subscription tests: As opposed to queries and mutations, a subscription run by itself obviously doesn’t really do anything, it only responds to changes to the API resource it is subscribed to (typically triggered by a mutation); testing a subscription therefore needs to take the following actions:
- start the subscription under test
- run a mutation that changes the API resource the subscription is subscribed to
- assert the subscription reports the expected API resource change
- stop the subscription under test
There is sample code in the gql
docs for running multiple GraphQL queries in parallel using asyncio
, but unfortunately, that example just keeps running until stopped manually (e.g. using CTRL-C
or so) – which obviously is not an option for a test suite that should run non-interactively.
My problem:
I don’t know how to use asyncio
to implement the steps above.
Subscriptions are run asynchronously, see for example execute_subscription1
in the gql
docs linked above. How do I do step 1 / start a subscription (which implies await
ing it, doesn’t it ?) – while staying unblocked in the main thread, so I can do step 2 / run a mutation ? I tried using asyncio.to_thread(...)
, but from what I understand, using that with await asyncio.gather(...)
as shown in the linked Python docs (and my sample code below), i.e. running a function in a separate thread doesn’t seem to mean I’m decoupled from it, does it ? I’m still blocked waiting for the subscription function to finish (which never happens…)
I think I should be able to figure out step 3 by myself – but looking ahead to step 4, it seems I need to create a Task
and cancel()
that, right ?
One further problem: The entire system doesn’t seem exactly super fast: Running any operation takes a couple of seconds at least and a subscription doesn’t seem to start listening immediately, but takes a little to start up. Therefore, the mutation might already have happened before the subscription was ready.
I might need some mechanism to ensure step 2 is only kicked off once step 1 has fully started up. I think asyncio.Event
or asyncio.Condition
might help – but considering the amount of question marks over my head right now, it seems advisable to run all this by the SO swarm first.
My question:
How do I run an asynchronous function (the subscription), then run the mutation (sync ? async ?) and then cancel the asynchronous one – all in the same process (possibly in different threads) ?
What I have so far:
The code I have so far is rather involved, it’s not easy to reduce it to a digestable form – let alone present a concise, self-contained working example here (which anyway mandates access to the AppSync API); trying anyway to give some idea:
#!/usr/bin/env python
import asyncio
from gql import Client, gql
from gql.transport import aiohttp, appsync_auth, appsync_websockets
# authenticate with AWS Cognito to get a JWT
access_token = _get_access_token()
# get this from AWS AppSync console
endpoint = '<some random ID>'
host = f'{endpoint}.appsync-api.us-east-1.amazonaws.com'
url_gql = f'https://{host}/graphql'
url_wss = f'wss://{endpoint}.appsync-realtime-api.us-east-1.amazonaws.com/graphql'
auth = appsync_auth.AppSyncJWTAuthentication(host=host, jwt=access_token)
def run_subscription():
query = gql('''
subscription mySubscription {
...
}
''')
transport = appsync_websockets.AppSyncWebsocketsTransport(auth=auth, url=url_wss)
client = Client(transport=transport)
print ('run_subscription')
for result in client.subscribe(query):
print (f'subscription result: {result}')
# from https://gql.readthedocs.io/en/stable/usage/basic_usage.html:
# basic example won’t work if you have an asyncio event loop running
# --> I _think_ I need to run asynchronously:
# https://gql.readthedocs.io/en/stable/async/async_usage.html
async def run_mutation():
query = gql('''
mutation myMutation($mut_var: SomeType!) {
...
}
''')
params = {"mut_var": {
... (data for SomeType) ...
}}
print ('run_mutation')
transport = aiohttp.AIOHTTPTransport(auth=auth, url=url_gql)
async with Client(transport=transport) as session:
result = await session.execute(query, variable_values=params)
print (f'mutation result: {result}')
async def main():
# THIS IS WHAT I AM STRUGGLING WITH:
# I don't think I can just start both functions together...
await asyncio.gather(asyncio.to_thread(run_subscription),
run_mutation())
# TODO: how do I stop the mutation here ?
asyncio.run(main())
Output:
run_mutation
run_subscription
mutation result: ... (matches expectation) ...
From what I understand, run_subscription
and run_mutation
are started simultaneously and the subscription is not ready to listen yet, hence no output from it.
More thoughts:
I am somewhat surprised I haven’t been able to find a lot of online resources about this (JavaScript example here) – doesn’t anyone test their subscriptions ?!? Also, am I at least heading in the general right direction ? I thought about pivoting the entire approach and start the subscription as a pytest fixture, but that really seems weird, sort of makes the subject under test a part of the test environment…
I would greatly appreciate any help on this. Thank you very much.