Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions components/profile-controller/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Build the manager binary
FROM golang:1.22 as builder
FROM golang:1.23 as builder
Comment thread
kimwnasptd marked this conversation as resolved.

WORKDIR /workspace
# Copy the Go Modules manifests
Expand Down Expand Up @@ -34,7 +34,6 @@ WORKDIR /
COPY --from=builder /workspace/dash /bin/dash
COPY third_party third_party
COPY --from=builder /workspace/manager .
COPY --from=builder /go/pkg/mod/github.com/hashicorp third_party/library/

USER 65532:65532

Expand Down
242 changes: 229 additions & 13 deletions components/profile-controller/controllers/profile_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"gopkg.in/fsnotify.v1"
"gopkg.in/yaml.v2"
istioSecurity "istio.io/api/security/v1beta1"
istioApi "istio.io/api/type/v1beta1"
istioSecurityClient "istio.io/client-go/pkg/apis/security/v1beta1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
Expand All @@ -46,6 +47,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
)

const AUTHZPOLICYISTIO = "ns-owner-access-istio"
Expand Down Expand Up @@ -91,6 +93,10 @@ type ProfileReconciler struct {
UserIdPrefix string
WorkloadIdentity string
DefaultNamespaceLabelsPath string
ServiceMeshMode string
WaypointName string
WaypointNamespace string
CreateWaypoint bool
}

// +kubebuilder:rbac:groups=core,resources=namespaces,verbs="*"
Comment thread
kimwnasptd marked this conversation as resolved.
Expand Down Expand Up @@ -127,14 +133,13 @@ func (r *ProfileReconciler) Reconcile(ctx context.Context, request ctrl.Request)
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{"owner": instance.Spec.Owner.Name},
// inject istio sidecar to all pods in target namespace by default.
Labels: map[string]string{
istioInjectionLabel: "enabled",
},
Name: instance.Name,
Labels: map[string]string{},
Name: instance.Name,
},
}
setNamespaceLabels(ns, defaultKubeflowNamespaceLabels)

// Set namespace labels and service mesh labels in one call
r.setNamespaceLabelsAndServiceMesh(ns, instance, defaultKubeflowNamespaceLabels)
logger.Info("List of labels to be added to namespace", "labels", ns.Labels)
if err := controllerutil.SetControllerReference(instance, ns, r.Scheme); err != nil {
IncRequestErrorCounter("error setting ControllerReference", SEVERITY_MAJOR)
Expand Down Expand Up @@ -178,8 +183,10 @@ func (r *ProfileReconciler) Reconcile(ctx context.Context, request ctrl.Request)
for k, v := range foundNs.Labels {
oldLabels[k] = v
}
setNamespaceLabels(foundNs, defaultKubeflowNamespaceLabels)
logger.Info("List of labels to be added to found namespace", "labels", ns.Labels)

// Apply namespace labels and service mesh mode labels to existing namespace
r.setNamespaceLabelsAndServiceMesh(foundNs, instance, defaultKubeflowNamespaceLabels)
logger.Info("List of labels to be added to found namespace", "labels", foundNs.Labels)
if !reflect.DeepEqual(oldLabels, foundNs.Labels) {
err = r.Update(ctx, foundNs)
if err != nil {
Expand All @@ -205,6 +212,23 @@ func (r *ProfileReconciler) Reconcile(ctx context.Context, request ctrl.Request)
return reconcile.Result{}, err
}

// Create waypoint and L4 AuthorizationPolicy in ambient mode
if r.ServiceMeshMode == "istio-ambient" {
if r.CreateWaypoint {
if err = r.createWaypoint(instance); err != nil {
logger.Error(err, "error creating waypoint", "namespace", instance.Name)
IncRequestErrorCounter("error creating waypoint", SEVERITY_MAJOR)
return reconcile.Result{}, err
}
}

if err = r.updateL4AuthorizationPolicy(instance); err != nil {
logger.Error(err, "error updating L4 AuthorizationPolicy", "namespace", instance.Name)
IncRequestErrorCounter("error updating L4 AuthorizationPolicy", SEVERITY_MAJOR)
return reconcile.Result{}, err
}
}

// Update service accounts
// Create service account "default-editor" in target namespace.
// "default-editor" would have kubeflowEdit permission: edit all resources in target namespace except rbac.
Expand Down Expand Up @@ -409,14 +433,18 @@ func (r *ProfileReconciler) SetupWithManager(mgr ctrl.Manager) error {
handler.EnqueueRequestsFromMapFunc(r.mapEventToRequest),
)

if r.ServiceMeshMode == "istio-ambient" {
c.Owns(&gatewayv1beta1.Gateway{})
}

err = c.Complete(r)
if err != nil {
return err
}
return nil
}
Comment thread
kimwnasptd marked this conversation as resolved.

func (r *ProfileReconciler) getAuthorizationPolicy(profileIns *profilev1.Profile) istioSecurity.AuthorizationPolicy {
func (r *ProfileReconciler) getAuthorizationPolicy(profileIns *profilev1.Profile) *istioSecurity.AuthorizationPolicy {
nbControllerPrincipal := GetEnvDefault(
"NOTEBOOK_CONTROLLER_PRINCIPAL",
"cluster.local/ns/kubeflow/sa/notebook-controller-service-account")
Expand All @@ -429,7 +457,11 @@ func (r *ProfileReconciler) getAuthorizationPolicy(profileIns *profilev1.Profile
"KFP_UI_PRINCIPAL",
"cluster.local/ns/kubeflow/sa/ml-pipeline-ui")

return istioSecurity.AuthorizationPolicy{
katibControllerPrincipal := GetEnvDefault(
"KATIB_CONTROLLER_PRINCIPAL",
"cluster.local/ns/kubeflow/sa/katib-controller")

policy := &istioSecurity.AuthorizationPolicy{
Action: istioSecurity.AuthorizationPolicy_ALLOW,
// Empty selector == match all workloads in namespace
Selector: nil,
Expand Down Expand Up @@ -480,6 +512,16 @@ func (r *ProfileReconciler) getAuthorizationPolicy(profileIns *profilev1.Profile
},
},
},
{
// allow katib-controller to talk to suggestion server
From: []*istioSecurity.Rule_From{{
Source: &istioSecurity.Source{
Principals: []string{
katibControllerPrincipal,
},
},
}},
},
Comment thread
kimwnasptd marked this conversation as resolved.
{
// allow the notebook-controller in the kubeflow namespace to
// access the api/kernels endpoint of the notebook servers.
Expand All @@ -501,6 +543,20 @@ func (r *ProfileReconciler) getAuthorizationPolicy(profileIns *profilev1.Profile
},
},
}

if r.ServiceMeshMode == "istio-ambient" {
targetRefs := []*istioApi.PolicyTargetReference{
{
Kind: "Gateway",
Group: "gateway.networking.k8s.io",
Name: r.WaypointName,
},
}

policy.TargetRefs = targetRefs
Comment thread
kimwnasptd marked this conversation as resolved.
}
Comment thread
kimwnasptd marked this conversation as resolved.

return policy
}

// updateIstioAuthorizationPolicy create or update Istio AuthorizationPolicy
Expand All @@ -515,7 +571,7 @@ func (r *ProfileReconciler) updateIstioAuthorizationPolicy(profileIns *profilev1
Name: AUTHZPOLICYISTIO,
Namespace: profileIns.Name,
},
Spec: r.getAuthorizationPolicy(profileIns),
Spec: *r.getAuthorizationPolicy(profileIns),
}

