Added a timeout on the producer flush call in KafkaMirrorMakerConnectorTask#957
Added a timeout on the producer flush call in KafkaMirrorMakerConnectorTask#957
Conversation
...java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java
Show resolved
Hide resolved
shrinandthakkar
left a comment
There was a problem hiding this comment.
why shouldn't we just use the producer config offset.flush.timeout.ms instead of creating a newer config to wait on ?
ref: https://kafka.apache.org/21/documentation.html#producerconfigs
@shrinandthakkar the problem is that the the tasks were found to be stuck on |
@jzakaryan Do you think if we should rather try to reconfigure that value for MM clusters ? |
Even in flushness mode BMM's tasks do a flush call on the producer when shutting down. We have observed that producer flush call tends to get indefinitely stuck and this keeps the tasks from shutting down gracefully. The code change in this PR addresses this by wrapping the producer flush call in a future and blocking on that future with a timeout.
If the producer flush doesn't complete in the given timeout window, the task will proceed to committing safe offsets and shutting down. The timeout window is exposed through a configuration property.