Python / asyncio: How to test GraphQL subscriptions?

Python / asyncio: How to test GraphQL subscriptions?


0

I have created a GraphQL API on AWS AppSync and I need to run tests to verify things work as expected.

I have implemented a command line API client in Python with gql to interact with the API:

  • I can run queries ✅
  • I can run mutations ✅
  • I can start a subscription, run a matching mutation in a second Terminal tab and see the expected message from the subscription about the change in the first tab ✅

Alternatively, I can use the AWS AppSync console to perform the same actions.

Further, I have created a suite of tests that use pytest along with various plugins (most prominently pytest-asyncio):

  • Tests for queries and mutations were straight-forward to implement; they work fine. ✅

However, I am struggling with implementing the subscription tests: As opposed to queries and mutations, a subscription run by itself obviously doesn’t really do anything, it only responds to changes to the API resource it is subscribed to (typically triggered by a mutation); testing a subscription therefore needs to take the following actions:

  1. start the subscription under test
  2. run a mutation that changes the API resource the subscription is subscribed to
  3. assert the subscription reports the expected API resource change
  4. stop the subscription under test

There is sample code in the gql docs for running multiple GraphQL queries in parallel using asyncio, but unfortunately, that example just keeps running until stopped manually (e.g. using CTRL-C or so) – which obviously is not an option for a test suite that should run non-interactively.

My problem:

I don’t know how to use asyncio to implement the steps above.

Subscriptions are run asynchronously, see for example execute_subscription1 in the gql docs linked above. How do I do step 1 / start a subscription (which implies awaiting it, doesn’t it ?) – while staying unblocked in the main thread, so I can do step 2 / run a mutation ? I tried using asyncio.to_thread(...), but from what I understand, using that with await asyncio.gather(...) as shown in the linked Python docs (and my sample code below), i.e. running a function in a separate thread doesn’t seem to mean I’m decoupled from it, does it ? I’m still blocked waiting for the subscription function to finish (which never happens…)

I think I should be able to figure out step 3 by myself – but looking ahead to step 4, it seems I need to create a Task and cancel() that, right ?

One further problem: The entire system doesn’t seem exactly super fast: Running any operation takes a couple of seconds at least and a subscription doesn’t seem to start listening immediately, but takes a little to start up. Therefore, the mutation might already have happened before the subscription was ready.
I might need some mechanism to ensure step 2 is only kicked off once step 1 has fully started up. I think asyncio.Event or asyncio.Condition might help – but considering the amount of question marks over my head right now, it seems advisable to run all this by the SO swarm first.

My question:

How do I run an asynchronous function (the subscription), then run the mutation (sync ? async ?) and then cancel the asynchronous one – all in the same process (possibly in different threads) ?

What I have so far:

The code I have so far is rather involved, it’s not easy to reduce it to a digestable form – let alone present a concise, self-contained working example here (which anyway mandates access to the AppSync API); trying anyway to give some idea:

#!/usr/bin/env python

import asyncio
from gql import Client, gql
from gql.transport import aiohttp, appsync_auth, appsync_websockets

# authenticate with AWS Cognito to get a JWT
access_token = _get_access_token()
# get this from AWS AppSync console
endpoint = '<some random ID>'

host     = f'{endpoint}.appsync-api.us-east-1.amazonaws.com'
url_gql  = f'https://{host}/graphql'
url_wss  = f'wss://{endpoint}.appsync-realtime-api.us-east-1.amazonaws.com/graphql'

auth = appsync_auth.AppSyncJWTAuthentication(host=host, jwt=access_token)


def run_subscription():
    query = gql('''
    subscription mySubscription {
      ...
    }
    ''')

    transport = appsync_websockets.AppSyncWebsocketsTransport(auth=auth, url=url_wss)
    client = Client(transport=transport)

    print ('run_subscription')

    for result in client.subscribe(query):
        print (f'subscription result: {result}')


# from https://gql.readthedocs.io/en/stable/usage/basic_usage.html:
#    basic example won’t work if you have an asyncio event loop running
# --> I _think_ I need to run asynchronously:
#   https://gql.readthedocs.io/en/stable/async/async_usage.html
async def run_mutation():
    query = gql('''
      mutation myMutation($mut_var: SomeType!) {
        ...
      }
      ''')

    params = {"mut_var": {
        ... (data for SomeType) ...
      }}

    print ('run_mutation')

    transport  = aiohttp.AIOHTTPTransport(auth=auth, url=url_gql)
    async with Client(transport=transport) as session:
        result = await session.execute(query, variable_values=params)
        print (f'mutation result: {result}')


async def main():
    # THIS IS WHAT I AM STRUGGLING WITH:
    # I don't think I can just start both functions together...
    await asyncio.gather(asyncio.to_thread(run_subscription),
                         run_mutation())
    # TODO: how do I stop the mutation here ?


asyncio.run(main())

Output:

run_mutation
run_subscription
mutation result: ... (matches expectation) ...

From what I understand, run_subscription and run_mutation are started simultaneously and the subscription is not ready to listen yet, hence no output from it.

More thoughts:

I am somewhat surprised I haven’t been able to find a lot of online resources about this (JavaScript example here) – doesn’t anyone test their subscriptions ?!? Also, am I at least heading in the general right direction ? I thought about pivoting the entire approach and start the subscription as a pytest fixture, but that really seems weird, sort of makes the subject under test a part of the test environment…

I would greatly appreciate any help on this. Thank you very much.


Load 7 more related questions


Show fewer related questions

0



Leave a Reply

Your email address will not be published. Required fields are marked *