Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,6 @@ spec:
app.kubernetes.io/name: {{ include "kafka-operator.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/component: operator
app: prometheus
component: alertmanager
spec:
{{- with .Values.imagePullSecrets }}
imagePullSecrets:
Expand Down
31 changes: 31 additions & 0 deletions charts/kafka-operator/templates/podmonitor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{{- if .Values.prometheusMetrics.podMonitor.enabled }}
kind: PodMonitor
apiVersion: monitoring.coreos.com/v1
metadata:
name: {{ include "kafka-operator.fullname" . }}
namespace: {{ .Release.Namespace | quote }}
labels:
helm.sh/chart: {{ include "kafka-operator.chart" . }}
app.kubernetes.io/name: {{ include "kafka-operator.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
app.kubernetes.io/version: {{ .Chart.AppVersion }}
app.kubernetes.io/component: operator
{{- with .Values.operator.annotations }}
annotations:
{{- toYaml . | nindent 4 }}
{{- end }}
spec:
namespaceSelector:
matchNames:
- {{ .Release.Namespace }}
selector:
matchLabels:
app.kubernetes.io/name: {{ include "kafka-operator.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
app.kubernetes.io/component: operator
endpoints:
- interval: {{ .Values.prometheusMetrics.podMonitor.interval }}
port: metrics
path: /metrics
{{- end }}
4 changes: 4 additions & 0 deletions charts/kafka-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ prometheusMetrics:
create: true
# -- ServiceAccount used by prometheus auth proxy
name: kafka-operator-authproxy
podMonitor:
# -- If true, create a PodMonitor for Prometheus metrics
enabled: false
interval: 30s

# -- Health probes configuration
healthProbes: {}
Expand Down
4 changes: 3 additions & 1 deletion pkg/jmxextractor/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"io"
"net/http"
"regexp"
"time"

"github.com/banzaicloud/koperator/api/v1beta1"
"github.com/banzaicloud/koperator/pkg/errorfactory"
Expand Down Expand Up @@ -90,7 +91,8 @@ func (exp *jmxExtractor) ExtractDockerImageAndVersion(brokerId int32, brokerConf
requestURL = fmt.Sprintf(serviceJMXTemplate, exp.clusterName, brokerId,
exp.clusterNamespace, exp.kubernetesClusterDomain, 9020)
}
rsp, err := http.Get(requestURL)
client := &http.Client{Timeout: 30 * time.Second}
rsp, err := client.Get(requestURL)
if err != nil {
exp.log.Error(err, fmt.Sprintf("error during talking to broker-%d", brokerId))
return nil, errorfactory.New(errorfactory.BrokersNotReady{}, err, "unable to talk to ...")
Expand Down
49 changes: 34 additions & 15 deletions pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
reorderedBrokers := reorderBrokers(runningBrokers, boundPersistentVolumeClaims, r.KafkaCluster.Spec.Brokers, r.KafkaCluster.Status.BrokersState, controllerID, log)

allBrokerDynamicConfigSucceeded := true
brokerStatus := make(map[int32]*banzaiv1beta1.BrokerConfig)
for _, broker := range reorderedBrokers {
brokerConfig, err := broker.GetBrokerConfig(r.KafkaCluster.Spec)
if err != nil {
Expand Down Expand Up @@ -449,9 +450,8 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
if err != nil {
return err
}
if err = r.updateStatusWithDockerImageAndVersion(broker.Id, brokerConfig, log); err != nil {
return err
}
brokerStatus[broker.Id] = brokerConfig

// If dynamic configs can not be set then let the loop continue to the next broker,
// after the loop we return error. This solves that case when other brokers could get healthy,
// but the loop exits too soon because dynamic configs can not be set.
Expand All @@ -474,6 +474,10 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
return err
}

if err := r.updateStatusWithDockerImageAndVersion(brokerStatus, log); err != nil {
return err
}

// in case HeadlessServiceEnabled is changed, delete the service that was created by the previous
// reconcile flow. The services must be deleted at the end of the reconcile flow after the new services
// were created and broker configurations reflecting the new services otherwise the Kafka brokers
Expand Down Expand Up @@ -917,21 +921,36 @@ func (r *Reconciler) reconcileKafkaPod(log logr.Logger, desiredPod *corev1.Pod,
return nil
}

func (r *Reconciler) updateStatusWithDockerImageAndVersion(brokerId int32, brokerConfig *banzaiv1beta1.BrokerConfig,
log logr.Logger) error {
jmxExp := jmxextractor.NewJMXExtractor(r.KafkaCluster.GetNamespace(),
r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.GetName(), log)
type brokerVersionResult struct {
brokerID int32
kafkaVersion *banzaiv1beta1.KafkaVersion
err error
}

kafkaVersion, err := jmxExp.ExtractDockerImageAndVersion(brokerId, brokerConfig,
r.KafkaCluster.Spec.GetClusterImage(), r.KafkaCluster.Spec.HeadlessServiceEnabled)
if err != nil {
return err
func (r *Reconciler) updateStatusWithDockerImageAndVersion(brokers map[int32]*banzaiv1beta1.BrokerConfig, log logr.Logger) error {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Goroutines have no cancellation path. If the controller shuts down mid-reconcile, these goroutines run to completion (or until JMX times out). Worth passing a context.Context from the reconcile call if JMX extraction can be long-running.

ch := make(chan brokerVersionResult, len(brokers))
for brokerID, brokerConfig := range brokers {
go func(id int32, cfg *banzaiv1beta1.BrokerConfig) {
jmxExp := jmxextractor.NewJMXExtractor(r.KafkaCluster.GetNamespace(), r.KafkaCluster.Spec.GetKubernetesClusterDomain(), r.KafkaCluster.GetName(), log)
kv, err := jmxExp.ExtractDockerImageAndVersion(id, cfg, r.KafkaCluster.Spec.GetClusterImage(), r.KafkaCluster.Spec.HeadlessServiceEnabled)
if err != nil {
ch <- brokerVersionResult{brokerID: id, err: err}
return
}
ch <- brokerVersionResult{brokerID: id, kafkaVersion: kv}
}(brokerID, brokerConfig)
}
err = k8sutil.UpdateBrokerStatus(r.Client, []string{strconv.Itoa(int(brokerId))}, r.KafkaCluster,
*kafkaVersion, log)
if err != nil {
return err

for range brokers {
result := <-ch
if result.err != nil {
return result.err
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drain the hole channel first then update the broker status so all or none are updated.

if err := k8sutil.UpdateBrokerStatus(r.Client, []string{strconv.Itoa(int(result.brokerID))}, r.KafkaCluster, *result.kafkaVersion, log); err != nil {
return err
}
}

return nil
}

Expand Down
Loading