if err := controllerutil.SetControllerReference(profileIns, istioAuth, r.Scheme); err != nil {
Expand Down Expand Up @@ -751,12 +807,14 @@ func removeString(slice []string, s string) (result []string) {
return
}

func setNamespaceLabels(ns *corev1.Namespace, newLabels map[string]string) {
// setServiceMeshLabels sets the appropriate service mesh labels based on the mode
Comment thread
kimwnasptd marked this conversation as resolved.
func (r *ProfileReconciler) setNamespaceLabelsAndServiceMesh(ns *corev1.Namespace, profileIns *profilev1.Profile, defaultLabels map[string]string) {
if ns.Labels == nil {
ns.Labels = make(map[string]string)
}

for k, v := range newLabels {
// Apply default Kubeflow namespace labels first
for k, v := range defaultLabels {
_, ok := ns.Labels[k]
if len(v) == 0 {
// When there is an empty value, k should be removed.
Expand All @@ -770,6 +828,29 @@ func setNamespaceLabels(ns *corev1.Namespace, newLabels map[string]string) {
}
}
}

// Apply service mesh specific labels
if r.ServiceMeshMode == "istio-ambient" {
// In ambient mode, disable sidecar injection but enable ambient mesh
ns.Labels[istioInjectionLabel] = "disabled"
ns.Labels["istio.io/dataplane-mode"] = "ambient"
// Add waypoint labels for ambient mode
waypointNamespace := r.WaypointNamespace
if waypointNamespace == "" {
waypointNamespace = profileIns.Name
}
ns.Labels["istio.io/use-waypoint"] = r.WaypointName
ns.Labels["istio.io/use-waypoint-namespace"] = waypointNamespace
ns.Labels["istio.io/ingress-use-waypoint"] = "true"
} else {
// In sidecar mode (default), inject istio sidecar to all pods in target namespace
ns.Labels[istioInjectionLabel] = "enabled"
// Remove ambient mode labels if they exist
delete(ns.Labels, "istio.io/dataplane-mode")
delete(ns.Labels, "istio.io/use-waypoint")
delete(ns.Labels, "istio.io/use-waypoint-namespace")
delete(ns.Labels, "istio.io/ingress-use-waypoint")
}
}

func (r *ProfileReconciler) readDefaultLabelsFromFile(path string) map[string]string {
Comment thread
kimwnasptd marked this conversation as resolved.
Expand All @@ -789,6 +870,141 @@ func (r *ProfileReconciler) readDefaultLabelsFromFile(path string) map[string]st
return labels
}

// createWaypoint creates a waypoint proxy in the profile namespace for ambient mode
func (r *ProfileReconciler) createWaypoint(profileIns *profilev1.Profile) error {
logger := r.Log.WithValues("profile", profileIns.Name)

waypointNamespace := r.WaypointNamespace
if waypointNamespace == "" {
waypointNamespace = profileIns.Name
}

// Create waypoint using Gateway API with waypoint gateway class
// This creates an Istio waypoint proxy that handles L7 policies in ambient mode
gatewayClassName := "istio-waypoint"
Comment thread
kimwnasptd marked this conversation as resolved.

waypoint := &gatewayv1beta1.Gateway{
ObjectMeta: metav1.ObjectMeta{
Name: r.WaypointName,
Namespace: waypointNamespace,
Labels: map[string]string{
"gateway.istio.io/managed": "Istio",
},
},
Spec: gatewayv1beta1.GatewaySpec{
GatewayClassName: gatewayv1beta1.ObjectName(gatewayClassName),
Listeners: []gatewayv1beta1.Listener{
{
Name: "mesh",
Port: 15008,
Protocol: "HBONE",
},
},
Comment thread
kimwnasptd marked this conversation as resolved.
},
}

// Only set controller reference when the waypoint is not cross-namespace.
// Cross-namespace ownerReferences are not supported by Kubernetes and will cause
// the API server to reject the object. When using a shared/central waypoint
// (waypointNamespace != profileIns.Name), we intentionally skip setting the
// controller reference.
if waypointNamespace == "" || waypointNamespace == profileIns.Name {
Comment thread
kimwnasptd marked this conversation as resolved.
if err := controllerutil.SetControllerReference(profileIns, waypoint, r.Scheme); err != nil {
return err
}
}

// Check if the waypoint already exists
foundWaypoint := &gatewayv1beta1.Gateway{}
err := r.Get(context.TODO(), types.NamespacedName{Name: waypoint.Name, Namespace: waypoint.Namespace}, foundWaypoint)
if err != nil {
if apierrors.IsNotFound(err) {
logger.Info("Creating waypoint", "waypoint", waypoint.Name, "namespace", waypoint.Namespace)
err = r.Create(context.TODO(), waypoint)
if err != nil {
return fmt.Errorf("failed to create waypoint: %w", err)
}
} else {
return fmt.Errorf("failed to get waypoint: %w", err)
}
} else {
// Waypoint already exists, check if update is needed
if !reflect.DeepEqual(waypoint.Spec, foundWaypoint.Spec) {
logger.Info("Updating waypoint", "waypoint", waypoint.Name, "namespace", waypoint.Namespace)
foundWaypoint.Spec = waypoint.Spec
err = r.Update(context.TODO(), foundWaypoint)
if err != nil {
return fmt.Errorf("failed to update waypoint: %w", err)
}
}
}

logger.Info("Waypoint reconciled successfully", "waypoint", r.WaypointName, "namespace", waypointNamespace)
return nil
}

// updateL4AuthorizationPolicy creates L4 AuthorizationPolicy to allow traffic from waypoint to services
func (r *ProfileReconciler) updateL4AuthorizationPolicy(profileIns *profilev1.Profile) error {
logger := r.Log.WithValues("profile", profileIns.Name)

waypointNamespace := r.WaypointNamespace
if waypointNamespace == "" {
waypointNamespace = profileIns.Name
}

waypointPrincipal := fmt.Sprintf("cluster.local/ns/%s/sa/%s", waypointNamespace, r.WaypointName)

l4Policy := &istioSecurityClient.AuthorizationPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: "waypoint-l4-access",
Namespace: profileIns.Name,
},
Comment thread
kimwnasptd marked this conversation as resolved.
Spec: istioSecurity.AuthorizationPolicy{
Action: istioSecurity.AuthorizationPolicy_ALLOW,
Selector: nil, // Match all workloads in namespace
Rules: []*istioSecurity.Rule{
{
From: []*istioSecurity.Rule_From{
{
Source: &istioSecurity.Source{
Principals: []string{waypointPrincipal},
},
},
},
},
},
},
}

if err := controllerutil.SetControllerReference(profileIns, l4Policy, r.Scheme); err != nil {
return err
}

foundL4Policy := &istioSecurityClient.AuthorizationPolicy{}
err := r.Get(context.TODO(), types.NamespacedName{Name: l4Policy.Name, Namespace: l4Policy.Namespace}, foundL4Policy)
if err != nil {
if apierrors.IsNotFound(err) {
logger.Info("Creating L4 AuthorizationPolicy", "namespace", l4Policy.Namespace, "name", l4Policy.Name)
err = r.Create(context.TODO(), l4Policy)
if err != nil {
return err
}
} else {
return err
}
} else {
if !reflect.DeepEqual(*l4Policy.Spec.DeepCopy(), *foundL4Policy.Spec.DeepCopy()) {
foundL4Policy.Spec = *l4Policy.Spec.DeepCopy()
logger.Info("Updating L4 AuthorizationPolicy", "namespace", l4Policy.Namespace, "name", l4Policy.Name)
err = r.Update(context.TODO(), foundL4Policy)
if err != nil {
return err
}
}
}
return nil
}

func GetEnvDefault(variable string, defaultVal string) string {
envVar := os.Getenv(variable)
if len(envVar) == 0 {
Expand Down
Loading
Loading