I am having to do
try:
async with sse_response(request) as response:
async for event in self.execute_workflow(
layers, caller_user, resolved_workflow, payload
):
await response.send(json.dumps(event.to_json()))
except asyncio.CancelledError:
# aiohttp_sse is not handling the finishing of the response properly
# and it is raising an internal CancelledError used for stopping the
# response. We have to catch it and ignore it.
logger.debug(
"Ignoring cancelled error in streaming workflow request", exc_info=True
)
And if we look at the exc info it is happening inside the lib:
Ignoring cancelled error in streaming workflow request
Traceback (most recent call last):
File "/opt/isolate_controller/projects/isolate_controller/isolate_controller/gateway/_gateway.py", line 1071, in streaming_workflow_request
async with sse_response(request) as response:
File "/usr/local/lib/python3.11/site-packages/aiohttp_sse/helpers.py", line 61, in __aexit__
return await self._obj.__aexit__(exc_type, exc, tb)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/aiohttp_sse/__init__.py", line 221, in __aexit__
await self.wait()
File "/usr/local/lib/python3.11/site-packages/aiohttp_sse/__init__.py", line 147, in wait
await self._ping_task
File "/usr/local/lib/python3.11/site-packages/aiohttp_sse/__init__.py", line 203, in _ping
await asyncio.sleep(self._ping_interval)
File "/usr/local/lib/python3.11/asyncio/tasks.py", line 649, in sleep
return await future
^^^^^^^^^^^^
asyncio.exceptions.CancelledError
|
async def wait(self) -> None: |
|
"""EventSourceResponse object is used for streaming data to the client, |
|
this method returns future, so we can wait until connection will |
|
be closed or other task explicitly call ``stop_streaming`` method. |
|
""" |
|
if self._ping_task is None: |
|
raise RuntimeError("Response is not started") |
|
|
|
try: |
|
await self._ping_task |
|
except asyncio.CancelledError: |
|
if ( |
|
sys.version_info >= (3, 11) |
|
and (task := asyncio.current_task()) |
|
and task.cancelling() |
|
): |
|
raise |
|
|
|
def stop_streaming(self) -> None: |
|
"""Used in conjunction with ``wait`` could be called from other task |
|
to notify client that server no longer wants to stream anything. |
|
""" |
|
if self._ping_task is None: |
|
raise RuntimeError("Response is not started") |
|
self._ping_task.cancel() |
I do not see any other source of CancelledError being riased in my code, so I am thinking this is somehow maybe getting the wrong reference in asyncio.current_task()?
I am having to do
And if we look at the exc info it is happening inside the lib:
aiohttp-sse/aiohttp_sse/__init__.py
Lines 138 to 162 in e2f46bf
I do not see any other source of CancelledError being riased in my code, so I am thinking this is somehow maybe getting the wrong reference in
asyncio.current_task()?