diff --git a/api/v1beta1/kafkacluster_types.go b/api/v1beta1/kafkacluster_types.go index 0e03045fe..2ee265ecd 100644 --- a/api/v1beta1/kafkacluster_types.go +++ b/api/v1beta1/kafkacluster_types.go @@ -610,6 +610,15 @@ func (c ExternalListenerConfig) GetIngressControllerTargetPort() int32 { return *c.IngressControllerTargetPort } +// GetIngressController returns the effective ingress controller for this external listener. +// When IngressController is non-empty it is used; otherwise the cluster-level spec.GetIngressController() is returned. +func (c ExternalListenerConfig) GetIngressController(spec *KafkaClusterSpec) string { + if c.IngressController != "" { + return c.IngressController + } + return spec.GetIngressController() +} + // GetBrokerPort - When TLS is enabled AnyCastPort is enough since hostname based multiplexing // is used and not port based one func (c ExternalListenerConfig) GetBrokerPort(brokerId int32) int32 { @@ -724,6 +733,10 @@ type ExternalListenerConfig struct { // NodePort should be used in Kubernetes environments with no support for provisioning Load Balancers. // +optional AccessMethod corev1.ServiceType `json:"accessMethod,omitempty"` + // +kubebuilder:validation:Enum=envoy;contour;istioingress + // IngressController specifies the ingress controller type for this external listener. When empty, the cluster-level spec.ingressController is used. + // +optional + IngressController string `json:"ingressController,omitempty"` // Config allows to specify ingress controller configuration per external listener // if set, it overrides the default `KafkaClusterSpec.IstioIngressConfig` or `KafkaClusterSpec.EnvoyConfig` for this external listener. // +optional diff --git a/charts/kafka-operator/crds/kafkaclusters.yaml b/charts/kafka-operator/crds/kafkaclusters.yaml index ff4636057..baa57b328 100644 --- a/charts/kafka-operator/crds/kafkaclusters.yaml +++ b/charts/kafka-operator/crds/kafkaclusters.yaml @@ -23467,6 +23467,15 @@ spec: In case of external listeners using NodePort access method the broker instead of node public IP (see "brokerConfig.nodePortExternalIP") is advertised on the address having the following format: -. type: string + ingressController: + description: IngressController specifies the ingress controller + type for this external listener. When empty, the cluster-level + spec.ingressController is used. + enum: + - envoy + - contour + - istioingress + type: string ingressControllerTargetPort: description: |- IngressControllerTargetPort defines the container port that the ingress controller uses for handling external traffic. diff --git a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml index ff4636057..baa57b328 100644 --- a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml +++ b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml @@ -23467,6 +23467,15 @@ spec: In case of external listeners using NodePort access method the broker instead of node public IP (see "brokerConfig.nodePortExternalIP") is advertised on the address having the following format: -. type: string + ingressController: + description: IngressController specifies the ingress controller + type for this external listener. When empty, the cluster-level + spec.ingressController is used. + enum: + - envoy + - contour + - istioingress + type: string ingressControllerTargetPort: description: |- IngressControllerTargetPort defines the container port that the ingress controller uses for handling external traffic. diff --git a/config/samples/simplekafkacluster_with_per_listener_envoy_tls.yaml b/config/samples/simplekafkacluster_with_per_listener_envoy_tls.yaml new file mode 100644 index 000000000..867a273af --- /dev/null +++ b/config/samples/simplekafkacluster_with_per_listener_envoy_tls.yaml @@ -0,0 +1,305 @@ +apiVersion: kafka.banzaicloud.io/v1beta1 +kind: KafkaCluster +metadata: + labels: + controller-tools.k8s.io: "1.0" + name: kafka +spec: + removeUnusedIngressResources: false + kRaft: false + monitoringConfig: + jmxImage: "ghcr.io/adobe/koperator/jmx-javaagent:1.4.0" + headlessServiceEnabled: true + zkAddresses: + - "zookeeper-server-client.zookeeper:2181" + propagateLabels: false + oneBrokerPerNode: false + clusterImage: "ghcr.io/adobe/koperator/kafka:2.13-3.9.1" + envoyConfig: + image: "envoyproxy/envoy:v1.37.1" + replicas: 1 + readOnlyConfig: | + auto.create.topics.enable=false + cruise.control.metrics.topic.auto.create=true + cruise.control.metrics.topic.num.partitions=1 + cruise.control.metrics.topic.replication.factor=2 + brokerConfigGroups: + default: + storageConfigs: + - mountPath: "/kafka-logs" + pvcSpec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 10Gi + brokerAnnotations: + prometheus.io/scrape: "true" + prometheus.io/port: "9020" + brokers: + - id: 0 + brokerConfigGroup: "default" + brokerConfig: + brokerIngressMapping: + - external + - corp + - id: 1 + brokerConfig: + brokerIngressMapping: + - external + - corp + brokerConfigGroup: "default" + - id: 2 + brokerConfig: + brokerIngressMapping: + - external + - corp + brokerConfigGroup: "default" + rollingUpgradeConfig: + failureThreshold: 1 + listenersConfig: + internalListeners: + - type: "plaintext" + name: "internal" + containerPort: 29092 + usedForInnerBrokerCommunication: true + - type: "plaintext" + name: "controller" + containerPort: 29093 + usedForInnerBrokerCommunication: false + usedForControllerCommunication: true + externalListeners: + - type: "plaintext" + name: "envoy" + containerPort: 29094 + ingressController: "envoy" + externalStartingPort: 9090 + - accessMethod: ClusterIP + ingressController: "contour" + anyCastPort: 443 + config: + defaultIngressConfig: "" + ingressConfig: + external: + contourIngressConfig: + brokerFQDNTemplate: m-%id-kafka.local.dev + tlsSecretName: projectcontour/envoycert + hostnameOverride: kafka.local.dev + containerPort: 29095 + externalStartingPort: -1 + name: external + type: plaintext + usedForInnerBrokerCommunication: false + - accessMethod: ClusterIP + ingressController: "contour" + anyCastPort: 443 + config: + defaultIngressConfig: "" + ingressConfig: + corp: + contourIngressConfig: + brokerFQDNTemplate: c-%id-kafka.local.dev + tlsSecretName: projectcontour/envoycert + hostnameOverride: kafka-corp.local.dev + containerPort: 29096 + externalStartingPort: -1 + name: corp + type: plaintext + usedForInnerBrokerCommunication: false + cruiseControlConfig: + cruiseControlTaskSpec: + RetryDurationMinutes: 5 + topicConfig: + partitions: 12 + replicationFactor: 3 + image: "adobe/cruise-control:3.0.3-adbe-20250804" + config: | + # Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + # + # This is an example property file for Kafka Cruise Control. See KafkaCruiseControlConfig for more details. + # Configuration for the metadata client. + # ======================================= + # The maximum interval in milliseconds between two metadata refreshes. + #metadata.max.age.ms=300000 + # Client id for the Cruise Control. It is used for the metadata client. + #client.id=kafka-cruise-control + # The size of TCP send buffer bytes for the metadata client. + #send.buffer.bytes=131072 + # The size of TCP receive buffer size for the metadata client. + #receive.buffer.bytes=131072 + # The time to wait before disconnect an idle TCP connection. + #connections.max.idle.ms=540000 + # The time to wait before reconnect to a given host. + #reconnect.backoff.ms=50 + # The time to wait for a response from a host after sending a request. + #request.timeout.ms=30000 + # Configurations for the load monitor + # ======================================= + # The number of metric fetcher thread to fetch metrics for the Kafka cluster + num.metric.fetchers=1 + # The metric sampler class + metric.sampler.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.CruiseControlMetricsReporterSampler + # Configurations for CruiseControlMetricsReporterSampler + metric.reporter.topic.pattern=__CruiseControlMetrics + # The sample store class name + sample.store.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.KafkaSampleStore + # The config for the Kafka sample store to save the partition metric samples + partition.metric.sample.store.topic=__KafkaCruiseControlPartitionMetricSamples + # The config for the Kafka sample store to save the model training samples + broker.metric.sample.store.topic=__KafkaCruiseControlModelTrainingSamples + # The replication factor of Kafka metric sample store topic + sample.store.topic.replication.factor=2 + # The config for the number of Kafka sample store consumer threads + num.sample.loading.threads=8 + # The partition assignor class for the metric samplers + metric.sampler.partition.assignor.class=com.linkedin.kafka.cruisecontrol.monitor.sampling.DefaultMetricSamplerPartitionAssignor + # The metric sampling interval in milliseconds + metric.sampling.interval.ms=120000 + metric.anomaly.detection.interval.ms=180000 + # The partition metrics window size in milliseconds + partition.metrics.window.ms=300000 + # The number of partition metric windows to keep in memory + num.partition.metrics.windows=1 + # The minimum partition metric samples required for a partition in each window + min.samples.per.partition.metrics.window=1 + # The broker metrics window size in milliseconds + broker.metrics.window.ms=300000 + # The number of broker metric windows to keep in memory + num.broker.metrics.windows=20 + # The minimum broker metric samples required for a partition in each window + min.samples.per.broker.metrics.window=1 + # The configuration for the BrokerCapacityConfigFileResolver (supports JBOD and non-JBOD broker capacities) + capacity.config.file=config/capacity.json + #capacity.config.file=config/capacityJBOD.json + # Configurations for the analyzer + # ======================================= + # The list of goals to optimize the Kafka cluster for with pre-computed proposals + default.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal + # The list of supported goals + goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PotentialNwOutGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerDiskUsageDistributionGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.PreferredLeaderElectionGoal + # The list of supported hard goals + hard.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal + # The minimum percentage of well monitored partitions out of all the partitions + min.monitored.partition.percentage=0.95 + # The balance threshold for CPU + cpu.balance.threshold=1.1 + # The balance threshold for disk + disk.balance.threshold=1.1 + # The balance threshold for network inbound utilization + network.inbound.balance.threshold=1.1 + # The balance threshold for network outbound utilization + network.outbound.balance.threshold=1.1 + # The balance threshold for the replica count + replica.count.balance.threshold=1.1 + # The capacity threshold for CPU in percentage + cpu.capacity.threshold=0.8 + # The capacity threshold for disk in percentage + disk.capacity.threshold=0.8 + # The capacity threshold for network inbound utilization in percentage + network.inbound.capacity.threshold=0.8 + # The capacity threshold for network outbound utilization in percentage + network.outbound.capacity.threshold=0.8 + # The threshold to define the cluster to be in a low CPU utilization state + cpu.low.utilization.threshold=0.0 + # The threshold to define the cluster to be in a low disk utilization state + disk.low.utilization.threshold=0.0 + # The threshold to define the cluster to be in a low network inbound utilization state + network.inbound.low.utilization.threshold=0.0 + # The threshold to define the cluster to be in a low disk utilization state + network.outbound.low.utilization.threshold=0.0 + # The metric anomaly percentile upper threshold + metric.anomaly.percentile.upper.threshold=90.0 + # The metric anomaly percentile lower threshold + metric.anomaly.percentile.lower.threshold=10.0 + # How often should the cached proposal be expired and recalculated if necessary + proposal.expiration.ms=60000 + # The maximum number of replicas that can reside on a broker at any given time. + max.replicas.per.broker=10000 + # The number of threads to use for proposal candidate precomputing. + num.proposal.precompute.threads=1 + # the topics that should be excluded from the partition movement. + #topics.excluded.from.partition.movement + # Configurations for the executor + # ======================================= + # The max number of partitions to move in/out on a given broker at a given time. + num.concurrent.partition.movements.per.broker=10 + # The interval between two execution progress checks. + execution.progress.check.interval.ms=10000 + # Configurations for anomaly detector + # ======================================= + # The goal violation notifier class + anomaly.notifier.class=com.linkedin.kafka.cruisecontrol.detector.notifier.SelfHealingNotifier + # The metric anomaly finder class + metric.anomaly.finder.class=com.linkedin.kafka.cruisecontrol.detector.KafkaMetricAnomalyFinder + # The anomaly detection interval + anomaly.detection.interval.ms=10000 + # The goal violation to detect. + anomaly.detection.goals=com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal + # The interested metrics for metric anomaly analyzer. + metric.anomaly.analyzer.metrics=BROKER_PRODUCE_LOCAL_TIME_MS_MAX,BROKER_PRODUCE_LOCAL_TIME_MS_MEAN,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_MAX,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_MEAN,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_MAX,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_MEAN,BROKER_LOG_FLUSH_TIME_MS_MAX,BROKER_LOG_FLUSH_TIME_MS_MEAN + ## Adjust accordingly if your metrics reporter is an older version and does not produce these metrics. + #metric.anomaly.analyzer.metrics=BROKER_PRODUCE_LOCAL_TIME_MS_50TH,BROKER_PRODUCE_LOCAL_TIME_MS_999TH,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_50TH,BROKER_CONSUMER_FETCH_LOCAL_TIME_MS_999TH,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_50TH,BROKER_FOLLOWER_FETCH_LOCAL_TIME_MS_999TH,BROKER_LOG_FLUSH_TIME_MS_50TH,BROKER_LOG_FLUSH_TIME_MS_999TH + # The zk path to store failed broker information. + failed.brokers.zk.path=/CruiseControlBrokerList + # Topic config provider class + topic.config.provider.class=com.linkedin.kafka.cruisecontrol.config.KafkaTopicConfigProvider + # The cluster configurations for the KafkaTopicConfigProvider + cluster.configs.file=config/clusterConfigs.json + # The maximum time in milliseconds to store the response and access details of a completed user task. + completed.user.task.retention.time.ms=21600000 + # The maximum time in milliseconds to retain the demotion history of brokers. + demotion.history.retention.time.ms=86400000 + # The maximum number of completed user tasks for which the response and access details will be cached. + max.cached.completed.user.tasks=500 + # The maximum number of user tasks for concurrently running in async endpoints across all users. + max.active.user.tasks=25 + # Enable self healing for all anomaly detectors, unless the particular anomaly detector is explicitly disabled + self.healing.enabled=true + # Enable self healing for broker failure detector + #self.healing.broker.failure.enabled=true + # Enable self healing for goal violation detector + #self.healing.goal.violation.enabled=true + # Enable self healing for metric anomaly detector + #self.healing.metric.anomaly.enabled=true + # configurations for the webserver + # ================================ + # HTTP listen port + webserver.http.port=9090 + # HTTP listen address + webserver.http.address=0.0.0.0 + # Whether CORS support is enabled for API or not + webserver.http.cors.enabled=false + # Value for Access-Control-Allow-Origin + webserver.http.cors.origin=http://localhost:8080/ + # Value for Access-Control-Request-Method + webserver.http.cors.allowmethods=OPTIONS,GET,POST + # Headers that should be exposed to the Browser (Webapp) + # This is a special header that is used by the + # User Tasks subsystem and should be explicitly + # Enabled when CORS mode is used as part of the + # Admin Interface + webserver.http.cors.exposeheaders=User-Task-ID + # REST API default prefix + # (dont forget the ending *) + webserver.api.urlprefix=/kafkacruisecontrol/* + # Location where the Cruise Control frontend is deployed + webserver.ui.diskpath=./cruise-control-ui/dist/ + # URL path prefix for UI + # (dont forget the ending *) + webserver.ui.urlprefix=/* + # Time After which request is converted to Async + webserver.request.maxBlockTimeMs=10000 + # Default Session Expiry Period + webserver.session.maxExpiryTimeMs=60000 + # Session cookie path + webserver.session.path=/ + # Server Access Logs + webserver.accesslog.enabled=true + # Location of HTTP Request Logs + webserver.accesslog.path=access.log + # HTTP Request Log retention days + webserver.accesslog.retention.days=14 + clusterConfig: | + { + "min.insync.replicas": 3 + } diff --git a/controllers/tests/kafkacluster_controller_contour_test.go b/controllers/tests/kafkacluster_controller_contour_test.go index 329e67c21..71c566443 100644 --- a/controllers/tests/kafkacluster_controller_contour_test.go +++ b/controllers/tests/kafkacluster_controller_contour_test.go @@ -19,6 +19,7 @@ import ( "context" "fmt" "sync/atomic" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -106,6 +107,26 @@ var _ = Describe("KafkaClusterWithContourIngressController", Label("contour"), f expectContour(ctx, kafkaCluster) }) }) + When("listener removed and RemoveUnusedIngressResources=true", func() { + It("should remove contour resources for that listener", func(ctx SpecContext) { + expectContour(ctx, kafkaCluster) + By("updating listener to use envoy and enabling RemoveUnusedIngressResources") + Expect(k8sClient.Get(ctx, types.NamespacedName{Namespace: kafkaCluster.Namespace, Name: kafkaCluster.Name}, kafkaCluster)).To(Succeed()) + kafkaCluster.Spec.RemoveUnusedIngressResources = true + el := kafkaCluster.Spec.ListenersConfig.ExternalListeners[0] + kafkaCluster.Spec.ListenersConfig.ExternalListeners = []v1beta1.ExternalListenerConfig{} + Expect(k8sClient.Update(ctx, kafkaCluster)).To(Succeed()) + waitForClusterRunningState(ctx, kafkaCluster, namespace) + By("expecting contour HTTPProxy and ClusterIP services for that listener to be deleted") + ingressConfigName := "ingress1" + serviceName := fmt.Sprintf(contourutils.ContourServiceNameWithScope, el.Name, ingressConfigName, kafkaCluster.GetName()) + var svc corev1.Service + Eventually(ctx, func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{Namespace: kafkaCluster.Namespace, Name: serviceName}, &svc) + return err != nil + }).WithTimeout(30 * time.Second).Should(BeTrue()) + }) + }) }) func expectContourClusterIpAnycastSvc(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster, eListener v1beta1.ExternalListenerConfig) { diff --git a/controllers/tests/kafkacluster_controller_ingresscontroller_override_test.go b/controllers/tests/kafkacluster_controller_ingresscontroller_override_test.go new file mode 100644 index 000000000..2b7c9e4a6 --- /dev/null +++ b/controllers/tests/kafkacluster_controller_ingresscontroller_override_test.go @@ -0,0 +1,110 @@ +// Copyright © 2020 Cisco Systems, Inc. and/or its affiliates +// Copyright 2025 Adobe. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tests + +import ( + "fmt" + "sync/atomic" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/banzaicloud/koperator/api/v1beta1" + "github.com/banzaicloud/koperator/pkg/util" +) + +var _ = Describe("KafkaClusterWithIngressControllerOverride", Label("contour"), func() { + var ( + count uint64 = 0 + namespace string + namespaceObj *corev1.Namespace + kafkaCluster *v1beta1.KafkaCluster + ) + + BeforeEach(func() { + atomic.AddUint64(&count, 1) + namespace = fmt.Sprintf("kafkacontouroverridetest-%v", count) + namespaceObj = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespace, + }, + } + + kafkaCluster = createMinimalKafkaClusterCR(fmt.Sprintf("kafkacluster-%d", count), namespace) + + // Cluster default is envoy; listener overrides to contour. + kafkaCluster.Spec.IngressController = "envoy" + + kafkaCluster.Spec.ListenersConfig.ExternalListeners = []v1beta1.ExternalListenerConfig{ + { + CommonListenerSpec: v1beta1.CommonListenerSpec{ + Name: "ingress1", + Type: v1beta1.SecurityProtocolPlaintext, + ContainerPort: 29098, + }, + IngressController: "contour", + AccessMethod: corev1.ServiceTypeClusterIP, + ExternalStartingPort: -1, + AnyCastPort: util.Int32Pointer(8443), + Config: &v1beta1.Config{ + DefaultIngressConfig: "", + IngressConfig: map[string]v1beta1.IngressConfig{ + "ingress1": { + IngressServiceSettings: v1beta1.IngressServiceSettings{ + HostnameOverride: "kafka.cluster.local", + }, + ContourIngressConfig: &v1beta1.ContourIngressConfig{ + TLSSecretName: "test-tls-secret", + BrokerFQDNTemplate: "broker-%id.kafka.cluster.local", + }, + }, + }, + }, + }, + } + + kafkaCluster.Spec.Brokers[0].BrokerConfig = &v1beta1.BrokerConfig{BrokerIngressMapping: []string{"ingress1"}} + kafkaCluster.Spec.Brokers[1].BrokerConfig = &v1beta1.BrokerConfig{BrokerIngressMapping: []string{"ingress1"}} + kafkaCluster.Spec.Brokers[2].BrokerConfig = &v1beta1.BrokerConfig{BrokerIngressMapping: []string{"ingress1"}} + }) + + JustBeforeEach(func(ctx SpecContext) { + By("creating namespace " + namespace) + err := k8sClient.Create(ctx, namespaceObj) + Expect(err).NotTo(HaveOccurred()) + + By("creating kafka cluster object " + kafkaCluster.Name + " in namespace " + namespace) + err = k8sClient.Create(ctx, kafkaCluster) + Expect(err).NotTo(HaveOccurred()) + + waitForClusterRunningState(ctx, kafkaCluster, namespace) + + }) + + JustAfterEach(func(ctx SpecContext) { + By("deleting Kafka cluster object " + kafkaCluster.Name + " in namespace " + namespace) + err := k8sClient.Delete(ctx, kafkaCluster) + Expect(err).NotTo(HaveOccurred()) + + kafkaCluster = nil + }) + + It("creates contour resources when a listener overrides the cluster ingress controller", func(ctx SpecContext) { + expectContour(ctx, kafkaCluster) + }) +}) diff --git a/openspec/changes/per-listener-ingress-controller/.openspec.yaml b/openspec/changes/per-listener-ingress-controller/.openspec.yaml new file mode 100644 index 000000000..e331c975d --- /dev/null +++ b/openspec/changes/per-listener-ingress-controller/.openspec.yaml @@ -0,0 +1,2 @@ +schema: spec-driven +created: 2026-02-25 diff --git a/openspec/changes/per-listener-ingress-controller/design.md b/openspec/changes/per-listener-ingress-controller/design.md new file mode 100644 index 000000000..e5ba4e5b8 --- /dev/null +++ b/openspec/changes/per-listener-ingress-controller/design.md @@ -0,0 +1,70 @@ +# Design: Per-Listener Ingress Controller + +## Context + +KafkaCluster currently has a single cluster-wide `spec.ingressController` (`envoy` | `contour` | `istioingress`). All external listeners use that type. Each ingress reconciler (envoy, contour, istioingress) checks `Spec.GetIngressController() == ` and, if true, reconciles all external listeners that match the required access method (LoadBalancer for envoy/istio, ClusterIP for contour). Config selection (`util.GetIngressConfigs`) and service lookup (`getServiceFromExternalListener`) also use the cluster-level value. The goal is to allow each external listener to optionally specify its own ingress controller type, with the cluster default as fallback, so one cluster can mix controller types (e.g. one listener via Envoy, another via Contour). + +## Goals / Non-Goals + +**Goals:** + +- Each external listener MAY specify an optional `ingressController`; when unset, the listener uses `KafkaCluster.Spec.IngressController`. +- Reconcilers create/update/delete ingress resources only for listeners whose effective controller matches that reconciler. +- Status, config, and service lookup use the effective controller per listener. Cleanup (RemoveUnusedIngressResources) removes resources when a listener no longer uses that controller type. +- Backward compatible: existing CRs without the new field behave exactly as today. + +**Non-Goals:** + +- Changing the coupling between controller type and access method (Envoy/Istio → LoadBalancer, Contour → ClusterIP). Per-listener controller does not introduce new access-method combinations. +- Supporting multiple controller types for a single listener (e.g. same listener on both Envoy and Contour). Each listener has exactly one effective controller. + +## Decisions + +### 1. API: optional field on ExternalListenerConfig + +- **Choice:** Add `IngressController string` (optional, empty means use cluster default) on `ExternalListenerConfig`. +- **Rationale:** Keeps API simple and backward compatible. No pointer needed if empty string is treated as "use default"; existing CRs have no field (zero value) and continue to use cluster default. Alternatively a `*string` could make "unset" explicit; we prefer `string` with empty = default to avoid pointer proliferation and match existing style (e.g. `AccessMethod`). +- **Alternative:** Required per-listener field. Rejected because it would force every existing listener to be updated. + +### 2. Effective controller helper + +- **Choice:** Introduce a helper that returns the effective ingress controller for a listener, e.g. `GetIngressController(spec *KafkaClusterSpec, eListener ExternalListenerConfig) string`. If `eListener.IngressController != ""`, return it; else return `spec.GetIngressController()`. +- **Rationale:** Single place for defaulting; all call sites (reconcilers, GetIngressConfigs, status, getServiceFromExternalListener) use this instead of reading cluster spec only. +- **Alternative:** Inline logic at each call site. Rejected to avoid drift and bugs. + +### 3. GetIngressConfigs signature + +- **Choice:** Keep signature `GetIngressConfigs(spec, eListener)` and derive effective controller inside the function via the new helper. Use that effective value to choose the envoy/contour/istio branch and to merge the correct cluster-level config (EnvoyConfig, ContourIngressConfig, or IstioIngressConfig). +- **Rationale:** No API change for callers; behavior change is internal. Callers already pass both spec and listener. +- **Alternative:** Add an explicit `effectiveController string` parameter. Rejected as redundant when it can be derived from spec + listener. + +### 4. Reconciler logic + +- **Choice:** For each external listener, compute effective controller. If it matches this reconciler’s controller and access method, reconcile that listener (create/update resources). Else if RemoveUnusedIngressResources is set, treat as "this listener does not use this controller" and delete any existing resources for this (listener, controller) pair. +- **Rationale:** Aligns with current per-listener loop; only the condition changes from cluster-level to per-listener effective controller. +- **Alternative:** Separate "reconcile" and "cleanup" passes. Current code already does both in one loop; no need to split. + +### 5. getServiceFromExternalListener and status + +- **Choice:** These functions currently take cluster + listener name + ingress config name and use `cluster.Spec.GetIngressController()` to pick the service name template. Change them to accept the effective controller for that listener (or the listener config itself) and use it for the switch. Callers (e.g. status creation) already iterate listeners and call GetIngressConfigs; they can compute effective controller once per listener and pass it (or pass listener so the helper can be used). +- **Rationale:** Status and service lookup must use the same controller type that was used to create the service, i.e. the effective controller for that listener. + +### 6. Validation + +- **Choice:** In the KafkaCluster webhook, validate that `ExternalListenerConfig.IngressController` (when non-empty) is one of `envoy`, `contour`, `istioingress`. If any listener’s effective controller is `istioingress`, require `Spec.IstioControlPlane != nil` (same rule as today, evaluated for the set of effective controllers in use). +- **Rationale:** Prevents invalid enum values and ensures Istio config is present when any listener uses Istio. + +## Risks / Trade-offs + +- **[Risk]** Call sites that use `GetIngressController()` in a listener context might be missed, leading to wrong controller type (e.g. status pointing at wrong service). **Mitigation:** Grep for all uses of `GetIngressController()` and ensure listener-scoped paths use the effective controller helper; add tests for mixed listener types and status/service lookup. +- **[Risk]** Defaulting empty string to cluster default could be confused with "no ingress" if we ever add that. **Mitigation:** Document clearly; today there is no "no ingress" option, so empty = default is unambiguous. +- **[Trade-off]** Keeping controller–access-method coupling (Contour = ClusterIP only, Envoy/Istio = LoadBalancer only) means we do not support e.g. Contour with LoadBalancer. **Mitigation:** Out of scope for this change; document in design and proposal. + +## Migration Plan + +- **Deploy:** No migration required. New field is optional; existing CRs unchanged. +- **Rollback:** Revert the operator; CRs with per-listener `ingressController` set will be ignored by older operator (field unknown), and cluster-level default will be used for all listeners. If users had mixed controllers, reverting would effectively force all listeners back to the cluster default until CRs are edited. Document this in release notes. + +## Open Questions + +- None at design time. Validation and helper placement (API type vs util package) can be decided during implementation; recommendation is helper in `api/v1beta1` or `pkg/util` and used from both API and reconcilers. diff --git a/openspec/changes/per-listener-ingress-controller/proposal.md b/openspec/changes/per-listener-ingress-controller/proposal.md new file mode 100644 index 000000000..3d6d82944 --- /dev/null +++ b/openspec/changes/per-listener-ingress-controller/proposal.md @@ -0,0 +1,35 @@ +# Per-Listener Ingress Controller + +## Why + +Today, a KafkaCluster has a single cluster-wide ingress controller type (`envoy`, `contour`, or `istioingress`). All external listeners use that same type. Operators cannot expose one listener via Envoy (e.g. public LoadBalancer) and another via Contour (e.g. internal TLS) on the same cluster without running multiple clusters or custom tooling. Allowing each external listener to specify its own ingress controller (with a cluster default) removes this limitation and supports mixed ingress environments. + +## What Changes + +- Add optional `ingressController` field on `ExternalListenerConfig`. When unset, the listener uses `KafkaCluster.Spec.IngressController` (current behavior). +- Reconcilers (envoy, contour, istioingress) will create/update/delete resources only for external listeners whose effective ingress controller matches that reconciler. +- `util.GetIngressConfigs` and all call sites that today use cluster-level `GetIngressController()` in a listener context will use the effective controller for that listener (helper: e.g. effective controller from listener + spec). +- Status and service lookup (e.g. `getServiceFromExternalListener`) will use the effective ingress controller per listener when resolving service names. +- `RemoveUnusedIngressResources` cleanup will remain per-listener and will remove resources when a listener no longer uses that controller type. +- Validation: per-listener value restricted to same enum as cluster (`envoy` | `contour` | `istioingress`). If any listener uses `istioingress`, `Spec.IstioControlPlane` must be set (unchanged rule). + +No breaking changes: existing CRs omit the new field and keep current behavior (all listeners use cluster default). + +## Capabilities + +### New Capabilities + +- `per-listener-ingress-controller`: External listeners can optionally specify which ingress controller type to use (envoy, contour, istioingress). When unspecified, the cluster-level `spec.ingressController` is used. The operator reconciles ingress resources per listener based on this effective controller and cleans up when a listener switches or is removed. + +### Modified Capabilities + +- (None — no existing specs in openspec/specs/; this is a new API/behavior capability.) + +## Impact + +- **API**: `api/v1beta1/kafkacluster_types.go` — `ExternalListenerConfig` gains optional `IngressController string`. +- **Util**: `pkg/util/util.go` — `GetIngressConfigs` and new helper for effective ingress controller per listener. +- **Reconcilers**: `pkg/resources/contouringress/contour.go`, `pkg/resources/envoy/envoy.go`, `pkg/resources/istioingress/istioingress.go` — gate on effective controller per listener. +- **Kafka/config/status**: `pkg/resources/kafka/kafka.go` — status creation and `getServiceFromExternalListener` use effective controller per listener. +- **Webhooks**: `pkg/webhooks/kafkacluster_validator.go` — validate per-listener enum and istio control plane when any listener uses istioingress. +- **Tests**: Controller and util tests for mixed listener controller types and defaulting. diff --git a/openspec/changes/per-listener-ingress-controller/specs/per-listener-ingress-controller/spec.md b/openspec/changes/per-listener-ingress-controller/specs/per-listener-ingress-controller/spec.md new file mode 100644 index 000000000..f73f41497 --- /dev/null +++ b/openspec/changes/per-listener-ingress-controller/specs/per-listener-ingress-controller/spec.md @@ -0,0 +1,68 @@ +# Spec: Per-Listener Ingress Controller + +## ADDED Requirements + +### Requirement: External listener may specify ingress controller type + +The system SHALL allow an external listener to optionally specify which ingress controller type (`envoy`, `contour`, or `istioingress`) to use. When not specified, the listener SHALL use the cluster-level `KafkaCluster.Spec.IngressController` value. + +#### Scenario: Listener with explicit ingress controller + +- **WHEN** an external listener has `ingressController` set to `contour` +- **THEN** only the Contour reconciler SHALL create or update ingress resources for that listener, and Envoy and Istio reconcilers SHALL not create resources for it + +#### Scenario: Listener without explicit ingress controller + +- **WHEN** an external listener does not set `ingressController` (empty or omitted) +- **THEN** the effective controller for that listener SHALL be `KafkaCluster.Spec.IngressController`, and reconcilers SHALL behave as before (all listeners use cluster default) + +#### Scenario: Mixed controllers on same cluster + +- **WHEN** one external listener has `ingressController: envoy` and another has `ingressController: contour` +- **THEN** the Envoy reconciler SHALL create resources only for the first listener and the Contour reconciler SHALL create resources only for the second listener + +### Requirement: Effective controller drives config and status + +The system SHALL use the effective ingress controller for a listener (per-listener value if set, else cluster default) when resolving ingress config, service names, and listener status for that listener. + +#### Scenario: GetIngressConfigs uses effective controller + +- **WHEN** `GetIngressConfigs(spec, eListener)` is called and the listener has `ingressController: contour` +- **THEN** the returned config SHALL be the Contour branch (ContourIngressConfig merge) regardless of `spec.IngressController` + +#### Scenario: Service lookup uses effective controller + +- **WHEN** status or service lookup resolves the ingress service for a listener that uses Contour +- **THEN** the system SHALL use the Contour service name template for that listener, not the Envoy or Istio template + +### Requirement: Cleanup when listener no longer uses a controller + +When `RemoveUnusedIngressResources` is true and an external listener’s effective controller is not a given type, the system SHALL remove any existing ingress resources (services, HTTPProxies, etc.) that were created for that (listener, controller) pair. + +#### Scenario: Listener switches from Envoy to Contour + +- **WHEN** a listener is updated from `ingressController: envoy` (or cluster default envoy) to `ingressController: contour` and `RemoveUnusedIngressResources` is true +- **THEN** Envoy resources for that listener SHALL be deleted and Contour resources SHALL be created for that listener + +### Requirement: Validation of per-listener ingress controller + +The system SHALL validate that when `ExternalListenerConfig.IngressController` is non-empty, it SHALL be one of `envoy`, `contour`, or `istioingress`. When any listener’s effective controller is `istioingress`, the system SHALL require `KafkaCluster.Spec.IstioControlPlane` to be set. + +#### Scenario: Invalid per-listener value rejected + +- **WHEN** an external listener has `ingressController: nginx` +- **THEN** the admission webhook SHALL reject the KafkaCluster with a validation error + +#### Scenario: Istio without control plane rejected + +- **WHEN** any external listener has effective controller `istioingress` and `Spec.IstioControlPlane` is nil +- **THEN** the admission webhook SHALL reject the KafkaCluster (same rule as today, applied to effective controllers in use) + +### Requirement: Backward compatibility + +Existing KafkaCluster resources that do not set `ingressController` on any external listener SHALL behave identically to before: all listeners SHALL use `Spec.IngressController` and all reconcilers SHALL behave as they do today. + +#### Scenario: Existing CR unchanged + +- **WHEN** a KafkaCluster has no `ingressController` field on any external listener +- **THEN** every listener’s effective controller SHALL be `Spec.IngressController` and resource creation/cleanup SHALL match current (pre-feature) behavior diff --git a/openspec/changes/per-listener-ingress-controller/tasks.md b/openspec/changes/per-listener-ingress-controller/tasks.md new file mode 100644 index 000000000..2fd558322 --- /dev/null +++ b/openspec/changes/per-listener-ingress-controller/tasks.md @@ -0,0 +1,36 @@ +# Tasks: Per-Listener Ingress Controller + +## 1. API and effective controller helper + +- [x] 1.1 Add optional `IngressController string` to `ExternalListenerConfig` in `api/v1beta1/kafkacluster_types.go` with json tag and kubebuilder validation enum `envoy;contour;istioingress` +- [x] 1.2 Add `GetIngressController(spec *KafkaClusterSpec) string` on `ExternalListenerConfig` (or equivalent helper in util) that returns `eListener.IngressController` if non-empty, else `spec.GetIngressController()` +- [x] 1.3 Run code generation (`make generate`) and ensure deepcopy includes the new field + +## 2. GetIngressConfigs and util + +- [x] 2.1 In `pkg/util/util.go`, update `GetIngressConfigs` to derive effective controller per listener via the new helper and use it to select the envoy/contour/istio branch and merge the correct cluster-level config +- [x] 2.2 Add unit tests for `GetIngressConfigs` with per-listener override (e.g. listener contour + spec envoy returns contour config) + +## 3. Ingress reconcilers (Contour, Envoy, Istio) + +- [x] 3.1 In `pkg/resources/contouringress/contour.go`, replace cluster-level `GetIngressController() == contour` check with per-listener effective controller; only reconcile listeners whose effective controller is contour; cleanup branch uses same effective check +- [x] 3.2 In `pkg/resources/envoy/envoy.go`, replace cluster-level check with per-listener effective controller; only reconcile listeners whose effective controller is envoy; cleanup branch uses same effective check +- [x] 3.3 In `pkg/resources/istioingress/istioingress.go`, replace cluster-level check with per-listener effective controller; only reconcile listeners whose effective controller is istioingress; cleanup branch uses same effective check + +## 4. Kafka status and service lookup + +- [x] 4.1 Update `getServiceFromExternalListener` in `pkg/resources/kafka/kafka.go` to accept effective controller for the listener (or listener + spec) and use it in the switch for service name template +- [x] 4.2 Update `createExternalListenerStatuses` and any callers that resolve ingress services to pass effective controller per listener when calling `getServiceFromExternalListener` +- [x] 4.3 Find and update any other listener-scoped use of `GetIngressController()` in `pkg/resources/kafka/kafka.go` (e.g. broker config) to use effective controller for that listener + +## 5. Webhook validation + +- [x] 5.1 In `pkg/webhooks/kafkacluster_validator.go`, add validation that each external listener’s `IngressController` (when non-empty) is one of envoy, contour, istioingress +- [x] 5.2 Ensure validation that when any listener’s effective controller is istioingress, `Spec.IstioControlPlane` is required (extend or reuse existing rule) +- [x] 5.3 Add or extend webhook tests for per-listener enum and Istio control plane + +## 6. Tests + +- [x] 6.1 Add controller test(s) for mixed listener controller types (e.g. one listener envoy, one contour) and verify only expected resources created per reconciler +- [x] 6.2 Add test for defaulting: no per-listener field, all listeners use cluster default (backward compatibility) +- [x] 6.3 Add test for cleanup when listener switches controller type with RemoveUnusedIngressResources true diff --git a/openspec/config.yaml b/openspec/config.yaml new file mode 100644 index 000000000..392946c67 --- /dev/null +++ b/openspec/config.yaml @@ -0,0 +1,20 @@ +schema: spec-driven + +# Project context (optional) +# This is shown to AI when creating artifacts. +# Add your tech stack, conventions, style guides, domain knowledge, etc. +# Example: +# context: | +# Tech stack: TypeScript, React, Node.js +# We use conventional commits +# Domain: e-commerce platform + +# Per-artifact rules (optional) +# Add custom rules for specific artifacts. +# Example: +# rules: +# proposal: +# - Keep proposals under 500 words +# - Always include a "Non-goals" section +# tasks: +# - Break tasks into chunks of max 2 hours diff --git a/pkg/resources/contouringress/contour.go b/pkg/resources/contouringress/contour.go index 48a0ba3a3..83e9bf903 100644 --- a/pkg/resources/contouringress/contour.go +++ b/pkg/resources/contouringress/contour.go @@ -19,11 +19,11 @@ import ( "context" "fmt" "reflect" - "strings" "emperror.dev/errors" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -68,7 +68,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { var reconcileObjects []runtime.Object // create ClusterIP services for discovery service and brokers for _, eListener := range r.KafkaCluster.Spec.ListenersConfig.ExternalListeners { - if r.KafkaCluster.Spec.GetIngressController() == contourutils.IngressControllerName && eListener.GetAccessMethod() == corev1.ServiceTypeClusterIP { + if eListener.GetIngressController(&r.KafkaCluster.Spec) == contourutils.IngressControllerName && eListener.GetAccessMethod() == corev1.ServiceTypeClusterIP { // create per ingressConfig services ClusterIP ingressConfigs, defaultControllerName, err := util.GetIngressConfigs(r.KafkaCluster.Spec, eListener) if err != nil { @@ -104,46 +104,55 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { return err } } - } else if r.KafkaCluster.Spec.RemoveUnusedIngressResources { - // Cleaning up unused contour resources when ingress controller is not contour or externalListener access method is not ClusterIP - deletionCounter := 0 - ctx := context.Background() - contourResourcesGVK := []schema.GroupVersionKind{ - { - Version: corev1.SchemeGroupVersion.Version, - Group: corev1.SchemeGroupVersion.Group, - Kind: reflect.TypeOf(corev1.Service{}).Name(), - }, - { - Version: corev1.SchemeGroupVersion.Version, - Group: corev1.SchemeGroupVersion.Group, - Kind: reflect.TypeOf(contour.HTTPProxy{}).Name(), - }, + } + } + + // Clean up contour resources that are no longer desired (outside the loop so we only remove resources not in reconcileObjects) + if r.KafkaCluster.Spec.RemoveUnusedIngressResources { + desiredNames := make(map[string]struct{}) + for _, obj := range reconcileObjects { + metaObj, err := meta.Accessor(obj) + if err != nil { + return errors.Wrap(err, "failed to get object meta for desired contour resources") } - var contourResources unstructured.UnstructuredList - for _, gvk := range contourResourcesGVK { - contourResources.SetGroupVersionKind(gvk) + desiredNames[metaObj.GetName()] = struct{}{} + } - if err := r.List(ctx, &contourResources, client.InNamespace(r.KafkaCluster.GetNamespace()), - client.MatchingLabels(labelsForContourIngressWithoutEListenerName(r.KafkaCluster.Name))); err != nil { - return errors.Wrap(err, "error when getting list of envoy ingress resources for deletion") - } + ctx := context.Background() + contourResourcesGVK := []schema.GroupVersionKind{ + { + Version: corev1.SchemeGroupVersion.Version, + Group: corev1.SchemeGroupVersion.Group, + Kind: reflect.TypeOf(corev1.Service{}).Name(), + }, + { + Version: contour.SchemeGroupVersion.Version, + Group: contour.SchemeGroupVersion.Group, + Kind: reflect.TypeOf(contour.HTTPProxy{}).Name(), + }, + } - for _, removeObject := range contourResources.Items { - if !strings.Contains(removeObject.GetLabels()[util.ExternalListenerLabelNameKey], eListener.Name) || - util.ObjectManagedByClusterRegistry(&removeObject) || - !removeObject.GetDeletionTimestamp().IsZero() { - continue - } - if err := r.Delete(ctx, &removeObject); client.IgnoreNotFound(err) != nil { - return errors.Wrap(err, "error when removing contour ingress resources") - } - log.V(1).Info(fmt.Sprintf("Deleted contour ingress '%s' resource '%s' for externalListener '%s'", gvk.Kind, removeObject.GetName(), eListener.Name)) - deletionCounter++ - } + var contourResources unstructured.UnstructuredList + for _, gvk := range contourResourcesGVK { + contourResources.SetGroupVersionKind(gvk) + + if err := r.List(ctx, &contourResources, client.InNamespace(r.KafkaCluster.GetNamespace()), + client.MatchingLabels(labelsForContourIngressWithoutEListenerName(r.KafkaCluster.Name))); err != nil { + return errors.Wrap(err, "error when getting list of contour ingress resources for deletion") } - if deletionCounter > 0 { - log.Info(fmt.Sprintf("Removed '%d' resources for contour ingress", deletionCounter)) + + for _, removeObject := range contourResources.Items { + if _, desired := desiredNames[removeObject.GetName()]; desired || + util.ObjectManagedByClusterRegistry(&removeObject) || + !removeObject.GetDeletionTimestamp().IsZero() { + log.V(2).Info(fmt.Sprintf("Skipping deletion of resource '%s' (kind: %s): either managed by cluster registry or marked for deletion", + removeObject.GetName(), gvk.Kind)) + continue + } + if err := r.Delete(ctx, &removeObject); client.IgnoreNotFound(err) != nil { + return errors.Wrap(err, "error when removing contour ingress resources") + } + log.Info(fmt.Sprintf("Deleted contour ingress '%s' resource '%s'", gvk.Kind, removeObject.GetName())) } } } diff --git a/pkg/resources/envoy/envoy.go b/pkg/resources/envoy/envoy.go index 318205599..3c5f9f56f 100644 --- a/pkg/resources/envoy/envoy.go +++ b/pkg/resources/envoy/envoy.go @@ -19,13 +19,13 @@ import ( "context" "fmt" "reflect" - "strings" "emperror.dev/errors" "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/client" @@ -68,8 +68,9 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { log = log.WithValues("component", envoyutils.ComponentName) log.V(1).Info("Reconciling") + desiredNames := make(map[string]struct{}) for _, eListener := range r.KafkaCluster.Spec.ListenersConfig.ExternalListeners { - if r.KafkaCluster.Spec.GetIngressController() == envoyutils.IngressControllerName && eListener.GetAccessMethod() == corev1.ServiceTypeLoadBalancer { + if eListener.GetIngressController(&r.KafkaCluster.Spec) == envoyutils.IngressControllerName && eListener.GetAccessMethod() == corev1.ServiceTypeLoadBalancer { ingressConfigs, defaultControllerName, err := util.GetIngressConfigs(r.KafkaCluster.Spec, eListener) if err != nil { return err @@ -91,62 +92,63 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { for _, res := range externalListenerResources { o := res(log, eListener, ingressConfig, name, defaultControllerName) - err := k8sutil.Reconcile(log, r.Client, o, r.KafkaCluster) + metaObj, err := meta.Accessor(o) + if err != nil { + return errors.Wrap(err, "failed to get object meta for desired envoy resources") + } + desiredNames[metaObj.GetName()] = struct{}{} + err = k8sutil.Reconcile(log, r.Client, o, r.KafkaCluster) if err != nil { return err } } } - } else if r.KafkaCluster.Spec.RemoveUnusedIngressResources { - // Cleaning up unused envoy resources when ingress controller is not envoy or externalListener access method is not LoadBalancer - deletionCounter := 0 - ctx := context.Background() - envoyResourcesGVK := []schema.GroupVersionKind{ - { - Version: corev1.SchemeGroupVersion.Version, - Group: corev1.SchemeGroupVersion.Group, - Kind: reflect.TypeOf(corev1.Service{}).Name(), - }, - { - Version: corev1.SchemeGroupVersion.Version, - Group: corev1.SchemeGroupVersion.Group, - Kind: reflect.TypeOf(corev1.ConfigMap{}).Name(), - }, - { - Version: appsv1.SchemeGroupVersion.Version, - Group: appsv1.SchemeGroupVersion.Group, - Kind: reflect.TypeOf(appsv1.Deployment{}).Name(), - }, - { - Version: policyv1.SchemeGroupVersion.Version, - Group: policyv1.SchemeGroupVersion.Group, - Kind: reflect.TypeOf(policyv1.PodDisruptionBudget{}).Name(), - }, + } + } + + if r.KafkaCluster.Spec.RemoveUnusedIngressResources { + ctx := context.Background() + envoyResourcesGVK := []schema.GroupVersionKind{ + { + Version: corev1.SchemeGroupVersion.Version, + Group: corev1.SchemeGroupVersion.Group, + Kind: reflect.TypeOf(corev1.Service{}).Name(), + }, + { + Version: corev1.SchemeGroupVersion.Version, + Group: corev1.SchemeGroupVersion.Group, + Kind: reflect.TypeOf(corev1.ConfigMap{}).Name(), + }, + { + Version: appsv1.SchemeGroupVersion.Version, + Group: appsv1.SchemeGroupVersion.Group, + Kind: reflect.TypeOf(appsv1.Deployment{}).Name(), + }, + { + Version: policyv1.SchemeGroupVersion.Version, + Group: policyv1.SchemeGroupVersion.Group, + Kind: reflect.TypeOf(policyv1.PodDisruptionBudget{}).Name(), + }, + } + var envoyResources unstructured.UnstructuredList + for _, gvk := range envoyResourcesGVK { + envoyResources.SetGroupVersionKind(gvk) + + if err := r.List(ctx, &envoyResources, client.InNamespace(r.KafkaCluster.GetNamespace()), + client.MatchingLabels(labelsForEnvoyIngressWithoutEListenerName(r.KafkaCluster.Name))); err != nil { + return errors.Wrap(err, "error when getting list of envoy ingress resources for deletion") } - var envoyResources unstructured.UnstructuredList - for _, gvk := range envoyResourcesGVK { - envoyResources.SetGroupVersionKind(gvk) - if err := r.List(ctx, &envoyResources, client.InNamespace(r.KafkaCluster.GetNamespace()), - client.MatchingLabels(labelsForEnvoyIngressWithoutEListenerName(r.KafkaCluster.Name))); err != nil { - return errors.Wrap(err, "error when getting list of envoy ingress resources for deletion") + for _, removeObject := range envoyResources.Items { + if _, desired := desiredNames[removeObject.GetName()]; desired || + util.ObjectManagedByClusterRegistry(&removeObject) || + !removeObject.GetDeletionTimestamp().IsZero() { + continue } - - for _, removeObject := range envoyResources.Items { - if !strings.Contains(removeObject.GetLabels()[util.ExternalListenerLabelNameKey], eListener.Name) || - util.ObjectManagedByClusterRegistry(&removeObject) || - !removeObject.GetDeletionTimestamp().IsZero() { - continue - } - if err := r.Delete(ctx, &removeObject); client.IgnoreNotFound(err) != nil { - return errors.Wrap(err, "error when removing envoy ingress resources") - } - log.V(1).Info(fmt.Sprintf("Deleted envoy ingress '%s' resource '%s' for externalListener '%s'", gvk.Kind, removeObject.GetName(), eListener.Name)) - deletionCounter++ + if err := r.Delete(ctx, &removeObject); client.IgnoreNotFound(err) != nil { + return errors.Wrap(err, "error when removing envoy ingress resources") } - } - if deletionCounter > 0 { - log.Info(fmt.Sprintf("Removed '%d' resources for envoy ingress", deletionCounter)) + log.Info(fmt.Sprintf("Deleted envoy ingress '%s' resource '%s'", gvk.Kind, removeObject.GetName())) } } } diff --git a/pkg/resources/istioingress/istioingress.go b/pkg/resources/istioingress/istioingress.go index 8e1c6b48f..61cd00070 100644 --- a/pkg/resources/istioingress/istioingress.go +++ b/pkg/resources/istioingress/istioingress.go @@ -82,9 +82,10 @@ func New(client client.Client, cluster *v1beta1.KafkaCluster) *Reconciler { func (r *Reconciler) Reconcile(log logr.Logger) error { log = log.WithValues("component", componentName) log.V(1).Info("Reconciling") + desiredNames := make(map[string]struct{}) for _, eListener := range r.KafkaCluster.Spec.ListenersConfig.ExternalListeners { - if r.KafkaCluster.Spec.GetIngressController() == istioingress.IngressControllerName && eListener.GetAccessMethod() == corev1.ServiceTypeLoadBalancer { + if eListener.GetIngressController(&r.KafkaCluster.Spec) == istioingress.IngressControllerName && eListener.GetAccessMethod() == corev1.ServiceTypeLoadBalancer { if r.KafkaCluster.Spec.IstioControlPlane == nil { log.Error(errors.NewPlain("reference to Istio Control Plane is missing"), "skip external listener reconciliation", "external listener", eListener.Name) continue @@ -107,57 +108,58 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { r.virtualService, } { o := res(log, eListener, ingressConfig, name, defaultControllerName, istioRevision) - err := k8sutil.Reconcile(log, r.Client, o, r.KafkaCluster) + metaObj, err := apimeta.Accessor(o) + if err != nil { + return errors.Wrap(err, "failed to get object meta for desired istio resources") + } + desiredNames[metaObj.GetName()] = struct{}{} + err = k8sutil.Reconcile(log, r.Client, o, r.KafkaCluster) if err != nil { return err } } } - } else if r.KafkaCluster.Spec.RemoveUnusedIngressResources { - // Cleaning up unused istio resources when ingress controller is not istioingress or externalListener access method is not LoadBalancer - deletionCounter := 0 - ctx := context.Background() - istioResourcesGVK := []schema.GroupVersionKind{ - { - Version: istioOperatorApi.GroupVersion.Version, - Group: istioOperatorApi.GroupVersion.Group, - Kind: reflect.TypeOf(istioOperatorApi.IstioMeshGateway{}).Name(), - }, - { - Version: istioclientv1beta1.SchemeGroupVersion.Version, - Group: istioclientv1beta1.SchemeGroupVersion.Group, - Kind: reflect.TypeOf(istioclientv1beta1.Gateway{}).Name(), - }, - { - Version: istioclientv1beta1.SchemeGroupVersion.Version, - Group: istioclientv1beta1.SchemeGroupVersion.Group, - Kind: reflect.TypeOf(istioclientv1beta1.VirtualService{}).Name(), - }, + } + } + + if r.KafkaCluster.Spec.RemoveUnusedIngressResources { + ctx := context.Background() + istioResourcesGVK := []schema.GroupVersionKind{ + { + Version: istioOperatorApi.GroupVersion.Version, + Group: istioOperatorApi.GroupVersion.Group, + Kind: reflect.TypeOf(istioOperatorApi.IstioMeshGateway{}).Name(), + }, + { + Version: istioclientv1beta1.SchemeGroupVersion.Version, + Group: istioclientv1beta1.SchemeGroupVersion.Group, + Kind: reflect.TypeOf(istioclientv1beta1.Gateway{}).Name(), + }, + { + Version: istioclientv1beta1.SchemeGroupVersion.Version, + Group: istioclientv1beta1.SchemeGroupVersion.Group, + Kind: reflect.TypeOf(istioclientv1beta1.VirtualService{}).Name(), + }, + } + var istioResources unstructured.UnstructuredList + for _, gvk := range istioResourcesGVK { + istioResources.SetGroupVersionKind(gvk) + + if err := r.List(ctx, &istioResources, client.InNamespace(r.KafkaCluster.GetNamespace()), + client.MatchingLabels(labelsForIstioIngressWithoutEListenerName(r.KafkaCluster.Name, ""))); err != nil && !apimeta.IsNoMatchError(err) { + return errors.Wrap(err, "error when getting list of istio ingress resources for deletion") } - var istioResources unstructured.UnstructuredList - for _, gvk := range istioResourcesGVK { - istioResources.SetGroupVersionKind(gvk) - if err := r.List(ctx, &istioResources, client.InNamespace(r.KafkaCluster.GetNamespace()), - client.MatchingLabels(labelsForIstioIngressWithoutEListenerName(r.KafkaCluster.Name, ""))); err != nil && !apimeta.IsNoMatchError(err) { - return errors.Wrap(err, "error when getting list of istio ingress resources for deletion") + for _, removeObject := range istioResources.Items { + if _, desired := desiredNames[removeObject.GetName()]; desired || + util.ObjectManagedByClusterRegistry(&removeObject) || + !removeObject.GetDeletionTimestamp().IsZero() { + continue } - - for _, removeObject := range istioResources.Items { - if !strings.Contains(removeObject.GetLabels()[util.ExternalListenerLabelNameKey], eListener.Name) || - util.ObjectManagedByClusterRegistry(&removeObject) || - !removeObject.GetDeletionTimestamp().IsZero() { - continue - } - if err := r.Delete(ctx, &removeObject); client.IgnoreNotFound(err) != nil { - return errors.Wrap(err, "error when removing istio ingress resources") - } - log.V(1).Info(fmt.Sprintf("Deleted istio ingress '%s' resource '%s' for externalListener '%s'", gvk.Kind, removeObject.GetName(), eListener.Name)) - deletionCounter++ + if err := r.Delete(ctx, &removeObject); client.IgnoreNotFound(err) != nil { + return errors.Wrap(err, "error when removing istio ingress resources") } - } - if deletionCounter > 0 { - log.Info(fmt.Sprintf("Removed '%d' resources for istio ingress", deletionCounter)) + log.Info(fmt.Sprintf("Deleted istio ingress '%s' resource '%s'", gvk.Kind, removeObject.GetName())) } } } diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 9aa91808f..177dedd39 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -614,7 +614,7 @@ func (r *Reconciler) reconcileKafkaPodDelete(ctx context.Context, log logr.Logge if err != nil { return errors.WrapIfWithDetails(err, "could not delete broker", "id", broker.Labels[banzaiv1beta1.BrokerIdLabelKey]) } - log.Info("broker pod deleted", banzaiv1beta1.BrokerIdLabelKey, broker.Labels[banzaiv1beta1.BrokerIdLabelKey], "pod", broker.GetName()) + log.Info("broker pod deleted, no longer present in spec", banzaiv1beta1.BrokerIdLabelKey, broker.Labels[banzaiv1beta1.BrokerIdLabelKey], "pod", broker.GetName()) configMapName := fmt.Sprintf(brokerConfigTemplate+"-%s", r.KafkaCluster.Name, broker.Labels[banzaiv1beta1.BrokerIdLabelKey]) err = r.Delete(context.TODO(), &corev1.ConfigMap{ObjectMeta: templates.ObjectMeta(configMapName, apiutil.LabelsForKafka(r.KafkaCluster.Name), r.KafkaCluster)}) if err != nil { @@ -963,8 +963,15 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo r.KafkaCluster.Status.BrokersState[currentPod.Labels[banzaiv1beta1.BrokerIdLabelKey]].ConfigurationState == banzaiv1beta1.ConfigInSync && !k8sutil.IsPodContainsEvictedContainer(currentPod) && !k8sutil.IsPodContainsShutdownContainer(currentPod) { - log.V(1).Info("resource is in sync") + log.V(1).Info("broker pod is in sync", "pod", currentPod.GetName(), banzaiv1beta1.BrokerIdLabelKey, currentPod.Labels[banzaiv1beta1.BrokerIdLabelKey]) return nil + } else { + log.V(1).Info("broker pod is in crashloop or not in sync", + "pod", currentPod.GetName(), + "configSync", r.KafkaCluster.Status.BrokersState[currentPod.Labels[banzaiv1beta1.BrokerIdLabelKey]].ConfigurationState == banzaiv1beta1.ConfigInSync, + "evicted", k8sutil.IsPodContainsEvictedContainer(currentPod), + "shutdown", k8sutil.IsPodContainsShutdownContainer(currentPod), + "terminated", k8sutil.IsPodContainsTerminatedContainer(currentPod)) } default: log.V(1).Info("kafka pod resource diffs", @@ -1425,8 +1432,9 @@ func (r *Reconciler) getBrokerHost(log logr.Logger, defaultHost string, broker b func (r *Reconciler) createExternalListenerStatuses(log logr.Logger) (map[string]banzaiv1beta1.ListenerStatusList, error) { extListenerStatuses := make(map[string]banzaiv1beta1.ListenerStatusList, len(r.KafkaCluster.Spec.ListenersConfig.ExternalListeners)) for _, eListener := range r.KafkaCluster.Spec.ListenersConfig.ExternalListeners { + effectiveController := eListener.GetIngressController(&r.KafkaCluster.Spec) // in case if external listener uses loadbalancer type of service and istioControlPlane is not specified than we skip this listener from status update. In this way this external listener will not be in the configmap. - if eListener.GetAccessMethod() == corev1.ServiceTypeLoadBalancer && r.KafkaCluster.Spec.GetIngressController() == istioingressutils.IngressControllerName && r.KafkaCluster.Spec.IstioControlPlane == nil { + if eListener.GetAccessMethod() == corev1.ServiceTypeLoadBalancer && effectiveController == istioingressutils.IngressControllerName && r.KafkaCluster.Spec.IstioControlPlane == nil { continue } var host string @@ -1444,7 +1452,7 @@ func (r *Reconciler) createExternalListenerStatuses(log logr.Logger) (map[string if iConfig.HostnameOverride != "" { host = iConfig.HostnameOverride } else if eListener.GetAccessMethod() == corev1.ServiceTypeLoadBalancer { - foundLBService, err = getServiceFromExternalListener(r.Client, r.KafkaCluster, eListener.Name, iConfigName) + foundLBService, err = getServiceFromExternalListener(r.Client, r.KafkaCluster, eListener.Name, iConfigName, effectiveController) if err != nil { return nil, errors.WrapIfWithDetails(err, "could not get service corresponding to the external listener", "externalListenerName", eListener.Name) } @@ -1458,7 +1466,7 @@ func (r *Reconciler) createExternalListenerStatuses(log logr.Logger) (map[string // optionally add all brokers service to the top of the list if eListener.GetAccessMethod() != corev1.ServiceTypeNodePort { if foundLBService == nil { - foundLBService, err = getServiceFromExternalListener(r.Client, r.KafkaCluster, eListener.Name, iConfigName) + foundLBService, err = getServiceFromExternalListener(r.Client, r.KafkaCluster, eListener.Name, iConfigName, effectiveController) if err != nil { return nil, errors.WrapIfWithDetails(err, "could not get service corresponding to the external listener", "externalListenerName", eListener.Name) } @@ -1621,10 +1629,10 @@ func (r *Reconciler) getBrokerAz(pod *corev1.Pod, kafkaBrokerAvailabilityZoneMap } func getServiceFromExternalListener(client client.Client, cluster *banzaiv1beta1.KafkaCluster, - eListenerName string, ingressConfigName string) (*corev1.Service, error) { + eListenerName string, ingressConfigName string, effectiveController string) (*corev1.Service, error) { foundLBService := &corev1.Service{} var iControllerServiceName string - switch cluster.Spec.GetIngressController() { + switch effectiveController { case istioingressutils.IngressControllerName: if ingressConfigName == util.IngressConfigGlobalName { iControllerServiceName = fmt.Sprintf(istioingressutils.MeshGatewayNameTemplate, eListenerName, cluster.GetName()) diff --git a/pkg/resources/nodeportexternalaccess/nodeportExternalAccess.go b/pkg/resources/nodeportexternalaccess/nodeportExternalAccess.go index 61dc83109..4aa1e5bb8 100644 --- a/pkg/resources/nodeportexternalaccess/nodeportExternalAccess.go +++ b/pkg/resources/nodeportexternalaccess/nodeportExternalAccess.go @@ -68,12 +68,21 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { return err } } else if r.KafkaCluster.Spec.RemoveUnusedIngressResources { - // Cleaning up unused nodeport services + // Cleaning up unused nodeport services — only delete if the existing service is NodePort type. + // Other reconcilers (e.g. contour) use the same name template for ClusterIP services; deleting + // those would cause an infinite create/delete loop. removeService := service.(client.Object) - if err := r.Delete(context.Background(), removeService); client.IgnoreNotFound(err) != nil { - return errors.Wrap(err, "error when removing unused nodeport services") + existing := &corev1.Service{} + err := r.Get(context.Background(), client.ObjectKeyFromObject(removeService), existing) + if client.IgnoreNotFound(err) != nil { + return errors.Wrap(err, "error when checking for unused nodeport service") + } + if err == nil && existing.Spec.Type == corev1.ServiceTypeNodePort { + if err := r.Delete(context.Background(), existing); client.IgnoreNotFound(err) != nil { + return errors.Wrap(err, "error when removing unused nodeport services") + } + log.V(1).Info(fmt.Sprintf("Deleted nodePort service '%s' for external listener '%s'", existing.GetName(), eListener.Name)) } - log.V(1).Info(fmt.Sprintf("Deleted nodePort service '%s' for external listener '%s'", removeService.GetName(), eListener.Name)) } } } diff --git a/pkg/util/util.go b/pkg/util/util.go index 6b3007201..0ec2c4569 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -271,7 +271,7 @@ func ShouldIncludeBroker(brokerConfig *v1beta1.BrokerConfig, status v1beta1.Kafk defaultIngressConfigName, ingressConfigName string) bool { // in KRaft mode, controller only are excluded if brokerConfig != nil && (len(brokerConfig.Roles) == 0 || brokerConfig.IsBrokerNode()) { - if len(brokerConfig.BrokerIngressMapping) == 0 && (ingressConfigName == defaultIngressConfigName || defaultIngressConfigName == "") || + if (len(brokerConfig.BrokerIngressMapping) == 0 || defaultIngressConfigName == "") && (ingressConfigName == defaultIngressConfigName || defaultIngressConfigName == "") || apiutil.StringSliceContains(brokerConfig.BrokerIngressMapping, ingressConfigName) { return true } @@ -289,8 +289,9 @@ func GetIngressConfigs(kafkaClusterSpec v1beta1.KafkaClusterSpec, eListenerConfig v1beta1.ExternalListenerConfig) (map[string]v1beta1.IngressConfig, string, error) { var ingressConfigs map[string]v1beta1.IngressConfig var defaultIngressConfigName string - // Merge specific external listener configuration with the global one if none specified - switch kafkaClusterSpec.GetIngressController() { + // Use effective controller for this listener (per-listener override or cluster default) + effectiveController := eListenerConfig.GetIngressController(&kafkaClusterSpec) + switch effectiveController { case envoyutils.IngressControllerName: if eListenerConfig.Config != nil { defaultIngressConfigName = eListenerConfig.Config.DefaultIngressConfig @@ -376,7 +377,7 @@ func GetIngressConfigs(kafkaClusterSpec v1beta1.KafkaClusterSpec, } } default: - return nil, "", errors.NewWithDetails("not supported ingress type", "name", kafkaClusterSpec.GetIngressController()) + return nil, "", errors.NewWithDetails("not supported ingress type", "name", effectiveController) } return ingressConfigs, defaultIngressConfigName, nil } @@ -604,10 +605,6 @@ func RetryOnConflict(backoff wait.Backoff, fn func() error) error { return RetryOnError(backoff, fn, apierrors.IsConflict) } -func GetExternalPortForBroker(externalStartingPort, brokerId int32) int32 { - return externalStartingPort + brokerId -} - // Generage MD5 hash for a given string func GetMD5Hash(text string) string { hash := md5.Sum([]byte(text)) diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index db84482ed..e0ae7924a 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/require" "github.com/banzaicloud/koperator/api/v1beta1" + "github.com/banzaicloud/koperator/pkg/util/contour" "github.com/banzaicloud/koperator/pkg/util/istioingress" "gotest.tools/assert" @@ -297,6 +298,24 @@ func TestGetIngressConfigs(t *testing.T) { }, } + defaultKafkaClusterWithContour := &v1beta1.KafkaClusterSpec{ + IngressController: contour.IngressControllerName, + ContourIngressConfig: v1beta1.ContourIngressConfig{ + TLSSecretName: "contour-tls", + BrokerFQDNTemplate: "broker-%id.contour.example.com", + }, + } + + // Cluster default envoy but has ContourIngressConfig for per-listener override test + clusterEnvoyWithContourConfig := &v1beta1.KafkaClusterSpec{ + IngressController: "envoy", + EnvoyConfig: defaultKafkaClusterWithEnvoy.EnvoyConfig, + ContourIngressConfig: v1beta1.ContourIngressConfig{ + TLSSecretName: "contour-tls", + BrokerFQDNTemplate: "broker-%id.contour.example.com", + }, + } + testCases := []struct { globalConfig v1beta1.KafkaClusterSpec externalListenerSpecifiedConfigs v1beta1.ExternalListenerConfig @@ -473,6 +492,41 @@ func TestGetIngressConfigs(t *testing.T) { }, }, }, + // Per-listener ingress override: cluster default envoy, listener specifies contour -> contour config + { + *clusterEnvoyWithContourConfig, + v1beta1.ExternalListenerConfig{ + CommonListenerSpec: v1beta1.CommonListenerSpec{ + Type: "plaintext", + Name: "external", + ContainerPort: 9094, + }, + ExternalStartingPort: 19090, + IngressController: contour.IngressControllerName, + }, + map[string]v1beta1.IngressConfig{ + IngressConfigGlobalName: { + ContourIngressConfig: &clusterEnvoyWithContourConfig.ContourIngressConfig, + }, + }, + }, + // Cluster default contour, listener has no override -> contour config (backward compatibility) + { + *defaultKafkaClusterWithContour, + v1beta1.ExternalListenerConfig{ + CommonListenerSpec: v1beta1.CommonListenerSpec{ + Type: "plaintext", + Name: "external", + ContainerPort: 9094, + }, + ExternalStartingPort: 19090, + }, + map[string]v1beta1.IngressConfig{ + IngressConfigGlobalName: { + ContourIngressConfig: &defaultKafkaClusterWithContour.ContourIngressConfig, + }, + }, + }, } for _, testCase := range testCases { ingressConfigs, _, err := GetIngressConfigs(testCase.globalConfig, testCase.externalListenerSpecifiedConfigs) @@ -822,6 +876,123 @@ func TestFilterControllerOnlyNodes(t *testing.T) { } } +func TestShouldIncludeBroker(t *testing.T) { + testCases := []struct { + testName string + brokerConfig *v1beta1.BrokerConfig + status v1beta1.KafkaClusterStatus + brokerID int + defaultIngressConfigName string + ingressConfigName string + expected bool + }{ + { + testName: "global Envoy listener, empty BrokerIngressMapping", + brokerConfig: &v1beta1.BrokerConfig{}, + status: v1beta1.KafkaClusterStatus{}, + brokerID: 0, + defaultIngressConfigName: "", + ingressConfigName: "", + expected: true, + }, + { + testName: "global Envoy listener, BrokerIngressMapping set to Contour config names (regression)", + brokerConfig: &v1beta1.BrokerConfig{ + BrokerIngressMapping: []string{"external"}, + }, + status: v1beta1.KafkaClusterStatus{ + BrokersState: map[string]v1beta1.BrokerState{ + "0": {ExternalListenerConfigNames: []string{"external"}}, + }, + }, + brokerID: 0, + defaultIngressConfigName: "", + ingressConfigName: "", + expected: true, + }, + { + testName: "global Envoy listener, BrokerIngressMapping set to multiple Contour config names (regression)", + brokerConfig: &v1beta1.BrokerConfig{ + BrokerIngressMapping: []string{"external", "corp"}, + }, + status: v1beta1.KafkaClusterStatus{}, + brokerID: 1, + defaultIngressConfigName: "", + ingressConfigName: "", + expected: true, + }, + { + testName: "named Contour listener, BrokerIngressMapping matches", + brokerConfig: &v1beta1.BrokerConfig{ + BrokerIngressMapping: []string{"external"}, + }, + status: v1beta1.KafkaClusterStatus{}, + brokerID: 0, + defaultIngressConfigName: "external", + ingressConfigName: "external", + expected: true, + }, + { + testName: "named Contour listener, BrokerIngressMapping does not match", + brokerConfig: &v1beta1.BrokerConfig{ + BrokerIngressMapping: []string{"external"}, + }, + status: v1beta1.KafkaClusterStatus{}, + brokerID: 0, + defaultIngressConfigName: "external", + ingressConfigName: "corp", + expected: false, + }, + { + testName: "named Contour listener, empty BrokerIngressMapping uses default", + brokerConfig: &v1beta1.BrokerConfig{}, + status: v1beta1.KafkaClusterStatus{}, + brokerID: 0, + defaultIngressConfigName: "external", + ingressConfigName: "external", + expected: true, + }, + { + testName: "KRaft controller-only node excluded", + brokerConfig: &v1beta1.BrokerConfig{ + Roles: []string{"controller"}, + }, + status: v1beta1.KafkaClusterStatus{}, + brokerID: 0, + defaultIngressConfigName: "", + ingressConfigName: "", + expected: false, + }, + { + testName: "KRaft combined broker+controller node included", + brokerConfig: &v1beta1.BrokerConfig{ + Roles: []string{"broker", "controller"}, + }, + status: v1beta1.KafkaClusterStatus{}, + brokerID: 0, + defaultIngressConfigName: "", + ingressConfigName: "", + expected: true, + }, + { + testName: "nil brokerConfig excluded", + brokerConfig: nil, + status: v1beta1.KafkaClusterStatus{}, + brokerID: 0, + defaultIngressConfigName: "", + ingressConfigName: "", + expected: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + result := ShouldIncludeBroker(tc.brokerConfig, tc.status, tc.brokerID, tc.defaultIngressConfigName, tc.ingressConfigName) + require.Equal(t, tc.expected, result) + }) + } +} + func TestConstructEListenerLabelName(t *testing.T) { tests := []struct { ingressConfigName string diff --git a/pkg/webhooks/kafkacluster_validator.go b/pkg/webhooks/kafkacluster_validator.go index d9f7b8932..5ff6ad7e7 100644 --- a/pkg/webhooks/kafkacluster_validator.go +++ b/pkg/webhooks/kafkacluster_validator.go @@ -30,7 +30,6 @@ import ( "github.com/go-logr/logr" banzaicloudv1beta1 "github.com/banzaicloud/koperator/api/v1beta1" - "github.com/banzaicloud/koperator/pkg/util" ) type KafkaClusterValidator struct { @@ -99,13 +98,47 @@ func checkInternalListeners(kafkaClusterSpec *banzaicloudv1beta1.KafkaClusterSpe func checkExternalListeners(kafkaClusterSpec *banzaicloudv1beta1.KafkaClusterSpec) field.ErrorList { var allErrs field.ErrorList + allErrs = append(allErrs, checkExternalListenerIngressController(kafkaClusterSpec)...) + allErrs = append(allErrs, checkIstioControlPlaneRequiredWhenIstioIngress(kafkaClusterSpec)...) allErrs = append(allErrs, checkExternalListenerStartingPort(kafkaClusterSpec)...) - allErrs = append(allErrs, checkTargetPortsCollisionForEnvoy(kafkaClusterSpec)...) return allErrs } +// checkExternalListenerIngressController validates that each external listener's IngressController (when non-empty) is envoy, contour, or istioingress. +func checkExternalListenerIngressController(kafkaClusterSpec *banzaicloudv1beta1.KafkaClusterSpec) field.ErrorList { + var allErrs field.ErrorList + valid := map[string]bool{"envoy": true, "contour": true, "istioingress": true} + for i, extListener := range kafkaClusterSpec.ListenersConfig.ExternalListeners { + if extListener.IngressController != "" && !valid[extListener.IngressController] { + fldErr := field.Invalid( + field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(i).Child("ingressController"), + extListener.IngressController, + "ingressController must be one of: envoy, contour, istioingress") + allErrs = append(allErrs, fldErr) + } + } + return allErrs +} + +// checkIstioControlPlaneRequiredWhenIstioIngress ensures Spec.IstioControlPlane is set when any listener's effective controller is istioingress. +func checkIstioControlPlaneRequiredWhenIstioIngress(kafkaClusterSpec *banzaicloudv1beta1.KafkaClusterSpec) field.ErrorList { + for _, extListener := range kafkaClusterSpec.ListenersConfig.ExternalListeners { + if extListener.GetIngressController(kafkaClusterSpec) == "istioingress" { + if kafkaClusterSpec.IstioControlPlane == nil { + return field.ErrorList{ + field.Required( + field.NewPath("spec").Child("istioControlPlane"), + "istioControlPlane must be set when any external listener uses ingressController istioingress"), + } + } + return nil + } + } + return nil +} + // checkUniqueListenerContainerPort checks for duplicate containerPort numbers across both internal and external listeners // which would subsequently generate a "Duplicate value" error when creating a Service which accumulates all these ports. // The first time a port number is found will not be reported as duplicate; only subsequent instances using that port are. @@ -144,7 +177,7 @@ func checkExternalListenerStartingPort(kafkaClusterSpec *banzaicloudv1beta1.Kafk for i, extListener := range kafkaClusterSpec.ListenersConfig.ExternalListeners { var outOfRangeBrokerIDs, collidingPortsBrokerIDs []int32 for _, broker := range kafkaClusterSpec.Brokers { - externalPort := util.GetExternalPortForBroker(extListener.ExternalStartingPort, broker.Id) + externalPort := extListener.GetBrokerPort(broker.Id) if externalPort < 1 || externalPort > maxPort { outOfRangeBrokerIDs = append(outOfRangeBrokerIDs, broker.Id) } @@ -153,7 +186,7 @@ func checkExternalListenerStartingPort(kafkaClusterSpec *banzaicloudv1beta1.Kafk collidingPortsBrokerIDs = append(collidingPortsBrokerIDs, broker.Id) } - if kafkaClusterSpec.GetIngressController() == "envoy" { + if extListener.GetIngressController(kafkaClusterSpec) == "envoy" { if externalPort == kafkaClusterSpec.EnvoyConfig.GetEnvoyAdminPort() || externalPort == kafkaClusterSpec.EnvoyConfig.GetEnvoyHealthCheckPort() { collidingPortsBrokerIDs = append(collidingPortsBrokerIDs, broker.Id) } @@ -177,12 +210,8 @@ func checkExternalListenerStartingPort(kafkaClusterSpec *banzaicloudv1beta1.Kafk return allErrs } -// checkTargetPortsCollisionForEnvoy checks if the IngressControllerTargetPort collides with the other container ports for envoy deployment +// checkTargetPortsCollisionForEnvoy checks if the IngressControllerTargetPort collides with the other container ports for envoy deployment (per listener when effective controller is envoy). func checkTargetPortsCollisionForEnvoy(kafkaClusterSpec *banzaicloudv1beta1.KafkaClusterSpec) field.ErrorList { - if kafkaClusterSpec.GetIngressController() != "envoy" { - return nil - } - var allErrs field.ErrorList ap := kafkaClusterSpec.EnvoyConfig.GetEnvoyAdminPort() @@ -196,6 +225,9 @@ func checkTargetPortsCollisionForEnvoy(kafkaClusterSpec *banzaicloudv1beta1.Kafk if kafkaClusterSpec.ListenersConfig.ExternalListeners != nil { for i, extListener := range kafkaClusterSpec.ListenersConfig.ExternalListeners { + if extListener.GetIngressController(kafkaClusterSpec) != "envoy" { + continue + } // the ingress controller target port only has impact while using LoadBalancer to access the Kafka cluster if extListener.GetAccessMethod() != corev1.ServiceTypeLoadBalancer { continue diff --git a/pkg/webhooks/kafkacluster_validator_test.go b/pkg/webhooks/kafkacluster_validator_test.go index 952c3f389..e6dbc70d6 100644 --- a/pkg/webhooks/kafkacluster_validator_test.go +++ b/pkg/webhooks/kafkacluster_validator_test.go @@ -248,6 +248,26 @@ func TestCheckExternalListenerStartingPort(t *testing.T) { "test-external2", int32(8081), int32(8080), int32(29092), []int32{11})), ), }, + { + testName: "valid config: valid ports when tls is enableed", + kafkaClusterSpec: v1beta1.KafkaClusterSpec{ + Brokers: []v1beta1.Broker{{Id: 0}, {Id: 1}, {Id: 2}}, + ListenersConfig: v1beta1.ListenersConfig{ + ExternalListeners: []v1beta1.ExternalListenerConfig{ + { + CommonListenerSpec: v1beta1.CommonListenerSpec{ + Name: "test-external1", + ContainerPort: 29095, + Type: "plaintext", + }, + ExternalStartingPort: -1, + AnyCastPort: util.Int32Pointer(9097), + }, + }, + }, + }, + expected: nil, + }, } for _, testCase := range testCases { @@ -385,3 +405,137 @@ func TestCheckTargetPortsCollisionForEnvoy(t *testing.T) { }) } } + +func TestCheckExternalListenerIngressController(t *testing.T) { + testCases := []struct { + testName string + kafkaClusterSpec v1beta1.KafkaClusterSpec + expected field.ErrorList + }{ + { + testName: "valid: empty per-listener ingressController (use cluster default)", + kafkaClusterSpec: v1beta1.KafkaClusterSpec{ + ListenersConfig: v1beta1.ListenersConfig{ + ExternalListeners: []v1beta1.ExternalListenerConfig{ + {CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "ext1"}, ExternalStartingPort: 19090}, + }, + }, + }, + expected: nil, + }, + { + testName: "valid: per-listener envoy", + kafkaClusterSpec: v1beta1.KafkaClusterSpec{ + ListenersConfig: v1beta1.ListenersConfig{ + ExternalListeners: []v1beta1.ExternalListenerConfig{ + {CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "ext1"}, ExternalStartingPort: 19090, IngressController: "envoy"}, + }, + }, + }, + expected: nil, + }, + { + testName: "valid: per-listener contour and istioingress", + kafkaClusterSpec: v1beta1.KafkaClusterSpec{ + ListenersConfig: v1beta1.ListenersConfig{ + ExternalListeners: []v1beta1.ExternalListenerConfig{ + {CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "ext1"}, ExternalStartingPort: 19090, IngressController: "contour"}, + {CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "ext2"}, ExternalStartingPort: 29090, IngressController: "istioingress"}, + }, + }, + }, + expected: nil, + }, + { + testName: "invalid: per-listener ingressController nginx", + kafkaClusterSpec: v1beta1.KafkaClusterSpec{ + ListenersConfig: v1beta1.ListenersConfig{ + ExternalListeners: []v1beta1.ExternalListenerConfig{ + {CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "ext1"}, ExternalStartingPort: 19090, IngressController: "nginx"}, + }, + }, + }, + expected: field.ErrorList{ + field.Invalid( + field.NewPath("spec").Child("listenersConfig").Child("externalListeners").Index(0).Child("ingressController"), + "nginx", + "ingressController must be one of: envoy, contour, istioingress"), + }, + }, + } + for _, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + got := checkExternalListenerIngressController(&tc.kafkaClusterSpec) + require.Equal(t, tc.expected, got) + }) + } +} + +func TestCheckIstioControlPlaneRequiredWhenIstioIngress(t *testing.T) { + testCases := []struct { + testName string + kafkaClusterSpec v1beta1.KafkaClusterSpec + expected field.ErrorList + }{ + { + testName: "valid: no listener uses istioingress", + kafkaClusterSpec: v1beta1.KafkaClusterSpec{ + ListenersConfig: v1beta1.ListenersConfig{ + ExternalListeners: []v1beta1.ExternalListenerConfig{ + {CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "ext1"}, ExternalStartingPort: 19090, IngressController: "envoy"}, + }, + }, + }, + expected: nil, + }, + { + testName: "valid: listener uses istioingress and IstioControlPlane is set", + kafkaClusterSpec: v1beta1.KafkaClusterSpec{ + IstioControlPlane: &v1beta1.IstioControlPlaneReference{Name: "cp", Namespace: "istio-system"}, + ListenersConfig: v1beta1.ListenersConfig{ + ExternalListeners: []v1beta1.ExternalListenerConfig{ + {CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "ext1"}, ExternalStartingPort: 19090, IngressController: "istioingress"}, + }, + }, + }, + expected: nil, + }, + { + testName: "invalid: listener effective controller istioingress but IstioControlPlane nil", + kafkaClusterSpec: v1beta1.KafkaClusterSpec{ + ListenersConfig: v1beta1.ListenersConfig{ + ExternalListeners: []v1beta1.ExternalListenerConfig{ + {CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "ext1"}, ExternalStartingPort: 19090, IngressController: "istioingress"}, + }, + }, + }, + expected: field.ErrorList{ + field.Required( + field.NewPath("spec").Child("istioControlPlane"), + "istioControlPlane must be set when any external listener uses ingressController istioingress"), + }, + }, + { + testName: "invalid: cluster default istioingress, no per-listener override, IstioControlPlane nil", + kafkaClusterSpec: v1beta1.KafkaClusterSpec{ + IngressController: "istioingress", + ListenersConfig: v1beta1.ListenersConfig{ + ExternalListeners: []v1beta1.ExternalListenerConfig{ + {CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "ext1"}, ExternalStartingPort: 19090}, + }, + }, + }, + expected: field.ErrorList{ + field.Required( + field.NewPath("spec").Child("istioControlPlane"), + "istioControlPlane must be set when any external listener uses ingressController istioingress"), + }, + }, + } + for _, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + got := checkIstioControlPlaneRequiredWhenIstioIngress(&tc.kafkaClusterSpec) + require.Equal(t, tc.expected, got) + }) + } +} diff --git a/tests/e2e/uninstall.go b/tests/e2e/uninstall.go index 5ab3f6f06..6f8dd8cbb 100644 --- a/tests/e2e/uninstall.go +++ b/tests/e2e/uninstall.go @@ -239,7 +239,7 @@ func requireRemoveCertManagerCRDs(kubectlOptions k8s.KubectlOptions) { }) } func requireUninstallingContour(kubectlOptions k8s.KubectlOptions) { - ginkgo.When("Uninstalling zookeeper-operator", func() { + ginkgo.When("Uninstalling Project Contour", func() { requireUninstallingContourHelmChart(kubectlOptions) requireRemoveContourCRDs(kubectlOptions) requireRemoveNamespace(kubectlOptions, contourIngressControllerHelmDescriptor.Namespace)