Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions internal/registry/api/handlers/v0/crud/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type PerKindHooks struct {
// PostDeletes run after a successful DELETE; see
// resource.Config.PostDelete. Mirrors PostUpserts above.
PostDeletes map[string]func(ctx context.Context, obj v1alpha1.Object) error
// CreateStager optionally intercepts validated creates before the
// row reaches production storage. Enterprise approval mode wires this.
CreateStager func(ctx context.Context, in resource.CreateStagerInput) (resource.CreateStagerResult, error)
}

// Register wires the namespace-scoped + cross-namespace list endpoints
Expand Down Expand Up @@ -83,6 +86,7 @@ func Register(
ListFilter: perKind.ListFilters[kind],
PostUpsert: perKind.PostUpserts[kind],
PostDelete: perKind.PostDeletes[kind],
CreateStager: perKind.CreateStager,
}, true
}

Expand Down
34 changes: 32 additions & 2 deletions internal/registry/api/router/v0.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/agentregistry-dev/agentregistry/pkg/importer"
"github.com/agentregistry-dev/agentregistry/pkg/registry/resource"
"github.com/agentregistry-dev/agentregistry/pkg/registry/v1alpha1store"
"github.com/agentregistry-dev/agentregistry/pkg/types"
"github.com/danielgtaylor/huma/v2"
)

Expand Down Expand Up @@ -76,8 +77,20 @@ type RouteOptions struct {
// short-circuits OCI).
RegistryValidator v1alpha1.RegistryValidatorFunc

// CreateStager optionally intercepts validated creates before the row
// reaches production storage. Enterprise approval mode wires this.
CreateStager func(ctx context.Context, in resource.CreateStagerInput) (resource.CreateStagerResult, error)

// ResolverWrapper optionally decorates the shared ResourceRef resolver
// before CRUD/apply handlers capture it.
ResolverWrapper func(v1alpha1.ResolverFunc) v1alpha1.ResolverFunc

// Optional callback for integration-owned route registration.
ExtraRoutes func(api huma.API, pathPrefix string)

// Optional callback for integration-owned routes that need the finalized
// v1alpha1 store/resolver/hook context.
ExtraResourceRoutes func(api huma.API, pathPrefix string, ctx types.ResourceRouteContext)
}

