Skip to content

Rework config#15

Merged
dieproht merged 2 commits into
mainfrom
rework-config
Jul 3, 2026
Merged

Rework config#15
dieproht merged 2 commits into
mainfrom
rework-config

Conversation

@dieproht

@dieproht dieproht commented Jul 1, 2026

Copy link
Copy Markdown
Collaborator

No description provided.

@dieproht dieproht force-pushed the rework-config branch 2 times, most recently from 3740296 to b692fc2 Compare July 2, 2026 14:21
@dieproht dieproht marked this pull request as ready for review July 3, 2026 07:36
@wangyk414 wangyk414 requested a review from Copilot July 3, 2026 07:38

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR reworks Anthology’s configuration model and pipeline terminology by renaming “domain/aggregate” concepts to “channel/message” and updating the end-to-end workflow accordingly (including config loading, Kafka serialization, and state-store integration).

Changes:

  • Replaces DomainConfigs/DomainRelations/Credentials with domain.channels + domain.relations and AdditionalKafkaPropertiesLoader.
  • Renames core data types and pipeline stages from Aggregate* to Message* across filtering, transformation, persistence, linking, triggering, composition/inlining, and Kafka IO.
  • Updates tests and example configs/resources to match the new config structure and naming.

Reviewed changes

Copilot reviewed 68 out of 68 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
src/test/scala/de/otto/anthology/transformation/DomainTransformationStageTest.scala Updates transformation stage tests to channel/message terminology and configs.
src/test/scala/de/otto/anthology/transformation/CodomainTransformationStageTest.scala Updates codomain transformation tests to message-based API.
src/test/scala/de/otto/anthology/transformation/AggregateIdTransformerTest.scala Switches to MessageIdTransformer usage (test suite naming still references aggregate).
src/test/scala/de/otto/anthology/TestUtils.scala Adjusts Kafka record helpers to use MessageId/Message.
src/test/scala/de/otto/anthology/TestData.scala Renames test fixtures to channel/message-format relations and message payloads.
src/test/scala/de/otto/anthology/statestore/RocksDBStateStoreTest.scala Updates state store test to use MessageId.
src/test/scala/de/otto/anthology/MessageDeserializerTest.scala Renames/describes deserializer tests for message deserialization.
src/test/scala/de/otto/anthology/KafkaSourceTest.scala Updates consumer deserializers to message-based serializers.
src/test/scala/de/otto/anthology/KafkaSinkTest.scala Updates sink test to emit codomain messages with new serializers.
src/test/scala/de/otto/anthology/headerpropagation/HeaderPropagationStageTest.scala Updates header propagation test to MessageId.
src/test/scala/de/otto/anthology/filtering/DomainFilteringStageTest.scala Updates domain filtering tests to channel/message configs and IDs.
src/test/scala/de/otto/anthology/filtering/CodomainFilteringStageTest.scala Updates codomain filtering tests to message pipeline types.
src/test/scala/de/otto/anthology/DomainSourceTest.scala Updates source recognition and types to channel/message format configs.
src/test/scala/de/otto/anthology/DomainPersistenceStageTest.scala Updates persistence tests to message IDs and new state-store keys.
src/test/scala/de/otto/anthology/config/DomainRelationConfigsTest.scala Updates relation config tests to new RelationConfigs model.
src/test/scala/de/otto/anthology/config/CredentialsLoaderTest.scala Removes credentials loader tests (loader removed).
src/test/scala/de/otto/anthology/config/AnthologyConfigTest.scala Updates config loading assertions for new nested domain section and headers.
src/test/scala/de/otto/anthology/config/AdditionalKafkaPropertiesLoaderTest.scala Adds tests for parsing additional Kafka properties JSON.
src/test/scala/de/otto/anthology/CodomainTriggeringStageTest.scala Updates triggering pipeline tests to message identifiers and relations.
src/test/scala/de/otto/anthology/CodomainPersistenceStageTest.scala Updates codomain persistence tests to message types.
src/test/scala/de/otto/anthology/CodomainInliningStageTest.scala Updates inlining tests to new message-based stages and relations types.
src/test/scala/de/otto/anthology/CodomainDeduplicationStageTest.scala Updates deduplication tests to message IDs and qualified message IDs.
src/test/scala/de/otto/anthology/CodomainCompositionStageTest.scala Updates composition tests to message-based pipeline.
src/test/scala/de/otto/anthology/AppTest.scala Updates E2E test CLI args and expected JSON paths to new config naming.
src/test/resources/e2e/application-minimal.yaml Migrates minimal E2E config to domain.channels and domain.relations.
src/test/resources/application.yaml Migrates example config to new structure + updated header-propagation examples.
src/test/resources/additional-props.env.example Updates env var name and description for additional Kafka properties.
src/main/scala/de/otto/anthology/transformation/MessageTransformer.scala Renames transformer implementation to MessageTransformer.
src/main/scala/de/otto/anthology/transformation/MessageIdTransformer.scala Renames ID transformer to MessageIdTransformer and updates types/docs.
src/main/scala/de/otto/anthology/transformation/DomainTransformationStage.scala Refactors domain transformation stages to message/channel config APIs.
src/main/scala/de/otto/anthology/transformation/CodomainTransformationStage.scala Refactors codomain transformation stage to message payloads.
src/main/scala/de/otto/anthology/statestore/StateStoreSection.scala Updates section docs to “messages”.
src/main/scala/de/otto/anthology/QualifiedMessageId.scala Introduces QualifiedMessageId (replaces QualifiedAggregateId).
src/main/scala/de/otto/anthology/QualifiedAggregateId.scala Removes QualifiedAggregateId.
src/main/scala/de/otto/anthology/package.scala Replaces Aggregate* opaque types with Message* and channel/message-format naming.
src/main/scala/de/otto/anthology/KafkaSink.scala Renames sink extension + switches to message serializers and additional properties.
src/main/scala/de/otto/anthology/kafka.scala Renames Kafka serializers/deserializers and passthrough types to message-based.
src/main/scala/de/otto/anthology/headerpropagation/HeaderPropagationStage.scala Updates header propagation stage to message payload types.
src/main/scala/de/otto/anthology/filtering/FilterChain.scala Updates filter chain to work with Message instead of Aggregate.
src/main/scala/de/otto/anthology/filtering/DomainFilteringStage.scala Updates domain filtering stage to channel/message configs and qualified IDs.
src/main/scala/de/otto/anthology/filtering/CodomainFilteringStage.scala Updates codomain filtering stage to message payload types.
src/main/scala/de/otto/anthology/DomainSources.scala Updates domain sources aggregation to channel-based configs.
src/main/scala/de/otto/anthology/DomainSource.scala Updates source recognition and output to qualified message IDs.
src/main/scala/de/otto/anthology/DomainPersistenceStage.scala Updates persistence to store domain messages keyed by qualified message ID.
src/main/scala/de/otto/anthology/DomainLinkingStage.scala Refactors linking/backlinking logic to channel/message IDs and relation configs.
src/main/scala/de/otto/anthology/config/RelationConfigs.scala Renames and refactors relation config container for channels/message formats.
src/main/scala/de/otto/anthology/config/RelationConfig.scala Introduces new relation config ADT for PureConfig loading.
src/main/scala/de/otto/anthology/config/MessageFormatConfig.scala Introduces new message-format config model (replaces aggregate config).
src/main/scala/de/otto/anthology/config/KafkaClusterConfig.scala Renames credentials to additionalProperties in cluster settings.
src/main/scala/de/otto/anthology/config/DomainRelationConfig.scala Removes old domain relation config ADT.
src/main/scala/de/otto/anthology/config/DomainConfigs.scala Removes old domain configs container.
src/main/scala/de/otto/anthology/config/DomainConfig.scala Replaces per-domain config with new DomainConfig(channels, relations) structure.
src/main/scala/de/otto/anthology/config/CredentialsLoader.scala Removes credentials loader implementation.
src/main/scala/de/otto/anthology/config/CliConf.scala Replaces CLI arg for credentials with additional Kafka properties.
src/main/scala/de/otto/anthology/config/ChannelConfigs.scala Adds channel configs container for runtime lookups.
src/main/scala/de/otto/anthology/config/ChannelConfig.scala Adds channel config model holding Kafka source config and message formats.
src/main/scala/de/otto/anthology/config/AnthologyConfig.scala Changes root config to hold a single domain section instead of flat lists.
src/main/scala/de/otto/anthology/config/AggregateConfig.scala Removes old aggregate config model.
src/main/scala/de/otto/anthology/config/AdditionalKafkaPropertiesLoader.scala Adds loader for additional Kafka client properties (CLI/env).
src/main/scala/de/otto/anthology/CodomainTriggeringStage.scala Refactors triggering stage to message-based qualified IDs and relations.
src/main/scala/de/otto/anthology/CodomainPersistenceStage.scala Refactors persistence stage to codomain messages.
src/main/scala/de/otto/anthology/CodomainInliningStage.scala Refactors inlining stage to message IDs and message JSON nodes.
src/main/scala/de/otto/anthology/CodomainDeduplicationStage.scala Refactors deduplication stage types to message IDs.
src/main/scala/de/otto/anthology/CodomainCompositionStage.scala Refactors composition/staging to message-based keys and IDs.
src/main/scala/de/otto/anthology/AppWorkflow.scala Updates the main workflow wiring to the renamed stages and new config types.
src/main/scala/de/otto/anthology/App.scala Switches config parsing, cluster settings loading, and consumer setup to new models.
Dockerfile Updates runtime env var documentation to additional Kafka properties.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +87 to 88
if logCnt % 100 == 0 then logger.info("Published and committed 100 batches...")
logCnt += 1
Comment on lines 51 to +54
val clusterSettings: Map[ClusterName, KafkaClusterSettings] =
config.kafkaClusters.map(cc => cc.name -> KafkaClusterSettings(cc, credentials(cc.name))).toMap
config.kafkaClusters
.map(cc => cc.name -> KafkaClusterSettings(cc, additionalKafkaProps(cc.name)))
.toMap
Comment on lines +18 to +21
given ConfigReader[(ChannelName, MessageFormatName)] =
ConfigReader[String].map: c2mStr =>
val c2m = c2mStr.split("/")
(ChannelName(c2m(0)), MessageFormatName(c2m(1)))
Comment on lines 9 to 12
class AggregateIdTransformerTest extends AnyFlatSpec, Matchers, Diagrams:

"AggregateIdTransformer" should "fail if the id is empty" in:
assertThrows[IllegalArgumentException]:

@wangyk414 wangyk414 left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

großartig

@dieproht dieproht merged commit a1306a4 into main Jul 3, 2026
2 checks passed
@dieproht dieproht deleted the rework-config branch July 3, 2026 08:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants