From 816fcbd9199ac606614b1beaf33c54049b76f03f Mon Sep 17 00:00:00 2001 From: Toby Brain Date: Thu, 20 Nov 2025 20:38:29 +1100 Subject: [PATCH 1/2] Only require any input parameters to be specified if they differ from the defaults --- docs/resources/fleet_integration_policy.md | 28 +- internal/fleet/integration_policy/acc_test.go | 68 +- internal/fleet/integration_policy/create.go | 8 +- .../fleet/integration_policy/input_type.go | 74 + .../fleet/integration_policy/input_value.go | 376 +++++ .../integration_policy/input_value_test.go | 275 +++ .../fleet/integration_policy/inputs_type.go | 2 +- .../fleet/integration_policy/inputs_value.go | 158 +- .../integration_policy/inputs_value_test.go | 32 +- internal/fleet/integration_policy/models.go | 49 +- .../integration_policy/models_defaults.go | 192 +++ .../models_defaults_test.go | 744 +++++++++ .../fleet/integration_policy/models_test.go | 2 +- internal/fleet/integration_policy/read.go | 8 +- .../resource-description.md | 4 - internal/fleet/integration_policy/schema.go | 96 +- .../fleet/integration_policy/schema_v1.go | 7 +- .../minimal/integration_policy.tf | 35 + .../unset/integration_policy.tf | 26 + .../integration_policy.tf | 67 + .../testdata/integration_kafka.json | 1481 +++++++++++++++++ internal/fleet/integration_policy/update.go | 8 +- 22 files changed, 3547 insertions(+), 193 deletions(-) create mode 100644 internal/fleet/integration_policy/input_type.go create mode 100644 internal/fleet/integration_policy/input_value.go create mode 100644 internal/fleet/integration_policy/input_value_test.go create mode 100644 internal/fleet/integration_policy/models_defaults.go create mode 100644 internal/fleet/integration_policy/models_defaults_test.go create mode 100644 internal/fleet/integration_policy/testdata/TestAccIntegrationPolicyInputs/minimal/integration_policy.tf create mode 100644 internal/fleet/integration_policy/testdata/TestAccIntegrationPolicyInputs/unset/integration_policy.tf create mode 100644 internal/fleet/integration_policy/testdata/TestAccIntegrationPolicyInputs/update_logfile_tags_only/integration_policy.tf create mode 100644 internal/fleet/integration_policy/testdata/integration_kafka.json diff --git a/docs/resources/fleet_integration_policy.md b/docs/resources/fleet_integration_policy.md index 8e956b6de..3899fa62f 100644 --- a/docs/resources/fleet_integration_policy.md +++ b/docs/resources/fleet_integration_policy.md @@ -4,9 +4,6 @@ page_title: "elasticstack_fleet_integration_policy Resource - terraform-provider subcategory: "Fleet" description: |- Creates or updates a Fleet Integration Policy. - It is highly recommended that all inputs and streams are provided in the - Terraform plan, even if some are disabled. Otherwise, differences may appear - between what is in the plan versus what is returned by the Fleet API. The Kibana Fleet UI https://www.elastic.co/guide/en/fleet/current/add-integration-to-policy.html can be used as a reference for what data needs to be provided. Instead of saving a new integration configuration, the API request can be previewed, showing what @@ -17,10 +14,6 @@ description: |- Creates or updates a Fleet Integration Policy. -It is highly recommended that all inputs and streams are provided in the -Terraform plan, even if some are disabled. Otherwise, differences may appear -between what is in the plan versus what is returned by the Fleet API. - The [Kibana Fleet UI](https://www.elastic.co/guide/en/fleet/current/add-integration-to-policy.html) can be used as a reference for what data needs to be provided. Instead of saving a new integration configuration, the API request can be previewed, showing what @@ -124,6 +117,10 @@ Optional: - `streams` (Attributes Map) Input streams mapped by stream ID. (see [below for nested schema](#nestedatt--inputs--streams)) - `vars` (String, Sensitive) Input-level variables as JSON. +Read-Only: + +- `defaults` (Attributes) Input defaults. (see [below for nested schema](#nestedatt--inputs--defaults)) + ### Nested Schema for `inputs.streams` @@ -132,6 +129,23 @@ Optional: - `enabled` (Boolean) Enable the stream. - `vars` (String, Sensitive) Stream-level variables as JSON. + + +### Nested Schema for `inputs.defaults` + +Read-Only: + +- `streams` (Attributes Map) Stream-level defaults mapped by stream ID. (see [below for nested schema](#nestedatt--inputs--defaults--streams)) +- `vars` (String) Input-level variable defaults as JSON. + + +### Nested Schema for `inputs.defaults.streams` + +Read-Only: + +- `enabled` (Boolean) Default enabled state for the stream. +- `vars` (String) Stream-level variable defaults as JSON. + ## Import Import is supported using the following syntax: diff --git a/internal/fleet/integration_policy/acc_test.go b/internal/fleet/integration_policy/acc_test.go index 22146dc1a..3e119e7bb 100644 --- a/internal/fleet/integration_policy/acc_test.go +++ b/internal/fleet/integration_policy/acc_test.go @@ -276,9 +276,14 @@ func TestAccResourceIntegrationPolicySecrets(t *testing.T) { "policy_name": config.StringVariable(policyName), "secret_key": config.StringVariable("updated"), }, - ImportState: true, - ImportStateVerify: true, - ImportStateVerifyIgnore: []string{"vars_json", "space_ids"}, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{ + "vars_json", + "space_ids", + "inputs.aws_logs-aws-cloudwatch.defaults", + "inputs.aws_logs-aws-s3.defaults", + }, Check: resource.ComposeTestCheckFunc( resource.TestMatchResourceAttr("elasticstack_fleet_integration_policy.test_policy", "vars_json", regexp.MustCompile(`{"access_key_id":{"id":"\S+","isSecretRef":true},"default_region":"us-east-2","endpoint":"endpoint","secret_access_key":{"id":"\S+","isSecretRef":true},"session_token":{"id":"\S+","isSecretRef":true}}`)), ), @@ -425,6 +430,63 @@ func TestAccIntegrationPolicyInputs(t *testing.T) { resource.TestCheckResourceAttr("elasticstack_fleet_integration_policy.test_policy", "inputs.kafka-kafka/metrics.streams.kafka.partition.enabled", "false"), ), }, + { + ProtoV6ProviderFactories: acctest.Providers, + SkipFunc: versionutils.CheckIfVersionIsUnsupported(minVersionIntegrationPolicy), + ConfigDirectory: acctest.NamedTestCaseDirectory("update_logfile_tags_only"), + ConfigVariables: config.Variables{ + "policy_name": config.StringVariable(policyName), + }, + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr("elasticstack_fleet_integration_policy.test_policy", "name", policyName), + resource.TestCheckResourceAttr("elasticstack_fleet_integration_policy.test_policy", "description", "Kafka Integration Policy - Logfile with tags only"), + // Check that the kafka-logfile input is enabled with only tags specified + resource.TestCheckResourceAttr("elasticstack_fleet_integration_policy.test_policy", "inputs.kafka-logfile.enabled", "true"), + resource.TestCheckResourceAttr("elasticstack_fleet_integration_policy.test_policy", "inputs.kafka-logfile.streams.kafka.log.enabled", "true"), + resource.TestCheckResourceAttr("elasticstack_fleet_integration_policy.test_policy", "inputs.kafka-logfile.streams.kafka.log.vars", `{"tags":["custom-tag-1","custom-tag-2"]}`), + // Check that the kafka/metrics input remains enabled + resource.TestCheckResourceAttr("elasticstack_fleet_integration_policy.test_policy", "inputs.kafka-kafka/metrics.enabled", "true"), + resource.TestCheckResourceAttr("elasticstack_fleet_integration_policy.test_policy", "inputs.kafka-kafka/metrics.vars", `{"hosts":["localhost:9092"],"period":"10s","ssl.certificate_authorities":[]}`), + resource.TestCheckResourceAttr("elasticstack_fleet_integration_policy.test_policy", "inputs.kafka-kafka/metrics.streams.kafka.broker.enabled", "true"), + resource.TestCheckResourceAttr("elasticstack_fleet_integration_policy.test_policy", "inputs.kafka-kafka/metrics.streams.kafka.consumergroup.enabled", "true"), + resource.TestCheckResourceAttr("elasticstack_fleet_integration_policy.test_policy", "inputs.kafka-kafka/metrics.streams.kafka.consumergroup.vars", `{"topics":["don't mention the war, I mentioned it once but I think I got away with it"]}`), + resource.TestCheckResourceAttr("elasticstack_fleet_integration_policy.test_policy", "inputs.kafka-kafka/metrics.streams.kafka.partition.enabled", "false"), + ), + }, + { + ProtoV6ProviderFactories: acctest.Providers, + SkipFunc: versionutils.CheckIfVersionIsUnsupported(minVersionIntegrationPolicy), + ConfigDirectory: acctest.NamedTestCaseDirectory("minimal"), + ConfigVariables: config.Variables{ + "policy_name": config.StringVariable(policyName), + }, + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr("elasticstack_fleet_integration_policy.test_policy", "name", policyName), + resource.TestCheckResourceAttr("elasticstack_fleet_integration_policy.test_policy", "description", "Kafka Integration Policy - Minimal"), + resource.TestCheckResourceAttr("elasticstack_fleet_integration_policy.test_policy", "integration_name", "kafka"), + // Check specified inputs + resource.TestCheckResourceAttr("elasticstack_fleet_integration_policy.test_policy", "inputs.kafka-logfile.enabled", "false"), + resource.TestCheckResourceAttr("elasticstack_fleet_integration_policy.test_policy", "inputs.kafka-kafka/metrics.enabled", "true"), + // Check unspecified, disabled by default input + resource.TestCheckNoResourceAttr("elasticstack_fleet_integration_policy.test_policy", "inputs.kafka-jolokia/metrics"), + ), + }, + { + ProtoV6ProviderFactories: acctest.Providers, + SkipFunc: versionutils.CheckIfVersionIsUnsupported(minVersionIntegrationPolicy), + ConfigDirectory: acctest.NamedTestCaseDirectory("unset"), + ConfigVariables: config.Variables{ + "policy_name": config.StringVariable(policyName), + }, + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttr("elasticstack_fleet_integration_policy.test_policy", "name", policyName), + resource.TestCheckResourceAttr("elasticstack_fleet_integration_policy.test_policy", "description", "Kafka Integration Policy - Minimal"), + resource.TestCheckResourceAttr("elasticstack_fleet_integration_policy.test_policy", "integration_name", "kafka"), + // Check previously specified inputs + resource.TestCheckResourceAttr("elasticstack_fleet_integration_policy.test_policy", "inputs.kafka-logfile.enabled", "false"), + resource.TestCheckResourceAttr("elasticstack_fleet_integration_policy.test_policy", "inputs.kafka-kafka/metrics.enabled", "true"), + ), + }, }, }) } diff --git a/internal/fleet/integration_policy/create.go b/internal/fleet/integration_policy/create.go index 6a23039f1..01d2e86ef 100644 --- a/internal/fleet/integration_policy/create.go +++ b/internal/fleet/integration_policy/create.go @@ -67,7 +67,13 @@ func (r *integrationPolicyResource) Create(ctx context.Context, req resource.Cre // Remember if the user configured input in the plan planHadInput := utils.IsKnown(planModel.Inputs) && !planModel.Inputs.IsNull() && len(planModel.Inputs.Elements()) > 0 - diags = planModel.populateFromAPI(ctx, policy) + pkg, diags := fleet.GetPackage(ctx, client, policy.Package.Name, policy.Package.Version) + resp.Diagnostics.Append(diags...) + if resp.Diagnostics.HasError() { + return + } + + diags = planModel.populateFromAPI(ctx, pkg, policy) resp.Diagnostics.Append(diags...) if resp.Diagnostics.HasError() { return diff --git a/internal/fleet/integration_policy/input_type.go b/internal/fleet/integration_policy/input_type.go new file mode 100644 index 000000000..4ef83fcc3 --- /dev/null +++ b/internal/fleet/integration_policy/input_type.go @@ -0,0 +1,74 @@ +package integration_policy + +import ( + "context" + + "github.com/hashicorp/terraform-plugin-framework/attr" + "github.com/hashicorp/terraform-plugin-framework/diag" + "github.com/hashicorp/terraform-plugin-framework/types/basetypes" + "github.com/hashicorp/terraform-plugin-go/tftypes" +) + +var ( + _ basetypes.ObjectTypable = (*InputType)(nil) + _ basetypes.ObjectValuableWithSemanticEquals = (*InputValue)(nil) +) + +// InputType is a custom type for an individual input that supports semantic equality +type InputType struct { + basetypes.ObjectType +} + +// String returns a human readable string of the type name. +func (t InputType) String() string { + return "integration_policy.InputType" +} + +// ValueType returns the Value type. +func (t InputType) ValueType(ctx context.Context) attr.Value { + return InputValue{ + ObjectValue: basetypes.NewObjectUnknown(t.AttributeTypes()), + } +} + +// Equal returns true if the given type is equivalent. +func (t InputType) Equal(o attr.Type) bool { + other, ok := o.(InputType) + if !ok { + return false + } + return t.ObjectType.Equal(other.ObjectType) +} + +// ValueFromObject returns an ObjectValuable type given a basetypes.ObjectValue. +func (t InputType) ValueFromObject(ctx context.Context, in basetypes.ObjectValue) (basetypes.ObjectValuable, diag.Diagnostics) { + return InputValue{ + ObjectValue: in, + }, nil +} + +// ValueFromTerraform returns a Value given a tftypes.Value. +func (t InputType) ValueFromTerraform(ctx context.Context, in tftypes.Value) (attr.Value, error) { + attrValue, err := t.ObjectType.ValueFromTerraform(ctx, in) + if err != nil { + return nil, err + } + + objectValue, ok := attrValue.(basetypes.ObjectValue) + if !ok { + return nil, err + } + + return InputValue{ + ObjectValue: objectValue, + }, nil +} + +// NewInputType creates a new InputType with the given attribute types +func NewInputType(attrTypes map[string]attr.Type) InputType { + return InputType{ + ObjectType: basetypes.ObjectType{ + AttrTypes: attrTypes, + }, + } +} diff --git a/internal/fleet/integration_policy/input_value.go b/internal/fleet/integration_policy/input_value.go new file mode 100644 index 000000000..cdfb4303b --- /dev/null +++ b/internal/fleet/integration_policy/input_value.go @@ -0,0 +1,376 @@ +package integration_policy + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/elastic/terraform-provider-elasticstack/internal/utils" + "github.com/hashicorp/terraform-plugin-framework-jsontypes/jsontypes" + "github.com/hashicorp/terraform-plugin-framework/attr" + "github.com/hashicorp/terraform-plugin-framework/diag" + "github.com/hashicorp/terraform-plugin-framework/path" + "github.com/hashicorp/terraform-plugin-framework/types" + "github.com/hashicorp/terraform-plugin-framework/types/basetypes" +) + +// InputValue is a custom value type for an individual input that implements semantic equality +// Semantic equality uses the defaults attribute to populate unspecified values before comparison +type InputValue struct { + basetypes.ObjectValue +} + +// Type returns an InputType. +func (v InputValue) Type(ctx context.Context) attr.Type { + return NewInputType(v.AttributeTypes(ctx)) +} + +// Equal returns true if the given value is equivalent. +func (v InputValue) Equal(o attr.Value) bool { + other, ok := o.(InputValue) + if !ok { + return false + } + return v.ObjectValue.Equal(other.ObjectValue) +} + +func (v InputValue) MaybeEnabled(ctx context.Context) (bool, diag.Diagnostics) { + if v.IsNull() || v.IsUnknown() { + return false, nil + } + + var input integrationPolicyInputsModel + diags := v.As(ctx, &input, basetypes.ObjectAsOptions{}) + if diags.HasError() { + return false, diags + } + + input, defaultDiags := applyDefaultsToInput(ctx, input, input.Defaults) + diags.Append(defaultDiags...) + if diags.HasError() { + return false, diags + } + + if !utils.IsKnown(input.Enabled) { + return true, diags + } + + // The input will be treated as disabled unless at least one stream is enabled + for _, stream := range input.Streams.Elements() { + streamModel := integrationPolicyInputStreamModel{} + d := stream.(types.Object).As(ctx, &streamModel, basetypes.ObjectAsOptions{}) + diags.Append(d...) + if diags.HasError() { + return false, diags + } + + if !utils.IsKnown(streamModel.Enabled) || streamModel.Enabled.ValueBool() { + return true, diags + } + } + + return false, nil +} + +// ObjectSemanticEquals returns true if the given object value is semantically equal to the current object value. +// Semantic equality applies defaults from the defaults attribute before comparing values. +func (v InputValue) ObjectSemanticEquals(ctx context.Context, newValuable basetypes.ObjectValuable) (bool, diag.Diagnostics) { + var diags diag.Diagnostics + + newValue, ok := newValuable.(InputValue) + if !ok { + diags.AddError( + "Semantic Equality Check Error", + "An unexpected value type was received while performing semantic equality checks. "+ + "Please report this to the provider developers.\n\n"+ + "Expected Value Type: "+fmt.Sprintf("%T", v)+"\n"+ + "Got Value Type: "+fmt.Sprintf("%T", newValuable), + ) + return false, diags + } + + // Convert both values to the model + var oldInput integrationPolicyInputsModel + d := v.As(ctx, &oldInput, basetypes.ObjectAsOptions{ + UnhandledNullAsEmpty: true, + UnhandledUnknownAsEmpty: true, + }) + diags.Append(d...) + if diags.HasError() { + return false, diags + } + + var newInput integrationPolicyInputsModel + d = newValue.As(ctx, &newInput, basetypes.ObjectAsOptions{ + UnhandledNullAsEmpty: true, + UnhandledUnknownAsEmpty: true, + }) + diags.Append(d...) + if diags.HasError() { + return false, diags + } + + defaults := oldInput.Defaults + if !utils.IsKnown(defaults) { + defaults = newInput.Defaults + } + + // Apply defaults to both inputs + oldInputWithDefaults, d := applyDefaultsToInput(ctx, oldInput, defaults) + diags.Append(d...) + if diags.HasError() { + return false, diags + } + + newInputWithDefaults, d := applyDefaultsToInput(ctx, newInput, defaults) + diags.Append(d...) + if diags.HasError() { + return false, diags + } + + // Ignore the disabled attribute in equality checks. + // Disabled inputs are handled at the InputsValue level. + + // Compare vars using semantic equality if both are known + if utils.IsKnown(oldInputWithDefaults.Vars) && utils.IsKnown(newInputWithDefaults.Vars) { + varsEqual, d := oldInputWithDefaults.Vars.StringSemanticEquals(ctx, newInputWithDefaults.Vars) + diags.Append(d...) + if diags.HasError() { + return false, diags + } + if !varsEqual { + return false, diags + } + } else if !oldInputWithDefaults.Vars.Equal(newInputWithDefaults.Vars) { + // If one is null/unknown, use regular equality + return false, diags + } + + // Compare streams + streamsEqual, d := compareStreams(ctx, oldInputWithDefaults, newInputWithDefaults) + diags.Append(d...) + if diags.HasError() { + return false, diags + } + if !streamsEqual { + return false, diags + } + + return true, diags +} + +// applyDefaultsToInput applies defaults from the defaults attribute to the input +func applyDefaultsToInput(ctx context.Context, input integrationPolicyInputsModel, defaultsObj types.Object) (integrationPolicyInputsModel, diag.Diagnostics) { + var diags diag.Diagnostics + + // If defaults is null or unknown, return input as-is + if !utils.IsKnown(defaultsObj) { + return input, diags + } + + // Extract defaults model + var defaults inputDefaultsModel + d := defaultsObj.As(ctx, &defaults, basetypes.ObjectAsOptions{}) + diags.Append(d...) + if diags.HasError() { + return input, diags + } + + result := input + + // Apply var defaults + varsWithDefaults, d := applyDefaultsToVars(input.Vars, defaults.Vars) + diags.Append(d...) + if diags.HasError() { + return input, diags + } + result.Vars = varsWithDefaults + + // Apply stream defaults + streamsWithDefaults, d := applyDefaultsToStreams(ctx, input.Streams, defaults.Streams) + diags.Append(d...) + if diags.HasError() { + return input, diags + } + result.Streams = streamsWithDefaults + + return result, diags +} + +func applyDefaultsToVars(vars jsontypes.Normalized, defaults jsontypes.Normalized) (jsontypes.Normalized, diag.Diagnostics) { + if !utils.IsKnown(defaults) { + return vars, nil + } + + if !utils.IsKnown(vars) { + return defaults, nil + } + + var varsMap map[string]interface{} + var defaultsMap map[string]interface{} + + diags := vars.Unmarshal(&varsMap) + d := defaults.Unmarshal(&defaultsMap) + diags.Append(d...) + if diags.HasError() { + return vars, diags + } + + for key, defaultValue := range defaultsMap { + if _, exists := varsMap[key]; !exists { + varsMap[key] = defaultValue + } + } + + varsBytes, err := json.Marshal(varsMap) + if err != nil { + diags.AddError("Failed to marshal vars with defaults", err.Error()) + return vars, diags + } + + varsWithDefaults := jsontypes.NewNormalizedValue(string(varsBytes)) + return varsWithDefaults, diags +} + +// applyDefaultsToStreams applies defaults to streams +func applyDefaultsToStreams(ctx context.Context, streams basetypes.MapValue, defaultStreams map[string]inputDefaultsStreamModel) (basetypes.MapValue, diag.Diagnostics) { + if len(defaultStreams) == 0 { + return streams, nil + } + + // If streams is not known, create new streams from defaults + if !utils.IsKnown(streams) { + streamsMap := make(map[string]integrationPolicyInputStreamModel) + for streamID, streamDefaults := range defaultStreams { + streamsMap[streamID] = integrationPolicyInputStreamModel(streamDefaults) + } + return types.MapValueFrom(ctx, getInputStreamType(), streamsMap) + } + + // Convert streams to model + var diags diag.Diagnostics + streamsMap := utils.MapTypeAs[integrationPolicyInputStreamModel](ctx, streams, path.Root("streams"), &diags) + if diags.HasError() { + return streams, diags + } + + // Apply defaults to each stream + for streamID, streamDefaults := range defaultStreams { + stream, exists := streamsMap[streamID] + if !exists { + // Stream not configured, use defaults + streamsMap[streamID] = integrationPolicyInputStreamModel(streamDefaults) + continue + } + + // Apply defaults to existing stream + if !utils.IsKnown(stream.Enabled) && utils.IsKnown(streamDefaults.Enabled) { + stream.Enabled = streamDefaults.Enabled + } + varsWithDefaults, d := applyDefaultsToVars(stream.Vars, streamDefaults.Vars) + diags.Append(d...) + if diags.HasError() { + return streams, diags + } + stream.Vars = varsWithDefaults + streamsMap[streamID] = stream + } + + return types.MapValueFrom(ctx, getInputStreamType(), streamsMap) +} + +// compareStreams compares two inputs' streams after defaults have been applied +func compareStreams(ctx context.Context, oldInput, newInput integrationPolicyInputsModel) (bool, diag.Diagnostics) { + var diags diag.Diagnostics + + // Handle null/unknown cases + if oldInput.Streams.IsNull() && newInput.Streams.IsNull() { + return true, diags + } + if oldInput.Streams.IsUnknown() && newInput.Streams.IsUnknown() { + return true, diags + } + if oldInput.Streams.IsNull() != newInput.Streams.IsNull() || oldInput.Streams.IsUnknown() != newInput.Streams.IsUnknown() { + return false, diags + } + + // Convert both maps to model + oldStreamsMap := utils.MapTypeAs[integrationPolicyInputStreamModel](ctx, oldInput.Streams, path.Root("streams"), &diags) + if diags.HasError() { + return false, diags + } + + newStreamsMap := utils.MapTypeAs[integrationPolicyInputStreamModel](ctx, newInput.Streams, path.Root("streams"), &diags) + if diags.HasError() { + return false, diags + } + + // Filter out disabled streams from both maps + enabledOldStreams := filterEnabledStreams(oldStreamsMap) + enabledNewStreams := filterEnabledStreams(newStreamsMap) + + // Check if the number of enabled streams is the same + if len(enabledOldStreams) != len(enabledNewStreams) { + return false, diags + } + + // Compare each enabled stream + for streamID, oldStream := range enabledOldStreams { + newStream, exists := enabledNewStreams[streamID] + if !exists { + return false, diags + } + + // Compare enabled flags + if !oldStream.Enabled.Equal(newStream.Enabled) { + return false, diags + } + + // Compare vars using semantic equality if both are known + if utils.IsKnown(oldStream.Vars) && utils.IsKnown(newStream.Vars) { + varsEqual, d := oldStream.Vars.StringSemanticEquals(ctx, newStream.Vars) + diags.Append(d...) + if diags.HasError() { + return false, diags + } + if !varsEqual { + return false, diags + } + } else if !oldStream.Vars.Equal(newStream.Vars) { + // If one is null/unknown, use regular equality + return false, diags + } + } + + return true, diags +} + +// NewInputNull creates an InputValue with a null value. +func NewInputNull(attrTypes map[string]attr.Type) InputValue { + return InputValue{ + ObjectValue: basetypes.NewObjectNull(attrTypes), + } +} + +// NewInputUnknown creates an InputValue with an unknown value. +func NewInputUnknown(attrTypes map[string]attr.Type) InputValue { + return InputValue{ + ObjectValue: basetypes.NewObjectUnknown(attrTypes), + } +} + +// NewInputValue creates an InputValue with a known value. +func NewInputValue(attrTypes map[string]attr.Type, attributes map[string]attr.Value) (InputValue, diag.Diagnostics) { + objectValue, diags := basetypes.NewObjectValue(attrTypes, attributes) + return InputValue{ + ObjectValue: objectValue, + }, diags +} + +// NewInputValueFrom creates an InputValue from a Go value. +func NewInputValueFrom(ctx context.Context, attrTypes map[string]attr.Type, val any) (InputValue, diag.Diagnostics) { + objectValue, diags := basetypes.NewObjectValueFrom(ctx, attrTypes, val) + return InputValue{ + ObjectValue: objectValue, + }, diags +} diff --git a/internal/fleet/integration_policy/input_value_test.go b/internal/fleet/integration_policy/input_value_test.go new file mode 100644 index 000000000..910981fb3 --- /dev/null +++ b/internal/fleet/integration_policy/input_value_test.go @@ -0,0 +1,275 @@ +package integration_policy + +import ( + "context" + "testing" + + "github.com/hashicorp/terraform-plugin-framework-jsontypes/jsontypes" + "github.com/hashicorp/terraform-plugin-framework/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestInputValue_ObjectSemanticEquals(t *testing.T) { + ctx := context.Background() + attrTypes := getInputsAttributeTypes() + + tests := []struct { + name string + value1 InputValue + value2 InputValue + expected bool + expectError bool + }{ + { + name: "both null values are equal", + value1: NewInputNull(attrTypes), + value2: NewInputNull(attrTypes), + expected: true, + }, + { + name: "both unknown values are equal", + value1: NewInputUnknown(attrTypes), + value2: NewInputUnknown(attrTypes), + expected: true, + }, + { + name: "null vs unknown are equal", + value1: NewInputNull(attrTypes), + value2: NewInputUnknown(attrTypes), + expected: true, + }, + { + name: "same inputs without defaults are equal", + value1: mustNewInputValue(ctx, t, integrationPolicyInputsModel{ + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedValue(`{"key": "value"}`), + Defaults: types.ObjectNull(getInputDefaultsAttrTypes()), + Streams: types.MapNull(getInputStreamType()), + }), + value2: mustNewInputValue(ctx, t, integrationPolicyInputsModel{ + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedValue(`{"key": "value"}`), + Defaults: types.ObjectNull(getInputDefaultsAttrTypes()), + Streams: types.MapNull(getInputStreamType()), + }), + expected: true, + }, + { + name: "different vars are not equal", + value1: mustNewInputValue(ctx, t, integrationPolicyInputsModel{ + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedValue(`{"key": "value1"}`), + Defaults: types.ObjectNull(getInputDefaultsAttrTypes()), + Streams: types.MapNull(getInputStreamType()), + }), + value2: mustNewInputValue(ctx, t, integrationPolicyInputsModel{ + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedValue(`{"key": "value2"}`), + Defaults: types.ObjectNull(getInputDefaultsAttrTypes()), + Streams: types.MapNull(getInputStreamType()), + }), + expected: false, + }, + { + name: "unset vars use defaults - equal when defaults match", + value1: mustNewInputValue(ctx, t, integrationPolicyInputsModel{ + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedNull(), + Defaults: mustNewInputDefaults(ctx, t, inputDefaultsModel{ + Vars: jsontypes.NewNormalizedValue(`{"key": "default_value"}`), + Streams: nil, + }), + Streams: types.MapNull(getInputStreamType()), + }), + value2: mustNewInputValue(ctx, t, integrationPolicyInputsModel{ + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedValue(`{"key": "default_value"}`), + Defaults: types.ObjectNull(getInputDefaultsAttrTypes()), + Streams: types.MapNull(getInputStreamType()), + }), + expected: true, + }, + { + name: "unset vars use defaults - not equal when different from defaults", + value1: mustNewInputValue(ctx, t, integrationPolicyInputsModel{ + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedNull(), + Defaults: mustNewInputDefaults(ctx, t, inputDefaultsModel{ + Vars: jsontypes.NewNormalizedValue(`{"key": "default_value"}`), + Streams: nil, + }), + Streams: types.MapNull(getInputStreamType()), + }), + value2: mustNewInputValue(ctx, t, integrationPolicyInputsModel{ + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedValue(`{"key": "other_value"}`), + Defaults: types.ObjectNull(getInputDefaultsAttrTypes()), + Streams: types.MapNull(getInputStreamType()), + }), + expected: false, + }, + { + name: "both unset vars use same defaults - equal", + value1: mustNewInputValue(ctx, t, integrationPolicyInputsModel{ + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedNull(), + Defaults: mustNewInputDefaults(ctx, t, inputDefaultsModel{ + Vars: jsontypes.NewNormalizedValue(`{"key": "default_value"}`), + Streams: nil, + }), + Streams: types.MapNull(getInputStreamType()), + }), + value2: mustNewInputValue(ctx, t, integrationPolicyInputsModel{ + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedNull(), + Defaults: mustNewInputDefaults(ctx, t, inputDefaultsModel{ + Vars: jsontypes.NewNormalizedValue(`{"key": "default_value"}`), + Streams: nil, + }), + Streams: types.MapNull(getInputStreamType()), + }), + expected: true, + }, + { + name: "unset streams use defaults - equal when defaults match", + value1: mustNewInputValue(ctx, t, integrationPolicyInputsModel{ + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedValue(`{"key": "value"}`), + Defaults: mustNewInputDefaults(ctx, t, inputDefaultsModel{ + Vars: jsontypes.NewNormalizedNull(), + Streams: map[string]inputDefaultsStreamModel{ + "stream1": { + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedValue(`{"stream_key": "default_stream"}`), + }, + }, + }), + Streams: types.MapNull(getInputStreamType()), + }), + value2: mustNewInputValue(ctx, t, integrationPolicyInputsModel{ + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedValue(`{"key": "value"}`), + Defaults: types.ObjectNull(getInputDefaultsAttrTypes()), + Streams: mustNewStreamsMap(ctx, t, map[string]integrationPolicyInputStreamModel{ + "stream1": { + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedValue(`{"stream_key": "default_stream"}`), + }, + }), + }), + expected: true, + }, + { + name: "stream vars use defaults - equal when defaults match", + value1: mustNewInputValue(ctx, t, integrationPolicyInputsModel{ + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedValue(`{"key": "value"}`), + Defaults: mustNewInputDefaults(ctx, t, inputDefaultsModel{ + Vars: jsontypes.NewNormalizedNull(), + Streams: map[string]inputDefaultsStreamModel{ + "stream1": { + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedValue(`{"stream_key": "default_stream"}`), + }, + }, + }), + Streams: mustNewStreamsMap(ctx, t, map[string]integrationPolicyInputStreamModel{ + "stream1": { + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedNull(), + }, + }), + }), + value2: mustNewInputValue(ctx, t, integrationPolicyInputsModel{ + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedValue(`{"key": "value"}`), + Defaults: types.ObjectNull(getInputDefaultsAttrTypes()), + Streams: mustNewStreamsMap(ctx, t, map[string]integrationPolicyInputStreamModel{ + "stream1": { + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedValue(`{"stream_key": "default_stream"}`), + }, + }), + }), + expected: true, + }, + { + name: "disabled streams are ignored", + value1: mustNewInputValue(ctx, t, integrationPolicyInputsModel{ + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedValue(`{"key": "value"}`), + Defaults: types.ObjectNull(getInputDefaultsAttrTypes()), + Streams: mustNewStreamsMap(ctx, t, map[string]integrationPolicyInputStreamModel{ + "stream1": { + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedValue(`{"stream_key": "stream_value"}`), + }, + "stream2": { + Enabled: types.BoolValue(false), + Vars: jsontypes.NewNormalizedValue(`{"stream_key": "disabled_old"}`), + }, + }), + }), + value2: mustNewInputValue(ctx, t, integrationPolicyInputsModel{ + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedValue(`{"key": "value"}`), + Defaults: types.ObjectNull(getInputDefaultsAttrTypes()), + Streams: mustNewStreamsMap(ctx, t, map[string]integrationPolicyInputStreamModel{ + "stream1": { + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedValue(`{"stream_key": "stream_value"}`), + }, + "stream3": { + Enabled: types.BoolValue(false), + Vars: jsontypes.NewNormalizedValue(`{"stream_key": "disabled_new"}`), + }, + }), + }), + expected: true, + }, + { + name: "vars use semantic equality - whitespace differences ignored", + value1: mustNewInputValue(ctx, t, integrationPolicyInputsModel{ + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedValue(`{"key":"value"}`), + Defaults: types.ObjectNull(getInputDefaultsAttrTypes()), + Streams: types.MapNull(getInputStreamType()), + }), + value2: mustNewInputValue(ctx, t, integrationPolicyInputsModel{ + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedValue(`{"key": "value"}`), + Defaults: types.ObjectNull(getInputDefaultsAttrTypes()), + Streams: types.MapNull(getInputStreamType()), + }), + expected: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, diags := tt.value1.ObjectSemanticEquals(ctx, tt.value2) + + if tt.expectError { + require.True(t, diags.HasError(), "Expected error but got none") + } else { + require.False(t, diags.HasError(), "Expected no error but got: %v", diags) + assert.Equal(t, tt.expected, result) + } + }) + } +} + +func mustNewInputValue(ctx context.Context, t *testing.T, input integrationPolicyInputsModel) InputValue { + t.Helper() + value, diags := NewInputValueFrom(ctx, getInputsAttributeTypes(), input) + require.False(t, diags.HasError(), "Failed to create InputValue: %v", diags) + return value +} + +func mustNewInputDefaults(ctx context.Context, t *testing.T, defaults inputDefaultsModel) types.Object { + t.Helper() + value, diags := types.ObjectValueFrom(ctx, getInputDefaultsAttrTypes(), defaults) + require.False(t, diags.HasError(), "Failed to create defaults object: %v", diags) + return value +} diff --git a/internal/fleet/integration_policy/inputs_type.go b/internal/fleet/integration_policy/inputs_type.go index 840ec3ac8..4f1529516 100644 --- a/internal/fleet/integration_policy/inputs_type.go +++ b/internal/fleet/integration_policy/inputs_type.go @@ -66,7 +66,7 @@ func (t InputsType) ValueFromTerraform(ctx context.Context, in tftypes.Value) (a } // NewInputsType creates a new InputsType with the given element type -func NewInputsType(elemType attr.Type) InputsType { +func NewInputsType(elemType InputType) InputsType { return InputsType{ MapType: basetypes.MapType{ ElemType: elemType, diff --git a/internal/fleet/integration_policy/inputs_value.go b/internal/fleet/integration_policy/inputs_value.go index 3af0db204..12db9de5d 100644 --- a/internal/fleet/integration_policy/inputs_value.go +++ b/internal/fleet/integration_policy/inputs_value.go @@ -4,10 +4,8 @@ import ( "context" "fmt" - "github.com/elastic/terraform-provider-elasticstack/internal/utils" "github.com/hashicorp/terraform-plugin-framework/attr" "github.com/hashicorp/terraform-plugin-framework/diag" - "github.com/hashicorp/terraform-plugin-framework/path" "github.com/hashicorp/terraform-plugin-framework/types/basetypes" ) @@ -19,7 +17,13 @@ type InputsValue struct { // Type returns an InputsType. func (v InputsValue) Type(ctx context.Context) attr.Type { - return NewInputsType(v.ElementType(ctx)) + elemType := v.ElementType(ctx) + inputType, ok := elemType.(InputType) + if !ok { + // Fallback for when ElementType is not InputType (shouldn't happen in practice) + return NewInputsType(NewInputType(getInputsAttributeTypes())) + } + return NewInputsType(inputType) } // Equal returns true if the given value is equivalent. @@ -57,143 +61,51 @@ func (v InputsValue) MapSemanticEquals(ctx context.Context, newValuable basetype return newValue.IsUnknown(), diags } - // Convert both maps to integrationPolicyInputsModel - oldInputsMap := utils.MapTypeAs[integrationPolicyInputsModel](ctx, v.MapValue, path.Root("inputs"), &diags) - if diags.HasError() { - return false, diags - } - - newInputsMap := utils.MapTypeAs[integrationPolicyInputsModel](ctx, newValue.MapValue, path.Root("inputs"), &diags) - if diags.HasError() { - return false, diags - } - - // Filter out disabled inputs from both maps - enabledOldInputs := filterEnabledInputs(ctx, oldInputsMap) - enabledNewInputs := filterEnabledInputs(ctx, newInputsMap) - - // Check if the number of enabled inputs is the same - if len(enabledOldInputs) != len(enabledNewInputs) { - return false, diags - } - - // Compare each enabled input - for inputID, oldInput := range enabledOldInputs { - newInput, exists := enabledNewInputs[inputID] + remainingNewInputs := newValue.Elements() + for inputID, oldInputValue := range v.Elements() { + oldInput := oldInputValue.(InputValue) + newInput, exists := remainingNewInputs[inputID] if !exists { - return false, diags - } - - // Compare enabled flags - if !oldInput.Enabled.Equal(newInput.Enabled) { - return false, diags - } - - // Compare vars using semantic equality if both are known - if utils.IsKnown(oldInput.Vars) && utils.IsKnown(newInput.Vars) { - varsEqual, d := oldInput.Vars.StringSemanticEquals(ctx, newInput.Vars) + // If the old input is disabled, we can ignore its absence in the new inputs + enabled, d := oldInput.MaybeEnabled(ctx) diags.Append(d...) if diags.HasError() { return false, diags } - if !varsEqual { - return false, diags + + if !enabled { + continue } - } else if !oldInput.Vars.Equal(newInput.Vars) { - // If one is null/unknown, use regular equality + return false, diags } - // Compare streams - streamsEqual, d := compareStreams(ctx, oldInput.Streams, newInput.Streams) + newInputValue := newInput.(InputValue) + + equals, d := oldInput.ObjectSemanticEquals(ctx, newInputValue) diags.Append(d...) if diags.HasError() { return false, diags } - if !streamsEqual { + if !equals { return false, diags } - } - - return true, diags -} - -// filterEnabledInputs returns a map of only the enabled inputs -func filterEnabledInputs(ctx context.Context, inputs map[string]integrationPolicyInputsModel) map[string]integrationPolicyInputsModel { - if inputs == nil { - return nil - } - - enabled := make(map[string]integrationPolicyInputsModel) - for inputID, input := range inputs { - // Only include inputs that are explicitly enabled or unknown - // Disabled inputs (enabled=false) are excluded - if input.Enabled.IsNull() || input.Enabled.IsUnknown() || input.Enabled.ValueBool() { - enabled[inputID] = input - } - } - return enabled -} - -// compareStreams compares two stream maps, ignoring disabled streams -func compareStreams(ctx context.Context, oldStreams, newStreams basetypes.MapValue) (bool, diag.Diagnostics) { - var diags diag.Diagnostics - - // Handle null/unknown cases - if oldStreams.IsNull() && newStreams.IsNull() { - return true, diags - } - if oldStreams.IsUnknown() && newStreams.IsUnknown() { - return true, diags - } - if oldStreams.IsNull() != newStreams.IsNull() || oldStreams.IsUnknown() != newStreams.IsUnknown() { - return false, diags - } - - // Convert both maps to integrationPolicyInputStreamModel - oldStreamsMap := utils.MapTypeAs[integrationPolicyInputStreamModel](ctx, oldStreams, path.Root("streams"), &diags) - if diags.HasError() { - return false, diags - } - newStreamsMap := utils.MapTypeAs[integrationPolicyInputStreamModel](ctx, newStreams, path.Root("streams"), &diags) - if diags.HasError() { - return false, diags + // Remove the processed input from remainingNewInputs + delete(remainingNewInputs, inputID) } - // Filter out disabled streams from both maps - enabledOldStreams := filterEnabledStreams(oldStreamsMap) - enabledNewStreams := filterEnabledStreams(newStreamsMap) - - // Check if the number of enabled streams is the same - if len(enabledOldStreams) != len(enabledNewStreams) { - return false, diags - } - - // Compare each enabled stream - for streamID, oldStream := range enabledOldStreams { - newStream, exists := enabledNewStreams[streamID] - if !exists { - return false, diags - } - - // Compare enabled flags - if !oldStream.Enabled.Equal(newStream.Enabled) { + // After processing all old inputs, check if there are any remaining new inputs + for _, newInputValue := range remainingNewInputs { + newInput := newInputValue.(InputValue) + // If the new input is enabled, it's a difference + enabled, d := newInput.MaybeEnabled(ctx) + diags.Append(d...) + if diags.HasError() { return false, diags } - // Compare vars using semantic equality if both are known - if utils.IsKnown(oldStream.Vars) && utils.IsKnown(newStream.Vars) { - varsEqual, d := oldStream.Vars.StringSemanticEquals(ctx, newStream.Vars) - diags.Append(d...) - if diags.HasError() { - return false, diags - } - if !varsEqual { - return false, diags - } - } else if !oldStream.Vars.Equal(newStream.Vars) { - // If one is null/unknown, use regular equality + if enabled { return false, diags } } @@ -219,21 +131,21 @@ func filterEnabledStreams(streams map[string]integrationPolicyInputStreamModel) } // NewInputsNull creates an InputsValue with a null value. -func NewInputsNull(elemType attr.Type) InputsValue { +func NewInputsNull(elemType InputType) InputsValue { return InputsValue{ MapValue: basetypes.NewMapNull(elemType), } } // NewInputsUnknown creates an InputsValue with an unknown value. -func NewInputsUnknown(elemType attr.Type) InputsValue { +func NewInputsUnknown(elemType InputType) InputsValue { return InputsValue{ MapValue: basetypes.NewMapUnknown(elemType), } } // NewInputsValue creates an InputsValue with a known value. -func NewInputsValue(elemType attr.Type, elements map[string]attr.Value) (InputsValue, diag.Diagnostics) { +func NewInputsValue(elemType InputType, elements map[string]attr.Value) (InputsValue, diag.Diagnostics) { mapValue, diags := basetypes.NewMapValue(elemType, elements) return InputsValue{ MapValue: mapValue, @@ -241,7 +153,7 @@ func NewInputsValue(elemType attr.Type, elements map[string]attr.Value) (InputsV } // NewInputsValueFrom creates an InputsValue from a map of Go values. -func NewInputsValueFrom(ctx context.Context, elemType attr.Type, elements any) (InputsValue, diag.Diagnostics) { +func NewInputsValueFrom(ctx context.Context, elemType InputType, elements any) (InputsValue, diag.Diagnostics) { mapValue, diags := basetypes.NewMapValueFrom(ctx, elemType, elements) return InputsValue{ MapValue: mapValue, diff --git a/internal/fleet/integration_policy/inputs_value_test.go b/internal/fleet/integration_policy/inputs_value_test.go index 135700591..d68cadd2f 100644 --- a/internal/fleet/integration_policy/inputs_value_test.go +++ b/internal/fleet/integration_policy/inputs_value_test.go @@ -149,29 +149,6 @@ func TestInputsValue_MapSemanticEquals(t *testing.T) { }), expected: false, }, - { - name: "different number of enabled inputs are not equal", - value1: mustNewInputsValue(ctx, t, map[string]integrationPolicyInputsModel{ - "input1": { - Enabled: types.BoolValue(true), - Vars: jsontypes.NewNormalizedValue(`{"key": "value1"}`), - Streams: types.MapNull(getInputStreamType()), - }, - }), - value2: mustNewInputsValue(ctx, t, map[string]integrationPolicyInputsModel{ - "input1": { - Enabled: types.BoolValue(true), - Vars: jsontypes.NewNormalizedValue(`{"key": "value1"}`), - Streams: types.MapNull(getInputStreamType()), - }, - "input2": { - Enabled: types.BoolValue(true), - Vars: jsontypes.NewNormalizedValue(`{"key": "value2"}`), - Streams: types.MapNull(getInputStreamType()), - }, - }), - expected: false, - }, { name: "disabled streams are ignored", value1: mustNewInputsValue(ctx, t, map[string]integrationPolicyInputsModel{ @@ -244,6 +221,15 @@ func TestInputsValue_MapSemanticEquals(t *testing.T) { func mustNewInputsValue(ctx context.Context, t *testing.T, inputs map[string]integrationPolicyInputsModel) InputsValue { t.Helper() + + // Set defaults to null for all inputs if not already set + for key, input := range inputs { + if input.Defaults.IsNull() || input.Defaults.IsUnknown() { + input.Defaults = types.ObjectNull(getInputDefaultsAttrTypes()) + inputs[key] = input + } + } + value, diags := NewInputsValueFrom(ctx, getInputsElementType(), inputs) require.False(t, diags.HasError(), "Failed to create InputsValue: %v", diags) return value diff --git a/internal/fleet/integration_policy/models.go b/internal/fleet/integration_policy/models.go index 4fc7b826d..90862a1b2 100644 --- a/internal/fleet/integration_policy/models.go +++ b/internal/fleet/integration_policy/models.go @@ -36,9 +36,10 @@ type integrationPolicyModel struct { } type integrationPolicyInputsModel struct { - Enabled types.Bool `tfsdk:"enabled"` - Vars jsontypes.Normalized `tfsdk:"vars"` - Streams types.Map `tfsdk:"streams"` //> integrationPolicyInputStreamModel + Enabled types.Bool `tfsdk:"enabled"` + Vars jsontypes.Normalized `tfsdk:"vars"` + Defaults types.Object `tfsdk:"defaults"` //> inputDefaultsModel + Streams types.Map `tfsdk:"streams"` //> integrationPolicyInputStreamModel } type integrationPolicyInputStreamModel struct { @@ -46,7 +47,7 @@ type integrationPolicyInputStreamModel struct { Vars jsontypes.Normalized `tfsdk:"vars"` } -func (model *integrationPolicyModel) populateFromAPI(ctx context.Context, data *kbapi.PackagePolicy) diag.Diagnostics { +func (model *integrationPolicyModel) populateFromAPI(ctx context.Context, pkg *kbapi.PackageInfo, data *kbapi.PackagePolicy) diag.Diagnostics { if data == nil { return nil } @@ -110,12 +111,12 @@ func (model *integrationPolicyModel) populateFromAPI(ctx context.Context, data * model.SpaceIds = types.SetNull(types.StringType) } // If originally set but API didn't return it, keep the original value - model.populateInputsFromAPI(ctx, data.Inputs, &diags) + model.populateInputsFromAPI(ctx, pkg, data.Inputs, &diags) return diags } -func (model *integrationPolicyModel) populateInputsFromAPI(ctx context.Context, inputs map[string]kbapi.PackagePolicyInput, diags *diag.Diagnostics) { +func (model *integrationPolicyModel) populateInputsFromAPI(ctx context.Context, pkg *kbapi.PackageInfo, inputs map[string]kbapi.PackagePolicyInput, diags *diag.Diagnostics) { // Handle input population based on context: // 1. If model.Inputs is unknown: we're importing or reading fresh state → populate from API // 2. If model.Inputs is known and null/empty: user explicitly didn't configure inputs → don't populate (avoid inconsistent state) @@ -136,6 +137,17 @@ func (model *integrationPolicyModel) populateInputsFromAPI(ctx context.Context, } // Case 3: Known and not null/empty - user configured inputs, populate from API (continue below) + // Fetch package info to get defaults + inputDefaults, defaultsDiags := packageInfoToDefaults(pkg) + diags.Append(defaultsDiags...) + if diags.HasError() { + return + } + + if inputDefaults == nil { + inputDefaults = make(map[string]inputDefaultsModel) + } + newInputs := make(map[string]integrationPolicyInputsModel) for inputID, inputData := range inputs { inputModel := integrationPolicyInputsModel{ @@ -162,6 +174,15 @@ func (model *integrationPolicyModel) populateInputsFromAPI(ctx context.Context, inputModel.Streams = types.MapNull(getInputStreamType()) } + // Populate defaults if available + if defaults, ok := inputDefaults[inputID]; ok { + defaultsObj, d := types.ObjectValueFrom(ctx, getInputDefaultsAttrTypes(), defaults) + diags.Append(d...) + inputModel.Defaults = defaultsObj + } else { + inputModel.Defaults = types.ObjectNull(getInputDefaultsAttrTypes()) + } + newInputs[inputID] = inputModel } @@ -231,28 +252,24 @@ func (model integrationPolicyModel) toAPIModel(ctx context.Context, isUpdate boo body.Id = model.ID.ValueStringPointer() } - if utils.IsKnown(model.Inputs) && len(model.Inputs.Elements()) > 0 { - // Use the 'inputs' attribute (v2 format) - body.Inputs = utils.MapRef(model.toAPIInputsFromInputsAttribute(ctx, &diags)) - } - + body.Inputs = model.toAPIInputsFromInputsAttribute(ctx, &diags) // Note: space_ids is read-only for integration policies and inherited from the agent policy return body, diags } // toAPIInputsFromInputsAttribute converts the 'inputs' attribute to the API model format -func (model integrationPolicyModel) toAPIInputsFromInputsAttribute(ctx context.Context, diags *diag.Diagnostics) map[string]kbapi.PackagePolicyRequestInput { +func (model integrationPolicyModel) toAPIInputsFromInputsAttribute(ctx context.Context, diags *diag.Diagnostics) *map[string]kbapi.PackagePolicyRequestInput { + result := make(map[string]kbapi.PackagePolicyRequestInput, len(model.Inputs.Elements())) if !utils.IsKnown(model.Inputs.MapValue) { - return nil + return &result } inputsMap := utils.MapTypeAs[integrationPolicyInputsModel](ctx, model.Inputs.MapValue, path.Root("inputs"), diags) if inputsMap == nil { - return nil + return &result } - result := make(map[string]kbapi.PackagePolicyRequestInput, len(inputsMap)) for inputID, inputModel := range inputsMap { inputPath := path.Root("inputs").AtMapKey(inputID) @@ -279,5 +296,5 @@ func (model integrationPolicyModel) toAPIInputsFromInputsAttribute(ctx context.C result[inputID] = apiInput } - return result + return &result } diff --git a/internal/fleet/integration_policy/models_defaults.go b/internal/fleet/integration_policy/models_defaults.go new file mode 100644 index 000000000..f49b73c41 --- /dev/null +++ b/internal/fleet/integration_policy/models_defaults.go @@ -0,0 +1,192 @@ +package integration_policy + +import ( + "encoding/json" + "fmt" + + "github.com/elastic/terraform-provider-elasticstack/generated/kbapi" + "github.com/hashicorp/terraform-plugin-framework-jsontypes/jsontypes" + "github.com/hashicorp/terraform-plugin-framework/diag" + "github.com/hashicorp/terraform-plugin-framework/types" + "github.com/mitchellh/mapstructure" +) + +type inputDefaultsModel struct { + Vars jsontypes.Normalized `tfsdk:"vars"` + Streams map[string]inputDefaultsStreamModel `tfsdk:"streams"` +} + +type inputDefaultsStreamModel struct { + Enabled types.Bool `tfsdk:"enabled"` + Vars jsontypes.Normalized `tfsdk:"vars"` +} + +type apiPolicyTemplate struct { + Name string `json:"name"` + Inputs []apiPolicyTemplateInput `json:"inputs"` +} + +type apiPolicyTemplateInput struct { + Type string `json:"type"` + Vars apiVars `json:"vars"` +} + +func (policyTemplate *apiPolicyTemplate) defaults() (map[string]jsontypes.Normalized, diag.Diagnostics) { + defaults := map[string]jsontypes.Normalized{} + + if policyTemplate == nil { + return defaults, nil + } + + for _, inputTemplate := range policyTemplate.Inputs { + name := fmt.Sprintf("%s-%s", policyTemplate.Name, inputTemplate.Type) + varDefaults, diags := inputTemplate.Vars.defaults() + if diags.HasError() { + return nil, diags + } + + defaults[name] = varDefaults + } + + return defaults, nil +} + +type apiDatastreams []apiDatastream +type apiDatastream struct { + Type string `json:"type"` + Dataset string `json:"dataset"` + Streams []apiDatastreamStream `json:"streams"` +} + +type apiDatastreamStream struct { + Input string `json:"input"` + Vars apiVars `json:"vars"` + Enabled bool `json:"enabled"` +} + +func (dataStreams apiDatastreams) defaults() (map[string]map[string]inputDefaultsStreamModel, diag.Diagnostics) { + defaults := map[string]map[string]inputDefaultsStreamModel{} + + if dataStreams == nil { + return defaults, nil + } + + for _, dataStream := range dataStreams { + for _, stream := range dataStream.Streams { + varDefaults, diags := stream.Vars.defaults() + if diags.HasError() { + return nil, diags + } + + d := defaults[stream.Input] + if d == nil { + d = map[string]inputDefaultsStreamModel{} + } + d[dataStream.Dataset] = inputDefaultsStreamModel{ + Enabled: types.BoolValue(stream.Enabled), + Vars: varDefaults, + } + defaults[stream.Input] = d + } + } + + return defaults, nil +} + +type apiVars []apiVar +type apiVar struct { + Name string `json:"name"` + Default interface{} `json:"default"` + Multi bool `json:"multi"` +} + +func (v apiVars) defaults() (jsontypes.Normalized, diag.Diagnostics) { + varDefaults := map[string]interface{}{} + for _, inputVar := range v { + if inputVar.Default != nil { + varDefaults[inputVar.Name] = inputVar.Default + continue + } + + if inputVar.Multi { + varDefaults[inputVar.Name] = []interface{}{} + continue + } + } + + varDefaultsBytes, err := json.Marshal(varDefaults) + if err != nil { + var diags diag.Diagnostics + diags.AddError("Failed to marshal default vars for input", err.Error()) + return jsontypes.NewNormalizedNull(), diags + } + + return jsontypes.NewNormalizedValue(string(varDefaultsBytes)), nil +} + +func packageInfoToDefaults(pkg *kbapi.PackageInfo) (map[string]inputDefaultsModel, diag.Diagnostics) { + policyTemplate, datastreams, diags := policyTemplateAndDataStreamsFromPackageInfo(pkg) + if diags.HasError() { + return nil, diags + } + + defaultVarsByInput, inputVarsDiags := policyTemplate.defaults() + diags.Append(inputVarsDiags...) + + defaultStreamsByInput, streamsDiags := datastreams.defaults() + diags.Append(streamsDiags...) + + if diags.HasError() { + return nil, diags + } + + defaults := map[string]inputDefaultsModel{} + for inputID, vars := range defaultVarsByInput { + defaults[inputID] = inputDefaultsModel{ + Vars: vars, + } + } + + for inputIDSuffix, streams := range defaultStreamsByInput { + inputID := fmt.Sprintf("%s-%s", policyTemplate.Name, inputIDSuffix) + inputDefaults, ok := defaults[inputID] + if !ok { + inputDefaults.Vars = jsontypes.NewNormalizedNull() + } + + inputDefaults.Streams = streams + defaults[inputID] = inputDefaults + } + + return defaults, diags +} + +func policyTemplateAndDataStreamsFromPackageInfo(pkg *kbapi.PackageInfo) (*apiPolicyTemplate, apiDatastreams, diag.Diagnostics) { + if pkg == nil { + return nil, nil, nil + } + + var diags diag.Diagnostics + + var policyTemplate apiPolicyTemplate + var dataStreams []apiDatastream + + if pkg.PolicyTemplates != nil && len(*pkg.PolicyTemplates) > 0 { + policyTemplateIf := (*pkg.PolicyTemplates)[0] + err := mapstructure.Decode(policyTemplateIf, &policyTemplate) + if err != nil { + diags.AddError("Failed to decode package policy template", err.Error()) + return nil, nil, diags + } + } + + if pkg.DataStreams != nil { + err := mapstructure.Decode(pkg.DataStreams, &dataStreams) + if err != nil { + diags.AddError("Failed to decode package data streams", err.Error()) + return nil, nil, diags + } + } + + return &policyTemplate, dataStreams, nil +} diff --git a/internal/fleet/integration_policy/models_defaults_test.go b/internal/fleet/integration_policy/models_defaults_test.go new file mode 100644 index 000000000..34abfd340 --- /dev/null +++ b/internal/fleet/integration_policy/models_defaults_test.go @@ -0,0 +1,744 @@ +package integration_policy + +import ( + _ "embed" + "encoding/json" + "testing" + + "github.com/elastic/terraform-provider-elasticstack/generated/kbapi" + "github.com/hashicorp/terraform-plugin-framework-jsontypes/jsontypes" + "github.com/hashicorp/terraform-plugin-framework/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +//go:embed testdata/integration_kafka.json +var kafkaIntegrationJSON []byte + +func TestApiVarsDefaults(t *testing.T) { + tests := []struct { + name string + vars apiVars + expectedJSON string + expectError bool + expectedIsNull bool + }{ + { + name: "nil vars returns empty object", + vars: nil, + expectedJSON: "{}", + expectedIsNull: false, + }, + { + name: "empty vars returns empty object", + vars: apiVars{}, + expectedJSON: "{}", + expectedIsNull: false, + }, + { + name: "single var with default", + vars: apiVars{ + { + Name: "hosts", + Default: []interface{}{"http://127.0.0.1:8778"}, + }, + }, + expectedJSON: `{"hosts":["http://127.0.0.1:8778"]}`, + }, + { + name: "single var without default is omitted", + vars: apiVars{ + { + Name: "username", + Default: nil, + }, + }, + expectedJSON: "{}", + }, + { + name: "multiple vars with mixed defaults", + vars: apiVars{ + { + Name: "hosts", + Default: []interface{}{"localhost:9092"}, + }, + { + Name: "period", + Default: "10s", + }, + { + Name: "username", + Default: nil, + }, + { + Name: "ssl.verification_mode", + Default: "none", + }, + }, + expectedJSON: `{"hosts":["localhost:9092"],"period":"10s","ssl.verification_mode":"none"}`, + }, + { + name: "var with complex default value", + vars: apiVars{ + { + Name: "headers", + Default: `# headers: +# Cookie: abcdef=123456 +# My-Custom-Header: my-custom-value +`, + }, + }, + expectedJSON: `{"headers":"# headers:\n# Cookie: abcdef=123456\n# My-Custom-Header: my-custom-value\n"}`, + }, + { + name: "var with boolean default", + vars: apiVars{ + { + Name: "preserve_original_event", + Default: false, + }, + }, + expectedJSON: `{"preserve_original_event":false}`, + }, + { + name: "var with string array default", + vars: apiVars{ + { + Name: "tags", + Default: []interface{}{"kafka-log"}, + }, + }, + expectedJSON: `{"tags":["kafka-log"]}`, + }, + { + name: "var with multi-element array default", + vars: apiVars{ + { + Name: "paths", + Default: []interface{}{ + "/logs/controller.log*", + "/logs/server.log*", + "/logs/state-change.log*", + }, + }, + }, + expectedJSON: `{"paths":["/logs/controller.log*","/logs/server.log*","/logs/state-change.log*"]}`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, diags := tt.vars.defaults() + + if tt.expectError { + require.True(t, diags.HasError(), "Expected error but got none") + } else { + require.False(t, diags.HasError(), "Expected no error but got: %v", diags) + + if tt.expectedIsNull { + assert.True(t, result.IsNull(), "Expected null result") + } else { + assert.False(t, result.IsNull(), "Expected non-null result") + + // Normalize JSON for comparison + var expectedMap map[string]interface{} + err := json.Unmarshal([]byte(tt.expectedJSON), &expectedMap) + require.NoError(t, err, "Failed to unmarshal expected JSON") + + var actualMap map[string]interface{} + err = json.Unmarshal([]byte(result.ValueString()), &actualMap) + require.NoError(t, err, "Failed to unmarshal actual JSON") + + assert.Equal(t, expectedMap, actualMap, "JSON content mismatch") + } + } + }) + } +} + +func TestApiPolicyTemplateDefaults(t *testing.T) { + tests := []struct { + name string + template *apiPolicyTemplate + expectedKeys []string + expectError bool + }{ + { + name: "nil template returns empty map", + template: nil, + expectedKeys: []string{}, + }, + { + name: "template with no inputs returns empty map", + template: &apiPolicyTemplate{ + Inputs: []apiPolicyTemplateInput{}, + }, + expectedKeys: []string{}, + }, + { + name: "template with single input", + template: &apiPolicyTemplate{ + Name: "kafka", + Inputs: []apiPolicyTemplateInput{ + { + Type: "jolokia/metrics", + Vars: apiVars{ + { + Name: "hosts", + Default: []interface{}{"http://127.0.0.1:8778"}, + }, + }, + }, + }, + }, + expectedKeys: []string{"kafka-jolokia/metrics"}, + }, + { + name: "template with multiple input types", + template: &apiPolicyTemplate{ + Name: "kafka", + Inputs: []apiPolicyTemplateInput{ + { + Type: "jolokia/metrics", + Vars: apiVars{ + { + Name: "hosts", + Default: []interface{}{"http://127.0.0.1:8778"}, + }, + }, + }, + { + Type: "logfile", + Vars: apiVars{}, + }, + { + Type: "kafka/metrics", + Vars: apiVars{ + { + Name: "hosts", + Default: []interface{}{"localhost:9092"}, + }, + { + Name: "period", + Default: "10s", + }, + }, + }, + }, + }, + expectedKeys: []string{"kafka-jolokia/metrics", "kafka-logfile", "kafka-kafka/metrics"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, diags := tt.template.defaults() + + if tt.expectError { + require.True(t, diags.HasError(), "Expected error but got none") + } else { + require.False(t, diags.HasError(), "Expected no error but got: %v", diags) + assert.Len(t, result, len(tt.expectedKeys), "Unexpected number of input types") + + for _, key := range tt.expectedKeys { + assert.Contains(t, result, key, "Expected key %s not found", key) + assert.False(t, result[key].IsNull(), "Expected non-null value for key %s", key) + } + } + }) + } +} + +func TestApiDatastreamsDefaults(t *testing.T) { + tests := []struct { + name string + datastreams apiDatastreams + expectedInputKeys []string + expectedStreamCounts map[string]int + expectError bool + }{ + { + name: "nil datastreams returns empty map", + datastreams: nil, + expectedInputKeys: []string{}, + expectedStreamCounts: map[string]int{}, + }, + { + name: "empty datastreams returns empty map", + datastreams: apiDatastreams{}, + expectedInputKeys: []string{}, + expectedStreamCounts: map[string]int{}, + }, + { + name: "single datastream with single stream", + datastreams: apiDatastreams{ + { + Type: "metrics", + Dataset: "kafka.broker", + Streams: []apiDatastreamStream{ + { + Input: "kafka/metrics", + Enabled: true, + Vars: apiVars{ + { + Name: "jolokia_hosts", + Default: []interface{}{"localhost:8778"}, + }, + }, + }, + }, + }, + }, + expectedInputKeys: []string{"kafka/metrics"}, + expectedStreamCounts: map[string]int{ + "kafka/metrics": 1, + }, + }, + { + name: "multiple datastreams with different inputs", + datastreams: apiDatastreams{ + { + Type: "metrics", + Dataset: "kafka.consumer", + Streams: []apiDatastreamStream{ + { + Input: "jolokia/metrics", + Enabled: false, + Vars: apiVars{ + { + Name: "period", + Default: "60s", + }, + }, + }, + }, + }, + { + Type: "logs", + Dataset: "kafka.log", + Streams: []apiDatastreamStream{ + { + Input: "logfile", + Enabled: true, + Vars: apiVars{ + { + Name: "kafka_home", + Default: "/opt/kafka*", + }, + }, + }, + }, + }, + }, + expectedInputKeys: []string{"jolokia/metrics", "logfile"}, + expectedStreamCounts: map[string]int{ + "jolokia/metrics": 1, + "logfile": 1, + }, + }, + { + name: "multiple streams for same input", + datastreams: apiDatastreams{ + { + Type: "metrics", + Dataset: "kafka.broker", + Streams: []apiDatastreamStream{ + { + Input: "kafka/metrics", + Enabled: true, + Vars: apiVars{}, + }, + }, + }, + { + Type: "metrics", + Dataset: "kafka.partition", + Streams: []apiDatastreamStream{ + { + Input: "kafka/metrics", + Enabled: true, + Vars: apiVars{}, + }, + }, + }, + { + Type: "metrics", + Dataset: "kafka.consumergroup", + Streams: []apiDatastreamStream{ + { + Input: "kafka/metrics", + Enabled: true, + Vars: apiVars{}, + }, + }, + }, + }, + expectedInputKeys: []string{"kafka/metrics"}, + expectedStreamCounts: map[string]int{ + "kafka/metrics": 3, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, diags := tt.datastreams.defaults() + + if tt.expectError { + require.True(t, diags.HasError(), "Expected error but got none") + } else { + require.False(t, diags.HasError(), "Expected no error but got: %v", diags) + assert.Len(t, result, len(tt.expectedInputKeys), "Unexpected number of input types") + + for _, inputKey := range tt.expectedInputKeys { + assert.Contains(t, result, inputKey, "Expected input key %s not found", inputKey) + + streams := result[inputKey] + expectedCount := tt.expectedStreamCounts[inputKey] + assert.Len(t, streams, expectedCount, "Unexpected stream count for input %s", inputKey) + } + } + }) + } +} + +func TestApiDatastreamsDefaults_StreamProperties(t *testing.T) { + datastreams := apiDatastreams{ + { + Type: "metrics", + Dataset: "kafka.consumer", + Streams: []apiDatastreamStream{ + { + Input: "jolokia/metrics", + Enabled: false, + Vars: apiVars{ + { + Name: "jolokia_hosts", + Default: []interface{}{"localhost:8774"}, + }, + { + Name: "period", + Default: "60s", + }, + }, + }, + }, + }, + } + + result, diags := datastreams.defaults() + require.False(t, diags.HasError(), "Expected no error but got: %v", diags) + require.Contains(t, result, "jolokia/metrics", "Expected jolokia/metrics input") + + streams := result["jolokia/metrics"] + require.Contains(t, streams, "kafka.consumer", "Expected kafka.consumer stream") + + stream := streams["kafka.consumer"] + assert.Equal(t, types.BoolValue(false), stream.Enabled, "Stream enabled mismatch") + assert.False(t, stream.Vars.IsNull(), "Expected non-null vars") + + // Verify vars content + var varsMap map[string]interface{} + err := json.Unmarshal([]byte(stream.Vars.ValueString()), &varsMap) + require.NoError(t, err, "Failed to unmarshal vars") + + assert.Equal(t, []interface{}{"localhost:8774"}, varsMap["jolokia_hosts"]) + assert.Equal(t, "60s", varsMap["period"]) +} + +func TestPackageInfoToDefaults(t *testing.T) { + tests := []struct { + name string + pkg *kbapi.PackageInfo + expectedInputKeys []string + expectError bool + }{ + { + name: "nil package returns empty map", + pkg: nil, + expectedInputKeys: []string{}, + }, + { + name: "package with no policy templates or datastreams", + pkg: &kbapi.PackageInfo{}, + expectedInputKeys: []string{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, diags := packageInfoToDefaults(tt.pkg) + + if tt.expectError { + require.True(t, diags.HasError(), "Expected error but got none") + } else { + require.False(t, diags.HasError(), "Expected no error but got: %v", diags) + assert.Len(t, result, len(tt.expectedInputKeys), "Unexpected number of input types") + + for _, key := range tt.expectedInputKeys { + assert.Contains(t, result, key, "Expected input key %s not found", key) + } + } + }) + } +} + +func TestPackageInfoToDefaults_Kafka(t *testing.T) { + // Load the actual Kafka package JSON + var wrapper struct { + Item kbapi.PackageInfo `json:"item"` + } + err := json.Unmarshal(kafkaIntegrationJSON, &wrapper) + require.NoError(t, err, "Failed to unmarshal Kafka integration JSON") + + pkg := &wrapper.Item + + // Test with the full Kafka package + result, diags := packageInfoToDefaults(pkg) + require.False(t, diags.HasError(), "Expected no error but got: %v", diags) + + // Verify expected input types exist + expectedInputs := []string{"kafka-jolokia/metrics", "kafka-logfile", "kafka-kafka/metrics"} + for _, inputType := range expectedInputs { + assert.Contains(t, result, inputType, "Expected input type %s not found", inputType) + } + + // Verify jolokia/metrics input has expected vars + jolokiaInput := result["kafka-jolokia/metrics"] + assert.False(t, jolokiaInput.Vars.IsNull(), "Expected non-null vars for jolokia/metrics") + + var jolokiaVars map[string]interface{} + err = json.Unmarshal([]byte(jolokiaInput.Vars.ValueString()), &jolokiaVars) + require.NoError(t, err, "Failed to unmarshal jolokia/metrics vars") + + // Check some specific defaults from the Kafka package + assert.Contains(t, jolokiaVars, "hosts", "Expected 'hosts' var") + assert.Contains(t, jolokiaVars, "metrics_path", "Expected 'metrics_path' var") + assert.Contains(t, jolokiaVars, "http_method", "Expected 'http_method' var") + assert.Contains(t, jolokiaVars, "ssl.verification_mode", "Expected 'ssl.verification_mode' var") + + assert.Equal(t, []interface{}{"http://127.0.0.1:8778"}, jolokiaVars["hosts"]) + assert.Equal(t, "/jolokia", jolokiaVars["metrics_path"]) + assert.Equal(t, "GET", jolokiaVars["http_method"]) + assert.Equal(t, "none", jolokiaVars["ssl.verification_mode"]) + + // Verify kafka/metrics input has expected vars + kafkaInput := result["kafka-kafka/metrics"] + assert.False(t, kafkaInput.Vars.IsNull(), "Expected non-null vars for kafka-kafka/metrics") + + var kafkaVars map[string]interface{} + err = json.Unmarshal([]byte(kafkaInput.Vars.ValueString()), &kafkaVars) + require.NoError(t, err, "Failed to unmarshal kafka-kafka/metrics vars") + assert.Contains(t, kafkaVars, "hosts", "Expected 'hosts' var") + assert.Contains(t, kafkaVars, "period", "Expected 'period' var") + + assert.Equal(t, []interface{}{"localhost:9092"}, kafkaVars["hosts"]) + assert.Equal(t, "10s", kafkaVars["period"]) + + // Verify streams are populated correctly + assert.NotNil(t, jolokiaInput.Streams, "Expected streams for jolokia/metrics") + assert.NotEmpty(t, jolokiaInput.Streams, "Expected non-empty streams for jolokia/metrics") + + // Check specific stream - kafka.consumer + consumerStream, ok := jolokiaInput.Streams["kafka.consumer"] + require.True(t, ok, "Expected kafka.consumer stream") + assert.Equal(t, types.BoolValue(false), consumerStream.Enabled, "kafka.consumer should be disabled by default") + + var consumerVars map[string]interface{} + err = json.Unmarshal([]byte(consumerStream.Vars.ValueString()), &consumerVars) + require.NoError(t, err, "Failed to unmarshal kafka.consumer vars") + + assert.Contains(t, consumerVars, "jolokia_hosts", "Expected 'jolokia_hosts' var in stream") + assert.Contains(t, consumerVars, "period", "Expected 'period' var in stream") + + assert.Equal(t, []interface{}{"localhost:8774"}, consumerVars["jolokia_hosts"]) + assert.Equal(t, "60s", consumerVars["period"]) + + // Verify logfile input and log stream + logfileInput := result["kafka-logfile"] + assert.NotNil(t, logfileInput.Streams, "Expected streams for logfile") + + logStream, ok := logfileInput.Streams["kafka.log"] + require.True(t, ok, "Expected kafka.log stream") + assert.Equal(t, types.BoolValue(true), logStream.Enabled, "kafka.log should be enabled by default") + + var logVars map[string]interface{} + err = json.Unmarshal([]byte(logStream.Vars.ValueString()), &logVars) + require.NoError(t, err, "Failed to unmarshal kafka.log vars") + + assert.Contains(t, logVars, "kafka_home", "Expected 'kafka_home' var") + assert.Contains(t, logVars, "paths", "Expected 'paths' var") + assert.Contains(t, logVars, "tags", "Expected 'tags' var") + assert.Contains(t, logVars, "preserve_original_event", "Expected 'preserve_original_event' var") + + assert.Equal(t, "/opt/kafka*", logVars["kafka_home"]) + assert.Equal(t, []interface{}{ + "/logs/controller.log*", + "/logs/server.log*", + "/logs/state-change.log*", + "/logs/kafka-*.log*", + }, logVars["paths"]) + assert.Equal(t, []interface{}{"kafka-log"}, logVars["tags"]) + assert.Equal(t, false, logVars["preserve_original_event"]) + + // Verify kafka/metrics streams + kafkaStreams := kafkaInput.Streams + assert.Contains(t, kafkaStreams, "kafka.broker", "Expected kafka.broker stream") + assert.Contains(t, kafkaStreams, "kafka.partition", "Expected kafka.partition stream") + assert.Contains(t, kafkaStreams, "kafka.consumergroup", "Expected kafka.consumergroup stream") + + // Verify all streams have expected enabled state + assert.Equal(t, types.BoolValue(true), kafkaStreams["kafka.broker"].Enabled) + assert.Equal(t, types.BoolValue(true), kafkaStreams["kafka.partition"].Enabled) + assert.Equal(t, types.BoolValue(true), kafkaStreams["kafka.consumergroup"].Enabled) +} + +func TestPolicyTemplateAndDataStreamsFromPackageInfo(t *testing.T) { + tests := []struct { + name string + pkg *kbapi.PackageInfo + expectedPolicyTemplate bool + expectedDataStreams bool + expectError bool + }{ + { + name: "nil package returns nil", + pkg: nil, + expectedPolicyTemplate: false, + expectedDataStreams: false, + }, + { + name: "empty package returns empty", + pkg: &kbapi.PackageInfo{}, + expectedPolicyTemplate: true, // Returns empty struct, not nil + expectedDataStreams: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + policyTemplate, datastreams, diags := policyTemplateAndDataStreamsFromPackageInfo(tt.pkg) + + if tt.expectError { + require.True(t, diags.HasError(), "Expected error but got none") + } else { + require.False(t, diags.HasError(), "Expected no error but got: %v", diags) + + if tt.expectedPolicyTemplate { + assert.NotNil(t, policyTemplate, "Expected non-nil policy template") + } else { + assert.Nil(t, policyTemplate, "Expected nil policy template") + } + + if tt.expectedDataStreams { + assert.NotNil(t, datastreams, "Expected non-nil datastreams") + assert.NotEmpty(t, datastreams, "Expected non-empty datastreams") + } + } + }) + } +} + +func TestPolicyTemplateAndDataStreamsFromPackageInfo_Kafka(t *testing.T) { + // Load the actual Kafka package JSON + var wrapper struct { + Item kbapi.PackageInfo `json:"item"` + } + err := json.Unmarshal(kafkaIntegrationJSON, &wrapper) + require.NoError(t, err, "Failed to unmarshal Kafka integration JSON") + + pkg := &wrapper.Item + + policyTemplate, datastreams, diags := policyTemplateAndDataStreamsFromPackageInfo(pkg) + require.False(t, diags.HasError(), "Expected no error but got: %v", diags) + + // Verify policy template was extracted + require.NotNil(t, policyTemplate, "Expected non-nil policy template") + assert.Len(t, policyTemplate.Inputs, 3, "Expected 3 input types in policy template") + + // Verify input types + inputTypes := make([]string, 0, len(policyTemplate.Inputs)) + for _, input := range policyTemplate.Inputs { + inputTypes = append(inputTypes, input.Type) + } + assert.Contains(t, inputTypes, "jolokia/metrics") + assert.Contains(t, inputTypes, "logfile") + assert.Contains(t, inputTypes, "kafka/metrics") + + // Verify datastreams were extracted + require.NotNil(t, datastreams, "Expected non-nil datastreams") + assert.Len(t, datastreams, 13, "Expected 13 datastreams in Kafka package") + + // Verify some specific datastreams + datasetNames := make([]string, 0, len(datastreams)) + for _, ds := range datastreams { + datasetNames = append(datasetNames, ds.Dataset) + } + + expectedDatasets := []string{ + "kafka.broker", + "kafka.consumer", + "kafka.consumergroup", + "kafka.controller", + "kafka.jvm", + "kafka.log", + "kafka.log_manager", + "kafka.network", + "kafka.partition", + "kafka.producer", + "kafka.raft", + "kafka.replica_manager", + "kafka.topic", + } + + for _, expected := range expectedDatasets { + assert.Contains(t, datasetNames, expected, "Expected dataset %s not found", expected) + } + + // Verify a specific datastream has correct structure + var logDatastream *apiDatastream + for i := range datastreams { + if datastreams[i].Dataset == "kafka.log" { + logDatastream = &datastreams[i] + break + } + } + + require.NotNil(t, logDatastream, "Expected to find kafka.log datastream") + assert.Equal(t, "logs", logDatastream.Type) + assert.Len(t, logDatastream.Streams, 1, "Expected 1 stream in kafka.log datastream") + assert.Equal(t, "logfile", logDatastream.Streams[0].Input) + assert.True(t, logDatastream.Streams[0].Enabled) +} + +func TestInputDefaultsModel_Integration(t *testing.T) { + // This test ensures that the defaults model structure correctly represents + // the data needed for integration policies + + defaults := map[string]inputDefaultsModel{ + "kafka/metrics": { + Vars: jsontypes.NewNormalizedValue(`{"hosts":["localhost:9092"],"period":"10s"}`), + Streams: map[string]inputDefaultsStreamModel{ + "kafka.broker": { + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedValue(`{"jolokia_hosts":["localhost:8778"]}`), + }, + "kafka.partition": { + Enabled: types.BoolValue(true), + Vars: jsontypes.NewNormalizedValue(`{}`), + }, + }, + }, + } + + // Verify structure + assert.Contains(t, defaults, "kafka/metrics") + + kafkaDefaults := defaults["kafka/metrics"] + assert.False(t, kafkaDefaults.Vars.IsNull()) + assert.Len(t, kafkaDefaults.Streams, 2) + + brokerStream := kafkaDefaults.Streams["kafka.broker"] + assert.Equal(t, types.BoolValue(true), brokerStream.Enabled) + assert.False(t, brokerStream.Vars.IsNull()) +} diff --git a/internal/fleet/integration_policy/models_test.go b/internal/fleet/integration_policy/models_test.go index 121e764c8..f4398f6cd 100644 --- a/internal/fleet/integration_policy/models_test.go +++ b/internal/fleet/integration_policy/models_test.go @@ -39,7 +39,7 @@ func TestOutputIdHandling(t *testing.T) { OutputId: &outputId, } - diags := model.populateFromAPI(context.Background(), data) + diags := model.populateFromAPI(context.Background(), nil, data) require.Empty(t, diags) require.Equal(t, "test-output-id", model.OutputID.ValueString()) }) diff --git a/internal/fleet/integration_policy/read.go b/internal/fleet/integration_policy/read.go index e650957d7..e10c7ddcf 100644 --- a/internal/fleet/integration_policy/read.go +++ b/internal/fleet/integration_policy/read.go @@ -59,7 +59,13 @@ func (r *integrationPolicyResource) Read(ctx context.Context, req resource.ReadR isImport := stateModel.PolicyID.ValueString() != "" && (stateModel.Name.IsNull() || stateModel.Name.IsUnknown()) - diags = stateModel.populateFromAPI(ctx, policy) + pkg, diags := fleet.GetPackage(ctx, client, policy.Package.Name, policy.Package.Version) + resp.Diagnostics.Append(diags...) + if resp.Diagnostics.HasError() { + return + } + + diags = stateModel.populateFromAPI(ctx, pkg, policy) resp.Diagnostics.Append(diags...) if resp.Diagnostics.HasError() { return diff --git a/internal/fleet/integration_policy/resource-description.md b/internal/fleet/integration_policy/resource-description.md index f809f4ea8..a90374d56 100644 --- a/internal/fleet/integration_policy/resource-description.md +++ b/internal/fleet/integration_policy/resource-description.md @@ -1,9 +1,5 @@ Creates or updates a Fleet Integration Policy. -It is highly recommended that all inputs and streams are provided in the -Terraform plan, even if some are disabled. Otherwise, differences may appear -between what is in the plan versus what is returned by the Fleet API. - The [Kibana Fleet UI](https://www.elastic.co/guide/en/fleet/current/add-integration-to-policy.html) can be used as a reference for what data needs to be provided. Instead of saving a new integration configuration, the API request can be previewed, showing what diff --git a/internal/fleet/integration_policy/schema.go b/internal/fleet/integration_policy/schema.go index 381d9d832..745be31ee 100644 --- a/internal/fleet/integration_policy/schema.go +++ b/internal/fleet/integration_policy/schema.go @@ -13,11 +13,14 @@ import ( "github.com/hashicorp/terraform-plugin-framework/resource" "github.com/hashicorp/terraform-plugin-framework/resource/schema" "github.com/hashicorp/terraform-plugin-framework/resource/schema/booldefault" - "github.com/hashicorp/terraform-plugin-framework/resource/schema/mapdefault" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/mapplanmodifier" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/objectdefault" "github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier" + "github.com/hashicorp/terraform-plugin-framework/resource/schema/setplanmodifier" "github.com/hashicorp/terraform-plugin-framework/resource/schema/stringplanmodifier" "github.com/hashicorp/terraform-plugin-framework/schema/validator" "github.com/hashicorp/terraform-plugin-framework/types" + "github.com/hashicorp/terraform-plugin-framework/types/basetypes" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/logging" ) @@ -106,18 +109,27 @@ func getSchemaV2() schema.Schema { Computed: true, Optional: true, Sensitive: varsAreSensitive, + PlanModifiers: []planmodifier.String{ + stringplanmodifier.UseStateForUnknown(), + }, }, "space_ids": schema.SetAttribute{ Description: "The Kibana space IDs where this integration policy is available. When set, must match the space_ids of the referenced agent policy. If not set, will be inherited from the agent policy. Note: The order of space IDs does not matter as this is a set.", ElementType: types.StringType, Optional: true, Computed: true, + PlanModifiers: []planmodifier.Set{ + setplanmodifier.UseStateForUnknown(), + }, }, "inputs": schema.MapNestedAttribute{ - Description: "Integration inputs mapped by input ID.", - CustomType: NewInputsType(getInputsElementType()), - Computed: true, - Optional: true, + Description: "Integration inputs mapped by input ID.", + CustomType: NewInputsType(NewInputType(getInputsAttributeTypes())), + Computed: true, + Optional: true, + PlanModifiers: []planmodifier.Map{ + mapplanmodifier.UseStateForUnknown(), + }, NestedObject: getInputsNestedObject(varsAreSensitive), }, }, @@ -126,6 +138,7 @@ func getSchemaV2() schema.Schema { func getInputsNestedObject(varsAreSensitive bool) schema.NestedAttributeObject { return schema.NestedAttributeObject{ + CustomType: NewInputType(getInputsAttributeTypes()), Attributes: map[string]schema.Attribute{ "enabled": schema.BoolAttribute{ Description: "Enable the input.", @@ -139,11 +152,40 @@ func getInputsNestedObject(varsAreSensitive bool) schema.NestedAttributeObject { Optional: true, Sensitive: varsAreSensitive, }, + "defaults": schema.SingleNestedAttribute{ + Description: "Input defaults.", + Computed: true, + Default: objectdefault.StaticValue(basetypes.NewObjectNull( + getInputDefaultsAttrTypes(), + )), + Attributes: map[string]schema.Attribute{ + "vars": schema.StringAttribute{ + Description: "Input-level variable defaults as JSON.", + CustomType: jsontypes.NormalizedType{}, + Computed: true, + }, + "streams": schema.MapNestedAttribute{ + Description: "Stream-level defaults mapped by stream ID.", + Computed: true, + NestedObject: schema.NestedAttributeObject{ + Attributes: map[string]schema.Attribute{ + "enabled": schema.BoolAttribute{ + Description: "Default enabled state for the stream.", + Computed: true, + }, + "vars": schema.StringAttribute{ + Description: "Stream-level variable defaults as JSON.", + CustomType: jsontypes.NormalizedType{}, + Computed: true, + }, + }, + }, + }, + }, + }, "streams": schema.MapNestedAttribute{ Description: "Input streams mapped by stream ID.", Optional: true, - Computed: true, - Default: mapdefault.StaticValue(types.MapNull(getInputStreamType())), NestedObject: getInputStreamNestedObject(varsAreSensitive), }, }, @@ -169,10 +211,46 @@ func getInputStreamNestedObject(varsAreSensitive bool) schema.NestedAttributeObj } } -func getInputsElementType() attr.Type { - return getInputsNestedObject(false).Type() +func getInputsElementType() InputType { + return getInputsNestedObject(false).CustomType.(InputType) +} + +func getInputsAttributeTypes() map[string]attr.Type { + return map[string]attr.Type{ + "enabled": types.BoolType, + "vars": jsontypes.NormalizedType{}, + "defaults": types.ObjectType{ + AttrTypes: map[string]attr.Type{ + "vars": jsontypes.NormalizedType{}, + "streams": types.MapType{ + ElemType: types.ObjectType{ + AttrTypes: map[string]attr.Type{ + "enabled": types.BoolType, + "vars": jsontypes.NormalizedType{}, + }, + }, + }, + }, + }, + "streams": types.MapType{ + ElemType: types.ObjectType{ + AttrTypes: map[string]attr.Type{ + "enabled": types.BoolType, + "vars": jsontypes.NormalizedType{}, + }, + }, + }, + } } func getInputStreamType() attr.Type { return getInputStreamNestedObject(false).Type() } + +func getInputDefaultsType() attr.Type { + return getInputsAttributeTypes()["defaults"] +} + +func getInputDefaultsAttrTypes() map[string]attr.Type { + return getInputDefaultsType().(attr.TypeWithAttributeTypes).AttributeTypes() +} diff --git a/internal/fleet/integration_policy/schema_v1.go b/internal/fleet/integration_policy/schema_v1.go index 633398606..7b55a3a5d 100644 --- a/internal/fleet/integration_policy/schema_v1.go +++ b/internal/fleet/integration_policy/schema_v1.go @@ -79,9 +79,10 @@ func (m integrationPolicyModelV1) toV2(ctx context.Context) (integrationPolicyMo } inputsV2[id] = integrationPolicyInputsModel{ - Enabled: inputV1.Enabled, - Vars: inputV1.VarsJson, - Streams: streams, + Enabled: inputV1.Enabled, + Vars: inputV1.VarsJson, + Streams: streams, + Defaults: types.ObjectNull(getInputDefaultsAttrTypes()), } } diff --git a/internal/fleet/integration_policy/testdata/TestAccIntegrationPolicyInputs/minimal/integration_policy.tf b/internal/fleet/integration_policy/testdata/TestAccIntegrationPolicyInputs/minimal/integration_policy.tf new file mode 100644 index 000000000..0525d2731 --- /dev/null +++ b/internal/fleet/integration_policy/testdata/TestAccIntegrationPolicyInputs/minimal/integration_policy.tf @@ -0,0 +1,35 @@ +variable "policy_name" { + type = string +} + +resource "elasticstack_fleet_agent_policy" "test_policy" { + name = var.policy_name + namespace = "default" + description = "TestAccIntegrationPolicyInputs Agent Policy" + monitor_logs = true + monitor_metrics = true + skip_destroy = false +} + +data "elasticstack_fleet_integration" "test" { + name = "kafka" +} + +resource "elasticstack_fleet_integration_policy" "test_policy" { + name = var.policy_name + namespace = "default" + agent_policy_id = elasticstack_fleet_agent_policy.test_policy.id + integration_name = "kafka" + integration_version = data.elasticstack_fleet_integration.test.version + description = "Kafka Integration Policy - Minimal" + inputs = { + "kafka-logfile" = { + enabled = false + # Should not need to specify streams for disabled input + } + "kafka-kafka/metrics" = { + enabled = true + } + } +} + diff --git a/internal/fleet/integration_policy/testdata/TestAccIntegrationPolicyInputs/unset/integration_policy.tf b/internal/fleet/integration_policy/testdata/TestAccIntegrationPolicyInputs/unset/integration_policy.tf new file mode 100644 index 000000000..778ee6f75 --- /dev/null +++ b/internal/fleet/integration_policy/testdata/TestAccIntegrationPolicyInputs/unset/integration_policy.tf @@ -0,0 +1,26 @@ +variable "policy_name" { + type = string +} + +resource "elasticstack_fleet_agent_policy" "test_policy" { + name = var.policy_name + namespace = "default" + description = "TestAccIntegrationPolicyInputs Agent Policy" + monitor_logs = true + monitor_metrics = true + skip_destroy = false +} + +data "elasticstack_fleet_integration" "test" { + name = "kafka" +} + +resource "elasticstack_fleet_integration_policy" "test_policy" { + name = var.policy_name + namespace = "default" + agent_policy_id = elasticstack_fleet_agent_policy.test_policy.id + integration_name = "kafka" + integration_version = data.elasticstack_fleet_integration.test.version + description = "Kafka Integration Policy - Minimal" +} + diff --git a/internal/fleet/integration_policy/testdata/TestAccIntegrationPolicyInputs/update_logfile_tags_only/integration_policy.tf b/internal/fleet/integration_policy/testdata/TestAccIntegrationPolicyInputs/update_logfile_tags_only/integration_policy.tf new file mode 100644 index 000000000..918b80ad0 --- /dev/null +++ b/internal/fleet/integration_policy/testdata/TestAccIntegrationPolicyInputs/update_logfile_tags_only/integration_policy.tf @@ -0,0 +1,67 @@ +variable "policy_name" { + type = string +} + +resource "elasticstack_fleet_agent_policy" "test_policy" { + name = var.policy_name + namespace = "default" + description = "TestAccIntegrationPolicyInputs Agent Policy" + monitor_logs = true + monitor_metrics = true + skip_destroy = false +} + +data "elasticstack_fleet_integration" "test" { + name = "kafka" +} + +resource "elasticstack_fleet_integration_policy" "test_policy" { + name = var.policy_name + namespace = "default" + agent_policy_id = elasticstack_fleet_agent_policy.test_policy.id + integration_name = "kafka" + integration_version = data.elasticstack_fleet_integration.test.version + description = "Kafka Integration Policy - Logfile with tags only" + + inputs = { + "kafka-logfile" = { + enabled = true + streams = { + "kafka.log" = { + enabled = true + vars = jsonencode({ + "tags" = [ + "custom-tag-1", + "custom-tag-2" + ] + }) + } + } + } + "kafka-kafka/metrics" = { + enabled = true + vars = jsonencode({ + hosts = ["localhost:9092"] + period = "10s" + "ssl.certificate_authorities" = [] + }) + streams = { + "kafka.broker" = { + enabled = true + vars = jsonencode({ + "jolokia_hosts" = ["localhost:8778"] + }) + } + "kafka.consumergroup" = { + enabled = true + vars = jsonencode({ + "topics" = ["don't mention the war, I mentioned it once but I think I got away with it"] + }) + } + "kafka.partition" = { + enabled = false + } + } + } + } +} diff --git a/internal/fleet/integration_policy/testdata/integration_kafka.json b/internal/fleet/integration_policy/testdata/integration_kafka.json new file mode 100644 index 000000000..3af72b4f3 --- /dev/null +++ b/internal/fleet/integration_policy/testdata/integration_kafka.json @@ -0,0 +1,1481 @@ +{ + "item": { + "name": "kafka", + "title": "Kafka", + "version": "1.26.0", + "release": "ga", + "description": "Collect logs and metrics from Kafka servers with Elastic Agent.", + "type": "integration", + "download": "/epr/kafka/kafka-1.26.0.zip", + "path": "/package/kafka/1.26.0", + "icons": [ + { + "src": "/img/logo_kafka.svg", + "path": "/package/kafka/1.26.0/img/logo_kafka.svg", + "title": "logo kafka", + "size": "32x32", + "type": "image/svg+xml" + } + ], + "conditions": { + "kibana": { + "version": "^8.19.6 || ^9.0.6" + }, + "elastic": { + "subscription": "basic" + } + }, + "owner": { + "type": "elastic", + "github": "elastic/obs-infraobs-integrations" + }, + "categories": [ + "stream_processing", + "observability" + ], + "signature_path": "/epr/kafka/kafka-1.26.0.zip.sig", + "format_version": "3.0.2", + "readme": "/package/kafka/1.26.0/docs/README.md", + "license": "basic", + "screenshots": [ + { + "src": "/img/filebeat-kafka-logs-overview.png", + "path": "/package/kafka/1.26.0/img/filebeat-kafka-logs-overview.png", + "title": "filebeat kafka logs overview", + "size": "3024x3106", + "type": "image/png" + }, + { + "src": "/img/metricbeat_kafka_dashboard.png", + "path": "/package/kafka/1.26.0/img/metricbeat_kafka_dashboard.png", + "title": "metricbeat kafka dashboard", + "size": "3024x3722", + "type": "image/png" + }, + { + "src": "/img/metricbeat-kafka-controller.png", + "path": "/package/kafka/1.26.0/img/metricbeat-kafka-controller.png", + "title": "Kafka controller dashboard", + "size": "3024x5570", + "type": "image/png" + }, + { + "src": "/img/metricbeat-kafka-jvm.png", + "path": "/package/kafka/1.26.0/img/metricbeat-kafka-jvm.png", + "title": "Kafka jvm dashboard", + "size": "3024x7586", + "type": "image/png" + }, + { + "src": "/img/metricbeat-kafka-log_manger.png", + "path": "/package/kafka/1.26.0/img/metricbeat-kafka-log_manger.png", + "title": "Kafka log manager dashboard", + "size": "3024x4090", + "type": "image/png" + }, + { + "src": "/img/metricbeat-kafka-network.png", + "path": "/package/kafka/1.26.0/img/metricbeat-kafka-network.png", + "title": "Kafka network dashboard", + "size": "3024x4898", + "type": "image/png" + }, + { + "src": "/img/metricbeat-kafka-raft.png", + "path": "/package/kafka/1.26.0/img/metricbeat-kafka-raft.png", + "title": "Kafka raft dashboard", + "size": "3024x3890", + "type": "image/png" + }, + { + "src": "/img/metricbeat-kafka-replica_manager.png", + "path": "/package/kafka/1.26.0/img/metricbeat-kafka-replica_manager.png", + "title": "Kafka replica manager dashboard", + "size": "3024x4426", + "type": "image/png" + }, + { + "src": "/img/metricbeat-kafka-topic.png", + "path": "/package/kafka/1.26.0/img/metricbeat-kafka-topic.png", + "title": "Kafka topic dashboard", + "size": "3024x5906", + "type": "image/png" + }, + { + "src": "/img/metricbeat-kafka-consumer.png", + "path": "/package/kafka/1.26.0/img/metricbeat-kafka-consumer.png", + "title": "Kafka consumer dashboard", + "size": "1625x1649", + "type": "image/png" + }, + { + "src": "/img/metricbeat-kafka-producer.png", + "path": "/package/kafka/1.26.0/img/metricbeat-kafka-producer.png", + "title": "Kafka producer dashboard", + "size": "1365x3329", + "type": "image/png" + } + ], + "assets": { + "kibana": { + "dashboard": [ + { + "pkgkey": "kafka-1.26.0", + "service": "kibana", + "type": "dashboard", + "file": "kafka-20094380-8713-487d-8036-5cbbb529f0ca.json", + "path": "kafka-1.26.0/kibana/dashboard/kafka-20094380-8713-487d-8036-5cbbb529f0ca.json" + }, + { + "pkgkey": "kafka-1.26.0", + "service": "kibana", + "type": "dashboard", + "file": "kafka-28d2bf78-a638-4a1f-a46f-1a794c99166b.json", + "path": "kafka-1.26.0/kibana/dashboard/kafka-28d2bf78-a638-4a1f-a46f-1a794c99166b.json" + }, + { + "pkgkey": "kafka-1.26.0", + "service": "kibana", + "type": "dashboard", + "file": "kafka-32a191b0-c8ac-4ea1-812d-0e3d406f9a41.json", + "path": "kafka-1.26.0/kibana/dashboard/kafka-32a191b0-c8ac-4ea1-812d-0e3d406f9a41.json" + }, + { + "pkgkey": "kafka-1.26.0", + "service": "kibana", + "type": "dashboard", + "file": "kafka-6a239518-50b3-4af5-bb5c-01748d2310c8.json", + "path": "kafka-1.26.0/kibana/dashboard/kafka-6a239518-50b3-4af5-bb5c-01748d2310c8.json" + }, + { + "pkgkey": "kafka-1.26.0", + "service": "kibana", + "type": "dashboard", + "file": "kafka-894682fd-555e-49c6-b469-f8fbb3f0b9dc.json", + "path": "kafka-1.26.0/kibana/dashboard/kafka-894682fd-555e-49c6-b469-f8fbb3f0b9dc.json" + }, + { + "pkgkey": "kafka-1.26.0", + "service": "kibana", + "type": "dashboard", + "file": "kafka-943caca0-87ee-11e7-ad9c-db80de0bf8d3.json", + "path": "kafka-1.26.0/kibana/dashboard/kafka-943caca0-87ee-11e7-ad9c-db80de0bf8d3.json" + }, + { + "pkgkey": "kafka-1.26.0", + "service": "kibana", + "type": "dashboard", + "file": "kafka-a0e91797-fa95-497f-a130-614baa5327eb.json", + "path": "kafka-1.26.0/kibana/dashboard/kafka-a0e91797-fa95-497f-a130-614baa5327eb.json" + }, + { + "pkgkey": "kafka-1.26.0", + "service": "kibana", + "type": "dashboard", + "file": "kafka-c2cb8091-e812-4a23-9b59-3d461c4898ce.json", + "path": "kafka-1.26.0/kibana/dashboard/kafka-c2cb8091-e812-4a23-9b59-3d461c4898ce.json" + }, + { + "pkgkey": "kafka-1.26.0", + "service": "kibana", + "type": "dashboard", + "file": "kafka-e3be4fe8-c7b4-4cea-bb2e-953032cc322a.json", + "path": "kafka-1.26.0/kibana/dashboard/kafka-e3be4fe8-c7b4-4cea-bb2e-953032cc322a.json" + }, + { + "pkgkey": "kafka-1.26.0", + "service": "kibana", + "type": "dashboard", + "file": "kafka-ea488d90-8e63-11e8-8fa2-3d5f811fbd0f.json", + "path": "kafka-1.26.0/kibana/dashboard/kafka-ea488d90-8e63-11e8-8fa2-3d5f811fbd0f.json" + }, + { + "pkgkey": "kafka-1.26.0", + "service": "kibana", + "type": "dashboard", + "file": "kafka-ff622b8a-bbe2-4903-add2-ba3ad50806d2.json", + "path": "kafka-1.26.0/kibana/dashboard/kafka-ff622b8a-bbe2-4903-add2-ba3ad50806d2.json" + } + ] + }, + "elasticsearch": { + "ingest_pipeline": [ + { + "pkgkey": "kafka-1.26.0", + "service": "elasticsearch", + "type": "ingest_pipeline", + "file": "default.yml", + "dataset": "consumer", + "path": "kafka-1.26.0/data_stream/consumer/elasticsearch/ingest_pipeline/default.yml" + }, + { + "pkgkey": "kafka-1.26.0", + "service": "elasticsearch", + "type": "ingest_pipeline", + "file": "default.yml", + "dataset": "controller", + "path": "kafka-1.26.0/data_stream/controller/elasticsearch/ingest_pipeline/default.yml" + }, + { + "pkgkey": "kafka-1.26.0", + "service": "elasticsearch", + "type": "ingest_pipeline", + "file": "default.yml", + "dataset": "jvm", + "path": "kafka-1.26.0/data_stream/jvm/elasticsearch/ingest_pipeline/default.yml" + }, + { + "pkgkey": "kafka-1.26.0", + "service": "elasticsearch", + "type": "ingest_pipeline", + "file": "default.yml", + "dataset": "log", + "path": "kafka-1.26.0/data_stream/log/elasticsearch/ingest_pipeline/default.yml" + }, + { + "pkgkey": "kafka-1.26.0", + "service": "elasticsearch", + "type": "ingest_pipeline", + "file": "default.yml", + "dataset": "log_manager", + "path": "kafka-1.26.0/data_stream/log_manager/elasticsearch/ingest_pipeline/default.yml" + }, + { + "pkgkey": "kafka-1.26.0", + "service": "elasticsearch", + "type": "ingest_pipeline", + "file": "default.yml", + "dataset": "network", + "path": "kafka-1.26.0/data_stream/network/elasticsearch/ingest_pipeline/default.yml" + }, + { + "pkgkey": "kafka-1.26.0", + "service": "elasticsearch", + "type": "ingest_pipeline", + "file": "default.yml", + "dataset": "producer", + "path": "kafka-1.26.0/data_stream/producer/elasticsearch/ingest_pipeline/default.yml" + }, + { + "pkgkey": "kafka-1.26.0", + "service": "elasticsearch", + "type": "ingest_pipeline", + "file": "default.yml", + "dataset": "raft", + "path": "kafka-1.26.0/data_stream/raft/elasticsearch/ingest_pipeline/default.yml" + }, + { + "pkgkey": "kafka-1.26.0", + "service": "elasticsearch", + "type": "ingest_pipeline", + "file": "default.yml", + "dataset": "replica_manager", + "path": "kafka-1.26.0/data_stream/replica_manager/elasticsearch/ingest_pipeline/default.yml" + }, + { + "pkgkey": "kafka-1.26.0", + "service": "elasticsearch", + "type": "ingest_pipeline", + "file": "default.yml", + "dataset": "topic", + "path": "kafka-1.26.0/data_stream/topic/elasticsearch/ingest_pipeline/default.yml" + } + ] + } + }, + "policy_templates": [ + { + "name": "kafka", + "title": "Kafka logs and metrics", + "description": "Collect logs and metrics from Kafka brokers", + "inputs": [ + { + "type": "jolokia/metrics", + "vars": [ + { + "name": "hosts", + "type": "text", + "title": "Hosts", + "multi": true, + "required": true, + "show_user": true, + "default": [ + "http://127.0.0.1:8778" + ] + }, + { + "name": "metrics_path", + "type": "text", + "title": "Metrics Path", + "multi": false, + "required": true, + "show_user": true, + "default": "/jolokia" + }, + { + "name": "http_method", + "type": "text", + "title": "HTTP Method", + "multi": false, + "required": true, + "show_user": true, + "default": "GET" + }, + { + "name": "bearer_token_file", + "type": "text", + "title": "HTTP config options: bearer_token_file", + "description": "If defined, the contents of the file will be read once at initialization and then the value will be used in an HTTP Authorization header.", + "multi": false, + "required": false, + "show_user": false + }, + { + "name": "ssl.verification_mode", + "type": "text", + "title": "SSL Verification Mode", + "description": "SSL verification mode. See [documentation](https://www.elastic.co/guide/en/fleet/current/elastic-agent-ssl-configuration.html) for details.", + "multi": false, + "required": false, + "show_user": false, + "default": "none" + }, + { + "name": "ssl.certificate_authorities", + "type": "text", + "title": "SSL Certificate Authorities", + "description": "SSL certificate authorities. See [documentation](https://www.elastic.co/guide/en/fleet/current/elastic-agent-ssl-configuration.html) for details.", + "multi": true, + "required": false, + "show_user": true + }, + { + "name": "ssl.certificate", + "type": "text", + "title": "SSL Certificate", + "description": "SSL certificate. See [documentation](https://www.elastic.co/guide/en/fleet/current/elastic-agent-ssl-configuration.html) for details.", + "multi": false, + "required": false, + "show_user": true + }, + { + "name": "ssl.key", + "type": "text", + "title": "SSL Private Key", + "description": "SSL private key. See [documentation](https://www.elastic.co/guide/en/fleet/current/elastic-agent-ssl-configuration.html) for details.", + "multi": false, + "required": false, + "show_user": true + }, + { + "name": "ssl.key_passphrase", + "type": "password", + "title": "SSL Key Passphrase", + "description": "SSL key passphrase. See [documentation](https://www.elastic.co/guide/en/fleet/current/elastic-agent-ssl-configuration.html) for details.", + "multi": false, + "required": false, + "show_user": true + }, + { + "name": "ssl.ca_trusted_fingerprint", + "type": "text", + "title": "SSL CA Trusted Fingerprint", + "description": "SSL CA trusted fingerprint. See [documentation](https://www.elastic.co/guide/en/fleet/current/elastic-agent-ssl-configuration.html) for details.", + "multi": false, + "required": false, + "show_user": true + }, + { + "name": "username", + "type": "text", + "title": "HTTP config options: Username", + "description": "The username to use for basic authentication.", + "multi": false, + "required": false, + "show_user": false + }, + { + "name": "password", + "type": "password", + "title": "HTTP config options: Password", + "description": "The password to use for basic authentication.", + "multi": false, + "required": false, + "show_user": false + }, + { + "name": "connect_timeout", + "type": "text", + "title": "HTTP config options: connect_timeout", + "description": "Total time limit for an HTTP connection to be completed (Default 2 seconds)", + "multi": false, + "required": false, + "show_user": false + }, + { + "name": "timeout", + "type": "text", + "title": "HTTP config options: timeout", + "description": "Total time limit for HTTP requests made by the module (Default 10 seconds)", + "multi": false, + "required": false, + "show_user": false + }, + { + "name": "headers", + "type": "yaml", + "title": "HTTP config options: headers", + "description": "A list of headers to use with the HTTP request", + "multi": false, + "required": false, + "show_user": false, + "default": "# headers:\n# Cookie: abcdef=123456\n# My-Custom-Header: my-custom-value\n" + } + ], + "title": "Collect metrics from Kafka through the Jolokia endpoint", + "description": "Collect metrics from Kafka through the Jolokia endpoint." + }, + { + "type": "logfile", + "title": "Collect logs from Kafka brokers", + "description": "Collecting Kafka log logs" + }, + { + "type": "kafka/metrics", + "vars": [ + { + "name": "hosts", + "type": "text", + "title": "Kafka address", + "multi": true, + "required": true, + "show_user": true, + "default": [ + "localhost:9092" + ] + }, + { + "name": "period", + "type": "text", + "title": "Period", + "multi": false, + "required": true, + "show_user": true, + "default": "10s" + }, + { + "name": "ssl.certificate_authorities", + "type": "text", + "title": "SSL Certificate Authorities", + "description": "SSL certificate authorities. See [documentation](https://www.elastic.co/guide/en/fleet/current/elastic-agent-ssl-configuration.html) for details.", + "multi": true, + "required": false, + "show_user": true + }, + { + "name": "ssl.certificate", + "type": "text", + "title": "SSL Certificate", + "description": "SSL certificate. See [documentation](https://www.elastic.co/guide/en/fleet/current/elastic-agent-ssl-configuration.html) for details.", + "multi": false, + "required": false, + "show_user": true + }, + { + "name": "ssl.key", + "type": "text", + "title": "SSL Private Key", + "description": "SSL certificate key. See [documentation](https://www.elastic.co/guide/en/fleet/current/elastic-agent-ssl-configuration.html) for details.", + "multi": false, + "required": false, + "show_user": true + }, + { + "name": "ssl.key_passphrase", + "type": "password", + "title": "SSL Key Passphrase", + "description": "SSL certificate passphrase. See [documentation](https://www.elastic.co/guide/en/fleet/current/elastic-agent-ssl-configuration.html) for details.", + "multi": false, + "required": false, + "show_user": true + }, + { + "name": "ssl.verification_mode", + "type": "text", + "title": "SSL Verification Mode", + "description": "SSL verification mode. See [documentation](https://www.elastic.co/guide/en/fleet/current/elastic-agent-ssl-configuration.html) for details.", + "multi": false, + "required": false, + "show_user": true + }, + { + "name": "ssl.ca_trusted_fingerprint", + "type": "text", + "title": "SSL CA Trusted Fingerprint", + "description": "SSL CA trusted fingerprint. See [documentation](https://www.elastic.co/guide/en/fleet/current/elastic-agent-ssl-configuration.html) for details.", + "multi": false, + "required": false, + "show_user": true + } + ], + "title": "Collect metrics from Kafka brokers", + "description": "Collecting Kafka broker, consumergroup and partition metrics" + } + ], + "multiple": true + } + ], + "data_streams": [ + { + "type": "metrics", + "dataset": "kafka.broker", + "title": "Kafka broker metrics", + "release": "ga", + "streams": [ + { + "input": "kafka/metrics", + "vars": [ + { + "name": "jolokia_hosts", + "type": "text", + "title": "Address of Jolokia agent installed in Kafka", + "multi": true, + "required": true, + "show_user": true, + "default": [ + "localhost:8778" + ] + }, + { + "name": "processors", + "type": "yaml", + "title": "Processors", + "description": "Processors are used to reduce the number of fields in the exported event or to enhance the event with metadata. This executes in the agent before the events are shipped. See [Processors](https://www.elastic.co/guide/en/fleet/current/elastic-agent-processor-configuration.html) for details.\n", + "multi": false, + "required": false, + "show_user": false + } + ], + "template_path": "stream.yml.hbs", + "title": "Kafka broker metrics", + "description": "Collect Kafka broker metrics", + "enabled": true, + "ingestion_method": "Jolokia" + } + ], + "package": "kafka", + "elasticsearch": {}, + "path": "broker" + }, + { + "type": "metrics", + "dataset": "kafka.consumer", + "title": "Kafka consumer metrics", + "release": "beta", + "ingest_pipeline": "default", + "streams": [ + { + "input": "jolokia/metrics", + "vars": [ + { + "name": "jolokia_hosts", + "type": "text", + "title": "Address of Jolokia agent installed in Kafka", + "multi": true, + "required": true, + "show_user": true, + "default": [ + "localhost:8774" + ] + }, + { + "name": "period", + "type": "text", + "title": "Period", + "multi": false, + "required": false, + "show_user": true, + "default": "60s" + }, + { + "name": "processors", + "type": "yaml", + "title": "Processors", + "description": "Processors are used to reduce the number of fields in the exported event or to enhance the event with metadata. This executes in the agent before the events are shipped. See [Processors](https://www.elastic.co/guide/en/fleet/current/elastic-agent-processor-configuration.html) for details.\n", + "multi": false, + "required": false, + "show_user": false + } + ], + "template_path": "stream.yml.hbs", + "title": "Kafka consumer metrics", + "description": "Collect Kafka consumer metrics", + "enabled": false, + "ingestion_method": "Jolokia" + } + ], + "package": "kafka", + "elasticsearch": { + "ingest_pipeline.name": "default" + }, + "path": "consumer" + }, + { + "type": "metrics", + "dataset": "kafka.consumergroup", + "title": "Kafka consumergroup metrics", + "release": "ga", + "streams": [ + { + "input": "kafka/metrics", + "vars": [ + { + "name": "topics", + "type": "text", + "title": "List of topics to query metrics for", + "multi": true, + "required": false, + "show_user": true + }, + { + "name": "username", + "type": "text", + "title": "SASL username", + "multi": false, + "required": false, + "show_user": false + }, + { + "name": "password", + "type": "password", + "title": "SASL password", + "multi": false, + "required": false, + "show_user": false + }, + { + "name": "mechanism", + "type": "text", + "title": "SASL mechanism", + "description": "SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512", + "multi": false, + "required": false, + "show_user": false + }, + { + "name": "processors", + "type": "yaml", + "title": "Processors", + "description": "Processors are used to reduce the number of fields in the exported event or to enhance the event with metadata. This executes in the agent before the events are shipped. See [Processors](https://www.elastic.co/guide/en/fleet/current/elastic-agent-processor-configuration.html) for details.\n", + "multi": false, + "required": false, + "show_user": false + } + ], + "template_path": "stream.yml.hbs", + "title": "Kafka consumergroup metrics", + "description": "Collect Kafka consumergroup metrics", + "enabled": true, + "ingestion_method": "Jolokia" + } + ], + "package": "kafka", + "elasticsearch": { + "index_template.settings": { + "index": { + "mapping": { + "dimension_fields": { + "limit": 24 + } + } + } + } + }, + "path": "consumergroup" + }, + { + "type": "metrics", + "dataset": "kafka.controller", + "title": "Apache Kafka controller metrics", + "release": "beta", + "ingest_pipeline": "default", + "streams": [ + { + "input": "jolokia/metrics", + "vars": [ + { + "name": "period", + "type": "text", + "title": "Period", + "multi": false, + "required": false, + "show_user": true, + "default": "60s" + }, + { + "name": "processors", + "type": "yaml", + "title": "Processors", + "description": "Processors are used to reduce the number of fields in the exported event or to enhance the event with metadata. This executes in the agent before the logs are parsed. See [Processors](https://www.elastic.co/guide/en/fleet/current/elastic-agent-processor-configuration.html) for details.\n", + "multi": false, + "required": false, + "show_user": false + } + ], + "template_path": "stream.yml.hbs", + "title": "Apache Kafka controller metrics", + "description": "Collect Apache Kafka controller metrics using Jolokia agent.", + "enabled": false, + "ingestion_method": "Jolokia" + } + ], + "package": "kafka", + "elasticsearch": { + "ingest_pipeline.name": "default" + }, + "path": "controller" + }, + { + "type": "metrics", + "dataset": "kafka.jvm", + "title": "Apache Kafka JVM metrics", + "release": "beta", + "ingest_pipeline": "default", + "streams": [ + { + "input": "jolokia/metrics", + "vars": [ + { + "name": "period", + "type": "text", + "title": "Period", + "multi": false, + "required": false, + "show_user": true, + "default": "60s" + }, + { + "name": "processors", + "type": "yaml", + "title": "Processors", + "description": "Processors are used to reduce the number of fields in the exported event or to enhance the event with metadata. This executes in the agent before the logs are parsed. See [Processors](https://www.elastic.co/guide/en/fleet/current/elastic-agent-processor-configuration.html) for details.\n", + "multi": false, + "required": false, + "show_user": false + } + ], + "template_path": "stream.yml.hbs", + "title": "Apache Kafka JVM metrics", + "description": "Collect Apache Kafka JVM metrics using Jolokia agent.", + "enabled": false, + "ingestion_method": "Jolokia" + } + ], + "package": "kafka", + "elasticsearch": { + "ingest_pipeline.name": "default" + }, + "path": "jvm" + }, + { + "type": "logs", + "dataset": "kafka.log", + "title": "Kafka log logs", + "release": "ga", + "ingest_pipeline": "default", + "streams": [ + { + "input": "logfile", + "vars": [ + { + "name": "kafka_home", + "type": "text", + "title": "Kafka home directory", + "multi": false, + "required": true, + "show_user": true, + "default": "/opt/kafka*" + }, + { + "name": "paths", + "type": "text", + "title": "Paths (inside home directory)", + "multi": true, + "required": true, + "show_user": true, + "default": [ + "/logs/controller.log*", + "/logs/server.log*", + "/logs/state-change.log*", + "/logs/kafka-*.log*" + ] + }, + { + "name": "tags", + "type": "text", + "title": "Tags", + "multi": true, + "required": true, + "show_user": false, + "default": [ + "kafka-log" + ] + }, + { + "name": "preserve_original_event", + "type": "bool", + "title": "Preserve original event", + "description": "Preserves a raw copy of the original event, added to the field `event.original`", + "multi": false, + "required": true, + "show_user": true, + "default": false + }, + { + "name": "processors", + "type": "yaml", + "title": "Processors", + "description": "Processors are used to reduce the number of fields in the exported event or to enhance the event with metadata. This executes in the agent before the logs are parsed. See [Processors](https://www.elastic.co/guide/en/beats/filebeat/current/filtering-and-enhancing-data.html) for details.\n", + "multi": false, + "required": false, + "show_user": false + } + ], + "template_path": "log.yml.hbs", + "title": "Kafka log logs (log)", + "description": "Collect Kafka log logs using log input", + "enabled": true, + "ingestion_method": "File" + } + ], + "package": "kafka", + "elasticsearch": { + "ingest_pipeline.name": "default" + }, + "path": "log" + }, + { + "type": "metrics", + "dataset": "kafka.log_manager", + "title": "Apache Kafka Log Manager metrics", + "release": "beta", + "ingest_pipeline": "default", + "streams": [ + { + "input": "jolokia/metrics", + "vars": [ + { + "name": "period", + "type": "text", + "title": "Period", + "multi": false, + "required": false, + "show_user": true, + "default": "60s" + }, + { + "name": "processors", + "type": "yaml", + "title": "Processors", + "description": "Processors are used to reduce the number of fields in the exported event or to enhance the event with metadata. This executes in the agent before the logs are parsed. See [Processors](https://www.elastic.co/guide/en/fleet/current/elastic-agent-processor-configuration.html) for details.\n", + "multi": false, + "required": false, + "show_user": false + } + ], + "template_path": "stream.yml.hbs", + "title": "Apache Kafka Log Manager metrics", + "description": "Collect Apache Kafka Log Manager metrics using Jolokia agent.", + "enabled": false, + "ingestion_method": "Jolokia" + } + ], + "package": "kafka", + "elasticsearch": { + "ingest_pipeline.name": "default" + }, + "path": "log_manager" + }, + { + "type": "metrics", + "dataset": "kafka.network", + "title": "Apache Kafka network metrics", + "release": "beta", + "ingest_pipeline": "default", + "streams": [ + { + "input": "jolokia/metrics", + "vars": [ + { + "name": "period", + "type": "text", + "title": "Period", + "multi": false, + "required": false, + "show_user": true, + "default": "60s" + }, + { + "name": "processors", + "type": "yaml", + "title": "Processors", + "description": "Processors are used to reduce the number of fields in the exported event or to enhance the event with metadata. This executes in the agent before the logs are parsed. See [Processors](https://www.elastic.co/guide/en/fleet/current/elastic-agent-processor-configuration.html) for details.\n", + "multi": false, + "required": false, + "show_user": false + } + ], + "template_path": "stream.yml.hbs", + "title": "Apache Kafka network metrics", + "description": "Collect Apache Kafka network metrics using Jolokia agent.", + "enabled": false, + "ingestion_method": "Jolokia" + } + ], + "package": "kafka", + "elasticsearch": { + "index_template.settings": { + "index": { + "mapping": { + "dimension_fields": { + "limit": 32 + } + } + } + }, + "ingest_pipeline.name": "default" + }, + "path": "network" + }, + { + "type": "metrics", + "dataset": "kafka.partition", + "title": "Kafka partition metrics", + "release": "ga", + "streams": [ + { + "input": "kafka/metrics", + "vars": [ + { + "name": "topics", + "type": "text", + "title": "List of topics to query metrics for", + "multi": true, + "required": false, + "show_user": true + }, + { + "name": "username", + "type": "text", + "title": "SASL username", + "multi": false, + "required": false, + "show_user": false + }, + { + "name": "password", + "type": "password", + "title": "SASL password", + "multi": false, + "required": false, + "show_user": false + }, + { + "name": "mechanism", + "type": "text", + "title": "SASL mechanism", + "description": "SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512", + "multi": false, + "required": false, + "show_user": false + }, + { + "name": "processors", + "type": "yaml", + "title": "Processors", + "description": "Processors are used to reduce the number of fields in the exported event or to enhance the event with metadata. This executes in the agent before the events are shipped. See [Processors](https://www.elastic.co/guide/en/fleet/current/elastic-agent-processor-configuration.html) for details.\n", + "multi": false, + "required": false, + "show_user": false + } + ], + "template_path": "stream.yml.hbs", + "title": "Kafka partition metrics", + "description": "Collect Kafka partition metrics", + "enabled": true, + "ingestion_method": "Jolokia" + } + ], + "package": "kafka", + "elasticsearch": {}, + "path": "partition" + }, + { + "type": "metrics", + "dataset": "kafka.producer", + "title": "Kafka producer metrics", + "release": "beta", + "ingest_pipeline": "default", + "streams": [ + { + "input": "jolokia/metrics", + "vars": [ + { + "name": "jolokia_hosts", + "type": "text", + "title": "Address of Jolokia agent installed in Kafka", + "multi": true, + "required": true, + "show_user": true, + "default": [ + "localhost:8775" + ] + }, + { + "name": "period", + "type": "text", + "title": "Period", + "multi": false, + "required": false, + "show_user": true, + "default": "60s" + }, + { + "name": "processors", + "type": "yaml", + "title": "Processors", + "description": "Processors are used to reduce the number of fields in the exported event or to enhance the event with metadata. This executes in the agent before the events are shipped. See [Processors](https://www.elastic.co/guide/en/fleet/current/elastic-agent-processor-configuration.html) for details.\n", + "multi": false, + "required": false, + "show_user": false + } + ], + "template_path": "stream.yml.hbs", + "title": "Kafka producer metrics", + "description": "Collect Kafka producer metrics", + "enabled": false, + "ingestion_method": "Jolokia" + } + ], + "package": "kafka", + "elasticsearch": { + "ingest_pipeline.name": "default" + }, + "path": "producer" + }, + { + "type": "metrics", + "dataset": "kafka.raft", + "title": "Apache Kafka Raft metrics", + "release": "beta", + "ingest_pipeline": "default", + "streams": [ + { + "input": "jolokia/metrics", + "vars": [ + { + "name": "period", + "type": "text", + "title": "Period", + "multi": false, + "required": false, + "show_user": true, + "default": "60s" + }, + { + "name": "processors", + "type": "yaml", + "title": "Processors", + "description": "Processors are used to reduce the number of fields in the exported event or to enhance the event with metadata. This executes in the agent before the logs are parsed. See [Processors](https://www.elastic.co/guide/en/fleet/current/elastic-agent-processor-configuration.html) for details.\n", + "multi": false, + "required": false, + "show_user": false + } + ], + "template_path": "stream.yml.hbs", + "title": "Apache Kafka Raft metrics", + "description": "Collect Apache Kafka Raft metrics using Jolokia agent.", + "enabled": false, + "ingestion_method": "Jolokia" + } + ], + "package": "kafka", + "elasticsearch": { + "ingest_pipeline.name": "default" + }, + "path": "raft" + }, + { + "type": "metrics", + "dataset": "kafka.replica_manager", + "title": "Apache Kafka replica manager metrics", + "release": "beta", + "ingest_pipeline": "default", + "streams": [ + { + "input": "jolokia/metrics", + "vars": [ + { + "name": "period", + "type": "text", + "title": "Period", + "multi": false, + "required": false, + "show_user": true, + "default": "60s" + }, + { + "name": "processors", + "type": "yaml", + "title": "Processors", + "description": "Processors are used to reduce the number of fields in the exported event or to enhance the event with metadata. This executes in the agent before the logs are parsed. See [Processors](https://www.elastic.co/guide/en/fleet/current/elastic-agent-processor-configuration.html) for details.\n", + "multi": false, + "required": false, + "show_user": false + } + ], + "template_path": "stream.yml.hbs", + "title": "Apache Kafka replica manager metrics", + "description": "Collect Apache Kafka replica manager metrics using Jolokia agent.", + "enabled": false, + "ingestion_method": "Jolokia" + } + ], + "package": "kafka", + "elasticsearch": { + "ingest_pipeline.name": "default" + }, + "path": "replica_manager" + }, + { + "type": "metrics", + "dataset": "kafka.topic", + "title": "Apache Kafka topic metrics", + "release": "beta", + "ingest_pipeline": "default", + "streams": [ + { + "input": "jolokia/metrics", + "vars": [ + { + "name": "period", + "type": "text", + "title": "Period", + "multi": false, + "required": false, + "show_user": true, + "default": "60s" + }, + { + "name": "processors", + "type": "yaml", + "title": "Processors", + "description": "Processors are used to reduce the number of fields in the exported event or to enhance the event with metadata. This executes in the agent before the logs are parsed. See [Processors](https://www.elastic.co/guide/en/fleet/current/elastic-agent-processor-configuration.html) for details.\n", + "multi": false, + "required": false, + "show_user": false + } + ], + "template_path": "stream.yml.hbs", + "title": "Apache Kafka topic metrics", + "description": "Collect Apache Kafka topic metrics using Jolokia agent.", + "enabled": false, + "ingestion_method": "Jolokia" + } + ], + "package": "kafka", + "elasticsearch": { + "ingest_pipeline.name": "default" + }, + "path": "topic" + } + ], + "latestVersion": "1.26.0", + "licensePath": "/package/kafka/1.26.0/LICENSE.txt", + "keepPoliciesUpToDate": false, + "status": "installed", + "installationInfo": { + "created_at": "2025-11-17T10:45:25.683Z", + "updated_at": "2025-11-18T22:21:45.740Z", + "namespaces": [], + "type": "epm-packages", + "installed_kibana": [ + { + "id": "kafka-20094380-8713-487d-8036-5cbbb529f0ca", + "type": "dashboard" + }, + { + "id": "kafka-28d2bf78-a638-4a1f-a46f-1a794c99166b", + "type": "dashboard" + }, + { + "id": "kafka-32a191b0-c8ac-4ea1-812d-0e3d406f9a41", + "type": "dashboard" + }, + { + "id": "kafka-6a239518-50b3-4af5-bb5c-01748d2310c8", + "type": "dashboard" + }, + { + "id": "kafka-894682fd-555e-49c6-b469-f8fbb3f0b9dc", + "type": "dashboard" + }, + { + "id": "kafka-943caca0-87ee-11e7-ad9c-db80de0bf8d3", + "type": "dashboard" + }, + { + "id": "kafka-a0e91797-fa95-497f-a130-614baa5327eb", + "type": "dashboard" + }, + { + "id": "kafka-c2cb8091-e812-4a23-9b59-3d461c4898ce", + "type": "dashboard" + }, + { + "id": "kafka-e3be4fe8-c7b4-4cea-bb2e-953032cc322a", + "type": "dashboard" + }, + { + "id": "kafka-ea488d90-8e63-11e8-8fa2-3d5f811fbd0f", + "type": "dashboard" + }, + { + "id": "kafka-ff622b8a-bbe2-4903-add2-ba3ad50806d2", + "type": "dashboard" + } + ], + "installed_kibana_space_id": "default", + "installed_es": [ + { + "id": "metrics-kafka.broker-1.26.0", + "type": "ingest_pipeline" + }, + { + "id": "metrics-kafka.consumer-1.26.0", + "type": "ingest_pipeline" + }, + { + "id": "metrics-kafka.consumergroup-1.26.0", + "type": "ingest_pipeline" + }, + { + "id": "metrics-kafka.controller-1.26.0", + "type": "ingest_pipeline" + }, + { + "id": "metrics-kafka.jvm-1.26.0", + "type": "ingest_pipeline" + }, + { + "id": "logs-kafka.log-1.26.0", + "type": "ingest_pipeline" + }, + { + "id": "metrics-kafka.log_manager-1.26.0", + "type": "ingest_pipeline" + }, + { + "id": "metrics-kafka.network-1.26.0", + "type": "ingest_pipeline" + }, + { + "id": "metrics-kafka.partition-1.26.0", + "type": "ingest_pipeline" + }, + { + "id": "metrics-kafka.producer-1.26.0", + "type": "ingest_pipeline" + }, + { + "id": "metrics-kafka.raft-1.26.0", + "type": "ingest_pipeline" + }, + { + "id": "metrics-kafka.replica_manager-1.26.0", + "type": "ingest_pipeline" + }, + { + "id": "metrics-kafka.topic-1.26.0", + "type": "ingest_pipeline" + }, + { + "id": "metrics-kafka.broker", + "type": "index_template" + }, + { + "id": "metrics-kafka.broker@package", + "type": "component_template" + }, + { + "id": "metrics@custom", + "type": "component_template" + }, + { + "id": "kafka@custom", + "type": "component_template" + }, + { + "id": "metrics-kafka.broker@custom", + "type": "component_template" + }, + { + "id": "metrics-kafka.consumer", + "type": "index_template" + }, + { + "id": "metrics-kafka.consumer@package", + "type": "component_template" + }, + { + "id": "metrics-kafka.consumer@custom", + "type": "component_template" + }, + { + "id": "metrics-kafka.consumergroup", + "type": "index_template" + }, + { + "id": "metrics-kafka.consumergroup@package", + "type": "component_template" + }, + { + "id": "metrics-kafka.consumergroup@custom", + "type": "component_template" + }, + { + "id": "metrics-kafka.controller", + "type": "index_template" + }, + { + "id": "metrics-kafka.controller@package", + "type": "component_template" + }, + { + "id": "metrics-kafka.controller@custom", + "type": "component_template" + }, + { + "id": "metrics-kafka.jvm", + "type": "index_template" + }, + { + "id": "metrics-kafka.jvm@package", + "type": "component_template" + }, + { + "id": "metrics-kafka.jvm@custom", + "type": "component_template" + }, + { + "id": "logs-kafka.log", + "type": "index_template" + }, + { + "id": "logs-kafka.log@package", + "type": "component_template" + }, + { + "id": "logs@custom", + "type": "component_template" + }, + { + "id": "logs-kafka.log@custom", + "type": "component_template" + }, + { + "id": "metrics-kafka.log_manager", + "type": "index_template" + }, + { + "id": "metrics-kafka.log_manager@package", + "type": "component_template" + }, + { + "id": "metrics-kafka.log_manager@custom", + "type": "component_template" + }, + { + "id": "metrics-kafka.network", + "type": "index_template" + }, + { + "id": "metrics-kafka.network@package", + "type": "component_template" + }, + { + "id": "metrics-kafka.network@custom", + "type": "component_template" + }, + { + "id": "metrics-kafka.partition", + "type": "index_template" + }, + { + "id": "metrics-kafka.partition@package", + "type": "component_template" + }, + { + "id": "metrics-kafka.partition@custom", + "type": "component_template" + }, + { + "id": "metrics-kafka.producer", + "type": "index_template" + }, + { + "id": "metrics-kafka.producer@package", + "type": "component_template" + }, + { + "id": "metrics-kafka.producer@custom", + "type": "component_template" + }, + { + "id": "metrics-kafka.raft", + "type": "index_template" + }, + { + "id": "metrics-kafka.raft@package", + "type": "component_template" + }, + { + "id": "metrics-kafka.raft@custom", + "type": "component_template" + }, + { + "id": "metrics-kafka.replica_manager", + "type": "index_template" + }, + { + "id": "metrics-kafka.replica_manager@package", + "type": "component_template" + }, + { + "id": "metrics-kafka.replica_manager@custom", + "type": "component_template" + }, + { + "id": "metrics-kafka.topic", + "type": "index_template" + }, + { + "id": "metrics-kafka.topic@package", + "type": "component_template" + }, + { + "id": "metrics-kafka.topic@custom", + "type": "component_template" + } + ], + "install_status": "installed", + "install_source": "registry", + "name": "kafka", + "version": "1.26.0", + "verification_status": "verified", + "verification_key_id": "d27d666cd88e42b4", + "latest_install_failed_attempts": [], + "rolled_back": false + } + } +} \ No newline at end of file diff --git a/internal/fleet/integration_policy/update.go b/internal/fleet/integration_policy/update.go index 890cd75b4..197b957b6 100644 --- a/internal/fleet/integration_policy/update.go +++ b/internal/fleet/integration_policy/update.go @@ -76,7 +76,13 @@ func (r *integrationPolicyResource) Update(ctx context.Context, req resource.Upd // Remember the input configuration from state stateHadInput := utils.IsKnown(stateModel.Inputs) && !stateModel.Inputs.IsNull() && len(stateModel.Inputs.Elements()) > 0 - diags = planModel.populateFromAPI(ctx, policy) + pkg, diags := fleet.GetPackage(ctx, client, policy.Package.Name, policy.Package.Version) + resp.Diagnostics.Append(diags...) + if resp.Diagnostics.HasError() { + return + } + + diags = planModel.populateFromAPI(ctx, pkg, policy) resp.Diagnostics.Append(diags...) if resp.Diagnostics.HasError() { return From 5c9049e7ad7adef1bdd782131a1d9968dc5cb3ef Mon Sep 17 00:00:00 2001 From: Toby Brain Date: Wed, 24 Dec 2025 04:54:04 +1100 Subject: [PATCH 2/2] Update internal/fleet/integration_policy/input_value.go Co-authored-by: Dmitry Onishchenko <8962171+dimuon@users.noreply.github.com> --- internal/fleet/integration_policy/input_value.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/fleet/integration_policy/input_value.go b/internal/fleet/integration_policy/input_value.go index cdfb4303b..98fb53d75 100644 --- a/internal/fleet/integration_policy/input_value.go +++ b/internal/fleet/integration_policy/input_value.go @@ -35,7 +35,7 @@ func (v InputValue) Equal(o attr.Value) bool { } func (v InputValue) MaybeEnabled(ctx context.Context) (bool, diag.Diagnostics) { - if v.IsNull() || v.IsUnknown() { + if !utils.IsKnown(v) { return false, nil }