Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,15 @@ func (c ExternalListenerConfig) GetIngressControllerTargetPort() int32 {
return *c.IngressControllerTargetPort
}

// GetIngressController returns the effective ingress controller for this external listener.
// When IngressController is non-empty it is used; otherwise the cluster-level spec.GetIngressController() is returned.
func (c ExternalListenerConfig) GetIngressController(spec *KafkaClusterSpec) string {
if c.IngressController != "" {
return c.IngressController
}
return spec.GetIngressController()
}

// GetBrokerPort - When TLS is enabled AnyCastPort is enough since hostname based multiplexing
// is used and not port based one
func (c ExternalListenerConfig) GetBrokerPort(brokerId int32) int32 {
Expand Down Expand Up @@ -724,6 +733,10 @@ type ExternalListenerConfig struct {
// NodePort should be used in Kubernetes environments with no support for provisioning Load Balancers.
// +optional
AccessMethod corev1.ServiceType `json:"accessMethod,omitempty"`
// +kubebuilder:validation:Enum=envoy;contour;istioingress
// IngressController specifies the ingress controller type for this external listener. When empty, the cluster-level spec.ingressController is used.
// +optional
IngressController string `json:"ingressController,omitempty"`
// Config allows to specify ingress controller configuration per external listener
// if set, it overrides the default `KafkaClusterSpec.IstioIngressConfig` or `KafkaClusterSpec.EnvoyConfig` for this external listener.
// +optional
Expand Down
9 changes: 9 additions & 0 deletions charts/kafka-operator/crds/kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23467,6 +23467,15 @@ spec:
In case of external listeners using NodePort access method the broker instead of node public IP (see "brokerConfig.nodePortExternalIP")
is advertised on the address having the following format: <kafka-cluster-name>-<broker-id>.<namespace><value-specified-in-hostnameOverride-field>
type: string
ingressController:
description: IngressController specifies the ingress controller
type for this external listener. When empty, the cluster-level
spec.ingressController is used.
enum:
- envoy
- contour
- istioingress
type: string
ingressControllerTargetPort:
description: |-
IngressControllerTargetPort defines the container port that the ingress controller uses for handling external traffic.
Expand Down
9 changes: 9 additions & 0 deletions config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23467,6 +23467,15 @@ spec:
In case of external listeners using NodePort access method the broker instead of node public IP (see "brokerConfig.nodePortExternalIP")
is advertised on the address having the following format: <kafka-cluster-name>-<broker-id>.<namespace><value-specified-in-hostnameOverride-field>
type: string
ingressController:
description: IngressController specifies the ingress controller
type for this external listener. When empty, the cluster-level
spec.ingressController is used.
enum:
- envoy
- contour
- istioingress
type: string
ingressControllerTargetPort:
description: |-
IngressControllerTargetPort defines the container port that the ingress controller uses for handling external traffic.
Expand Down
305 changes: 305 additions & 0 deletions config/samples/simplekafkacluster_with_per_listener_envoy_tls.yaml

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions controllers/tests/kafkacluster_controller_contour_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"sync/atomic"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -106,6 +107,26 @@ var _ = Describe("KafkaClusterWithContourIngressController", Label("contour"), f
expectContour(ctx, kafkaCluster)
})
})
When("listener removed and RemoveUnusedIngressResources=true", func() {
It("should remove contour resources for that listener", func(ctx SpecContext) {
expectContour(ctx, kafkaCluster)
By("updating listener to use envoy and enabling RemoveUnusedIngressResources")
Expect(k8sClient.Get(ctx, types.NamespacedName{Namespace: kafkaCluster.Namespace, Name: kafkaCluster.Name}, kafkaCluster)).To(Succeed())
kafkaCluster.Spec.RemoveUnusedIngressResources = true
el := kafkaCluster.Spec.ListenersConfig.ExternalListeners[0]
kafkaCluster.Spec.ListenersConfig.ExternalListeners = []v1beta1.ExternalListenerConfig{}
Expect(k8sClient.Update(ctx, kafkaCluster)).To(Succeed())
waitForClusterRunningState(ctx, kafkaCluster, namespace)
By("expecting contour HTTPProxy and ClusterIP services for that listener to be deleted")
ingressConfigName := "ingress1"
serviceName := fmt.Sprintf(contourutils.ContourServiceNameWithScope, el.Name, ingressConfigName, kafkaCluster.GetName())
var svc corev1.Service
Eventually(ctx, func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Namespace: kafkaCluster.Namespace, Name: serviceName}, &svc)
return err != nil
}).WithTimeout(30 * time.Second).Should(BeTrue())
})
})
})