// RegisterRoutes registers all API routes under /v0. Required
Expand Down Expand Up @@ -107,14 +120,19 @@ func RegisterRoutes(
// v1alpha1 generic routes. Cross-kind dangling-ref detection uses
// a Store-backed resolver. Deployment reconciliation hooks plug in
// when the coordinator is supplied.
registerKindRoutes(
resourceCtx := registerKindRoutes(
api,
pathPrefix,
opts.Stores,
opts.DeploymentCoordinator,
opts.PerKindHooks,
opts.RegistryValidator,
opts.CreateStager,
opts.ResolverWrapper,
)
if opts.ExtraResourceRoutes != nil {
opts.ExtraResourceRoutes(api, pathPrefix, resourceCtx)
}

// POST /v0/import — runs decoded manifests through the enrichment
// pipeline (validate + scanners + findings-write) before Upsert.
Expand Down Expand Up @@ -146,8 +164,11 @@ func RegisterRoutes(
// When coord is non-nil, Deployment PUT/DELETE fire
// coord.Apply/coord.Remove after the row is persisted so the platform
// adapter converges runtime state synchronously with the API call.
func registerKindRoutes(api huma.API, basePrefix string, stores Stores, coord *deploymentsvc.Coordinator, perKind crud.PerKindHooks, registryValidator v1alpha1.RegistryValidatorFunc) {
func registerKindRoutes(api huma.API, basePrefix string, stores Stores, coord *deploymentsvc.Coordinator, perKind crud.PerKindHooks, registryValidator v1alpha1.RegistryValidatorFunc, createStager func(ctx context.Context, in resource.CreateStagerInput) (resource.CreateStagerResult, error), resolverWrapper func(v1alpha1.ResolverFunc) v1alpha1.ResolverFunc) types.ResourceRouteContext {
resolver := internaldb.NewResolver(stores)
if resolverWrapper != nil {
resolver = resolverWrapper(resolver)
}
if registryValidator == nil {
registryValidator = registries.Dispatcher
}
Expand Down Expand Up @@ -184,6 +205,7 @@ func registerKindRoutes(api huma.API, basePrefix string, stores Stores, coord *d

// Per-kind CRUD endpoints — one call per built-in kind, hidden
// inside crud.Register.
perKind.CreateStager = createStager
crud.Register(api, basePrefix, stores, resolver, registryValidator, perKind)

// Deployment-specific endpoints: logs stream (cancel is subsumed
Expand All @@ -209,5 +231,13 @@ func registerKindRoutes(api huma.API, basePrefix string, stores Stores, coord *d
Authorizers: perKind.Authorizers,
PostUpserts: perKind.PostUpserts,
PostDeletes: perKind.PostDeletes,
CreateStager: createStager,
})
return types.ResourceRouteContext{
Stores: stores,
Resolver: resolver,
RegistryValidator: registryValidator,
PostUpserts: perKind.PostUpserts,
PostDeletes: perKind.PostDeletes,
}
}
13 changes: 8 additions & 5 deletions internal/registry/registry_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,14 @@ func buildRouteOptions(
adapters map[string]types.DeploymentAdapter,
) *router.RouteOptions {
routeOpts := &router.RouteOptions{
ExtraRoutes: options.ExtraRoutes,
Stores: stores,
Importer: importer,
PerKindHooks: crudPerKindHooks(options),
RegistryValidator: options.RegistryValidator,
ExtraRoutes: options.ExtraRoutes,
Stores: stores,
Importer: importer,
PerKindHooks: crudPerKindHooks(options),
RegistryValidator: options.RegistryValidator,
CreateStager: options.CreateStager,
ResolverWrapper: options.ResolverWrapper,
ExtraResourceRoutes: options.ExtraResourceRoutes,
}

if stores != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/api/v0/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ type ApplyResult struct {
Namespace string `json:"namespace,omitempty"`
Name string `json:"name"`
Version string `json:"version,omitempty"`
// Status is one of: created, configured, unchanged, deleted,
// dry-run, failed. Matches kubectl-style apply output.
// Status is one of: created, configured, unchanged, staged,
// deleted, dry-run, failed. Matches kubectl-style apply output.
Status string `json:"status"`
// Generation is the server-managed generation after the apply.
// Populated for internal callers that need the reconciler-
Expand All @@ -26,6 +26,7 @@ const (
ApplyStatusCreated = "created"
ApplyStatusConfigured = "configured"
ApplyStatusUnchanged = "unchanged"
ApplyStatusStaged = "staged"
ApplyStatusDeleted = "deleted"
ApplyStatusDryRun = "dry-run"
ApplyStatusFailed = "failed"
Expand Down
8 changes: 8 additions & 0 deletions pkg/registry/resource/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ type ApplyConfig struct {
// soft-deletes the row but never tears down the platform adapter
// state.
PostDeletes map[string]func(ctx context.Context, obj v1alpha1.Object) error

// CreateStager optionally intercepts validated create attempts before
// production Upsert. Enterprise builds use this to stage non-admin
// creates for approval while leaving OSS behavior unchanged.
CreateStager func(ctx context.Context, in CreateStagerInput) (CreateStagerResult, error)
}

// applyInput receives a raw multi-doc YAML stream. RawBody keeps bytes
Expand Down Expand Up @@ -165,6 +170,7 @@ func applyOne(ctx context.Context, cfg ApplyConfig, obj v1alpha1.Object, dryRun
Resolver: cfg.Resolver,
RegistryValidator: cfg.RegistryValidator,
PostUpsert: cfg.PostUpserts[obj.GetKind()],
CreateStager: cfg.CreateStager,
}, dryRun)
if ae != nil {
return failResult(res, ae)
Expand All @@ -175,6 +181,8 @@ func applyOne(ctx context.Context, cfg ApplyConfig, obj v1alpha1.Object, dryRun
return res
}
switch {
case up.Staged:
res.Status = arv0.ApplyStatusStaged
case up.Created:
res.Status = arv0.ApplyStatusCreated
case up.SpecChanged:
Expand Down
42 changes: 42 additions & 0 deletions pkg/registry/resource/apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,45 @@ spec:
_, err := mcps.Get(t.Context(), "default", "should-be-denied", "v1")
require.Error(t, err, "fail-closed must short-circuit before Upsert")
}

func TestRegisterApply_CreateStagerShortCircuitsProductionUpsert(t *testing.T) {
pool := v1alpha1store.NewTestPool(t)
agents := v1alpha1store.NewStore(pool, "v1alpha1.agents")
var staged resource.CreateStagerInput

_, api := humatest.New(t)
resource.RegisterApply(api, resource.ApplyConfig{
BasePrefix: "/v0",
Stores: map[string]*v1alpha1store.Store{
v1alpha1.KindAgent: agents,
},
CreateStager: func(_ context.Context, in resource.CreateStagerInput) (resource.CreateStagerResult, error) {
staged = in
return resource.CreateStagerResult{Staged: true}, nil
},
})

yaml := []byte(`apiVersion: ar.dev/v1alpha1
kind: Agent
metadata:
namespace: default
name: pending
version: v1
spec:
title: Pending
`)
resp := api.Post("/v0/apply", "Content-Type: application/yaml", strings.NewReader(string(yaml)))
require.Equal(t, http.StatusOK, resp.Code, resp.Body.String())

var out struct {
Results []arv0.ApplyResult `json:"results"`
}
require.NoError(t, json.Unmarshal(resp.Body.Bytes(), &out))
require.Len(t, out.Results, 1)
require.Equal(t, arv0.ApplyStatusStaged, out.Results[0].Status)
require.Equal(t, "pending", staged.Name)
require.Equal(t, v1alpha1.KindAgent, staged.Kind)

_, err := agents.Get(t.Context(), "default", "pending", "v1")
require.Error(t, err, "staged apply must not write the production table")
}
41 changes: 41 additions & 0 deletions pkg/registry/resource/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,38 @@ type applyOpts struct {
Resolver v1alpha1.ResolverFunc
RegistryValidator v1alpha1.RegistryValidatorFunc
PostUpsert func(ctx context.Context, obj v1alpha1.Object) error
CreateStager func(ctx context.Context, in CreateStagerInput) (CreateStagerResult, error)
}

// upsertResult is the outcome of a successful applyCore call.
type upsertResult struct {
Created bool
Staged bool
SpecChanged bool
Generation int64
}

// CreateStagerInput is handed to an optional enterprise create-approval
// hook after auth/validation/ref checks and before production Upsert.
// The hook may inspect the production Store to decide whether this apply
// is a create and, if policy requires it, persist the object somewhere
// outside the production v1alpha1 table.
type CreateStagerInput struct {
Kind string
Namespace string
Name string
Version string
Object v1alpha1.Object
Store *v1alpha1store.Store
}

// CreateStagerResult reports whether the hook handled the apply by
// staging it. When Staged is true, applyCore short-circuits before the
// production Upsert and does not run PostUpsert.
type CreateStagerResult struct {
Staged bool
}

// applyStage tags which step of the pipeline produced an error so
// callers can shape their error response (huma 4xx vs ApplyResult.Error)
// without re-classifying the underlying err.
Expand All @@ -38,6 +61,7 @@ const (
stageValidation applyStage = "validation"
stageRefs applyStage = "refs"
stageRegistries applyStage = "registries"
stageApproval applyStage = "approval"
stageMarshal applyStage = "marshal"
stageUpsert applyStage = "upsert"
stagePostUpsert applyStage = "post-upsert"
Expand Down Expand Up @@ -121,6 +145,23 @@ func applyCore(
return upsertResult{}, nil
}

if opts.CreateStager != nil {
staged, err := opts.CreateStager(ctx, CreateStagerInput{
Kind: kind,
Namespace: meta.Namespace,
Name: meta.Name,
Version: meta.Version,
Object: obj,
Store: store,
})
if err != nil {
return upsertResult{}, &applyError{Stage: stageApproval, Err: err}
}
if staged.Staged {
return upsertResult{Staged: true}, nil
}
}

specJSON, err := obj.MarshalSpec()
if err != nil {
return upsertResult{}, &applyError{Stage: stageMarshal, Err: err}
Expand Down
16 changes: 14 additions & 2 deletions pkg/registry/resource/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ type Config struct {
// and writes the terminal Removed condition.
PostDelete func(ctx context.Context, obj v1alpha1.Object) error

// CreateStager optionally intercepts validated create attempts before
// production Upsert. Enterprise builds use this for approval staging.
// nil preserves the normal OSS direct-write behavior.
CreateStager func(ctx context.Context, in CreateStagerInput) (CreateStagerResult, error)

// Authorize is optional; when set, every read and write handler
// (get / list / apply / delete) invokes it as an access gate before
// touching the store. Return nil to allow; return a huma error
Expand Down Expand Up @@ -395,14 +400,19 @@ func Register[T v1alpha1.Object](api huma.API, cfg Config, newObj func() T) {
meta.Version = version
body.SetMetadata(*meta)

if _, ae := applyCore(ctx, cfg.Store, body, applyOpts{
up, ae := applyCore(ctx, cfg.Store, body, applyOpts{
Authorize: cfg.Authorize,
Resolver: cfg.Resolver,
RegistryValidator: cfg.RegistryValidator,
PostUpsert: cfg.PostUpsert,
}, false); ae != nil {
CreateStager: cfg.CreateStager,
}, false)
if ae != nil {
return nil, mapApplyErrorToHuma(ae, kind, ns, name, version)
}
if up.Staged {
return &bodyOutput[T]{Body: body}, nil
}

// Read back so the response reflects the stored identity (assigned
// generation, default'd metadata) plus any status / annotation
Expand Down Expand Up @@ -485,6 +495,8 @@ func mapApplyErrorToHuma(ae *applyError, kind, ns, name, version string) error {
return huma.Error400BadRequest("refs: " + ae.Err.Error())
case stageRegistries:
return huma.Error400BadRequest("registries: " + ae.Err.Error())
case stageApproval:
return ae.Err
case stageMarshal:
return huma.Error400BadRequest("marshal spec: " + ae.Err.Error())
case stageUpsert:
Expand Down
41 changes: 41 additions & 0 deletions pkg/registry/resource/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ func registerAgent(api huma.API, store *v1alpha1store.Store) {
}, func() *v1alpha1.Agent { return &v1alpha1.Agent{} })
}

func registerAgentWithConfig(api huma.API, cfg resource.Config) {
cfg.Kind = v1alpha1.KindAgent
cfg.BasePrefix = "/v0"
resource.Register[*v1alpha1.Agent](api, cfg, func() *v1alpha1.Agent { return &v1alpha1.Agent{} })
}

// newTestPool is defined in database/store_v1alpha1_testutil.go. Each test
// gets its own isolated DB.
func TestResourceRegister_AgentCRUD(t *testing.T) {
Expand Down Expand Up @@ -158,6 +164,41 @@ func TestResourceRegister_AgentCRUD(t *testing.T) {
require.Empty(t, list.Items)
}

func TestResourceRegister_CreateStagerShortCircuitsProductionUpsert(t *testing.T) {
pool := v1alpha1store.NewTestPool(t)
store := v1alpha1store.NewStore(pool, "v1alpha1.agents")
var staged resource.CreateStagerInput

_, api := humatest.New(t)
registerAgentWithConfig(api, resource.Config{
Store: store,
CreateStager: func(_ context.Context, in resource.CreateStagerInput) (resource.CreateStagerResult, error) {
staged = in
return resource.CreateStagerResult{Staged: true}, nil
},
})

body := v1alpha1.Agent{
TypeMeta: v1alpha1.TypeMeta{APIVersion: v1alpha1.GroupVersion, Kind: v1alpha1.KindAgent},
Metadata: v1alpha1.ObjectMeta{
Namespace: "default",
Name: "pending",
Version: "v1",
},
Spec: v1alpha1.AgentSpec{Title: "Pending"},
}
resp := api.Put("/v0/agents/pending/v1", body)
require.Equal(t, http.StatusOK, resp.Code, resp.Body.String())

var got v1alpha1.Agent
require.NoError(t, json.Unmarshal(resp.Body.Bytes(), &got))
require.Equal(t, "pending", got.Metadata.Name)
require.Equal(t, "pending", staged.Name)

_, err := store.Get(t.Context(), "default", "pending", "v1")
require.Error(t, err, "staged PUT must not write the production table")
}

func TestResourceRegister_AgentNamespaceIsolation(t *testing.T) {
pool := v1alpha1store.NewTestPool(t)
store := v1alpha1store.NewStore(pool, "v1alpha1.agents")
Expand Down
Loading
Loading