Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@

logger = logging.getLogger(__name__)

# Engine tier preference for dialect auto-detection (fastest → slowest).
# When no dialect is specified, build_metrics_sql probes tiers in order:
# DRUID – served from a materialized cube (single Druid datasource scan)
# TRINO – served from pre-agg or source tables via Trino
# SPARK – served from pre-agg or source tables via Spark (default fallback)
# Full tier resolution (including Trino catalog engine lookup) is handled by
# resolve_dialect_and_engine_for_metrics in cube_matcher.py; the auto-detect
# logic here covers the common DRUID-vs-SPARK split.
_ENGINE_TIER_PREFERENCE = [Dialect.DRUID, Dialect.TRINO, Dialect.SPARK]


def _normalize_query_param_value(param: str, value: ast.Value | Any) -> ast.Value:
"""Normalize a Python value to an AST value node for query parameter substitution."""
Expand Down Expand Up @@ -471,9 +481,32 @@ async def build_metrics_sql(
Layer 3: Derived Metrics
Computes derived metrics that reference other metrics.
"""
# Default to SPARK dialect if not specified
# Auto-detect dialect when none specified: probe fastest available engine tier.
# See _ENGINE_TIER_PREFERENCE for priority ordering (DRUID > TRINO > SPARK).
# Trino resolution requires a catalog engine lookup; that is handled by
# resolve_dialect_and_engine_for_metrics. Here we cover the DRUID-vs-SPARK split.
if dialect is None:
dialect = Dialect.SPARK
if use_materialized:
# Probe Druid tier: look for a matching materialized cube.
probe_cube = (
matched_cube
if matched_cube is not None
else await find_matching_cube(
session,
metrics,
dimensions,
require_availability=True,
)
)
if probe_cube:
dialect = Dialect.DRUID
matched_cube = probe_cube # reuse below, avoids second DB round-trip
else:
dialect = (
Dialect.SPARK
) # no cube; Trino tier needs catalog engine lookup
else:
dialect = Dialect.SPARK

# Setup context (loads nodes, decomposes metrics, adds dimensions from expressions)
ctx = await setup_build_context(
Expand All @@ -485,9 +518,9 @@ async def build_metrics_sql(
use_materialized=use_materialized,
)

# Use materialized cube if available.
# Use materialized cube when dialect is DRUID (explicit or auto-detected above).
# Use pre-resolved cube if available (avoids duplicate find_matching_cube call).
if use_materialized:
if use_materialized and dialect == Dialect.DRUID:
cube = (
matched_cube
if matched_cube is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from copy import deepcopy
from typing import Optional

from datajunction_server.construction.build_v3.filters import extract_subscript_role
from datajunction_server.construction.build_v3.materialization import (
get_table_reference_parts_with_materialization,
should_use_materialized_table,
Expand Down Expand Up @@ -185,7 +186,7 @@ def replace_component_refs_in_ast(
component_aliases: Mapping from component name to (table_alias, column_name)
e.g., {"unit_price_sum_abc123": ("gg0", "sum_unit_price")}
"""
for col in expr_ast.find_all(ast.Column):
for col in list(expr_ast.find_all(ast.Column)):
# Get the column name (might be in name.name or just name)
col_name = col.name.name if col.name else None
if not col_name: # pragma: no cover
Expand All @@ -194,7 +195,6 @@ def replace_component_refs_in_ast(
# Check if this column name matches a component
if col_name in component_aliases: # pragma: no branch
table_alias, actual_col = component_aliases[col_name]
# Replace with qualified column reference
col.name = ast.Name(actual_col)
# Only set table if alias is non-empty (empty = no CTE prefix)
col._table = ast.Table(ast.Name(table_alias)) if table_alias else None
Expand Down Expand Up @@ -261,15 +261,7 @@ def replace_dimension_refs_in_ast(
if not base_col_name: # pragma: no cover
continue

# Get the role from the index (e.g., "order")
role = None
if isinstance(subscript.index, ast.Column):
role = subscript.index.name.name if subscript.index.name else None
elif isinstance(subscript.index, ast.Name): # pragma: no cover
role = subscript.index.name # pragma: no cover
elif hasattr(subscript.index, "name"): # pragma: no cover
role = str(subscript.index.name) # type: ignore

role = extract_subscript_role(subscript)
if not role: # pragma: no cover
continue

Expand Down Expand Up @@ -1019,15 +1011,14 @@ def process_metric_combiner_expression(
"""
Process a metric combiner expression for final output.

This function applies the same transformations used in generate_metrics_sql
(specifically build_derived_metric_expr) to ensure consistency between
SQL generation and stored metric expressions.
Transforms a raw combiner AST into the final SQL expression by replacing
component, metric, and dimension references with qualified column refs.

Used by:
- build_derived_metric_expr in generate_metrics_sql
- cube materialization for storing metric_expression in config

Transformations applied (in order, matching build_derived_metric_expr):
Transformations applied (in order):
1. Replace metric references (e.g., "v3.total_revenue" -> column ref)
2. Replace component references (e.g., "revenue_sum_abc123" -> column ref)
3. Replace dimension references (e.g., "v3.date.dateint" -> column ref)
Expand Down Expand Up @@ -1056,7 +1047,6 @@ def process_metric_combiner_expression(
expr_ast = deepcopy(combiner_ast)

# Replace metric references (for derived metrics referencing other metrics)
# This must happen first, matching build_derived_metric_expr order
if metric_refs:
replace_metric_refs_in_ast(expr_ast, metric_refs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,23 +391,74 @@ async def build_sql_from_cube(
return build_sql_from_cube_impl(ctx, cube, ctx.decomposed_metrics)


def _build_mat_col_lookup(cube: NodeRevision) -> dict[str, str]:
"""
Build a mapping from short column name -> physical column name by reading
the cube's materialization config columns.

Example entry in config["columns"]:
{
"name": "common_DOT_dimensions_DOT_time_DOT_date_DOT_dateint", # physical Druid col
"column": "dateint", # short col name
"semantic_entity": "common.dimensions.time.date.dateint",
"semantic_type": "dimension",
...
}

We key on ``column`` (the short name) because that is what
parse_dimension_ref().column_name returns, and it is stable across
different namespace / path representations.

Returns {} when no materialization config is available (e.g. in tests that
set availability directly without going through the materialization pipeline),
in which case callers fall back to the short column name unchanged.
"""
lookup: dict[str, str] = {}
for mat in cube.materializations or []:
for combiner in (mat.config or {}).get("combiners") or []:
for col_data in (combiner or {}).get("columns") or []:
short_name = col_data.get("column")
physical_name = col_data.get("name")
if short_name and physical_name:
lookup[short_name] = physical_name
return lookup


def build_synthetic_grain_group(
ctx: BuildContext,
decomposed_metrics: dict[str, DecomposedMetricInfo],
cube: NodeRevision,
) -> GrainGroupSQL:
"""
Collect components from base metrics only (not derived).
V3 cube column naming always uses component.name (the hashed name) for consistency.
Build a synthetic GrainGroupSQL that reads from the cube's materialized Druid table.

Physical column names are resolved from the cube's materialization config
(``materialization.config["columns"]``). Each entry there carries a
``column`` key (the short column name, e.g. ``dateint``) and a ``name`` key
(the physical column name as it exists in the Druid table, e.g.
``common_DOT_dimensions_DOT_time_DOT_date_DOT_dateint``). We key on the
short column name because that is what parse_dimension_ref().column_name
returns. When a match is found the physical name is used; otherwise we fall
back to the short name (which is correct for new-style materializations).
"""
all_components = []
component_aliases: dict[str, str] = {}

avail = cube.availability
if not avail: # pragma: no cover
raise ValueError(f"Cube {cube.name} has no availability")
table_parts = [p for p in [avail.catalog, avail.schema_, avail.table] if p]
table_name = ".".join(table_parts)
# Druid tables are referenced by the table name only (schema/catalog are not part of the ref).
# For other engines (e.g. Iceberg/Spark) we use the full catalog.schema.table path.
if ctx.dialect == Dialect.DRUID:
table_name = avail.table
else:
table_name = ".".join(
p for p in [avail.catalog, avail.schema_, avail.table] if p
)

# short_col_name -> physical column name from the materialization config.
# Empty when no materialization config is present (tests / direct calls).
mat_col_lookup = _build_mat_col_lookup(cube)

for metric_name, decomposed in decomposed_metrics.items():
# Only process BASE metrics for component alias mapping
Expand All @@ -418,40 +469,46 @@ def build_synthetic_grain_group(

for comp in decomposed.components:
if comp.name not in component_aliases: # pragma: no branch
# Always use component.name for consistency - no special case for single-component
cube_col_name = comp.name

cube_col_name = mat_col_lookup.get(comp.name, comp.name)
component_aliases[comp.name] = cube_col_name
all_components.append(comp)

# Build column metadata for the synthetic grain group
grain_group_columns: list[ColumnMetadata] = []

# Build mapping from dimension ref to short column name for filter resolution
# Build mapping from dimension ref to physical column name for filter resolution.
# ctx.dimensions includes both requested dimensions AND filter-only dimensions
# (filter-only dimensions were added by add_dimensions_from_filters() in setup_build_context)
dimension_aliases: dict[str, str] = {}

# Add all dimensions (requested + filter-only). We need all dimensions
# in the cube SELECT for proper filter resolution
# in the cube SELECT for proper filter resolution.
# dim_short_names holds the alias (short name) used everywhere outside the CTE.
# dim_physical_names holds the actual column name in the Druid table (may differ).
dim_short_names = []
dim_physical_names = []
for dim_ref in ctx.dimensions:
parsed_dim = parse_dimension_ref(dim_ref)
col_name = parsed_dim.column_name
short_name = parsed_dim.column_name
if parsed_dim.role:
col_name = f"{col_name}_{parsed_dim.role}"
dim_short_names.append(col_name)
dimension_aliases[dim_ref] = col_name
short_name = f"{short_name}_{parsed_dim.role}"
physical_name = mat_col_lookup.get(parsed_dim.column_name, short_name)
dim_short_names.append(short_name)
dim_physical_names.append(physical_name)
# Use the physical column name for WHERE clause resolution:
# the WHERE is applied directly on the cube table, so we must reference
# the physical column (e.g. common_DOT_..._DOT_dateint) not the alias.
dimension_aliases[dim_ref] = physical_name
grain_group_columns.append(
ColumnMetadata(
name=col_name,
name=short_name,
semantic_name=dim_ref,
type="string", # Will be refined by generate_metrics_sql
semantic_type="dimension",
),
)

# Add component columns (using cube column names from component_aliases)
# Add component columns (always use the short/hash name as both physical and alias)
for comp in all_components:
cube_col_name = component_aliases[comp.name]
grain_group_columns.append(
Expand All @@ -466,9 +523,13 @@ def build_synthetic_grain_group(
# Build the synthetic query: SELECT dims, components FROM cube_table WHERE filters
projection: list[ast.Column] = []

# Add all dimension columns (requested + filter-only)
for dim_col in dim_short_names:
projection.append(ast.Column(name=ast.Name(dim_col)))
# Add all dimension columns. When the physical name differs from the short alias,
# emit "physical_name AS short_name" so the rest of the query can use the short name.
for short_name, physical_name in zip(dim_short_names, dim_physical_names):
col = ast.Column(name=ast.Name(physical_name))
if physical_name != short_name:
col = col.set_alias(ast.Name(short_name)) # type: ignore[assignment]
projection.append(col)

# Add component columns (using cube column names)
for comp in all_components:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,29 @@ def parse_filter(filter_str: str) -> ast.Expression:
return query.select.where


def extract_subscript_role(subscript: ast.Subscript) -> str | None:
"""
Extract the role string from a subscript index node.

Handles the three forms that can appear as a subscript index:
- ast.Column: simple role like "order" (e.g., "v3.date.year[order]")
- ast.Name: simple role like "order" (fallback if parser produces Name instead of Column)
- ast.Lambda: multi-hop role (e.g., "v3.user[customer->home]")

Returns the role string, or None if the index is not a recognised form.
"""
# simple role like "dim.attr[order]"
if isinstance(subscript.index, ast.Column):
return subscript.index.name.name if subscript.index.name else None
# simple role like "dim.attr[order]"
if isinstance(subscript.index, ast.Name): # pragma: no cover
return subscript.index.name
# multi-hop role like "dim.attr[customer->home]"
if isinstance(subscript.index, ast.Lambda):
return str(subscript.index)
return None # pragma: no cover


def resolve_filter_references(
filter_ast: ast.Expression,
column_aliases: dict[str, str],
Expand Down Expand Up @@ -81,17 +104,7 @@ def resolve_filter_references(
if not base_col_ref:
continue # pragma: no cover

# Extract the role from the subscript index
role = None
if isinstance(subscript.index, ast.Column):
role = subscript.index.name.name if subscript.index.name else None
elif isinstance(subscript.index, ast.Name): # pragma: no cover
role = subscript.index.name
elif isinstance(subscript.index, ast.Lambda):
# Multi-hop role notation like "customer->home" is parsed as a Lambda node.
# Lambda.__str__ returns the canonical role string (e.g., "customer->home").
role = str(subscript.index)

role = extract_subscript_role(subscript)
if not role:
continue # pragma: no cover

Expand Down
Loading
Loading