func expectContourClusterIpAnycastSvc(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster, eListener v1beta1.ExternalListenerConfig) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright © 2020 Cisco Systems, Inc. and/or its affiliates
// Copyright 2025 Adobe. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tests

import (
"fmt"
"sync/atomic"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/banzaicloud/koperator/api/v1beta1"
"github.com/banzaicloud/koperator/pkg/util"
)

var _ = Describe("KafkaClusterWithIngressControllerOverride", Label("contour"), func() {
var (
count uint64 = 0
namespace string
namespaceObj *corev1.Namespace
kafkaCluster *v1beta1.KafkaCluster
)

BeforeEach(func() {
atomic.AddUint64(&count, 1)
namespace = fmt.Sprintf("kafkacontouroverridetest-%v", count)
namespaceObj = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: namespace,
},
}

kafkaCluster = createMinimalKafkaClusterCR(fmt.Sprintf("kafkacluster-%d", count), namespace)

// Cluster default is envoy; listener overrides to contour.
kafkaCluster.Spec.IngressController = "envoy"

kafkaCluster.Spec.ListenersConfig.ExternalListeners = []v1beta1.ExternalListenerConfig{
{
CommonListenerSpec: v1beta1.CommonListenerSpec{
Name: "ingress1",
Type: v1beta1.SecurityProtocolPlaintext,
ContainerPort: 29098,
},
IngressController: "contour",
AccessMethod: corev1.ServiceTypeClusterIP,
ExternalStartingPort: -1,
AnyCastPort: util.Int32Pointer(8443),
Config: &v1beta1.Config{
DefaultIngressConfig: "",
IngressConfig: map[string]v1beta1.IngressConfig{
"ingress1": {
IngressServiceSettings: v1beta1.IngressServiceSettings{
HostnameOverride: "kafka.cluster.local",
},
ContourIngressConfig: &v1beta1.ContourIngressConfig{
TLSSecretName: "test-tls-secret",
BrokerFQDNTemplate: "broker-%id.kafka.cluster.local",
},
},
},
},
},
}

kafkaCluster.Spec.Brokers[0].BrokerConfig = &v1beta1.BrokerConfig{BrokerIngressMapping: []string{"ingress1"}}
kafkaCluster.Spec.Brokers[1].BrokerConfig = &v1beta1.BrokerConfig{BrokerIngressMapping: []string{"ingress1"}}
kafkaCluster.Spec.Brokers[2].BrokerConfig = &v1beta1.BrokerConfig{BrokerIngressMapping: []string{"ingress1"}}
})

JustBeforeEach(func(ctx SpecContext) {
By("creating namespace " + namespace)
err := k8sClient.Create(ctx, namespaceObj)
Expect(err).NotTo(HaveOccurred())

By("creating kafka cluster object " + kafkaCluster.Name + " in namespace " + namespace)
err = k8sClient.Create(ctx, kafkaCluster)
Expect(err).NotTo(HaveOccurred())

waitForClusterRunningState(ctx, kafkaCluster, namespace)

})

JustAfterEach(func(ctx SpecContext) {
By("deleting Kafka cluster object " + kafkaCluster.Name + " in namespace " + namespace)
err := k8sClient.Delete(ctx, kafkaCluster)
Expect(err).NotTo(HaveOccurred())

kafkaCluster = nil
})

It("creates contour resources when a listener overrides the cluster ingress controller", func(ctx SpecContext) {
expectContour(ctx, kafkaCluster)
})
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
schema: spec-driven
created: 2026-02-25
70 changes: 70 additions & 0 deletions openspec/changes/per-listener-ingress-controller/design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Design: Per-Listener Ingress Controller

## Context

