Skip to content

Commit d51de7c

Browse files
committed
Support selecting wildcards in node queries
1 parent 2e75ac1 commit d51de7c

File tree

8 files changed

+441
-111
lines changed

8 files changed

+441
-111
lines changed

datajunction-server/datajunction_server/internal/namespaces.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
from sqlalchemy import or_, select
1010
from sqlalchemy.ext.asyncio import AsyncSession
11-
from sqlalchemy.orm import joinedload
11+
from sqlalchemy.orm import joinedload, selectinload
1212

1313
from datajunction_server.api.helpers import get_node_namespace
1414
from datajunction_server.database.history import ActivityType, EntityType, History
@@ -26,6 +26,7 @@
2626
)
2727
from datajunction_server.models.node import NodeMinimumDetail
2828
from datajunction_server.models.node_type import NodeType
29+
from datajunction_server.sql.dag import topological_sort
2930
from datajunction_server.typing import UTCDatetime
3031
from datajunction_server.utils import SEPARATOR
3132

@@ -229,10 +230,10 @@ async def hard_delete_namespace(
229230
"""
230231
Hard delete a node namespace.
231232
"""
232-
node_names = (
233+
nodes = (
233234
(
234235
await session.execute(
235-
select(Node.name)
236+
select(Node)
236237
.where(
237238
or_(
238239
Node.namespace.like(
@@ -241,27 +242,33 @@ async def hard_delete_namespace(
241242
Node.namespace == namespace,
242243
),
243244
)
244-
.order_by(Node.name),
245+
.order_by(Node.name)
246+
.options(
247+
joinedload(Node.current).options(
248+
selectinload(NodeRevision.parents),
249+
),
250+
),
245251
)
246252
)
253+
.unique()
247254
.scalars()
248255
.all()
249256
)
250257

251-
if not cascade and node_names:
258+
if not cascade and nodes:
252259
raise DJActionNotAllowedException(
253260
message=(
254261
f"Cannot hard delete namespace `{namespace}` as there are still the "
255-
f"following nodes under it: `{node_names}`. Set `cascade` to true to "
256-
"additionally hard delete the above nodes in this namespace. WARNING:"
262+
f"following nodes under it: `{[node.name for node in nodes]}`. Set `cascade` to "
263+
"true to additionally hard delete the above nodes in this namespace. WARNING:"
257264
" this action cannot be undone."
258265
),
259266
)
260267

261268
impacts = {}
262-
for node_name in node_names:
263-
impacts[node_name] = await hard_delete_node(
264-
node_name,
269+
for node in reversed(topological_sort(nodes)):
270+
impacts[node.name] = await hard_delete_node(
271+
node.name,
265272
session,
266273
current_user=current_user,
267274
)

datajunction-server/datajunction_server/internal/nodes.py

Lines changed: 57 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -868,7 +868,7 @@ async def propagate_update_downstream( # pylint: disable=too-many-locals
868868
)
869869

870870
# The downstreams need to be sorted topologically in order for the updates to be done
871-
# in the right order. Otherwise it is possible for a leaf node like a metric to be updated
871+
# in the right order. Otherwise, it is possible for a leaf node like a metric to be updated
872872
# before its upstreams are updated.
873873
for downstream in downstreams:
874874
original_node_revision = downstream.current
@@ -1866,18 +1866,17 @@ async def revalidate_node( # pylint: disable=too-many-locals,too-many-statement
18661866

18671867
# Check if any columns have been updated
18681868
existing_columns = {col.name: col for col in node.current.columns} # type: ignore
1869-
updated_columns = False
1869+
updated_columns = len(current_node_revision.columns) != len(node_validator.columns)
18701870
for col in node_validator.columns:
18711871
if existing_col := existing_columns.get(col.name):
18721872
if existing_col.type != col.type:
18731873
existing_col.type = col.type
18741874
updated_columns = True
18751875
else:
1876-
node.current.columns.append(col) # type: ignore # pragma: no cover
18771876
updated_columns = True # pragma: no cover
18781877

18791878
# Only create a new revision if the columns have been updated
1880-
if updated_columns: # type: ignore
1879+
if updated_columns and node_validator.updated_columns: # type: ignore
18811880
new_revision = copy_existing_node_revision(node.current) # type: ignore
18821881
new_revision.version = str(
18831882
Version.parse(node.current.version).next_major_version(), # type: ignore
@@ -1893,16 +1892,16 @@ async def revalidate_node( # pylint: disable=too-many-locals,too-many-statement
18931892
node_validator.updated_columns = node_validator.modified_columns(
18941893
new_revision, # type: ignore
18951894
)
1896-
new_revision.columns = node_validator.columns
18971895

18981896
# Save the new revision of the child
1897+
new_revision.columns = node_validator.columns
18991898
node.current_version = new_revision.version # type: ignore
19001899
new_revision.node_id = node.id # type: ignore
1901-
session.add(node)
19021900
session.add(new_revision)
1903-
await session.commit()
1904-
await session.refresh(node.current) # type: ignore
1905-
await session.refresh(node, ["current"])
1901+
session.add(node)
1902+
await session.commit()
1903+
await session.refresh(node.current) # type: ignore
1904+
await session.refresh(node, ["current"])
19061905
return node_validator
19071906

19081907

@@ -1918,7 +1917,14 @@ async def hard_delete_node(
19181917
node = await Node.get_by_name(
19191918
session,
19201919
name,
1921-
options=[joinedload(Node.current), joinedload(Node.revisions)],
1920+
options=[
1921+
joinedload(Node.current),
1922+
joinedload(Node.revisions).options(
1923+
selectinload(NodeRevision.columns).options(
1924+
joinedload(Column.attributes),
1925+
),
1926+
),
1927+
],
19221928
include_inactive=True,
19231929
raise_if_not_exists=False,
19241930
)
@@ -1946,42 +1952,50 @@ async def hard_delete_node(
19461952
user=current_user.username if current_user else None,
19471953
),
19481954
)
1949-
node_validator = await revalidate_node(
1950-
name=node.name,
1951-
session=session,
1952-
current_user=current_user,
1953-
)
1954-
impact.append(
1955-
{
1956-
"name": node.name,
1957-
"status": node_validator.status,
1958-
"effect": "downstream node is now invalid",
1959-
},
1960-
)
1955+
try:
1956+
node_validator = await revalidate_node(
1957+
name=node.name,
1958+
session=session,
1959+
current_user=current_user,
1960+
)
1961+
impact.append(
1962+
{
1963+
"name": node.name,
1964+
"status": node_validator.status,
1965+
"effect": "downstream node is now invalid",
1966+
},
1967+
)
1968+
except DJNodeNotFound:
1969+
_logger.warning("Node not found %s", node.name)
19611970

19621971
# Revalidate all linked nodes
19631972
for node in linked_nodes:
1964-
session.add( # Capture this in the downstream node's history
1965-
History(
1966-
entity_type=EntityType.LINK,
1967-
entity_name=name,
1968-
node=node.name,
1969-
activity_type=ActivityType.DELETE,
1970-
user=current_user.username if current_user else None,
1971-
),
1972-
)
1973-
node_validator = await revalidate_node(
1974-
name=node.name,
1975-
session=session,
1976-
current_user=current_user,
1977-
)
1978-
impact.append(
1979-
{
1980-
"name": node.name,
1981-
"status": node_validator.status,
1982-
"effect": "broken link",
1983-
},
1984-
)
1973+
if node:
1974+
session.add( # Capture this in the downstream node's history
1975+
History(
1976+
entity_type=EntityType.LINK,
1977+
entity_name=name,
1978+
node=node.name,
1979+
activity_type=ActivityType.DELETE,
1980+
user=current_user.username if current_user else None,
1981+
),
1982+
)
1983+
try:
1984+
node_validator = await revalidate_node(
1985+
name=node.name,
1986+
session=session,
1987+
current_user=current_user,
1988+
# update=False,
1989+
)
1990+
impact.append(
1991+
{
1992+
"name": node.name,
1993+
"status": node_validator.status,
1994+
"effect": "broken link",
1995+
},
1996+
)
1997+
except DJNodeNotFound:
1998+
_logger.warning("Node not found %s", node.name)
19851999
session.add( # Capture this in the downstream node's history
19862000
History(
19872001
entity_type=EntityType.NODE,

datajunction-server/datajunction_server/internal/validation.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
"""Node validation functions."""
22
from dataclasses import dataclass, field
3-
from typing import Dict, List, Set, Union
3+
from typing import Dict, List, Optional, Set, Union
44

55
from sqlalchemy.exc import MissingGreenlet
66
from sqlalchemy.ext.asyncio import AsyncSession
77

88
from datajunction_server.api.helpers import find_bound_dimensions
9-
from datajunction_server.database import Column, Node, NodeRevision
9+
from datajunction_server.database import Column, ColumnAttribute, Node, NodeRevision
1010
from datajunction_server.errors import DJError, DJException, ErrorCode
1111
from datajunction_server.models.base import labelize
1212
from datajunction_server.models.node import NodeRevisionBase, NodeStatus
@@ -22,6 +22,7 @@ class NodeValidator: # pylint: disable=too-many-instance-attributes
2222
Node validation
2323
"""
2424

25+
query_ast: Optional[ast.Query] = None
2526
status: NodeStatus = NodeStatus.VALID
2627
columns: List[Column] = field(default_factory=list)
2728
required_dimensions: List[Column] = field(default_factory=list)
@@ -128,7 +129,12 @@ async def validate_node_data( # pylint: disable=too-many-locals,too-many-statem
128129
name=column_name,
129130
display_name=labelize(column_name),
130131
type=column_type,
131-
attributes=existing_column.attributes if existing_column else [],
132+
attributes=[
133+
ColumnAttribute(attribute_type=col_attr.attribute_type)
134+
for col_attr in existing_column.attributes
135+
]
136+
if existing_column
137+
else [],
132138
dimension=existing_column.dimension if existing_column else None,
133139
order=idx,
134140
)

0 commit comments

Comments
 (0)