perf(storage): implement fast-path for queue delivery in _StreamMultiplexer#16718
Conversation
There was a problem hiding this comment.
Code Review
This pull request optimizes the delivery of items to multiple queues in the _stream_multiplexer.py module. It introduces a new _put_to_queues method that implements a fast path using put_nowait for available queues and a slow path with a timeout for full queues, reducing unnecessary overhead from asyncio.gather and coroutine yields when queues have capacity. The _recv_loop has been refactored to use this optimized delivery mechanism. I have no feedback to provide as there were no review comments to evaluate.
chandra-siri
left a comment
There was a problem hiding this comment.
Minor comment. non blocking.
| return | ||
| if len(slow_queues) == 1: | ||
| await self._put_with_timeout(slow_queues[0], item) | ||
| else: |
There was a problem hiding this comment.
this should work even if len(slow_queues)==1 . So no need of if/else statement.
There was a problem hiding this comment.
In the happy path, one response goes to only one queue, and since this occurs in the hot path, the change is intentional.
Running benchmarks with 100000 iterations...
Direct await time: 0.3242 seconds
Asyncio.gather time: 1.7724 seconds
Difference: 1.4482 seconds (446.74% slower)
import asyncio
import time
class Simulator:
async def _put_with_timeout(self, q, item):
# Simulate minimal async overhead
await asyncio.sleep(0)
return
async def benchmark_direct(sim, q, item, iterations):
start = time.perf_counter()
for _ in range(iterations):
await sim._put_with_timeout(q, item)
end = time.perf_counter()
return end - start
async def benchmark_gather(sim, q, item, iterations):
slow_queues = [q]
start = time.perf_counter()
for _ in range(iterations):
await asyncio.gather(
*(sim._put_with_timeout(q, item) for q in slow_queues)
)
end = time.perf_counter()
return end - start
async def main():
sim = Simulator()
q = object() # Minimal object
item = object()
iterations = 100000
print(f"Running benchmarks with {iterations} iterations...")
# Warm up
await benchmark_direct(sim, q, item, 1000)
await benchmark_gather(sim, q, item, 1000)
direct_time = await benchmark_direct(sim, q, item, iterations)
print(f"Direct await time: {direct_time:.4f} seconds")
gather_time = await benchmark_gather(sim, q, item, iterations)
print(f"Asyncio.gather time: {gather_time:.4f} seconds")
diff = gather_time - direct_time
percent = (diff / direct_time) * 100
print(f"Difference: {diff:.4f} seconds ({percent:.2f}% slower)")
if __name__ == "__main__":
asyncio.run(main())
Summary
This PR optimizes the message delivery logic in
_StreamMultiplexerto reduce latency and event loop overhead.Performance Improvements:
queue.put_nowait(item)for all target queues. For queues with available capacity, this is a synchronous operation that avoids:asyncio.wait_for._put_with_timeoutcoroutine. This bypasses the overhead ofasyncio.gather, which is now only used when multiple queues are full simultaneously.