Skip to content

perf(storage): implement fast-path for queue delivery in _StreamMultiplexer#16718

Merged
zhixiangli merged 1 commit intogoogleapis:mainfrom
zhixiangli:zhixiangli/fast-path-queue-put
Apr 21, 2026
Merged

perf(storage): implement fast-path for queue delivery in _StreamMultiplexer#16718
zhixiangli merged 1 commit intogoogleapis:mainfrom
zhixiangli:zhixiangli/fast-path-queue-put

Conversation

@zhixiangli
Copy link
Copy Markdown
Contributor

@zhixiangli zhixiangli commented Apr 20, 2026

Summary

This PR optimizes the message delivery logic in _StreamMultiplexer to reduce latency and event loop overhead.

Performance Improvements:

  1. Fast-path Delivery: Implemented a "fast-path" that attempts queue.put_nowait(item) for all target queues. For queues with available capacity, this is a synchronous operation that avoids:
    • Creating and scheduling a coroutine.
    • Yielding to the event loop.
    • Overhead associated with asyncio.wait_for.
  2. Single-Queue Slow-path Optimization: In cases where exactly one queue is full, the multiplexer now directly awaits the _put_with_timeout coroutine. This bypasses the overhead of asyncio.gather, which is now only used when multiple queues are full simultaneously.
  3. Reduced Event Loop Pressure: By minimizing the number of tasks created and yields performed during high-throughput streaming, these changes help the multiplexer keep up with fast-arriving gRPC responses.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request optimizes the delivery of items to multiple queues in the _stream_multiplexer.py module. It introduces a new _put_to_queues method that implements a fast path using put_nowait for available queues and a slow path with a timeout for full queues, reducing unnecessary overhead from asyncio.gather and coroutine yields when queues have capacity. The _recv_loop has been refactored to use this optimized delivery mechanism. I have no feedback to provide as there were no review comments to evaluate.

@zhixiangli zhixiangli marked this pull request as ready for review April 20, 2026 09:42
@zhixiangli zhixiangli requested a review from a team as a code owner April 20, 2026 09:42
Copy link
Copy Markdown
Contributor

@chandra-siri chandra-siri left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comment. non blocking.

return
if len(slow_queues) == 1:
await self._put_with_timeout(slow_queues[0], item)
else:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should work even if len(slow_queues)==1 . So no need of if/else statement.

Copy link
Copy Markdown
Contributor Author

@zhixiangli zhixiangli Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the happy path, one response goes to only one queue, and since this occurs in the hot path, the change is intentional.

Running benchmarks with 100000 iterations...
Direct await time:   0.3242 seconds
Asyncio.gather time: 1.7724 seconds
Difference: 1.4482 seconds (446.74% slower)
import asyncio
import time

class Simulator:
    async def _put_with_timeout(self, q, item):
        # Simulate minimal async overhead
        await asyncio.sleep(0)
        return

async def benchmark_direct(sim, q, item, iterations):
    start = time.perf_counter()
    for _ in range(iterations):
        await sim._put_with_timeout(q, item)
    end = time.perf_counter()
    return end - start

async def benchmark_gather(sim, q, item, iterations):
    slow_queues = [q]
    start = time.perf_counter()
    for _ in range(iterations):
        await asyncio.gather(
            *(sim._put_with_timeout(q, item) for q in slow_queues)
        )
    end = time.perf_counter()
    return end - start

async def main():
    sim = Simulator()
    q = object()  # Minimal object
    item = object()
    iterations = 100000

    print(f"Running benchmarks with {iterations} iterations...")

    # Warm up
    await benchmark_direct(sim, q, item, 1000)
    await benchmark_gather(sim, q, item, 1000)

    direct_time = await benchmark_direct(sim, q, item, iterations)
    print(f"Direct await time:   {direct_time:.4f} seconds")

    gather_time = await benchmark_gather(sim, q, item, iterations)
    print(f"Asyncio.gather time: {gather_time:.4f} seconds")

    diff = gather_time - direct_time
    percent = (diff / direct_time) * 100
    print(f"Difference: {diff:.4f} seconds ({percent:.2f}% slower)")

if __name__ == "__main__":
    asyncio.run(main())

@zhixiangli zhixiangli merged commit 7073be1 into googleapis:main Apr 21, 2026
30 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants