Skip to content

Ingestion module for foundational DP#276

Open
Aashutosh-cognite wants to merge 1 commit into
foundational-dp-cleanupfrom
foundational-dp-ingestion
Open

Ingestion module for foundational DP#276
Aashutosh-cognite wants to merge 1 commit into
foundational-dp-cleanupfrom
foundational-dp-ingestion

Conversation

@Aashutosh-cognite
Copy link
Copy Markdown
Contributor

@Aashutosh-cognite Aashutosh-cognite commented May 20, 2026

Adds the Layer 2 orchestration module for dp:foundation. This module owns the ingestion workflow, all transformation definitions, and the auth groups needed to run them.

The module implements a two-phase workflow:

  • Phase 1 (Population) — transformation tasks for PI, OPC-UA, and SAP run in parallel, landing data into the ISA Manufacturing Extension DM views (ISATimeSeries, ISAAsset, Equipment, WorkOrder, Operation).
  • Phase 2 (Contextualization) — relationship transforms run after population completes, setting Equipment.asset and Operation.workOrder properties.

Which phases and tasks are included is driven entirely by config flags (enabledSources, enabledContextualization, dataModelVariant) — no YAML editing required when toggling a source on or off.

Key files:

  • scripts/build_workflow.py — generates wf_ingestion_v1.WorkflowVersion.yaml from per-task snippets based on the active config. Run with --check in CI to detect drift.
  • transformations/ — 8 scaffold SQL + YAML pairs (population: PI, OPC-UA, SAP assets/equipment/orders/operations; contextualization: equipment-to-asset, operation-to-order). SQL follows naming convention tr_{source}_{location}_to_{target}.
  • auth/ — two self-contained groups: workflow service account (execute + transform) and workflow user (read-only monitoring).

@gemini-code-assist
Copy link
Copy Markdown

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

@Aashutosh-cognite
Copy link
Copy Markdown
Contributor Author

/gemini review

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the cdf_ingestion_foundation module, which provides a framework for orchestrating two-phase ingestion workflows (population and contextualization) across PI, OPC-UA, and SAP source systems. It includes a Python generator script to build workflow versions from task snippets, various SQL transformations, and authorization group definitions. Review feedback primarily addresses Python style guide violations in the build script—such as import sorting, the need for typed data structures (dataclasses/Pydantic), and proper logging—as well as security recommendations to restrict overly broad wildcard scopes in the authorization group capabilities.

Comment on lines +16 to +18
import argparse
import sys
from pathlib import Path
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Sort standard library imports alphabetically.

Suggested change
import argparse
import sys
from pathlib import Path
import argparse
from pathlib import Path
import sys
References
  1. Sort alphabetically within groups. (link)

Comment on lines +29 to +30
def load_config() -> dict:
return yaml.safe_load(CONFIG_FILE.read_text())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Use dataclasses or Pydantic models to represent the configuration instead of untyped dictionaries. This ensures type safety and adheres to the project's coding standards.

References
  1. Prefer dataclasses or Pydantic models over untyped dictionaries. Always parse file content into typed structures. (link)

Comment on lines +67 to +68
print(f"WARNING: Unknown dataModelVariant '{dm}'. No contextualization tasks added.")
print("Supported variants: isa_manufacturing_extension | cfihos_oil_and_gas")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Use the logging module instead of print for warnings and informational messages.

References
  1. Use the logging module with appropriate levels. (link)

return tasks


def load_tasks(filenames: list[str]) -> list[dict]:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Missing docstring for load_tasks. Functions must have concise docstrings using the Args/Returns format.

Suggested change
def load_tasks(filenames: list[str]) -> list[dict]:
def load_tasks(filenames: list[str]) -> list[dict]:
"""
Load task snippets from YAML files.
Args:
filenames: list of task snippet filenames.
Returns:
list of task definitions.
"""
References
  1. Use concise docstrings with Args/Returns format. (link)

- transformationsAcl:
actions: [READ, RUN]
scope:
all: {}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The transformationsAcl uses a wildcard scope (all: {}). This is overly broad for a service account. Restrict the scope to specific datasets or IDs where possible.

References
  1. Flag overly broad capabilities (e.g. wildcards '*' for dataSetScope, idScope, ACLs) in group definitions unless clearly justified. (link)

- datasetsAcl:
actions: [READ]
scope:
all: {}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The datasetsAcl uses a wildcard scope (all: {}). Consider restricting this to the specific datasets required by the workflow.

References
  1. Flag overly broad capabilities (e.g. wildcards '*' for dataSetScope, idScope, ACLs) in group definitions unless clearly justified. (link)

@Aashutosh-cognite Aashutosh-cognite force-pushed the foundational-dp-cleanup branch from 7e0e65e to 7425757 Compare May 21, 2026 05:07
@Aashutosh-cognite Aashutosh-cognite force-pushed the foundational-dp-ingestion branch from 0347e26 to c6ed831 Compare May 21, 2026 05:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant