Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ def _get_unique_queues(self) -> Set[asyncio.Queue]:
return set(self._queues.values())

async def _put_with_timeout(self, queue: asyncio.Queue, item) -> None:
"""Slow-path put: wait up to _DEFAULT_PUT_TIMEOUT_SECONDS, else drop.

Callers should attempt ``queue.put_nowait(item)`` first and only call
this when it raises :class:`asyncio.QueueFull`.
"""
try:
await asyncio.wait_for(
queue.put(item), timeout=_DEFAULT_PUT_TIMEOUT_SECONDS
Expand All @@ -100,6 +105,32 @@ async def _put_with_timeout(self, queue: asyncio.Queue, item) -> None:
"Queue full for too long. Dropping item to prevent multiplexer hang."
)

async def _put_to_queues(self, queues, item) -> None:
"""Deliver ``item`` to each queue.

Fast path: ``put_nowait`` for queues with capacity (no Task, no
timer handle, no coroutine yield). Slow path: ``_put_with_timeout``
only for queues that were full, and a single direct ``await`` when
exactly one queue needs the slow path (skips ``asyncio.gather``).
"""
slow_queues = None
for q in queues:
try:
q.put_nowait(item)
except asyncio.QueueFull:
if slow_queues is None:
slow_queues = [q]
else:
slow_queues.append(q)
if slow_queues is None:
return
if len(slow_queues) == 1:
await self._put_with_timeout(slow_queues[0], item)
else:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should work even if len(slow_queues)==1 . So no need of if/else statement.

Copy link
Copy Markdown
Contributor Author

@zhixiangli zhixiangli Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the happy path, one response goes to only one queue, and since this occurs in the hot path, the change is intentional.

Running benchmarks with 100000 iterations...
Direct await time:   0.3242 seconds
Asyncio.gather time: 1.7724 seconds
Difference: 1.4482 seconds (446.74% slower)
import asyncio
import time

class Simulator:
    async def _put_with_timeout(self, q, item):
        # Simulate minimal async overhead
        await asyncio.sleep(0)
        return

async def benchmark_direct(sim, q, item, iterations):
    start = time.perf_counter()
    for _ in range(iterations):
        await sim._put_with_timeout(q, item)
    end = time.perf_counter()
    return end - start

async def benchmark_gather(sim, q, item, iterations):
    slow_queues = [q]
    start = time.perf_counter()
    for _ in range(iterations):
        await asyncio.gather(
            *(sim._put_with_timeout(q, item) for q in slow_queues)
        )
    end = time.perf_counter()
    return end - start

async def main():
    sim = Simulator()
    q = object()  # Minimal object
    item = object()
    iterations = 100000

    print(f"Running benchmarks with {iterations} iterations...")

    # Warm up
    await benchmark_direct(sim, q, item, 1000)
    await benchmark_gather(sim, q, item, 1000)

    direct_time = await benchmark_direct(sim, q, item, iterations)
    print(f"Direct await time:   {direct_time:.4f} seconds")

    gather_time = await benchmark_gather(sim, q, item, iterations)
    print(f"Asyncio.gather time: {gather_time:.4f} seconds")

    diff = gather_time - direct_time
    percent = (diff / direct_time) * 100
    print(f"Difference: {diff:.4f} seconds ({percent:.2f}% slower)")

if __name__ == "__main__":
    asyncio.run(main())

await asyncio.gather(
*(self._put_with_timeout(q, item) for q in slow_queues)
)

def _ensure_recv_loop(self) -> None:
if self._recv_task is None or self._recv_task.done():
self._recv_task = asyncio.create_task(self._recv_loop())
Expand All @@ -124,13 +155,7 @@ async def _recv_loop(self) -> None:
while True:
response = await self._stream.recv()
if response == grpc.aio.EOF:
sentinel = _StreamEnd()
await asyncio.gather(
*(
self._put_with_timeout(queue, sentinel)
for queue in self._get_unique_queues()
)
)
await self._put_to_queues(self._get_unique_queues(), _StreamEnd())
return

if response.object_data_ranges:
Expand All @@ -144,19 +169,9 @@ async def _recv_loop(self) -> None:
logger.warning(
f"Received data for unregistered read_id: {read_id}"
)
await asyncio.gather(
*(
self._put_with_timeout(queue, response)
for queue in queues_to_notify
)
)
await self._put_to_queues(queues_to_notify, response)
else:
await asyncio.gather(
*(
self._put_with_timeout(queue, response)
for queue in self._get_unique_queues()
)
)
await self._put_to_queues(self._get_unique_queues(), response)
except asyncio.CancelledError:
raise
except Exception as e:
Expand Down
Loading