KafkaCluster currently has a single cluster-wide `spec.ingressController` (`envoy` | `contour` | `istioingress`). All external listeners use that type. Each ingress reconciler (envoy, contour, istioingress) checks `Spec.GetIngressController() == <this controller>` and, if true, reconciles all external listeners that match the required access method (LoadBalancer for envoy/istio, ClusterIP for contour). Config selection (`util.GetIngressConfigs`) and service lookup (`getServiceFromExternalListener`) also use the cluster-level value. The goal is to allow each external listener to optionally specify its own ingress controller type, with the cluster default as fallback, so one cluster can mix controller types (e.g. one listener via Envoy, another via Contour).

## Goals / Non-Goals

**Goals:**

- Each external listener MAY specify an optional `ingressController`; when unset, the listener uses `KafkaCluster.Spec.IngressController`.
- Reconcilers create/update/delete ingress resources only for listeners whose effective controller matches that reconciler.
- Status, config, and service lookup use the effective controller per listener. Cleanup (RemoveUnusedIngressResources) removes resources when a listener no longer uses that controller type.
- Backward compatible: existing CRs without the new field behave exactly as today.

**Non-Goals:**

- Changing the coupling between controller type and access method (Envoy/Istio → LoadBalancer, Contour → ClusterIP). Per-listener controller does not introduce new access-method combinations.
- Supporting multiple controller types for a single listener (e.g. same listener on both Envoy and Contour). Each listener has exactly one effective controller.

## Decisions

### 1. API: optional field on ExternalListenerConfig

- **Choice:** Add `IngressController string` (optional, empty means use cluster default) on `ExternalListenerConfig`.
- **Rationale:** Keeps API simple and backward compatible. No pointer needed if empty string is treated as "use default"; existing CRs have no field (zero value) and continue to use cluster default. Alternatively a `*string` could make "unset" explicit; we prefer `string` with empty = default to avoid pointer proliferation and match existing style (e.g. `AccessMethod`).
- **Alternative:** Required per-listener field. Rejected because it would force every existing listener to be updated.

### 2. Effective controller helper

- **Choice:** Introduce a helper that returns the effective ingress controller for a listener, e.g. `GetIngressController(spec *KafkaClusterSpec, eListener ExternalListenerConfig) string`. If `eListener.IngressController != ""`, return it; else return `spec.GetIngressController()`.
- **Rationale:** Single place for defaulting; all call sites (reconcilers, GetIngressConfigs, status, getServiceFromExternalListener) use this instead of reading cluster spec only.
- **Alternative:** Inline logic at each call site. Rejected to avoid drift and bugs.

### 3. GetIngressConfigs signature

- **Choice:** Keep signature `GetIngressConfigs(spec, eListener)` and derive effective controller inside the function via the new helper. Use that effective value to choose the envoy/contour/istio branch and to merge the correct cluster-level config (EnvoyConfig, ContourIngressConfig, or IstioIngressConfig).
- **Rationale:** No API change for callers; behavior change is internal. Callers already pass both spec and listener.
- **Alternative:** Add an explicit `effectiveController string` parameter. Rejected as redundant when it can be derived from spec + listener.

### 4. Reconciler logic

- **Choice:** For each external listener, compute effective controller. If it matches this reconciler’s controller and access method, reconcile that listener (create/update resources). Else if RemoveUnusedIngressResources is set, treat as "this listener does not use this controller" and delete any existing resources for this (listener, controller) pair.
- **Rationale:** Aligns with current per-listener loop; only the condition changes from cluster-level to per-listener effective controller.
- **Alternative:** Separate "reconcile" and "cleanup" passes. Current code already does both in one loop; no need to split.

### 5. getServiceFromExternalListener and status

- **Choice:** These functions currently take cluster + listener name + ingress config name and use `cluster.Spec.GetIngressController()` to pick the service name template. Change them to accept the effective controller for that listener (or the listener config itself) and use it for the switch. Callers (e.g. status creation) already iterate listeners and call GetIngressConfigs; they can compute effective controller once per listener and pass it (or pass listener so the helper can be used).
- **Rationale:** Status and service lookup must use the same controller type that was used to create the service, i.e. the effective controller for that listener.

### 6. Validation

- **Choice:** In the KafkaCluster webhook, validate that `ExternalListenerConfig.IngressController` (when non-empty) is one of `envoy`, `contour`, `istioingress`. If any listener’s effective controller is `istioingress`, require `Spec.IstioControlPlane != nil` (same rule as today, evaluated for the set of effective controllers in use).
- **Rationale:** Prevents invalid enum values and ensures Istio config is present when any listener uses Istio.

