Rework config#15
Merged
Merged
Conversation
3740296 to
b692fc2
Compare
There was a problem hiding this comment.
Pull request overview
This PR reworks Anthology’s configuration model and pipeline terminology by renaming “domain/aggregate” concepts to “channel/message” and updating the end-to-end workflow accordingly (including config loading, Kafka serialization, and state-store integration).
Changes:
- Replaces
DomainConfigs/DomainRelations/Credentialswithdomain.channels + domain.relationsandAdditionalKafkaPropertiesLoader. - Renames core data types and pipeline stages from
Aggregate*toMessage*across filtering, transformation, persistence, linking, triggering, composition/inlining, and Kafka IO. - Updates tests and example configs/resources to match the new config structure and naming.
Reviewed changes
Copilot reviewed 68 out of 68 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| src/test/scala/de/otto/anthology/transformation/DomainTransformationStageTest.scala | Updates transformation stage tests to channel/message terminology and configs. |
| src/test/scala/de/otto/anthology/transformation/CodomainTransformationStageTest.scala | Updates codomain transformation tests to message-based API. |
| src/test/scala/de/otto/anthology/transformation/AggregateIdTransformerTest.scala | Switches to MessageIdTransformer usage (test suite naming still references aggregate). |
| src/test/scala/de/otto/anthology/TestUtils.scala | Adjusts Kafka record helpers to use MessageId/Message. |
| src/test/scala/de/otto/anthology/TestData.scala | Renames test fixtures to channel/message-format relations and message payloads. |
| src/test/scala/de/otto/anthology/statestore/RocksDBStateStoreTest.scala | Updates state store test to use MessageId. |
| src/test/scala/de/otto/anthology/MessageDeserializerTest.scala | Renames/describes deserializer tests for message deserialization. |
| src/test/scala/de/otto/anthology/KafkaSourceTest.scala | Updates consumer deserializers to message-based serializers. |
| src/test/scala/de/otto/anthology/KafkaSinkTest.scala | Updates sink test to emit codomain messages with new serializers. |
| src/test/scala/de/otto/anthology/headerpropagation/HeaderPropagationStageTest.scala | Updates header propagation test to MessageId. |
| src/test/scala/de/otto/anthology/filtering/DomainFilteringStageTest.scala | Updates domain filtering tests to channel/message configs and IDs. |
| src/test/scala/de/otto/anthology/filtering/CodomainFilteringStageTest.scala | Updates codomain filtering tests to message pipeline types. |
| src/test/scala/de/otto/anthology/DomainSourceTest.scala | Updates source recognition and types to channel/message format configs. |
| src/test/scala/de/otto/anthology/DomainPersistenceStageTest.scala | Updates persistence tests to message IDs and new state-store keys. |
| src/test/scala/de/otto/anthology/config/DomainRelationConfigsTest.scala | Updates relation config tests to new RelationConfigs model. |
| src/test/scala/de/otto/anthology/config/CredentialsLoaderTest.scala | Removes credentials loader tests (loader removed). |
| src/test/scala/de/otto/anthology/config/AnthologyConfigTest.scala | Updates config loading assertions for new nested domain section and headers. |
| src/test/scala/de/otto/anthology/config/AdditionalKafkaPropertiesLoaderTest.scala | Adds tests for parsing additional Kafka properties JSON. |
| src/test/scala/de/otto/anthology/CodomainTriggeringStageTest.scala | Updates triggering pipeline tests to message identifiers and relations. |
| src/test/scala/de/otto/anthology/CodomainPersistenceStageTest.scala | Updates codomain persistence tests to message types. |
| src/test/scala/de/otto/anthology/CodomainInliningStageTest.scala | Updates inlining tests to new message-based stages and relations types. |
| src/test/scala/de/otto/anthology/CodomainDeduplicationStageTest.scala | Updates deduplication tests to message IDs and qualified message IDs. |
| src/test/scala/de/otto/anthology/CodomainCompositionStageTest.scala | Updates composition tests to message-based pipeline. |
| src/test/scala/de/otto/anthology/AppTest.scala | Updates E2E test CLI args and expected JSON paths to new config naming. |
| src/test/resources/e2e/application-minimal.yaml | Migrates minimal E2E config to domain.channels and domain.relations. |
| src/test/resources/application.yaml | Migrates example config to new structure + updated header-propagation examples. |
| src/test/resources/additional-props.env.example | Updates env var name and description for additional Kafka properties. |
| src/main/scala/de/otto/anthology/transformation/MessageTransformer.scala | Renames transformer implementation to MessageTransformer. |
| src/main/scala/de/otto/anthology/transformation/MessageIdTransformer.scala | Renames ID transformer to MessageIdTransformer and updates types/docs. |
| src/main/scala/de/otto/anthology/transformation/DomainTransformationStage.scala | Refactors domain transformation stages to message/channel config APIs. |
| src/main/scala/de/otto/anthology/transformation/CodomainTransformationStage.scala | Refactors codomain transformation stage to message payloads. |
| src/main/scala/de/otto/anthology/statestore/StateStoreSection.scala | Updates section docs to “messages”. |
| src/main/scala/de/otto/anthology/QualifiedMessageId.scala | Introduces QualifiedMessageId (replaces QualifiedAggregateId). |
| src/main/scala/de/otto/anthology/QualifiedAggregateId.scala | Removes QualifiedAggregateId. |
| src/main/scala/de/otto/anthology/package.scala | Replaces Aggregate* opaque types with Message* and channel/message-format naming. |
| src/main/scala/de/otto/anthology/KafkaSink.scala | Renames sink extension + switches to message serializers and additional properties. |
| src/main/scala/de/otto/anthology/kafka.scala | Renames Kafka serializers/deserializers and passthrough types to message-based. |
| src/main/scala/de/otto/anthology/headerpropagation/HeaderPropagationStage.scala | Updates header propagation stage to message payload types. |
| src/main/scala/de/otto/anthology/filtering/FilterChain.scala | Updates filter chain to work with Message instead of Aggregate. |
| src/main/scala/de/otto/anthology/filtering/DomainFilteringStage.scala | Updates domain filtering stage to channel/message configs and qualified IDs. |
| src/main/scala/de/otto/anthology/filtering/CodomainFilteringStage.scala | Updates codomain filtering stage to message payload types. |
| src/main/scala/de/otto/anthology/DomainSources.scala | Updates domain sources aggregation to channel-based configs. |
| src/main/scala/de/otto/anthology/DomainSource.scala | Updates source recognition and output to qualified message IDs. |
| src/main/scala/de/otto/anthology/DomainPersistenceStage.scala | Updates persistence to store domain messages keyed by qualified message ID. |
| src/main/scala/de/otto/anthology/DomainLinkingStage.scala | Refactors linking/backlinking logic to channel/message IDs and relation configs. |
| src/main/scala/de/otto/anthology/config/RelationConfigs.scala | Renames and refactors relation config container for channels/message formats. |
| src/main/scala/de/otto/anthology/config/RelationConfig.scala | Introduces new relation config ADT for PureConfig loading. |
| src/main/scala/de/otto/anthology/config/MessageFormatConfig.scala | Introduces new message-format config model (replaces aggregate config). |
| src/main/scala/de/otto/anthology/config/KafkaClusterConfig.scala | Renames credentials to additionalProperties in cluster settings. |
| src/main/scala/de/otto/anthology/config/DomainRelationConfig.scala | Removes old domain relation config ADT. |
| src/main/scala/de/otto/anthology/config/DomainConfigs.scala | Removes old domain configs container. |
| src/main/scala/de/otto/anthology/config/DomainConfig.scala | Replaces per-domain config with new DomainConfig(channels, relations) structure. |
| src/main/scala/de/otto/anthology/config/CredentialsLoader.scala | Removes credentials loader implementation. |
| src/main/scala/de/otto/anthology/config/CliConf.scala | Replaces CLI arg for credentials with additional Kafka properties. |
| src/main/scala/de/otto/anthology/config/ChannelConfigs.scala | Adds channel configs container for runtime lookups. |
| src/main/scala/de/otto/anthology/config/ChannelConfig.scala | Adds channel config model holding Kafka source config and message formats. |
| src/main/scala/de/otto/anthology/config/AnthologyConfig.scala | Changes root config to hold a single domain section instead of flat lists. |
| src/main/scala/de/otto/anthology/config/AggregateConfig.scala | Removes old aggregate config model. |
| src/main/scala/de/otto/anthology/config/AdditionalKafkaPropertiesLoader.scala | Adds loader for additional Kafka client properties (CLI/env). |
| src/main/scala/de/otto/anthology/CodomainTriggeringStage.scala | Refactors triggering stage to message-based qualified IDs and relations. |
| src/main/scala/de/otto/anthology/CodomainPersistenceStage.scala | Refactors persistence stage to codomain messages. |
| src/main/scala/de/otto/anthology/CodomainInliningStage.scala | Refactors inlining stage to message IDs and message JSON nodes. |
| src/main/scala/de/otto/anthology/CodomainDeduplicationStage.scala | Refactors deduplication stage types to message IDs. |
| src/main/scala/de/otto/anthology/CodomainCompositionStage.scala | Refactors composition/staging to message-based keys and IDs. |
| src/main/scala/de/otto/anthology/AppWorkflow.scala | Updates the main workflow wiring to the renamed stages and new config types. |
| src/main/scala/de/otto/anthology/App.scala | Switches config parsing, cluster settings loading, and consumer setup to new models. |
| Dockerfile | Updates runtime env var documentation to additional Kafka properties. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+87
to
88
| if logCnt % 100 == 0 then logger.info("Published and committed 100 batches...") | ||
| logCnt += 1 |
Comment on lines
51
to
+54
| val clusterSettings: Map[ClusterName, KafkaClusterSettings] = | ||
| config.kafkaClusters.map(cc => cc.name -> KafkaClusterSettings(cc, credentials(cc.name))).toMap | ||
| config.kafkaClusters | ||
| .map(cc => cc.name -> KafkaClusterSettings(cc, additionalKafkaProps(cc.name))) | ||
| .toMap |
Comment on lines
+18
to
+21
| given ConfigReader[(ChannelName, MessageFormatName)] = | ||
| ConfigReader[String].map: c2mStr => | ||
| val c2m = c2mStr.split("/") | ||
| (ChannelName(c2m(0)), MessageFormatName(c2m(1))) |
Comment on lines
9
to
12
| class AggregateIdTransformerTest extends AnyFlatSpec, Matchers, Diagrams: | ||
|
|
||
| "AggregateIdTransformer" should "fail if the id is empty" in: | ||
| assertThrows[IllegalArgumentException]: |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.