Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
import dataclasses
import json
import logging
from typing import TYPE_CHECKING, Any, Callable, Mapping, MutableMapping, Sequence, cast
from typing import TYPE_CHECKING, Any, Callable, Mapping, MutableMapping, Sequence, Union, cast
from urllib.parse import parse_qs

from pydantic import BaseModel
from typing_extensions import get_args, get_origin

from aws_lambda_powertools.event_handler.middlewares import BaseMiddlewareHandler
from aws_lambda_powertools.event_handler.openapi.compat import (
Expand All @@ -25,6 +26,7 @@
ResponseValidationError,
)
from aws_lambda_powertools.event_handler.openapi.params import Param
from aws_lambda_powertools.event_handler.openapi.types import UnionType

if TYPE_CHECKING:
from pydantic.fields import FieldInfo
Expand Down Expand Up @@ -431,9 +433,41 @@ def _handle_missing_field_value(
values[field.name] = field.get_default()


def _is_or_contains_sequence(annotation: Any) -> bool:
"""
Check if annotation is a sequence or Union/RootModel containing a sequence.

This function handles complex type annotations like:
- List[Model] - direct sequence
- Union[Model, List[Model]] - checks if any Union member is a sequence
- Optional[List[Model]] - Union[List[Model], None]
- RootModel[List[Model]] - checks if the RootModel wraps a sequence
- Optional[RootModel[List[Model]]] - Union member that is a RootModel
- RootModel[Union[Model, List[Model]]] - RootModel wrapping a Union with a sequence
"""
# Direct sequence check
if field_annotation_is_sequence(annotation):
return True

# Check Union members — recurse so we catch RootModel inside Union
origin = get_origin(annotation)
if origin is Union or origin is UnionType:
for arg in get_args(annotation):
if _is_or_contains_sequence(arg):
return True

# Check if it's a RootModel wrapping a sequence (or Union containing a sequence)
if lenient_issubclass(annotation, BaseModel) and getattr(annotation, "__pydantic_root_model__", False):
if hasattr(annotation, "model_fields") and "root" in annotation.model_fields:
root_annotation = annotation.model_fields["root"].annotation
return _is_or_contains_sequence(root_annotation)

return False


def _normalize_field_value(value: Any, field_info: FieldInfo) -> Any:
"""Normalize field value, converting lists to single values for non-sequence fields."""
if field_annotation_is_sequence(field_info.annotation):
if _is_or_contains_sequence(field_info.annotation):
return value
elif isinstance(value, list) and value:
return value[0]
Expand Down
Loading
Loading