diff --git a/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml b/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml index 0de92a3ed..c4801b225 100644 --- a/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml +++ b/charts/kafka-operator/templates/operator-deployment-with-webhook.yaml @@ -133,8 +133,6 @@ spec: app.kubernetes.io/name: {{ include "kafka-operator.name" . }} app.kubernetes.io/instance: {{ .Release.Name }} app.kubernetes.io/component: operator - app: prometheus - component: alertmanager spec: {{- with .Values.imagePullSecrets }} imagePullSecrets: diff --git a/charts/kafka-operator/templates/podmonitor.yaml b/charts/kafka-operator/templates/podmonitor.yaml new file mode 100644 index 000000000..06b8b87a7 --- /dev/null +++ b/charts/kafka-operator/templates/podmonitor.yaml @@ -0,0 +1,31 @@ +{{- if .Values.prometheusMetrics.podMonitor.enabled }} +kind: PodMonitor +apiVersion: monitoring.coreos.com/v1 +metadata: + name: {{ include "kafka-operator.fullname" . }} + namespace: {{ .Release.Namespace | quote }} + labels: + helm.sh/chart: {{ include "kafka-operator.chart" . }} + app.kubernetes.io/name: {{ include "kafka-operator.name" . }} + app.kubernetes.io/instance: {{ .Release.Name }} + app.kubernetes.io/managed-by: {{ .Release.Service }} + app.kubernetes.io/version: {{ .Chart.AppVersion }} + app.kubernetes.io/component: operator + {{- with .Values.operator.annotations }} + annotations: + {{- toYaml . | nindent 4 }} + {{- end }} +spec: + namespaceSelector: + matchNames: + - {{ .Release.Namespace }} + selector: + matchLabels: + app.kubernetes.io/name: {{ include "kafka-operator.name" . }} + app.kubernetes.io/instance: {{ .Release.Name }} + app.kubernetes.io/component: operator + endpoints: + - interval: {{ .Values.prometheusMetrics.podMonitor.interval }} + port: metrics + path: /metrics +{{- end }} diff --git a/charts/kafka-operator/values.yaml b/charts/kafka-operator/values.yaml index 1073280f8..a7966a9e8 100644 --- a/charts/kafka-operator/values.yaml +++ b/charts/kafka-operator/values.yaml @@ -103,6 +103,10 @@ prometheusMetrics: create: true # -- ServiceAccount used by prometheus auth proxy name: kafka-operator-authproxy + podMonitor: + # -- If true, create a PodMonitor for Prometheus metrics + enabled: false + interval: 30s # -- Health probes configuration healthProbes: {} diff --git a/pkg/jmxextractor/extractor.go b/pkg/jmxextractor/extractor.go index 583e06dd7..d06af1491 100644 --- a/pkg/jmxextractor/extractor.go +++ b/pkg/jmxextractor/extractor.go @@ -20,6 +20,7 @@ import ( "io" "net/http" "regexp" + "time" "github.com/banzaicloud/koperator/api/v1beta1" "github.com/banzaicloud/koperator/pkg/errorfactory" @@ -90,7 +91,8 @@ func (exp *jmxExtractor) ExtractDockerImageAndVersion(brokerId int32, brokerConf requestURL = fmt.Sprintf(serviceJMXTemplate, exp.clusterName, brokerId, exp.clusterNamespace, exp.kubernetesClusterDomain, 9020) } - rsp, err := http.Get(requestURL) + client := &http.Client{Timeout: 30 * time.Second} + rsp, err := client.Get(requestURL) if err != nil { exp.log.Error(err, fmt.Sprintf("error during talking to broker-%d", brokerId)) return nil, errorfactory.New(errorfactory.BrokersNotReady{}, err, "unable to talk to ...") diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index 9aa91808f..934d5b312 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -409,6 +409,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { reorderedBrokers := reorderBrokers(runningBrokers, boundPersistentVolumeClaims, r.KafkaCluster.Spec.Brokers, r.KafkaCluster.Status.BrokersState, controllerID, log) allBrokerDynamicConfigSucceeded := true + brokerStatus := make(map[int32]*banzaiv1beta1.BrokerConfig) for _, broker := range reorderedBrokers { brokerConfig, err := broker.GetBrokerConfig(r.KafkaCluster.Spec) if err != nil { @@ -449,9 +450,8 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { if err != nil { return err } - if err = r.updateStatusWithDockerImageAndVersion(broker.Id, brokerConfig, log); err != nil { - return err - } + brokerStatus[broker.Id] = brokerConfig + // If dynamic configs can not be set then let the loop continue to the next broker, // after the loop we return error. This solves that case when other brokers could get healthy, // but the loop exits too soon because dynamic configs can not be set. @@ -474,6 +474,10 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { return err } + if err := r.updateStatusWithDockerImageAndVersion(brokerStatus, log); err != nil { + return err + } + // in case HeadlessServiceEnabled is changed, delete the service that was created by the previous // reconcile flow. The services must be deleted at the end of the reconcile flow after the new services // were created and broker configurations reflecting the new services otherwise the Kafka brokers @@ -917,21 +921,36 @@ func (r *Reconciler) reconcileKafkaPod(log logr.Logger, desiredPod *corev1.Pod, return nil } -func (r *Reconciler) updateStatusWithDockerImageAndVersion(brokerId int32, brokerConfig *banzaiv1beta1.BrokerConfig, - log logr.Logger) error { - jmxExp := jmxextractor.NewJMXExtractor(r.KafkaCluster.GetNamespace(), - r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.GetName(), log) +type brokerVersionResult struct { + brokerID int32 + kafkaVersion *banzaiv1beta1.KafkaVersion + err error +} - kafkaVersion, err := jmxExp.ExtractDockerImageAndVersion(brokerId, brokerConfig, - r.KafkaCluster.Spec.GetClusterImage(), r.KafkaCluster.Spec.HeadlessServiceEnabled) - if err != nil { - return err +func (r *Reconciler) updateStatusWithDockerImageAndVersion(brokers map[int32]*banzaiv1beta1.BrokerConfig, log logr.Logger) error { + ch := make(chan brokerVersionResult, len(brokers)) + for brokerID, brokerConfig := range brokers { + go func(id int32, cfg *banzaiv1beta1.BrokerConfig) { + jmxExp := jmxextractor.NewJMXExtractor(r.KafkaCluster.GetNamespace(), r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.GetName(), log) + kv, err := jmxExp.ExtractDockerImageAndVersion(id, cfg, r.KafkaCluster.Spec.GetClusterImage(), r.KafkaCluster.Spec.HeadlessServiceEnabled) + if err != nil { + ch <- brokerVersionResult{brokerID: id, err: err} + return + } + ch <- brokerVersionResult{brokerID: id, kafkaVersion: kv} + }(brokerID, brokerConfig) } - err = k8sutil.UpdateBrokerStatus(r.Client, []string{strconv.Itoa(int(brokerId))}, r.KafkaCluster, - *kafkaVersion, log) - if err != nil { - return err + + for range brokers { + result := <-ch + if result.err != nil { + return result.err + } + if err := k8sutil.UpdateBrokerStatus(r.Client, []string{strconv.Itoa(int(result.brokerID))}, r.KafkaCluster, *result.kafkaVersion, log); err != nil { + return err + } } + return nil }