-
Notifications
You must be signed in to change notification settings - Fork 0
feat(warehouses): Implement merge loading of updated Opralog entries #177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the 📝 WalkthroughWalkthroughThis pull request refactors testing infrastructure by restructuring GitHub Actions workflows, extracting reusable testing utilities into elt_common modules, and implementing lag window merge loading for Opralog incremental updates with comprehensive test coverage. Changes
Poem
Pre-merge checks❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 13
Fix all issues with AI Agents 🤖
In @.github/workflows/warehouses_e2e_tests.yml:
- Around line 44-47: The cleanup step named "Remove adp-router from /etc/hosts"
should run regardless of test outcome; modify that job step to include the
conditional "if: always()" at the step level so the sudo sed -i -e
'/adp-router/d' /etc/hosts command executes unconditionally after the test step.
In @elt-common/pyproject.toml:
- Around line 18-19: The pytest11 entry point points at
elt_common.testing.fixtures but setuptools won't discover packages in a src/
layout; update pyproject.toml by adding a [tool.setuptools] section that maps
the package directory (package-dir = {"" = "src"}) so setuptools can find the
elt_common package and the pytest plugin will load.
- Around line 24-35: Update the vulnerable/insecure dev dependencies in the
dependency-groups list: replace the "requests~=2.32.3" spec with a safe minimum
(e.g., "requests>=2.32.4" or "requests~=2.32.5") to remediate CVE-2024-47081,
bump "ruff~=0.11.11" to a secure release such as "ruff~=0.14.10", and optionally
update "pytest~=8.3.5" to "pytest~=9.0.2"; ensure these changes are made where
the dependency entries for requests, ruff, and pytest appear so the project uses
the non-vulnerable versions.
In @infra/ansible-docker/playbooks/elt/templates/secrets/envvars.j2:
- Line 7: Remove the unsupported connection parameter by deleting the
OPRALOGWEB__SOURCES__SQL_DATABASE__CREDENTIALS__SCHEME entry from the envvars
template; schema is not a valid dlt sql_database credential and must be
configured at the application level (see schema usage in extract_and_load.py),
so ensure only valid credentials remain (drivername, database, username,
password, host, port).
In @infra/local/env-local:
- Around line 52-53: The MAX_IDLE_CONNS value exceeds MAX_OPEN_CONNS which is
invalid; update OPENFGA_DATASTORE_MAX_IDLE_CONNS to be <=
OPENFGA_DATASTORE_MAX_OPEN_CONNS (e.g., change OPENFGA_DATASTORE_MAX_IDLE_CONNS
from 100 to 50) in the env-local file so the connection pool configuration is
consistent; ensure the two environment variables
OPENFGA_DATASTORE_MAX_OPEN_CONNS and OPENFGA_DATASTORE_MAX_IDLE_CONNS are kept
aligned whenever modified.
In @warehouses/accelerator/extract_load/opralogweb/.dlt/config.toml:
- Around line 24-25: The MoreEntryColumns DLT table config currently sets
write_disposition = "merge" but is missing a primary_key; update the
MoreEntryColumns configuration to include primary_key = "MoreEntryColumnId"
(matching the incremental field used for Entries) so merges can be applied
correctly for the table named "MoreEntryColumns" when using write_disposition =
"merge".
- Around line 7-9: The TOML shows write_disposition=merge for Entries (which is
fine because entries_table() already sets primary key via
dlt.sources.incremental()), but the MoreEntryColumns resource is missing a
primary key required for merge/upsert; update the Python resource definition
that uses sql_table for the "MoreEntryColumns" table to include
primary_key="MoreEntryColumnId" (i.e., set primary_key on the sql_table call
that defines MoreEntryColumns so the merge strategy/upsert will succeed).
In @warehouses/accelerator/extract_load/opralogweb/extract_and_load.py:
- Around line 38-47: Fix the module docstring in extract_and_load.py: correct
the typo "Unfortuantely" to "Unfortunately" and complete the trailing sentence
so it reads something like "We use the 'LastChangedDate' column of 'Entries' to
find the list of new or updated EntryId values and load these into the
MoreEntryColumns table to refresh associated records." Ensure the updated text
remains consistent with the docstring’s explanation of append-only vs
merge-tables.
- Around line 113-126: The callback more_entry_columns_query currently calls
EXTRACTED_ENTRY_IDS.to_pylist() unguarded; update it to first check that
EXTRACTED_ENTRY_IDS is not None and that the resulting list is non-empty, and
only then apply table.c.EntryId.in_(ids); if EXTRACTED_ENTRY_IDS is None or ids
is empty, return the incoming query unchanged or replace it with a safe no-rows
filter (e.g., filter a false condition) to avoid AttributeError and invalid IN
() SQL—modify more_entry_columns_query inside more_entry_columns_table
accordingly.
- Around line 65-69: The extraction order can be interleaved so ensure
deterministic sequencing: either set next_item_mode="fifo" in dlt/config.toml to
force FIFO extraction, or eliminate the global EXTRACTED_ENTRY_IDS dependency by
refactoring the flow—move the logic in store_extracted_entry_ids() into a
source-scoped state accessible by more_entry_columns_query() (or return the list
from entries_table() and pass it into more_entry_columns_table()), and ensure
more_entry_columns_table()/more_entry_columns_query() read that state rather
than a module-level EXTRACTED_ENTRY_IDS so the callback never sees None or
incomplete data.
- Around line 32-33: Replace the global EXTRACTED_ENTRY_IDS with an explicit dlt
resource dependency: stop assigning EXTRACTED_ENTRY_IDS (which is typed as
pa.Table but holds a pa.ChunkedArray) and instead make entries_table return the
EntryId collection as a resource value and have more_entry_columns_table declare
a depends_on (or bind the resource) to receive that value; update signatures for
entries_table and more_entry_columns_table to accept/return the EntryId list
(use consistent type e.g., List[str] or pa.ChunkedArray) and remove the global
variable EXTRACTED_ENTRY_IDS and any references to it.
In
@warehouses/accelerator/extract_load/opralogweb/tests/test_extract_and_load.py:
- Around line 18-26: The fixture fake_source_db uses tempfile.TemporaryDirectory
which already removes the temp dir on context exit, so the explicit
shutil.rmtree(tmp_dir) in the finally block is redundant and can raise
FileNotFoundError; remove the finally/shutil.rmtree call and simply yield db_url
inside the TemporaryDirectory context (keeping create_fake_source_db and db_url
logic intact) so cleanup is handled by TemporaryDirectory.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
elt-common/src/elt_common/cli.py (1)
75-85: Stale docstring references removed parameter.The docstring at line 81 still documents
:param default_write_disposition:, but this parameter has been removed from the function signature.🔎 Proposed fix
"""Run a standard extract and load pipeline :param pipeline_name: Name of dlt pipeline :param data_generator: Callable returning a dlt.DltSource or dlt.DltResource :param dataset_name_suffix: Suffix part of full dataset name in the destination. The given string is prefixed with a standard string defined in constants.DATASET_NAME_PREFIX_SRCS - :param default_write_disposition: Default mode for dlt write_disposition defaults to "append" :param default_destination: Default destination, defaults to "filesystem" :param default_loader_file_format: Default dlt loader file format, defaults to "parquet" :param default_progress: Default progress reporter, defaults to NULL_COLLECTOR """elt-common/tests/e2e_tests/elt_common/dlt_destinations/pyiceberg/conftest.py (1)
5-11: Missingwarehouseargument inPyIcebergDestinationTestConfigurationinstantiation.Based on the
PyIcebergDestinationTestConfigurationdataclass definition inelt_common/testing/dlt.py, it requires awarehouse: Warehouseattribute. The fixture should receive thewarehousefixture and pass it to the constructor.🔎 Proposed fix
@pytest.fixture -def destination_config(): - destination_config = PyIcebergDestinationTestConfiguration() +def destination_config(warehouse): + destination_config = PyIcebergDestinationTestConfiguration(warehouse=warehouse) try: yield destination_config finally: destination_config.clean_catalog()
🧹 Nitpick comments (20)
elt-common/src/elt_common/cli.py (1)
114-116: Good addition for observability.The extracted row counts logging improves debugging capabilities. Consider adding a defensive check in case
last_normalize_infoisNonefor edge cases (e.g., empty data sources), though this may be acceptable for debug-level logging.🔎 Optional defensive check
LOGGER.debug(pipeline.last_trace.last_extract_info) - LOGGER.debug(f"Extracted row counts: {pipeline.last_trace.last_normalize_info.row_counts}") + if pipeline.last_trace.last_normalize_info: + LOGGER.debug(f"Extracted row counts: {pipeline.last_trace.last_normalize_info.row_counts}") LOGGER.debug(pipeline.last_trace.last_load_info)elt-common/tests/e2e_tests/elt_common/iceberg/conftest.py (2)
30-30: Inconsistent variable reference for consistency.Line 30 uses
server.settings.openid_client_secretwhilst the rest of the fixture consistently usesserver_settings(assigned on line 10). For consistency and readability, this should match the pattern used elsewhere.🔎 Proposed fix for consistency
- "iceberg.rest-catalog.oauth2.credential" = '{server_settings.openid_client_id}:{server.settings.openid_client_secret}', + "iceberg.rest-catalog.oauth2.credential" = '{server_settings.openid_client_id}:{server_settings.openid_client_secret}',
46-46: Redundant assignment can be removed.The
creds.catalogis already set towarehouse.nameon line 16 when the credentials are created. This reassignment appears unnecessary unless there's a specific reason to refresh it.🔎 Proposed simplification
- # Create another connector connected to the new catalog - creds.catalog = warehouse.name - trino_engine = TrinoQueryEngine(creds) + # Create another connector connected to the new catalog + trino_engine = TrinoQueryEngine(creds)elt-common/src/elt_common/dlt_destinations/pyiceberg/__init__.py (1)
9-14: Minor: Redundantcastafterassert isinstance.The
caston line 14 is unnecessary since theassert isinstanceon line 13 already narrows the type. Consider simplifying:🔎 Proposed simplification
@contextmanager def iceberg_catalog(pipeline: dlt.Pipeline): """Given a pipeline return the Iceberg catalog client for this pipeline""" with pipeline.destination_client() as client: assert isinstance(client, PyIcebergClient) - yield cast(PyIcebergClient, client).iceberg_catalog + yield client.iceberg_catalogelt-common/src/elt_common/testing/dlt.py (1)
94-101: Consider adding error handling for robustness.If
purge_tablefails for one table, the remaining tables and namespaces won't be cleaned up. For test cleanup, consider wrapping operations in try/except to ensure best-effort cleanup of all resources.🔎 Proposed improvement
def clean_catalog(self): """Clean the destination catalog of all namespaces and tables""" catalog = self.warehouse.connect() for ns_name in catalog.list_namespaces(): tables = catalog.list_tables(ns_name) for qualified_table_name in tables: - catalog.purge_table(qualified_table_name) - catalog.drop_namespace(ns_name) + try: + catalog.purge_table(qualified_table_name) + except Exception: + pass # Best-effort cleanup + try: + catalog.drop_namespace(ns_name) + except Exception: + passelt-common/src/elt_common/testing/__init__.py (1)
20-21: Consider validating path format in concatenation.The
__add__method concatenates paths directly without ensuring the path starts with/. This could create malformed URLs if callers forget the leading slash.🔎 Proposed improvement
def __add__(self, path: str) -> "Endpoint": + if path and not path.startswith("/"): + path = "/" + path return Endpoint(self.raw_value + path, self.internal_netloc)elt-common/src/elt_common/testing/lakekeeper.py (3)
58-63: Overly broad exception handling.Catching
Exceptionmasks unexpected errors. Consider catching onlyrequests.HTTPErrororrequests.RequestExceptionto let genuine bugs surface.🔎 Proposed fix
try: response.raise_for_status() -except Exception: +except requests.HTTPError: raise ValueError( f"Failed to create warehouse ({response.status_code}): {response.text}." )
66-66: Prefer logging over print statements.Using
🔎 Proposed improvement
Add logging at the top of the file:
+import logging + +logger = logging.getLogger(__name__)Then replace the print statement:
-print(f"Created warehouse {name} with ID {warehouse_id}") +logger.info(f"Created warehouse {name} with ID {warehouse_id}")
88-93: Consider making timeout configurable.The 10-second timeout is hardcoded. Whilst reasonable for most cases, making it configurable would improve flexibility for slower environments.
elt-common/src/elt_common/testing/fixtures.py (2)
68-68: Prefer logging over print statements.Using
89-93: Exception handling may miss other error types.Only catching
RuntimeErrormeans other exceptions (e.g.,requests.RequestException,ValueError) from the cleanup operations won't be caught, causing test failures instead of warnings.🔎 Proposed fix
-except RuntimeError as exc: +except Exception as exc: warnings.warn( f"Error deleting test warehouse '{str(warehouse.project_id)}'. It may need to be removed manually." ) warnings.warn(f"Error:\n{str(exc)}")warehouses/accelerator/extract_load/opralogweb/extract_and_load.py (2)
89-102: Consider performance for large datasets.The
.to_pandas().apply()pattern converts the entire column to pandas and applies the transformation row-by-row, which may be inefficient for large datasets. Consider using vectorised operations if performance becomes an issue.
105-110: Add validation and error handling.The function modifies global state without validation:
- No check if
tablecontains theEntryIdcolumn- No validation that
EntryIdextraction succeeded- Silent overwrite of any existing
EXTRACTED_ENTRY_IDSvalueConsider adding validation to prevent runtime errors if the input table structure is unexpected.
🔎 Proposed enhancement
def store_extracted_entry_ids(table: pa.Table) -> pa.Table: + """Store extracted EntryIds for filtering MoreEntryColumns""" global EXTRACTED_ENTRY_IDS - - EXTRACTED_ENTRY_IDS = table["EntryId"] - + + if "EntryId" not in table.column_names: + raise ValueError("Table missing required 'EntryId' column") + + entry_ids = table["EntryId"] + if len(entry_ids) == 0: + # Log warning but don't fail - valid case for no new entries + pass + + EXTRACTED_ENTRY_IDS = entry_ids return tablewarehouses/accelerator/extract_load/opralogweb/tests/conftest.py (1)
1-4: Consider using editable install instead ofsys.pathmanipulation.Modifying
sys.pathglobally has several drawbacks:
- Inserting at position 0 takes precedence over installed packages, potentially causing import shadowing
- Makes tests environment-dependent and harder to reproduce
- Modern Python testing recommends proper package installation
Consider using an editable install instead:
uv pip install -e warehouses/accelerator/extract_load/opralogwebOr set
PYTHONPATHin the test runner configuration..github/actions/run-pytest-with-uv/action.yml (1)
8-10: Consider documenting or validatinginstall-args-extrato prevent unintended command injection.The
install-args-extrainput is directly interpolated into the shell command. While GitHub Actions sanitises inputs, consider documenting allowed values or adding validation if this action is used in workflows triggered by untrusted PR authors.Also applies to: 47-47
warehouses/accelerator/extract_load/opralogweb/tests/test_extract_and_load.py (1)
34-39: Capture stdout separately for better debugging.When the script fails,
proc.stdoutwill beNonebecause stdout isn't captured (only stderr is redirected to stdout). Consider capturing both streams properly.🔎 Proposed fix
proc = subp.run( - [sys.executable, EL_SCRIPT], env=subp_env, text=True, stderr=subp.STDOUT + [sys.executable, EL_SCRIPT], env=subp_env, text=True, capture_output=True ) if proc.returncode != 0: - pytest.fail(reason=proc.stdout) + pytest.fail(reason=f"stdout:\n{proc.stdout}\nstderr:\n{proc.stderr}")warehouses/accelerator/extract_load/opralogweb/tests/fake_source/__init__.py (2)
3-3: Remove unused import.The
clickmodule is imported but not used in this file.🔎 Proposed fix
-import click
36-59: Consider batching inserts for performance.Each call to
insert_downtime_recordcommits separately due to the@commit_or_rollbackdecorator. For 100 iterations, this means 100 commits. For test code this is acceptable, but could be slow for larger datasets.warehouses/accelerator/extract_load/opralogweb/tests/fake_source/opralogmodel.py (2)
44-56: Decorator commits after rollback, which is redundant.The
finallyblock executescommit()even after arollback()in theexceptblock. While this doesn't cause data corruption (committing after rollback just commits an empty transaction), it's semantically incorrect. Useelseclause for the commit instead.🔎 Proposed fix
def commit_or_rollback(func): """Decorator to either commit a successful ORM session or rollback on exception""" def impl(session: Session, *args, **kwargs): try: - return func(session, *args, **kwargs) + result = func(session, *args, **kwargs) except Exception: session.rollback() raise - finally: + else: session.commit() + return result return impl
37-37: Remove unused constant or add usage comment.
LOGBOOKSdictionary is defined but never used. If it's intended for future use, consider adding a comment; otherwise, remove it.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge base: Disabled due to data retention organization setting
📒 Files selected for processing (29)
.github/actions/run-pytest-with-uv/action.yml.github/workflows/elt-common_e2e_tests.yml.github/workflows/elt-common_unit_tests.yml.github/workflows/warehouses_e2e_tests.ymlelt-common/pyproject.tomlelt-common/src/elt_common/cli.pyelt-common/src/elt_common/dlt_destinations/pyiceberg/__init__.pyelt-common/src/elt_common/testing/__init__.pyelt-common/src/elt_common/testing/dlt.pyelt-common/src/elt_common/testing/fixtures.pyelt-common/src/elt_common/testing/lakekeeper.pyelt-common/tests/e2e_tests/.dlt/config.tomlelt-common/tests/e2e_tests/conftest.pyelt-common/tests/e2e_tests/elt_common/dlt_destinations/pyiceberg/conftest.pyelt-common/tests/e2e_tests/elt_common/dlt_destinations/pyiceberg/test_pyiceberg_pipeline.pyelt-common/tests/e2e_tests/elt_common/dlt_destinations/pyiceberg/utils.pyelt-common/tests/e2e_tests/elt_common/iceberg/conftest.pyelt-common/tests/e2e_tests/elt_common/iceberg/test_maintenance.pyinfra/ansible-docker/playbooks/elt/templates/secrets/envvars.j2infra/local/env-localwarehouses/accelerator/extract_load/opralogweb/.dlt/config.tomlwarehouses/accelerator/extract_load/opralogweb/.gitignorewarehouses/accelerator/extract_load/opralogweb/extract_and_load.pywarehouses/accelerator/extract_load/opralogweb/tests/__init__.pywarehouses/accelerator/extract_load/opralogweb/tests/conftest.pywarehouses/accelerator/extract_load/opralogweb/tests/fake_source/README.mdwarehouses/accelerator/extract_load/opralogweb/tests/fake_source/__init__.pywarehouses/accelerator/extract_load/opralogweb/tests/fake_source/opralogmodel.pywarehouses/accelerator/extract_load/opralogweb/tests/test_extract_and_load.py
💤 Files with no reviewable changes (1)
- elt-common/tests/e2e_tests/.dlt/config.toml
🧰 Additional context used
🧬 Code graph analysis (11)
elt-common/tests/e2e_tests/elt_common/dlt_destinations/pyiceberg/test_pyiceberg_pipeline.py (4)
elt-common/src/elt_common/dlt_destinations/pyiceberg/__init__.py (1)
iceberg_catalog(10-14)elt-common/src/elt_common/dlt_destinations/pyiceberg/pyiceberg.py (1)
iceberg_catalog(61-62)elt-common/src/elt_common/testing/dlt.py (1)
PyIcebergDestinationTestConfiguration(33-101)elt-common/src/elt_common/testing/lakekeeper.py (1)
Warehouse(97-167)
elt-common/src/elt_common/dlt_destinations/pyiceberg/__init__.py (2)
elt-common/src/elt_common/dlt_destinations/pyiceberg/factory.py (1)
pyiceberg(15-58)elt-common/src/elt_common/dlt_destinations/pyiceberg/pyiceberg.py (1)
PyIcebergClient(72-463)
elt-common/src/elt_common/testing/lakekeeper.py (5)
elt-common/src/elt_common/dlt_destinations/pyiceberg/factory.py (1)
pyiceberg(15-58)elt-common/src/elt_common/dlt_destinations/pyiceberg/helpers.py (1)
create_catalog(50-58)elt-common/src/elt_common/testing/__init__.py (5)
Endpoint(16-32)Settings(35-96)openid_provider_uri(70-73)lakekeeper_url(66-67)storage_config(75-96)elt-common/src/elt_common/testing/fixtures.py (4)
settings(20-21)token_endpoint(25-28)warehouse(52-93)server(47-48)elt-common/src/elt_common/dlt_destinations/pyiceberg/configuration.py (1)
as_dict(26-46)
elt-common/tests/e2e_tests/elt_common/iceberg/test_maintenance.py (1)
elt-common/src/elt_common/testing/lakekeeper.py (1)
Warehouse(97-167)
elt-common/tests/e2e_tests/conftest.py (1)
elt-common/src/elt_common/testing/dlt.py (1)
configure_dlt_for_testing(19-29)
warehouses/accelerator/extract_load/opralogweb/tests/fake_source/__init__.py (1)
warehouses/accelerator/extract_load/opralogweb/tests/fake_source/opralogmodel.py (5)
get_latest_entry_id(119-121)metadata(40-41)insert_empty_downtime_logbook(125-134)insert_downtime_record(138-208)update_downtime_record(212-245)
elt-common/tests/e2e_tests/elt_common/iceberg/conftest.py (1)
elt-common/src/elt_common/testing/lakekeeper.py (1)
Warehouse(97-167)
elt-common/src/elt_common/testing/fixtures.py (3)
elt-common/src/elt_common/testing/dlt.py (2)
PyIcebergDestinationTestConfiguration(33-101)clean_catalog(94-101)elt-common/src/elt_common/testing/__init__.py (3)
Settings(35-96)openid_provider_uri(70-73)storage_config(75-96)elt-common/src/elt_common/testing/lakekeeper.py (6)
Server(18-93)Warehouse(97-167)token_endpoint(26-27)create_warehouse(45-73)purge_warehouse(76-78)delete_warehouse(81-86)
elt-common/tests/e2e_tests/elt_common/dlt_destinations/pyiceberg/utils.py (3)
elt-common/src/elt_common/dlt_destinations/pyiceberg/factory.py (1)
pyiceberg(15-58)elt-common/src/elt_common/dlt_destinations/pyiceberg/__init__.py (1)
iceberg_catalog(10-14)elt-common/src/elt_common/dlt_destinations/pyiceberg/pyiceberg.py (1)
iceberg_catalog(61-62)
elt-common/src/elt_common/testing/dlt.py (3)
elt-common/src/elt_common/testing/lakekeeper.py (4)
Warehouse(97-167)catalog_endpoint(29-34)token_endpoint(26-27)connect(103-113)elt-common/src/elt_common/testing/fixtures.py (4)
warehouse(52-93)server(47-48)settings(20-21)token_endpoint(25-28)elt-common/tests/e2e_tests/elt_common/dlt_destinations/pyiceberg/test_pyiceberg_pipeline.py (1)
pipeline_name(30-32)
warehouses/accelerator/extract_load/opralogweb/tests/test_extract_and_load.py (4)
elt-common/src/elt_common/testing/lakekeeper.py (2)
Warehouse(97-167)connect(103-113)elt-common/src/elt_common/testing/fixtures.py (3)
destination_config(97-102)pipelines_dir(106-108)warehouse(52-93)elt-common/tests/e2e_tests/elt_common/dlt_destinations/pyiceberg/conftest.py (1)
destination_config(6-11)elt-common/src/elt_common/dlt_destinations/pyiceberg/pyiceberg.py (1)
run(64-69)
🪛 Ruff (0.14.10)
warehouses/accelerator/extract_load/opralogweb/tests/fake_source/__init__.py
70-72: Avoid specifying long messages outside the exception class
(TRY003)
warehouses/accelerator/extract_load/opralogweb/tests/test_extract_and_load.py
34-34: subprocess call: check for execution of untrusted input
(S603)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: warehouses end-to-end tests
- GitHub Check: elt-common end-to-end tests
🔇 Additional comments (36)
warehouses/accelerator/extract_load/opralogweb/.gitignore (1)
1-2: LGTM!The patterns appropriately exclude temporary SQLite and DuckDB database files created by test scaffolding, aligning with the fake Opralog database infrastructure introduced in this PR.
elt-common/src/elt_common/cli.py (1)
46-51: Defaultingwrite-dispositiontoNoneis correct for merge loading.This change allows the write disposition to be controlled by resource decorators rather than CLI defaults, which is necessary for the merge-style incremental load feature.
elt-common/tests/e2e_tests/elt_common/iceberg/conftest.py (1)
4-4: LGTM! Good refactoring to centralize test utilities.The import path update moves the
Warehousefixture to a centralized testing module, improving code organization and reusability across the test suite.elt-common/pyproject.toml (1)
38-39: LGTM!The ruff configuration with a line length of 100 is a reasonable and widely-adopted standard.
elt-common/tests/e2e_tests/elt_common/iceberg/test_maintenance.py (1)
8-8: LGTM!The import path update correctly aligns with the refactored testing utilities location.
elt-common/tests/e2e_tests/conftest.py (1)
1-3: LGTM!Clean simplification that centralises the DLT testing configuration. Module-level invocation ensures the setup occurs before any tests run.
elt-common/tests/e2e_tests/elt_common/dlt_destinations/pyiceberg/test_pyiceberg_pipeline.py (1)
8-16: LGTM!Import paths correctly updated to use the centralised testing utilities from
elt_common.testingand the newiceberg_catalogcontext manager from the pyiceberg module.elt-common/tests/e2e_tests/elt_common/dlt_destinations/pyiceberg/utils.py (1)
17-18: LGTM!Clean refactoring that removes local duplication by importing the centralised
iceberg_catalogcontext manager.elt-common/src/elt_common/testing/dlt.py (1)
39-74: LGTM on environment setup logic.The distinction between direct assignment for essential credentials (URI, PROJECT_ID) and
setdefaultfor others is a reasonable pattern that allows tests to override optional settings while ensuring core connectivity is always configured correctly.elt-common/src/elt_common/testing/__init__.py (2)
8-12: LGTM!The retry configuration with exponential backoff and 5 attempts is appropriate for testing infrastructure that may have transient failures.
35-96: LGTM!The Settings class provides sensible defaults for local testing whilst allowing override via environment variables with the
tests_prefix. Hard-coded test credentials are appropriate for development defaults.elt-common/src/elt_common/testing/lakekeeper.py (5)
1-16: LGTM!Import organisation is clear and appropriate.
18-44: LGTM!The endpoint builders properly delegate to the Settings properties and construct versioned API paths correctly.
103-113: LGTM!The
connect()method properly constructs PyIceberg catalog credentials from the server settings.
115-157: LGTM!The
create_test_tables()context manager provides useful test scaffolding. The nestedappend_snapshotfunction and automatic cleanup in the finally block are appropriate for test utilities.
159-167: LGTM!The
purge()method correctly handles cleanup order: views first, then tables, then namespaces.elt-common/src/elt_common/testing/fixtures.py (4)
1-17: LGTM!Module docstring and imports are clear and appropriate.
19-48: LGTM!The authentication fixtures correctly implement the OpenID Connect client credentials flow. Session scope is appropriate for reusing authentication across tests.
96-102: LGTM!Function scope is appropriate for
destination_configto ensure test isolation whilst reusing the session-scoped warehouse. The cleanup in finally ensures the catalog is purged after each test.
105-108: LGTM!The
pipelines_dirfixture correctly usesTemporaryDirectoryfor automatic cleanup.warehouses/accelerator/extract_load/opralogweb/extract_and_load.py (4)
15-16: LGTM!The added imports are appropriate for the new functionality:
Generatorfor type hints,datetimefor the epoch constant,sql_tablefor creating resources, andSelectfor the query adapter callback type hint.Also applies to: 20-20, 23-23
27-30: LGTM!The constants are well-defined.
OPRALOG_EPOCHaligns with the documented usage start date, andSQL_TABLE_KWARGScentralises common configuration for all table resources.
49-63: LGTM!The append-only tables are correctly configured with incremental cursors and
write_disposition="append".
72-86: LGTM!The
entries_table()function correctly configures merge loading withLastChangedDateas the incremental cursor and applies appropriate transformations.warehouses/accelerator/extract_load/opralogweb/tests/fake_source/README.md (1)
1-16: LGTM!The documentation clearly explains how to generate fake data and simulate updates for testing purposes. The usage examples are well-structured and easy to follow.
.github/workflows/elt-common_e2e_tests.yml (1)
46-46: LGTM!The change from
pyproject-directorytoworking-directoryis consistent with the unit tests workflow refactoring..github/workflows/elt-common_unit_tests.yml (1)
33-33: Parameter change is correctly supported in the action.The
.github/actions/run-pytest-with-uv/action.ymlaction has been updated to accept theworking-directoryinput parameter (defined at line 22 and used at lines 48 and 53), so this workflow configuration change is properly aligned..github/actions/run-pytest-with-uv/action.yml (1)
22-24: LGTM!The refactoring from
pyproject-directorytoworking-directorywith the addition ofinstall-args-extraprovides more flexibility. The structure is clean and the working directory is correctly applied to both the install and test steps.Also applies to: 45-53
warehouses/accelerator/extract_load/opralogweb/tests/test_extract_and_load.py (2)
42-64: LGTM!The test comprehensively validates that the first run creates the expected namespace and populates all tables with correct row counts.
67-91: LGTM!The merge update test properly validates the incremental loading behaviour by running the script twice with an update in between, and verifying the expected row count changes.
warehouses/accelerator/extract_load/opralogweb/tests/fake_source/__init__.py (2)
62-100: LGTM!The update function correctly fetches the latest entry, validates existence, updates one record, and appends new records. The logic aligns with the test expectations (110 entries after update = 100 initial + 10 new).
103-110: LGTM!Clean wrapper functions that create engines and delegate to the appropriate helpers.
warehouses/accelerator/extract_load/opralogweb/tests/fake_source/opralogmodel.py (3)
59-117: LGTM!The ORM models are well-structured and correctly define the schema needed to simulate the Opralog database for testing purposes.
137-208: LGTM!The
insert_downtime_recordfunction correctly handles entry ID sequencing, inserts related records across multiple tables, and manages chapter creation when needed.
211-245: LGTM!The
update_downtime_recordfunction correctly updates the entry and related columns, and records the change in the changelog table..github/workflows/warehouses_e2e_tests.yml (1)
27-28: No action required.actions/checkout@v6is a valid and stable release with improved credential security compared to earlier versions. Ensure your GitHub Actions Runner is version 2.329.0 or later for full compatibility.Likely an incorrect or invalid review comment.
infra/ansible-docker/playbooks/elt/templates/secrets/envvars.j2
Outdated
Show resolved
Hide resolved
warehouses/accelerator/extract_load/opralogweb/extract_and_load.py
Outdated
Show resolved
Hide resolved
warehouses/accelerator/extract_load/opralogweb/extract_and_load.py
Outdated
Show resolved
Hide resolved
warehouses/accelerator/extract_load/opralogweb/extract_and_load.py
Outdated
Show resolved
Hide resolved
warehouses/accelerator/extract_load/opralogweb/tests/test_extract_and_load.py
Outdated
Show resolved
Hide resolved
66315cb to
5bf67bc
Compare
It makes it easier to skip the value if it is not needed.
Ensures any edited entries are reloaded and merged with the current warehouse contents.
5bf67bc to
112cd08
Compare
1ffca8e to
f0a02c9
Compare
… refactor (#179) ### Summary After refactor in #177 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Chores** * Restructured SQL credentials environment variable configuration for improved clarity and simplicity. The credential keys have been reorganised to use a flatter, more intuitive hierarchy. Users maintaining custom environment variable configurations should review the changes and update their setup accordingly to ensure continued compatibility. <sub>✏️ Tip: You can customize this high-level summary in your review settings.</sub> <!-- end of auto-generated comment: release notes by coderabbit.ai -->
### Summary Fix problems arising from removal of cli parameter in #177. Set write_dispositions explicitly on each resource. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Documentation** * Removed parameter documentation from CLI docstring. * **Refactor** * Migrated write disposition configuration from global defaults to resource-level settings across data source connectors, enabling granular control over how data is written to targets (replace vs. merge operations). <sub>✏️ Tip: You can customize this high-level summary in your review settings.</sub> <!-- end of auto-generated comment: release notes by coderabbit.ai -->
### Summary Datetime handling has been overhauled and as a result databases such as mssql now properly reflect tz-naive columns. We add in a mapping for the tz-naive column to the Iceberg type. Fixes an error now raised for incremental loads in #177 for mssql dbs. See dlt-hub/dlt#3061. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes * **New Features** * Added resource item limiting capability for enhanced testing and debugging flexibility. * **Bug Fixes** * Improved timestamp handling to correctly support timezone-aware datetime values. * **Chores** * Updated dlt dependency to version 1.20.0 for improved compatibility and performance. <sub>✏️ Tip: You can customize this high-level summary in your review settings.</sub> <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary
Entries in Opralog can be updated after creation. These changes ensure both new and updated records are loaded into the warehouse.
A set of tests against a fake database have also been added.
Fixes #138
Summary by CodeRabbit
Release Notes
New Features
Tests
Chores
✏️ Tip: You can customize this high-level summary in your review settings.