Add an option to continue processing on deserialization errors#514
Conversation
…out shutting down Kafka Streams
loicgreffier
left a comment
There was a problem hiding this comment.
@AnkurSinhaaa Here's my review with some minor code refactoring.
BTW I'm wondering if we should also add a property to resume without forwarding anything to DLQ. We might want to continue and simply drop the records sometimes.
| } | ||
|
|
||
| return Response.fail(); | ||
|
|
There was a problem hiding this comment.
Remove this extra-line
|
|
||
| boolean shouldResume = shouldResume(exception); | ||
|
|
||
| if (shouldResume) { |
There was a problem hiding this comment.
I think the final return logic in this handler could be simplified as follows:
boolean shouldResumeWithDlq = shouldResume(exception);
return shouldResumeWithDlq ? resumeWithDlqRecord(consumerRecord, value) : Response.fail();If the continueOnUnhandledErrors verification is included in the shouldResume function (see my comment https://github.com/michelin/kstreamplify/pull/514/changes#r3086483677)
…out shutting down Kafka Streams
|
Thanks for the suggestions @loicgreffier I’ve moved return shouldResume(exception) This centralizes the decision logic and removes duplication. Also agreed on the idea of supporting a "drop without DLQ" mode — that would be a useful addition, But the question is DLQ usage ? My personal opinion is to keep traces of dropped records for monitoring and debugging. I suggest introducing it as a separate configuration (e.g. |
Agree, resume and send to DLQ should remain the default. A "resume and drop" behavior should be controlled by a separate property |
feat(dlq): add continue-on-unhandled-errors for deserialization handler
dlq.deserialization-handler.continue-on-unhandled-errorsRefactoring:
Tests: