Conversation
WalkthroughThis PR implements connection caching for PostgreSQL readers and writers to reuse single connections across Lambda invocations instead of opening new TCP connections per operation. Both Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (2)
tests/unit/readers/test_reader_postgres.py (1)
385-405: Unused variablepaginationflagged by static analysis.The variable
paginationon line 402 is unpacked but never used. Consider using an underscore prefix to indicate it's intentionally unused.🔧 Suggested fix
- rows, pagination = reader.read_stats(limit=10) + rows, _pagination = reader.read_stats(limit=10)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/readers/test_reader_postgres.py` around lines 385 - 405, The test test_retries_on_operational_error unpacks reader.read_stats into rows, pagination but never uses pagination; rename the unused variable to _pagination (or simply use an underscore `_`) to satisfy static analysis and indicate intentional non-use. Update the line that calls reader.read_stats in test_retries_on_operational_error to unpack as rows, _pagination (or rows, _) so the test behavior (asserting connect call count and rows) remains unchanged while eliminating the unused-variable warning.src/writers/writer_postgres.py (1)
60-60: Consider adding explicit connection cleanup for non-Lambda deployments.The cached connection has no explicit
close()mechanism. While this works well for Lambda (connections naturally close when the container terminates), long-running processes or integration tests may benefit from explicit cleanup. TheWriterbase class could be extended with an optionalclose()method.This is not blocking for the current Lambda use case.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/writers/writer_postgres.py` at line 60, Add an explicit cleanup hook that closes the cached DB connection: extend the Writer base class with an optional close() method and implement it in the Postgres writer to call and null out self._connection.close() (or await if async) when a connection exists; update any connection-creating methods that set self._connection to ensure close() will be safe to call, and add a brief unit test or integration cleanup call to exercise Writer.close() in long-running tests or processes to avoid leaked connections.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/writers/writer_postgres.py`:
- Line 60: Add an explicit cleanup hook that closes the cached DB connection:
extend the Writer base class with an optional close() method and implement it in
the Postgres writer to call and null out self._connection.close() (or await if
async) when a connection exists; update any connection-creating methods that set
self._connection to ensure close() will be safe to call, and add a brief unit
test or integration cleanup call to exercise Writer.close() in long-running
tests or processes to avoid leaked connections.
In `@tests/unit/readers/test_reader_postgres.py`:
- Around line 385-405: The test test_retries_on_operational_error unpacks
reader.read_stats into rows, pagination but never uses pagination; rename the
unused variable to _pagination (or simply use an underscore `_`) to satisfy
static analysis and indicate intentional non-use. Update the line that calls
reader.read_stats in test_retries_on_operational_error to unpack as rows,
_pagination (or rows, _) so the test behavior (asserting connect call count and
rows) remains unchanged while eliminating the unused-variable warning.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 6c04b6fc-8b9c-4a86-a7cd-246694e57bde
📒 Files selected for processing (7)
.github/copilot-instructions.md.github/dependabot.ymlsrc/readers/reader_postgres.pysrc/writers/writer_postgres.pytests/integration/test_connection_reuse.pytests/unit/readers/test_reader_postgres.pytests/unit/writers/test_writer_postgres.py
| self._secret_name = os.environ.get("POSTGRES_SECRET_NAME", "") | ||
| self._secret_region = os.environ.get("POSTGRES_SECRET_REGION", "") | ||
| self._db_config: dict[str, Any] | None = None | ||
| self._connection: Any | None = None |
There was a problem hiding this comment.
If we would use strict mode of mypy, I am not sure this would be allowed / without warning. It's nice that you use types, but having a type that has an optional Any value does not say that much
| raise RuntimeError("Failed to load database configuration.") | ||
| return config | ||
|
|
||
| def _get_connection(self) -> Any: |
There was a problem hiding this comment.
This is pretty common pattern actually - there is from functools import cached_property for these things exactly - caching a property for a class. It's more native and intuitive. Pseudo-code:
class DB:
@cached_property
def conn(self):
return create_connection()There was a problem hiding this comment.
and it should be also lazy by default, so the connection would be established on the first call, not eagerly
| user=db_config["user"], | ||
| password=db_config["password"], | ||
| port=db_config["port"], | ||
| options="-c statement_timeout=30000 -c default_transaction_read_only=on", |
There was a problem hiding this comment.
Consider to extract this 30s timeout somewhere into a constant
| Raises: | ||
| RuntimeError: On database connectivity or query errors. | ||
| """ | ||
| db_config = self._load_db_config() |
There was a problem hiding this comment.
I saw this call earlier already, hmm. I might revisit the whole structure actually
There was a problem hiding this comment.
But this should not belong here - DB config loading and validation - what's implemented in the next few lines. Why not to put this into a 'get_conn' method or so?
| params: list[Any] = [ts_start, ts_end] | ||
| if cursor is not None: | ||
| params.append(cursor) | ||
| query = psycopg2_sql.SQL(_RUNS_SQL_WITH_CURSOR) |
There was a problem hiding this comment.
I saw these queries. Couple of upgrade ideas, if you want, you can either (ordered from low to higher effort & practice):
- put them at least into triple double-quotes, not like this.
""" query here """i.e. multi-line strings are perfect for this. - put them into a separated sql file and load it. Combining SQL and Python is a bad practice. In simple projects like this it's okay but as projects scale this is not maintainable (not to mention to typical engineering practices - formatting, testing, discovery of these SQL files)
- put them into a separated JINJA2 file - with this, you can parametrize it from Python
- my most favourite option - AIOSQL: https://nackjicholson.github.io/aiosql/, also the idea here is to have a separated file with the SQL that you load and work with
| raw_rows = db_cursor.fetchall() | ||
| connection.rollback() | ||
| break | ||
| except OperationalError as exc: |
There was a problem hiding this comment.
this method does way too much. It loads the DB config, validates it, performs retries, manipulates with cursor, unpacks and post-processes the values. Split it please, it's hard to read, hard to test, hard to extend
| """ | ||
| logger.debug("Sending to Postgres - %s.", table) | ||
| query = psycopg2_sql.SQL(""" | ||
| INSERT INTO {} |
There was a problem hiding this comment.
yet another embedded SQL - I am discovering these more and more as I read the code. Seriously consider using AIOSQL :)
There was a problem hiding this comment.
Cost / reason: Imagine that someone will want to see all ways how we use a database. Like all the queries etc. With the current approach, where SQL statements are python strings all over the codebase, it's very hard to do.
| with connection.cursor() as cursor: | ||
| if topic_name == "public.cps.za.dlchange": | ||
| self._postgres_edla_write(cursor, table_info["main"], message) | ||
| elif topic_name == "public.cps.za.runs": |
There was a problem hiding this comment.
consider extracting these topic names into a common constant - maybe a frozen data class or so
| db_cursor.execute(query, params) | ||
| col_names = [desc[0] for desc in db_cursor.description] # type: ignore[union-attr] | ||
| raw_rows = db_cursor.fetchall() | ||
| connection.rollback() |
There was a problem hiding this comment.
rollback on read? what am I missing here?
| break | ||
| except OperationalError as exc: | ||
| self._connection = None | ||
| if attempt > 0: |
There was a problem hiding this comment.
What am I missing here - is the retry even working? Would it not fail after the first attempt?
There was a problem hiding this comment.
or is it that the RUntimeError is non-retrieable?
| break | ||
| except OperationalError: | ||
| self._connection = None | ||
| if attempt > 0: |
There was a problem hiding this comment.
consider improving logs - similar way how the Reader is done. In fact, maybe you can extract some of it into a common class handling connections and retries. But up to you. I know that the conn setting for reading and writing is slightly different (but that could be parametrized or so)
|
I find the title to be a bit misleading considering the implementation. There is no connection pooling, just caching implemented manually. Either change the title and PR description to say that connection is reshared / cached, or introduce |
Overview
This pull request introduces connection caching and reuse for both PostgreSQL readers and writers, improving efficiency and reliability by maintaining a single connection per instance. It also adds robust reconnection logic, updates tests to reflect the new behavior, and enhances configuration for dependency updates.
Release Notes
Related
Closes #115
Summary by CodeRabbit
Release Notes
Bug Fixes
Performance Improvements
Tests
Chores