## Risks / Trade-offs

- **[Risk]** Call sites that use `GetIngressController()` in a listener context might be missed, leading to wrong controller type (e.g. status pointing at wrong service). **Mitigation:** Grep for all uses of `GetIngressController()` and ensure listener-scoped paths use the effective controller helper; add tests for mixed listener types and status/service lookup.
- **[Risk]** Defaulting empty string to cluster default could be confused with "no ingress" if we ever add that. **Mitigation:** Document clearly; today there is no "no ingress" option, so empty = default is unambiguous.
- **[Trade-off]** Keeping controller–access-method coupling (Contour = ClusterIP only, Envoy/Istio = LoadBalancer only) means we do not support e.g. Contour with LoadBalancer. **Mitigation:** Out of scope for this change; document in design and proposal.

## Migration Plan

- **Deploy:** No migration required. New field is optional; existing CRs unchanged.
- **Rollback:** Revert the operator; CRs with per-listener `ingressController` set will be ignored by older operator (field unknown), and cluster-level default will be used for all listeners. If users had mixed controllers, reverting would effectively force all listeners back to the cluster default until CRs are edited. Document this in release notes.

## Open Questions

- None at design time. Validation and helper placement (API type vs util package) can be decided during implementation; recommendation is helper in `api/v1beta1` or `pkg/util` and used from both API and reconcilers.
35 changes: 35 additions & 0 deletions openspec/changes/per-listener-ingress-controller/proposal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Per-Listener Ingress Controller

## Why

Today, a KafkaCluster has a single cluster-wide ingress controller type (`envoy`, `contour`, or `istioingress`). All external listeners use that same type. Operators cannot expose one listener via Envoy (e.g. public LoadBalancer) and another via Contour (e.g. internal TLS) on the same cluster without running multiple clusters or custom tooling. Allowing each external listener to specify its own ingress controller (with a cluster default) removes this limitation and supports mixed ingress environments.

## What Changes

- Add optional `ingressController` field on `ExternalListenerConfig`. When unset, the listener uses `KafkaCluster.Spec.IngressController` (current behavior).
- Reconcilers (envoy, contour, istioingress) will create/update/delete resources only for external listeners whose effective ingress controller matches that reconciler.
- `util.GetIngressConfigs` and all call sites that today use cluster-level `GetIngressController()` in a listener context will use the effective controller for that listener (helper: e.g. effective controller from listener + spec).
- Status and service lookup (e.g. `getServiceFromExternalListener`) will use the effective ingress controller per listener when resolving service names.
- `RemoveUnusedIngressResources` cleanup will remain per-listener and will remove resources when a listener no longer uses that controller type.
- Validation: per-listener value restricted to same enum as cluster (`envoy` | `contour` | `istioingress`). If any listener uses `istioingress`, `Spec.IstioControlPlane` must be set (unchanged rule).

No breaking changes: existing CRs omit the new field and keep current behavior (all listeners use cluster default).

## Capabilities

### New Capabilities

- `per-listener-ingress-controller`: External listeners can optionally specify which ingress controller type to use (envoy, contour, istioingress). When unspecified, the cluster-level `spec.ingressController` is used. The operator reconciles ingress resources per listener based on this effective controller and cleans up when a listener switches or is removed.

### Modified Capabilities

- (None — no existing specs in openspec/specs/; this is a new API/behavior capability.)

## Impact

- **API**: `api/v1beta1/kafkacluster_types.go` — `ExternalListenerConfig` gains optional `IngressController string`.
- **Util**: `pkg/util/util.go` — `GetIngressConfigs` and new helper for effective ingress controller per listener.
- **Reconcilers**: `pkg/resources/contouringress/contour.go`, `pkg/resources/envoy/envoy.go`, `pkg/resources/istioingress/istioingress.go` — gate on effective controller per listener.
- **Kafka/config/status**: `pkg/resources/kafka/kafka.go` — status creation and `getServiceFromExternalListener` use effective controller per listener.
- **Webhooks**: `pkg/webhooks/kafkacluster_validator.go` — validate per-listener enum and istio control plane when any listener uses istioingress.
- **Tests**: Controller and util tests for mixed listener controller types and defaulting.
Loading
Loading