Skip to content

Commit 7f31c59

Browse files
Merge pull request #147 from OSGP/feature/FDP-2161-allowing-kafka-tombstones
FDP-2161 - Enabling Kafka tombstones for compact topics
2 parents 88e2d4d + 0820d63 commit 7f31c59

File tree

7 files changed

+76
-22
lines changed

7 files changed

+76
-22
lines changed

.github/dependabot.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ updates:
2222
- package-ecosystem: github-actions
2323
directory: /
2424
schedule:
25-
interval: weekly
25+
interval: "weekly"
2626
groups:
2727
all:
2828
patterns:

gradle/libs.versions.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ mockk = "1.14.9"
1212
msal4j = "1.24.0"
1313
sonarqube = "7.2.3.7755"
1414
spotless = "8.3.0"
15-
springBoot = "4.0.3"
15+
springBoot = "4.0.5"
1616
springmockk = "5.0.1"
1717

1818
[libraries]

kafka-avro/build.gradle.kts

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,50 @@ plugins {
55
alias(libs.plugins.avro)
66
}
77

8+
buildscript {
9+
configurations.all { resolutionStrategy { force("org.kohsuke:github-api:1.330") } }
10+
dependencies {
11+
// override the avro dependencies (1.12.1) of the avro plugin (that contain vulnerabilities)
12+
classpath(libs.avro)
13+
}
14+
15+
configurations {
16+
classpath {
17+
resolutionStrategy {
18+
// Temporary: check regularly if still necessary to override plugin classpath dependencies, and remove
19+
// when no longer needed
20+
eachDependency {
21+
if (requested.group == "com.fasterxml.jackson.core" && requested.name == "jackson-core") {
22+
useVersion("2.21.2")
23+
because(
24+
"Override plugin classpath to use non-vulnerable com.fasterxml.jackson.core:jackson-core 2.21.2",
25+
)
26+
}
27+
}
28+
}
29+
}
30+
}
31+
}
32+
33+
configurations.all {
34+
resolutionStrategy {
35+
// Temporary: check regularly if still necessary to override dependency versions, and remove when no longer
36+
// needed
37+
eachDependency {
38+
if (requested.group == "com.fasterxml.jackson.core" && requested.name == "jackson-core") {
39+
useVersion("2.21.2")
40+
because(
41+
"Override BOM and all constraints to use non-vulnerable com.fasterxml.jackson.core:jackson-core 2.21.2",
42+
)
43+
}
44+
}
45+
}
46+
}
47+
848
dependencies {
9-
implementation(libs.kafkaClients)
1049
implementation(libs.avro)
50+
implementation(libs.kafkaClients)
51+
implementation(libs.kotlinLoggingJvm)
1152

1253
implementation(libs.slf4jApi)
1354

kafka-avro/src/main/kotlin/com/gxf/utilities/kafka/avro/AvroDeserializer.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,17 @@
33
// SPDX-License-Identifier: Apache-2.0
44
package com.gxf.utilities.kafka.avro
55

6+
import io.github.oshai.kotlinlogging.KotlinLogging
67
import org.apache.avro.Schema
78
import org.apache.avro.message.BinaryMessageDecoder
89
import org.apache.avro.specific.SpecificData
910
import org.apache.avro.specific.SpecificRecordBase
1011
import org.apache.kafka.common.errors.SerializationException
1112
import org.apache.kafka.common.serialization.Deserializer
12-
import org.slf4j.LoggerFactory
1313

1414
class AvroDeserializer(deserializerSchemas: List<Schema>) : Deserializer<SpecificRecordBase> {
1515
companion object {
16-
private val logger = LoggerFactory.getLogger(AvroDeserializer::class.java)
16+
private val logger = KotlinLogging.logger {}
1717
}
1818

1919
private val decoder = BinaryMessageDecoder<SpecificRecordBase>(SpecificData(), null)
@@ -26,7 +26,7 @@ class AvroDeserializer(deserializerSchemas: List<Schema>) : Deserializer<Specifi
2626
/** Deserializes a Byte Array to an Avro SpecificRecordBase */
2727
override fun deserialize(topic: String, payload: ByteArray): SpecificRecordBase {
2828
try {
29-
logger.trace("Deserializing for {}", topic)
29+
logger.trace { "Deserializing for $topic" }
3030
return decoder.decode(payload)
3131
} catch (ex: Exception) {
3232
throw SerializationException("Error deserializing Avro message for topic: $topic", ex)

kafka-avro/src/main/kotlin/com/gxf/utilities/kafka/avro/AvroEncoder.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,18 @@
33
// SPDX-License-Identifier: Apache-2.0
44
package com.gxf.utilities.kafka.avro
55

6+
import io.github.oshai.kotlinlogging.KotlinLogging
67
import org.apache.avro.message.BinaryMessageEncoder
78
import org.apache.avro.specific.SpecificData
89
import org.apache.avro.specific.SpecificRecordBase
9-
import org.slf4j.LoggerFactory
1010
import java.io.IOException
1111
import java.io.OutputStream
1212
import kotlin.reflect.KClass
1313

1414
object AvroEncoder {
1515
val encoders: HashMap<KClass<out SpecificRecordBase>, BinaryMessageEncoder<SpecificRecordBase>> = HashMap()
1616

17-
private val logger = LoggerFactory.getLogger(AvroEncoder::class.java)
17+
private val logger = KotlinLogging.logger {}
1818

1919
@Throws(IOException::class)
2020
fun encode(message: SpecificRecordBase): ByteArray {
@@ -38,7 +38,7 @@ object AvroEncoder {
3838
return existingEncoder
3939
}
4040

41-
logger.info("New encoder created for Avro schema {}", message::class)
41+
logger.info { "New encoder created for Avro schema ${message::class}" }
4242
val newEncoder = BinaryMessageEncoder<SpecificRecordBase>(SpecificData(), message.schema)
4343
encoders[message::class] = newEncoder
4444
return newEncoder

kafka-avro/src/main/kotlin/com/gxf/utilities/kafka/avro/AvroSerializer.kt

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,26 @@
33
// SPDX-License-Identifier: Apache-2.0
44
package com.gxf.utilities.kafka.avro
55

6+
import io.github.oshai.kotlinlogging.KotlinLogging
67
import org.apache.avro.specific.SpecificRecordBase
78
import org.apache.kafka.common.errors.SerializationException
89
import org.apache.kafka.common.serialization.Serializer
9-
import org.slf4j.LoggerFactory
1010
import java.io.ByteArrayOutputStream
1111

1212
class AvroSerializer : Serializer<SpecificRecordBase> {
13-
companion object {
14-
private val logger = LoggerFactory.getLogger(AvroSerializer::class.java)
15-
}
13+
private val logger = KotlinLogging.logger {}
1614

17-
/** Serializes a Byte Array to an Avro specific record */
18-
override fun serialize(topic: String?, data: SpecificRecordBase): ByteArray {
15+
/** Serializes an Avro specific record to a ByteArray, returns null if data is null (Kafka Tombstone) */
16+
override fun serialize(topic: String?, data: SpecificRecordBase?): ByteArray? {
1917
try {
20-
logger.trace("Serializing for {}", topic)
21-
val outputStream = ByteArrayOutputStream()
22-
AvroEncoder.encode(data, outputStream)
23-
return outputStream.toByteArray()
18+
return if (data == null) {
19+
null
20+
} else {
21+
logger.trace { "Serializing for $topic" }
22+
val outputStream = ByteArrayOutputStream()
23+
AvroEncoder.encode(data, outputStream)
24+
outputStream.toByteArray()
25+
}
2426
} catch (ex: Exception) {
2527
throw SerializationException("Error serializing Avro message for topic: $topic", ex)
2628
}

kafka-avro/src/test/kotlin/com/gxf/utilities/kafka/avro/AvroSerializerTest.kt

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,28 @@ import org.assertj.core.api.Assertions.assertThat
99
import org.junit.jupiter.api.Test
1010

1111
class AvroSerializerTest {
12+
val topicName = "topic-name"
13+
1214
@Test
13-
fun testEncodersCache() {
15+
fun `should fill encoder cache`() {
1416
val message1 = AvroSchema1("field no 1", "field no 2")
1517
val message2 = AvroSchema2("message in a bottle")
1618
val serializer = AvroSerializer()
1719

18-
serializer.serialize("", message1)
19-
serializer.serialize("", message2)
20+
serializer.serialize(topicName, message1)
21+
serializer.serialize(topicName, message2)
2022

2123
assertThat(AvroEncoder.encoders).containsKeys(AvroSchema1::class)
2224
assertThat(AvroEncoder.encoders).containsKeys(AvroSchema2::class)
2325
assertThat(AvroEncoder.encoders.size).isEqualTo(2)
2426
}
27+
28+
@Test
29+
fun `should allow a tombstone message`() {
30+
val serializer = AvroSerializer()
31+
32+
val result = serializer.serialize(topicName, null)
33+
34+
assertThat(result).isNull()
35+
}
2536
}

0 commit comments

Comments
 (0)