added kafka subscriber to observability package#2863
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Plus Run ID: ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (7)
✅ Files skipped from review due to trivial changes (1)
📝 WalkthroughWalkthroughAdds Kafka observability output: config schema (ObservabilityKafkaConfig), Sarama async kafkaAdapter with error-drain and graceful shutdown, InitializeKafkaAdapter wiring, a KafkaSubscriber to republish events to Kafka, and comprehensive unit tests. ChangesKafka Observability Output Sink
Sequence DiagramssequenceDiagram
participant Adapter as kafkaAdapter
participant Producer as sarama.AsyncProducer
participant Errors as Producer.Errors()
participant Drain as ErrorDrainGoroutine
Adapter->>Producer: NewAsyncProducer(brokers, config)
Adapter->>Drain: start error drain goroutine
Drain->>Errors: range over Errors()
Note over Adapter,Producer: runtime
Adapter->>Producer: Input() <- ProducerMessage (topic, bytes)
Errors->>Drain: send error (if any)
Drain->>Adapter: log error with topic
Note over Adapter,Producer: shutdown
Adapter->>Producer: AsyncClose()
Drain->>Drain: exit when Errors() closed
Adapter->>Adapter: wait for done or timeout
sequenceDiagram
participant Client as Client Code
participant Subscriber as KafkaSubscriber
participant Formatter as Formatter
participant Adapter as kafkaAdapter
participant Kafka as Kafka Broker
Client->>Subscriber: Initialize()
Subscriber->>Subscriber: validate brokers & topic
Subscriber->>Adapter: InitializeKafkaAdapter(cfg)
Subscriber->>Subscriber: generate UUIDv7 ID
Client->>Subscriber: OnEvent(evt)
Subscriber->>Formatter: format evt -> bytes
Subscriber->>Adapter: Write(bytes) (topic)
Adapter->>Kafka: async publish
Client->>Subscriber: Close()
Subscriber->>Adapter: Flush()
Subscriber->>Adapter: Close()
Adapter->>Kafka: AsyncClose producer
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
Dependency Validation ResultsDependency name: golang.org/x/crypto Dependency name: golang.org/x/text |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@backend/internal/system/observability/adapter/kafka_adapter.go`:
- Around line 103-115: The Write method can race with Close and panic by sending
to the producer Input after the adapter is marked closed; to fix it, make the
closed check and the send atomic by keeping ka.mu locked across the check and
the send: in kafkaAdapter.Write, remove the early ka.mu.Unlock() and perform the
ka.producer.Input() <- ... while still holding ka.mu, then unlock; reference the
kafkaAdapter.Write method, ka.mu, ka.closed and ka.producer.Input() so reviewers
can locate and update the code (this guarantees the check and send are
serialized with Close which calls AsyncClose()).
In `@backend/internal/system/observability/subscriber/kafka_subscriber.go`:
- Around line 128-143: KafkaSubscriber.Close uses ks.logger without checking for
nil which can panic if Close is called before Initialize; update Close (method
KafkaSubscriber.Close) to guard all logger calls by checking ks.logger != nil
before calling ks.logger.Info or ks.logger.Error, and ensure the early adapter
flush/close logic still runs (keep the existing ks.adapter != nil checks) but
replace direct logger usage at the start and end of the method (and inside error
branches) with conditional logging so Close is safe when ks.logger is nil (do
not change adapter behavior or return values).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro Plus
Run ID: 801ec88a-a6e5-48f6-93d9-7192897056a1
⛔ Files ignored due to path filters (1)
backend/go.sumis excluded by!**/*.sum
📒 Files selected for processing (7)
backend/go.modbackend/internal/system/config/config.gobackend/internal/system/observability/adapter/init.gobackend/internal/system/observability/adapter/kafka_adapter.gobackend/internal/system/observability/adapter/kafka_adapter_test.gobackend/internal/system/observability/subscriber/kafka_subscriber.gobackend/internal/system/observability/subscriber/kafka_subscriber_test.go
Dependency Validation ResultsDependency name: golang.org/x/crypto Dependency name: golang.org/x/text |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@backend/internal/system/config/config.go`:
- Around line 325-326: Add documentation for the new Kafka observability config
and the new agent.store option: document observability.output.kafka and all its
keys (enabled, brokers, topic, client_id, format, retries, timeout, categories)
with example YAML and explain initialization requirements (must have non-empty
brokers and topic), failure/close semantics, and category filtering behavior in
the observability config guide (e.g.,
docs/content/guides/getting-started/configuration.mdx or observability guide);
also document the new top-level agent.store accepted values and fallback
semantics alongside other *.store config docs so users know precedence and
defaults. Reference the new config struct ObservabilityKafkaConfig and the
agent.store option in the docs, and ensure a single consolidated docs update
covers the listed changed sections.
In `@backend/internal/system/observability/adapter/kafka_adapter.go`:
- Around line 103-115: In kafkaAdapter.Write, avoid blocking forever on
ka.producer.Input() by replacing the direct send with a timeout-based select:
check ka.closed under ka.mu as you already do, then use a select that attempts
to send the *sarama.ProducerMessage to ka.producer.Input(), a case for
<-time.After(timeout) (use cfg.Timeout if appropriate, otherwise introduce a
clear constant like producerEnqueueTimeout), and a case to detect if the adapter
was closed (e.g., a non-blocking check of ka.closed under ka.mu or a separate
closed channel). On timeout return a descriptive error (e.g., "kafka producer
input enqueue timed out"); keep the mutex usage minimal (only for checking
closed) and do not hold it while blocking on the select.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro Plus
Run ID: 88c10d7f-8b8f-40b3-b157-87eea22bd528
⛔ Files ignored due to path filters (1)
backend/go.sumis excluded by!**/*.sum
📒 Files selected for processing (7)
backend/go.modbackend/internal/system/config/config.gobackend/internal/system/observability/adapter/init.gobackend/internal/system/observability/adapter/kafka_adapter.gobackend/internal/system/observability/adapter/kafka_adapter_test.gobackend/internal/system/observability/subscriber/kafka_subscriber.gobackend/internal/system/observability/subscriber/kafka_subscriber_test.go
| func (ka *kafkaAdapter) Write(data []byte) error { | ||
| ka.mu.Lock() | ||
| if ka.closed { | ||
| ka.mu.Unlock() | ||
| return fmt.Errorf("kafka adapter is closed") | ||
| } | ||
| ka.mu.Unlock() | ||
|
|
||
| ka.producer.Input() <- &sarama.ProducerMessage{ | ||
| Topic: ka.topic, | ||
| Value: sarama.ByteEncoder(data), | ||
| } | ||
| return nil |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
For github.com/IBM/sarama v1.49.0 AsyncProducer, does sending to producer.Input() block when the channel is full, and what is the recommended way to avoid indefinite blocking in producer write paths?
💡 Result:
Yes, sending a message to the AsyncProducer.Input channel in Sarama will block if the channel is full [1][2]. The Input channel is essentially an unbuffered channel [2], and the producer's internal processing must be able to keep up with your ingestion rate to prevent blocking [3][2]. To avoid indefinite blocking in your producer write paths, you should implement one of the following strategies: 1. Non-blocking Writes with Select: Use a select statement with a default case when sending to the Input channel. This allows you to handle cases where the producer is overloaded by dropping the message or logging a warning instead of blocking your application [2]. select { case producer.Input <- msg: // Message successfully queued default: // Channel is full; handle as needed (e.g., drop, buffer, or retry) } 2. Timeout-based Writes: Instead of an immediate default, use a time.After or context.WithTimeout within a select block. This allows the application to wait for a short, defined period for space to become available before giving up, providing a compromise between immediate dropping and indefinite blocking [2]. 3. Monitor and Drain Channels: Deadlocks can occur if you do not actively drain the Errors and Successes channels [1]. Even if you do not need to process successes, the Successes channel must be consumed if Return.Successes is enabled in your configuration [1]. Similarly, the Errors channel must be drained to prevent the producer from deadlocking when the internal error buffer fills up [1]. It is also important to note that performance bottlenecks often stem from misconfiguration or improper handling of these return channels [1][3]. Always ensure that you are calling Close or AsyncClose on the producer to prevent memory leaks and ensure that all in-flight messages are flushed correctly [1].
Citations:
- 1: https://github.com/IBM/sarama/blob/main/async_producer.go
- 2: Async producer Input() channel full under load IBM/sarama#3369
- 3: Async producer can overflow itself at high rate IBM/sarama#805
🏁 Script executed:
cat -n backend/internal/system/observability/adapter/kafka_adapter.goRepository: thunder-id/thunderid
Length of output: 5020
🏁 Script executed:
grep -A2 "github.com/IBM/sarama" go.modRepository: thunder-id/thunderid
Length of output: 104
🏁 Script executed:
find . -name "go.mod" -type fRepository: thunder-id/thunderid
Length of output: 138
🏁 Script executed:
grep "github.com/IBM/sarama" backend/go.modRepository: thunder-id/thunderid
Length of output: 107
Add timeout to producer input write to prevent indefinite blocking on channel saturation.
The ka.producer.Input() <- ... send blocks indefinitely when the async producer input channel is saturated (e.g., due to broker/network degradation). Per Sarama v1.49.0 documentation, the Input channel blocks when full; you must implement a timeout-based select pattern to avoid stalling caller request paths.
While error draining is already correctly implemented (line 84), the Write path needs bounded enqueue behavior. Use cfg.Timeout (which exists but is currently only applied to network timeouts) or a new constant for the Input channel write timeout.
💡 Proposed change (timeout-based enqueue)
type kafkaAdapter struct {
producer sarama.AsyncProducer
topic string
logger *log.Logger
done chan struct{}
+ writeTimeout time.Duration
mu sync.Mutex
closed bool
}
@@
ka := &kafkaAdapter{
producer: producer,
topic: cfg.Topic,
logger: logger,
done: make(chan struct{}),
+ writeTimeout: func() time.Duration {
+ if cfg.Timeout > 0 {
+ return cfg.Timeout
+ }
+ return 5 * time.Second
+ }(),
}
@@
- ka.producer.Input() <- &sarama.ProducerMessage{
- Topic: ka.topic,
- Value: sarama.ByteEncoder(data),
- }
- return nil
+ msg := &sarama.ProducerMessage{
+ Topic: ka.topic,
+ Value: sarama.ByteEncoder(data),
+ }
+
+ select {
+ case ka.producer.Input() <- msg:
+ return nil
+ case <-time.After(ka.writeTimeout):
+ return fmt.Errorf("timed out enqueueing kafka message")
+ }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@backend/internal/system/observability/adapter/kafka_adapter.go` around lines
103 - 115, In kafkaAdapter.Write, avoid blocking forever on ka.producer.Input()
by replacing the direct send with a timeout-based select: check ka.closed under
ka.mu as you already do, then use a select that attempts to send the
*sarama.ProducerMessage to ka.producer.Input(), a case for <-time.After(timeout)
(use cfg.Timeout if appropriate, otherwise introduce a clear constant like
producerEnqueueTimeout), and a case to detect if the adapter was closed (e.g., a
non-blocking check of ka.closed under ka.mu or a separate closed channel). On
timeout return a descriptive error (e.g., "kafka producer input enqueue timed
out"); keep the mutex usage minimal (only for checking closed) and do not hold
it while blocking on the select.
There was a problem hiding this comment.
We don't want to drop published audits/logs/events so its ok for it to block. Will revisit if we decide to change it.
There was a problem hiding this comment.
@sacrana0, understood — backpressure semantics make sense here since losing audit/observability events would be worse than momentary caller blocking. I'll keep this in mind for future reviews of this adapter.
(˳˘ ɜ˘) 🐇
✏️ Learnings added
Learnt from: sacrana0
Repo: thunder-id/thunderid PR: 2863
File: backend/internal/system/observability/adapter/kafka_adapter.go:103-115
Timestamp: 2026-05-22T09:13:41.634Z
Learning: In `backend/internal/system/observability/adapter/kafka_adapter.go`, the `Write` method intentionally blocks on `ka.producer.Input() <- msg` without a timeout. This is by design: the team prefers backpressure over dropping audit/log/observability events. Do not flag this as a bug or suggest a timeout-based select pattern for this file.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
🧠 Learnings used
Learnt from: KaveeshaPiumini
Repo: thunder-id/thunderid PR: 2835
File: backend/internal/agent/declarative_resource.go:21-39
Timestamp: 2026-05-20T05:39:33.413Z
Learning: Do not treat importing a package named "testing" in production Go files as an anti-pattern in this codebase. For .go files that are not *_test.go, this convention is used to expose *ForTest constructors (e.g., NewAgentExporterForTest, NewApplicationExporterForTest) that are guarded by a call like testing.Testing() and panic when not running under tests. When reviewing, only flag importing "testing" in non-test files if it is not part of this established *ForTest/guard+panic convention.
Dependency Validation ResultsDependency name: golang.org/x/crypto Dependency name: golang.org/x/text |
Signed-off-by: Sachin Rana <sacrana324@gmail.com>
Dependency Validation ResultsDependency name: golang.org/x/crypto Dependency name: golang.org/x/text |
Purpose
Added support for a subscriber to publish events to Kafka.
Approach
Related Issues
Related PRs
Checklist
breaking changelabel added.Security checks
Summary by CodeRabbit