Skip to content

Commit 093684e

Browse files
authored
feat: add USER_STAC_COLLECTION_ID_REGISTRY for custom collection IDs (#95)
* feat: add USER_STAC_COLLECTION_ID_REGISTRY for custom collection IDs * chore: add integration test * fix: widen the batch window for the StacLoader lambda
1 parent c21217c commit 093684e

File tree

11 files changed

+418
-87
lines changed

11 files changed

+418
-87
lines changed

.github/workflows/deploy.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ jobs:
4545
TITILER_PGSTAC_API_CUSTOM_DOMAIN_NAME: ${{ vars.TITILER_PGSTAC_API_CUSTOM_DOMAIN_NAME }}
4646
USER_STAC_ITEM_GEN_ROLE_ARN: ${{ vars.USER_STAC_ITEM_GEN_ROLE_ARN }}
4747
USER_STAC_INBOUND_TOPIC_ARNS: ${{ vars.USER_STAC_INBOUND_TOPIC_ARNS }}
48+
USER_STAC_COLLECTION_ID_REGISTRY: ${{ vars.USER_STAC_COLLECTION_ID_REGISTRY }}
4849
USER_STAC_STAC_API_CUSTOM_DOMAIN_NAME: ${{ vars.USER_STAC_STAC_API_CUSTOM_DOMAIN_NAME }}
4950
USER_STAC_TITILER_PGSTAC_API_CUSTOM_DOMAIN_NAME: ${{ vars.USER_STAC_TITILER_PGSTAC_API_CUSTOM_DOMAIN_NAME }}
5051
WEB_ACL_ARN: ${{ vars.WEB_ACL_ARN }}

cdk/PgStacInfra.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,7 @@ export class PgStacInfra extends Stack {
462462
subnetSelection: apiSubnetSelection,
463463
batchSize: 500,
464464
lambdaTimeoutSeconds: 300,
465+
maxBatchingWindowMinutes: 5,
465466
environment: {
466467
CREATE_COLLECTIONS_IF_MISSING: "TRUE",
467468
},
@@ -503,6 +504,8 @@ export class PgStacInfra extends Stack {
503504
itemLoadTopicArn: stacLoader.topic.topicArn,
504505
roleArn: dpsStacItemGenConfig.itemGenRoleArn,
505506
inboundTopicArns: dpsStacItemGenConfig.inboundTopicArns,
507+
userStacCollectionIdRegistry:
508+
dpsStacItemGenConfig.userStacCollectionIdRegistry,
506509
vpc,
507510
subnetSelection: apiSubnetSelection,
508511
stage,
@@ -662,6 +665,7 @@ export interface Props extends StackProps {
662665
dpsStacItemGenConfig?: {
663666
itemGenRoleArn: string;
664667
inboundTopicArns?: string[];
668+
userStacCollectionIdRegistry?: Record<string, string[]>;
665669
};
666670
addStactoolsItemGenerator?: boolean | undefined;
667671
}

cdk/app.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ const {
2727
tags,
2828
titilerDataAccessRoleArn,
2929
titilerPgStacApiCustomDomainName,
30+
userStacCollectionIdRegistry,
3031
userStacInboundTopicArns,
3132
userStacItemGenRoleArn,
3233
userStacStacApiCustomDomainName,
@@ -124,6 +125,7 @@ const userInfrastructure = new PgStacInfra(app, buildStackName("userSTAC"), {
124125
dpsStacItemGenConfig: {
125126
itemGenRoleArn: userStacItemGenRoleArn,
126127
inboundTopicArns: userStacInboundTopicArns,
128+
userStacCollectionIdRegistry,
127129
},
128130
}),
129131
terminationProtection: false,

cdk/config.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ export class Config {
2222
readonly webAclArn: string;
2323
readonly userStacItemGenRoleArn: string;
2424
readonly userStacInboundTopicArns: string[] | undefined;
25+
readonly userStacCollectionIdRegistry: Record<string, string[]> | undefined;
2526
readonly userStacStacApiCustomDomainName: string | undefined;
2627
readonly userStacTitilerPgStacApiCustomDomainName: string | undefined;
2728

@@ -133,6 +134,21 @@ export class Config {
133134
} else {
134135
this.userStacInboundTopicArns = undefined;
135136
}
137+
138+
if (process.env.USER_STAC_COLLECTION_ID_REGISTRY) {
139+
try {
140+
this.userStacCollectionIdRegistry = JSON.parse(
141+
process.env.USER_STAC_COLLECTION_ID_REGISTRY,
142+
) as Record<string, string[]>;
143+
} catch (error) {
144+
throw new Error(
145+
`Invalid JSON format for USER_STAC_COLLECTION_ID_REGISTRY: ${error}. ` +
146+
`Expected format: {"collection-id": ["user1", "user2"]}`
147+
);
148+
}
149+
} else {
150+
this.userStacCollectionIdRegistry = undefined;
151+
}
136152
}
137153

138154
/**

cdk/constructs/DpsStacItemGenerator/index.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,21 @@ export interface DpsStacItemGeneratorProps {
102102
readonly inboundTopicArns?: string[];
103103
readonly roleArn: string;
104104

105+
/**
106+
* Registry mapping collection ID patterns to lists of authorized usernames.
107+
*
108+
* When a DPS job's STAC items already carry a collection ID and the
109+
* submitting user is listed as authorized for that collection ID pattern,
110+
* the item's collection ID is preserved instead of being replaced with the
111+
* deterministic ID. Keys support glob wildcards (e.g. "maap-prefix-*").
112+
*
113+
* @example
114+
* { "my-collection": ["user1", "user2"], "maap-*": ["user3"] }
115+
*
116+
* @default {} (all items receive the deterministic collection ID)
117+
*/
118+
readonly userStacCollectionIdRegistry?: Record<string, string[]>;
119+
105120
/**
106121
* Deployment stage for naming resources and exports.
107122
*
@@ -187,6 +202,11 @@ export class DpsStacItemGenerator extends Construct {
187202
environment: {
188203
ITEM_LOAD_TOPIC_ARN: props.itemLoadTopicArn,
189204
LOG_LEVEL: "INFO",
205+
...(props.userStacCollectionIdRegistry && {
206+
USER_STAC_COLLECTION_ID_REGISTRY: JSON.stringify(
207+
props.userStacCollectionIdRegistry,
208+
),
209+
}),
190210
...props.environment,
191211
},
192212
});

cdk/constructs/DpsStacItemGenerator/runtime/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ dependencies = [
1616

1717
[dependency-groups]
1818
dev = [
19+
"boto3>=1.42.89",
1920
"httpx>=0.28.1",
2021
"pytest>=8.3.5",
2122
"pytest-mock>=3.14.0",

cdk/constructs/DpsStacItemGenerator/runtime/src/dps_stac_item_generator/handler.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,31 @@
3131
logger.addHandler(log_handler)
3232

3333

34+
def _load_collection_id_registry(raw: str) -> dict[str, list[str]]:
35+
"""Parse USER_STAC_COLLECTION_ID_REGISTRY JSON string into a registry dict.
36+
37+
Args:
38+
raw: JSON string mapping collection ID patterns to lists of authorized
39+
usernames. An empty JSON object ("{}") disables all overrides.
40+
41+
Returns:
42+
Parsed registry dict, or an empty dict if parsing fails.
43+
"""
44+
try:
45+
return json.loads(raw)
46+
except json.JSONDecodeError:
47+
logger.warning(
48+
"Failed to parse USER_STAC_COLLECTION_ID_REGISTRY, using empty map: %s",
49+
raw,
50+
)
51+
return {}
52+
53+
54+
COLLECTION_ID_REGISTRY: dict[str, list[str]] = _load_collection_id_registry(
55+
os.environ.get("USER_STAC_COLLECTION_ID_REGISTRY", "{}")
56+
)
57+
58+
3459
def get_topic_arn() -> str:
3560
item_load_topic_arn = os.environ.get("ITEM_LOAD_TOPIC_ARN")
3661
if not item_load_topic_arn:
@@ -119,7 +144,10 @@ def handler(
119144
logger.debug(f"[{message_id}] SNS Message content: {message_str}")
120145

121146
catalog_json_key = get_catalog_json_key(message_str)
122-
for stac_item in get_stac_items(catalog_json_key):
147+
for stac_item in get_stac_items(
148+
catalog_json_key,
149+
collection_id_registry=COLLECTION_ID_REGISTRY,
150+
):
123151
stac_item_json = stac_item.model_dump_json()
124152

125153
item_load_topic_arn = get_topic_arn()

cdk/constructs/DpsStacItemGenerator/runtime/src/dps_stac_item_generator/item.py

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import fnmatch
12
import json
23
import logging
34
import re
@@ -77,10 +78,49 @@ def load_met_json(bucket: str, job_output_prefix: str) -> Optional[Dict[str, str
7778
)
7879

7980

80-
def get_stac_items(catalog_json_key: str) -> Generator[Item, Any, Any]:
81+
def is_authorized(
82+
username: str,
83+
collection_id: str,
84+
registry: dict[str, list[str]],
85+
) -> bool:
86+
"""Return True if username is authorized to publish to collection_id.
87+
88+
Each key in registry is a collection ID pattern (exact string or glob
89+
wildcard using fnmatch syntax). A user is authorized when their username
90+
appears in the list for any pattern that matches collection_id.
91+
92+
Args:
93+
username: The DPS job submitter's username.
94+
collection_id: The collection ID the item declares.
95+
registry: Mapping of collection ID patterns to authorized usernames.
96+
97+
Returns:
98+
True if the username is authorized for the given collection ID.
8199
"""
82-
Yield STAC items out of a catalog.json
100+
for pattern, authorized_users in registry.items():
101+
if fnmatch.fnmatch(collection_id, pattern) and username in authorized_users:
102+
return True
103+
return False
104+
105+
106+
def get_stac_items(
107+
catalog_json_key: str,
108+
collection_id_registry: dict[str, list[str]] | None = None,
109+
) -> Generator[Item, Any, Any]:
110+
"""Yield STAC items out of a catalog.json.
111+
112+
If collection_id_registry is provided, items whose existing collection ID
113+
is authorized for the submitting user are published as-is. All other items
114+
receive a deterministic collection ID derived from DPS job metadata.
115+
116+
Args:
117+
catalog_json_key: S3 URI of the catalog.json file.
118+
collection_id_registry: Optional mapping of collection ID patterns to
119+
lists of authorized usernames. When omitted, all items receive the
120+
deterministic collection ID.
83121
"""
122+
registry = collection_id_registry or {}
123+
84124
job_output_prefix = get_dps_output_prefix(catalog_json_key)
85125
if not job_output_prefix:
86126
raise ValueError(
@@ -95,15 +135,25 @@ def get_stac_items(catalog_json_key: str) -> Generator[Item, Any, Any]:
95135
f"could not locate the .met.json file with the DPS job outputs in {job_output_prefix}"
96136
)
97137

98-
collection_id = slugify(
138+
deterministic_collection_id = slugify(
99139
COLLECTION_ID_FORMAT.format(**job_metadata), regex_pattern=r"[/\?#%& ]+"
100140
)
141+
username = job_metadata.get("username", "")
101142

102143
catalog = pystac.Catalog.from_file(catalog_json_key)
103144
catalog.make_all_asset_hrefs_absolute()
104145

105146
for item in catalog.get_all_items():
106147
item_dict = item.to_dict()
107-
item_dict["collection"] = collection_id
148+
item_collection_id = item_dict.get("collection")
149+
150+
if item_collection_id and is_authorized(username, item_collection_id, registry):
151+
logger.info(
152+
"Preserving user-specified collection %s for user %s",
153+
item_collection_id,
154+
username,
155+
)
156+
else:
157+
item_dict["collection"] = deterministic_collection_id
108158

109159
yield Item(**item_dict)

0 commit comments

Comments
 (0)