Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ jobs:
TITILER_PGSTAC_API_CUSTOM_DOMAIN_NAME: ${{ vars.TITILER_PGSTAC_API_CUSTOM_DOMAIN_NAME }}
USER_STAC_ITEM_GEN_ROLE_ARN: ${{ vars.USER_STAC_ITEM_GEN_ROLE_ARN }}
USER_STAC_INBOUND_TOPIC_ARNS: ${{ vars.USER_STAC_INBOUND_TOPIC_ARNS }}
USER_STAC_COLLECTION_ID_REGISTRY: ${{ vars.USER_STAC_COLLECTION_ID_REGISTRY }}
USER_STAC_STAC_API_CUSTOM_DOMAIN_NAME: ${{ vars.USER_STAC_STAC_API_CUSTOM_DOMAIN_NAME }}
USER_STAC_TITILER_PGSTAC_API_CUSTOM_DOMAIN_NAME: ${{ vars.USER_STAC_TITILER_PGSTAC_API_CUSTOM_DOMAIN_NAME }}
WEB_ACL_ARN: ${{ vars.WEB_ACL_ARN }}
Expand Down
4 changes: 4 additions & 0 deletions cdk/PgStacInfra.ts
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ export class PgStacInfra extends Stack {
subnetSelection: apiSubnetSelection,
batchSize: 500,
lambdaTimeoutSeconds: 300,
maxBatchingWindowMinutes: 5,
environment: {
CREATE_COLLECTIONS_IF_MISSING: "TRUE",
},
Expand Down Expand Up @@ -503,6 +504,8 @@ export class PgStacInfra extends Stack {
itemLoadTopicArn: stacLoader.topic.topicArn,
roleArn: dpsStacItemGenConfig.itemGenRoleArn,
inboundTopicArns: dpsStacItemGenConfig.inboundTopicArns,
userStacCollectionIdRegistry:
dpsStacItemGenConfig.userStacCollectionIdRegistry,
vpc,
subnetSelection: apiSubnetSelection,
stage,
Expand Down Expand Up @@ -662,6 +665,7 @@ export interface Props extends StackProps {
dpsStacItemGenConfig?: {
itemGenRoleArn: string;
inboundTopicArns?: string[];
userStacCollectionIdRegistry?: Record<string, string[]>;
};
addStactoolsItemGenerator?: boolean | undefined;
}
2 changes: 2 additions & 0 deletions cdk/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const {
tags,
titilerDataAccessRoleArn,
titilerPgStacApiCustomDomainName,
userStacCollectionIdRegistry,
userStacInboundTopicArns,
userStacItemGenRoleArn,
userStacStacApiCustomDomainName,
Expand Down Expand Up @@ -124,6 +125,7 @@ const userInfrastructure = new PgStacInfra(app, buildStackName("userSTAC"), {
dpsStacItemGenConfig: {
itemGenRoleArn: userStacItemGenRoleArn,
inboundTopicArns: userStacInboundTopicArns,
userStacCollectionIdRegistry,
},
}),
terminationProtection: false,
Expand Down
16 changes: 16 additions & 0 deletions cdk/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export class Config {
readonly webAclArn: string;
readonly userStacItemGenRoleArn: string;
readonly userStacInboundTopicArns: string[] | undefined;
readonly userStacCollectionIdRegistry: Record<string, string[]> | undefined;
readonly userStacStacApiCustomDomainName: string | undefined;
readonly userStacTitilerPgStacApiCustomDomainName: string | undefined;

Expand Down Expand Up @@ -133,6 +134,21 @@ export class Config {
} else {
this.userStacInboundTopicArns = undefined;
}

if (process.env.USER_STAC_COLLECTION_ID_REGISTRY) {
try {
this.userStacCollectionIdRegistry = JSON.parse(
process.env.USER_STAC_COLLECTION_ID_REGISTRY,
) as Record<string, string[]>;
} catch (error) {
throw new Error(
`Invalid JSON format for USER_STAC_COLLECTION_ID_REGISTRY: ${error}. ` +
`Expected format: {"collection-id": ["user1", "user2"]}`
);
}
} else {
this.userStacCollectionIdRegistry = undefined;
}
}

/**
Expand Down
20 changes: 20 additions & 0 deletions cdk/constructs/DpsStacItemGenerator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,21 @@ export interface DpsStacItemGeneratorProps {
readonly inboundTopicArns?: string[];
readonly roleArn: string;

/**
* Registry mapping collection ID patterns to lists of authorized usernames.
*
* When a DPS job's STAC items already carry a collection ID and the
* submitting user is listed as authorized for that collection ID pattern,
* the item's collection ID is preserved instead of being replaced with the
* deterministic ID. Keys support glob wildcards (e.g. "maap-prefix-*").
*
* @example
* { "my-collection": ["user1", "user2"], "maap-*": ["user3"] }
*
* @default {} (all items receive the deterministic collection ID)
*/
readonly userStacCollectionIdRegistry?: Record<string, string[]>;

/**
* Deployment stage for naming resources and exports.
*
Expand Down Expand Up @@ -187,6 +202,11 @@ export class DpsStacItemGenerator extends Construct {
environment: {
ITEM_LOAD_TOPIC_ARN: props.itemLoadTopicArn,
LOG_LEVEL: "INFO",
...(props.userStacCollectionIdRegistry && {
USER_STAC_COLLECTION_ID_REGISTRY: JSON.stringify(
props.userStacCollectionIdRegistry,
),
}),
...props.environment,
},
});
Expand Down
1 change: 1 addition & 0 deletions cdk/constructs/DpsStacItemGenerator/runtime/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies = [

[dependency-groups]
dev = [
"boto3>=1.42.89",
"httpx>=0.28.1",
"pytest>=8.3.5",
"pytest-mock>=3.14.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,31 @@
logger.addHandler(log_handler)


def _load_collection_id_registry(raw: str) -> dict[str, list[str]]:
"""Parse USER_STAC_COLLECTION_ID_REGISTRY JSON string into a registry dict.

Args:
raw: JSON string mapping collection ID patterns to lists of authorized
usernames. An empty JSON object ("{}") disables all overrides.

Returns:
Parsed registry dict, or an empty dict if parsing fails.
"""
try:
return json.loads(raw)
except json.JSONDecodeError:
logger.warning(
"Failed to parse USER_STAC_COLLECTION_ID_REGISTRY, using empty map: %s",
raw,
)
return {}


COLLECTION_ID_REGISTRY: dict[str, list[str]] = _load_collection_id_registry(
os.environ.get("USER_STAC_COLLECTION_ID_REGISTRY", "{}")
)


def get_topic_arn() -> str:
item_load_topic_arn = os.environ.get("ITEM_LOAD_TOPIC_ARN")
if not item_load_topic_arn:
Expand Down Expand Up @@ -119,7 +144,10 @@ def handler(
logger.debug(f"[{message_id}] SNS Message content: {message_str}")

catalog_json_key = get_catalog_json_key(message_str)
for stac_item in get_stac_items(catalog_json_key):
for stac_item in get_stac_items(
catalog_json_key,
collection_id_registry=COLLECTION_ID_REGISTRY,
):
stac_item_json = stac_item.model_dump_json()

item_load_topic_arn = get_topic_arn()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import fnmatch
import json
import logging
import re
Expand Down Expand Up @@ -77,10 +78,49 @@ def load_met_json(bucket: str, job_output_prefix: str) -> Optional[Dict[str, str
)


def get_stac_items(catalog_json_key: str) -> Generator[Item, Any, Any]:
def is_authorized(
username: str,
collection_id: str,
registry: dict[str, list[str]],
) -> bool:
"""Return True if username is authorized to publish to collection_id.

Each key in registry is a collection ID pattern (exact string or glob
wildcard using fnmatch syntax). A user is authorized when their username
appears in the list for any pattern that matches collection_id.

Args:
username: The DPS job submitter's username.
collection_id: The collection ID the item declares.
registry: Mapping of collection ID patterns to authorized usernames.

Returns:
True if the username is authorized for the given collection ID.
"""
Yield STAC items out of a catalog.json
for pattern, authorized_users in registry.items():
if fnmatch.fnmatch(collection_id, pattern) and username in authorized_users:
return True
return False


def get_stac_items(
catalog_json_key: str,
collection_id_registry: dict[str, list[str]] | None = None,
) -> Generator[Item, Any, Any]:
"""Yield STAC items out of a catalog.json.

If collection_id_registry is provided, items whose existing collection ID
is authorized for the submitting user are published as-is. All other items
receive a deterministic collection ID derived from DPS job metadata.

Args:
catalog_json_key: S3 URI of the catalog.json file.
collection_id_registry: Optional mapping of collection ID patterns to
lists of authorized usernames. When omitted, all items receive the
deterministic collection ID.
"""
registry = collection_id_registry or {}

job_output_prefix = get_dps_output_prefix(catalog_json_key)
if not job_output_prefix:
raise ValueError(
Expand All @@ -95,15 +135,25 @@ def get_stac_items(catalog_json_key: str) -> Generator[Item, Any, Any]:
f"could not locate the .met.json file with the DPS job outputs in {job_output_prefix}"
)

collection_id = slugify(
deterministic_collection_id = slugify(
COLLECTION_ID_FORMAT.format(**job_metadata), regex_pattern=r"[/\?#%& ]+"
)
username = job_metadata.get("username", "")

catalog = pystac.Catalog.from_file(catalog_json_key)
catalog.make_all_asset_hrefs_absolute()

for item in catalog.get_all_items():
item_dict = item.to_dict()
item_dict["collection"] = collection_id
item_collection_id = item_dict.get("collection")

if item_collection_id and is_authorized(username, item_collection_id, registry):
logger.info(
"Preserving user-specified collection %s for user %s",
item_collection_id,
username,
)
else:
item_dict["collection"] = deterministic_collection_id

yield Item(**item_dict)
Loading