diff --git a/docs/flows.md b/docs/flows.md index 9d42583..eb53890 100644 --- a/docs/flows.md +++ b/docs/flows.md @@ -63,6 +63,7 @@ destruction: replicaSpace: optional-name exported: true +sequential: true parameters: parameterName1: @@ -195,7 +196,7 @@ my-flow -> -> pod/pod-$arg ``` will create two pods: `pod-a` and `pod-b`. -## Replication of flows +## Replication Flow replication is an AppController feature that makes specified number of flow graph copies, each one with a unique name and then merges them into a single graph. Because each replica name may be used in some of resource @@ -234,6 +235,96 @@ If there were 7 of them, 4 replicas would be deleted.\ `kubeac run my-flow` if there are no replicas exist, create one, otherwise validate status of resources of existing replicas. +### Replication of dependencies + +With commandline parameters one can create number of flow replicas. But sometimes there is a need to have flow +that creates several replicas of another flow, or just several resources with the same specification that differ +only in name. + +One possible solution is to utilize technique shown above: make parameter value be part of resource name and +then duplicate the dependency that leads to this resource and pass different parameter value along each of +dependencies. This works well for small and fixed number of replicas. But if the number goes big, it becomes hard +to manage such number of dependency objects. Moreover if the number itself is not fixed but rather passed as a +parameter replicating resource by manual replication of dependencies becomes impossible. + +Luckily, the dependencies can be automatically replicated. This is done through the `generateFor` field of the +`Dependency` object. `generateFor` is a map where keys are argument names and values are list expressions. Each list +expression is comma-separated list of values. If the value has a form of `number..number`, it is expended into a +list of integers in the given range. For example `"1..3, 10..11, abc"` will turn into `["1", "2", "3", "10", "11", "abc"]`. +Then the dependency is going to be replicated automatically with each replica getting on of the list values as an +additional argument. There can be several `generateFor` arguments. In this case there is going to be one dependency +for each combination of the list values. For example, + +```YAML +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: dependency +parent: pod/podName +child: flow/flowName-$x-$y +generateFor: + x: 1..2 + y: a, b +``` + +has the same effect as + +```YAML +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: dependency1 +parent: pod/podName +child: flow/flowName-$x-$y +args: + x: 1 + y: a +--- +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: dependency2 +parent: pod/podName +child: flow/flowName-$x-$y +args: + x: 2 + y: a +--- +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: dependency3 +parent: pod/podName +child: flow/flowName-$x-$y +args: + x: 1 + y: b +--- +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: dependency4 +parent: pod/podName +child: flow/flowName-$x-$y +args: + x: 2 + y: b +``` + +Besides simplifying the dependency graph, dependency replication makes possible to have dynamic number of replicas +by using parameter value right inside the list expressions: + +```YAML +apiVersion: appcontroller.k8s/v1alpha1 +kind: Dependency +metadata: + name: dependency +parent: pod/podName +child: flow/flowName-$index +generateFor: + index: 1..$replicaCount +``` + ### Replica-spaces and contexts Replica-space, is a tag that all replicas of the flow share. When new `Replica` object for the flow is created, @@ -261,6 +352,14 @@ another flow will "see" only its own replicas so the `Flow` resource can always However, when the flow is run independently, it will not have any context and thus query replicas based on replica-space alone, which means it will get all the replicas from all contexts. +### Sequential flows + +By default, if flow has more than one replica, generated dependency graph would have each replica subgraph attached +to the graph root vertex (the `Flow` vertex). When deployed, resources of all replicas are going to be created in +parallel. However, in some cases it is desired that replicas be deployed sequentially, one by one. This can be achieved +by setting `sequential` attribute of the `Flow` to `true`. For sequential flows each replica roots get attached to the +leaf vertices of previous one. + ## Scheduling flow deployments When user runs `kubeac run something` the deployment does not happen immediately (unless there is also a `--deploy` diff --git a/examples/etcd/README.md b/examples/etcd/README.md index cd945de..22fe40c 100644 --- a/examples/etcd/README.md +++ b/examples/etcd/README.md @@ -26,8 +26,7 @@ If omitted, `etcd` name is used by default. `kubectl exec k8s-appcontroller kubeac run etcd-scale -n +1 --arg clusterName=my-cluster` `-n +1` - adds one node to the cluster. Use `-n -1` to scale the cluster down by one node. In this case the last -added node is going to be deleted. At the moment it is only possible to scale cluster up by one node at a time. -However, any number of nodes can be removed. Note, that this can also remove nodes created upon initial deployment. +added node is going to be deleted. This flow can also remove nodes created upon initial deployment. `--arg clusterName=my-cluster` - name of the cluster to scale (`etcd` if not specified). diff --git a/examples/etcd/resdefs/scale-flow.yaml b/examples/etcd/resdefs/scale-flow.yaml index 4394b4b..4fee091 100644 --- a/examples/etcd/resdefs/scale-flow.yaml +++ b/examples/etcd/resdefs/scale-flow.yaml @@ -4,6 +4,8 @@ metadata: name: etcd-scale exported: true +sequential: true + construction: flow: etcd-scale destruction: diff --git a/pkg/client/dependencies.go b/pkg/client/dependencies.go index 075b48d..5b72158 100644 --- a/pkg/client/dependencies.go +++ b/pkg/client/dependencies.go @@ -41,6 +41,9 @@ type Dependency struct { // Arguments passed to dependent resource Args map[string]string `json:"args,omitempty"` + + // map of variable name -> list expression. New dependencies are generated by replication and iteration over those lists + GenerateFor map[string]string `json:"generateFor,omitempty"` } // DependencyList is a k8s object representing list of dependencies diff --git a/pkg/client/flows.go b/pkg/client/flows.go index 04b4251..55df049 100644 --- a/pkg/client/flows.go +++ b/pkg/client/flows.go @@ -46,6 +46,9 @@ type Flow struct { // can only be triggered by other flows (including DEFAULT flow which is exported by-default) Exported bool `json:"exported,omitempty"` + // Flow replicas must be deployed sequentially, one by one + Sequential bool `json:"sequential,omitempty"` + // Parameters that the flow can accept (i.e. valid inputs for the flow) Parameters map[string]FlowParameter `json:"parameters,omitempty"` diff --git a/pkg/interfaces/interfaces.go b/pkg/interfaces/interfaces.go index 384c010..929d627 100644 --- a/pkg/interfaces/interfaces.go +++ b/pkg/interfaces/interfaces.go @@ -82,7 +82,6 @@ type GraphContext interface { Scheduler() Scheduler GetArg(string) string Graph() DependencyGraph - Dependency() *client.Dependency } // DependencyGraphOptions contains all the input required to build a dependency graph diff --git a/pkg/resources/flow.go b/pkg/resources/flow.go index 564801f..9b11f84 100644 --- a/pkg/resources/flow.go +++ b/pkg/resources/flow.go @@ -15,9 +15,7 @@ package resources import ( - "fmt" "log" - "strings" "github.com/Mirantis/k8s-AppController/pkg/client" "github.com/Mirantis/k8s-AppController/pkg/interfaces" @@ -29,7 +27,6 @@ type flow struct { flow *client.Flow context interfaces.GraphContext originalName string - instanceName string currentGraph interfaces.DependencyGraph } @@ -52,19 +49,12 @@ func (flowTemplateFactory) Kind() string { func (flowTemplateFactory) New(def client.ResourceDefinition, c client.Interface, gc interfaces.GraphContext) interfaces.Resource { newFlow := parametrizeResource(def.Flow, gc, []string{"*"}).(*client.Flow) - dep := gc.Dependency() - var depName string - if dep != nil { - depName = strings.Replace(dep.Name, dep.GenerateName, "", 1) - } - return report.SimpleReporter{ BaseResource: &flow{ Base: Base{def.Meta}, flow: newFlow, context: gc, originalName: def.Flow.Name, - instanceName: fmt.Sprintf("%s%s", depName, gc.GetArg("AC_NAME")), }} } @@ -88,20 +78,13 @@ func (f *flow) buildDependencyGraph(replicaCount int, silent bool) (interfaces.D args[arg] = val } } - fixedNumberOfReplicas := false - if replicaCount > 0 { - fixedNumberOfReplicas = f.context.Graph().Options().FixedNumberOfReplicas - } else if replicaCount == 0 { - fixedNumberOfReplicas = true - replicaCount = -1 - } options := interfaces.DependencyGraphOptions{ FlowName: f.originalName, Args: args, - FlowInstanceName: f.instanceName, + FlowInstanceName: f.context.GetArg("AC_ID"), ReplicaCount: replicaCount, Silent: silent, - FixedNumberOfReplicas: fixedNumberOfReplicas, + FixedNumberOfReplicas: true, } graph, err := f.context.Scheduler().BuildDependencyGraph(options) @@ -141,7 +124,7 @@ func (f *flow) Create() error { // Delete is called during dlow destruction which can happen only once while Create ensures that at least one flow // replica exists, and as such can be called any number of times func (f flow) Delete() error { - graph, err := f.buildDependencyGraph(-1, false) + graph, err := f.buildDependencyGraph(0, false) if err != nil { return err } @@ -155,7 +138,7 @@ func (f flow) Status(meta map[string]string) (interfaces.ResourceStatus, error) graph := f.currentGraph if graph == nil { var err error - graph, err = f.buildDependencyGraph(0, true) + graph, err = f.buildDependencyGraph(-1, true) if err != nil { return interfaces.ResourceError, err } diff --git a/pkg/scheduler/dependency_graph.go b/pkg/scheduler/dependency_graph.go index 8dab597..30308bd 100644 --- a/pkg/scheduler/dependency_graph.go +++ b/pkg/scheduler/dependency_graph.go @@ -19,6 +19,7 @@ import ( "fmt" "log" "sort" + "strconv" "strings" "time" @@ -40,12 +41,12 @@ type dependencyGraph struct { } type graphContext struct { - args map[string]string - graph *dependencyGraph - scheduler *scheduler - flow *client.Flow - dependency *client.Dependency - replica string + args map[string]string + graph *dependencyGraph + scheduler *scheduler + flow *client.Flow + id string + replica string } var _ interfaces.GraphContext = &graphContext{} @@ -62,6 +63,8 @@ func (gc graphContext) GetArg(name string) string { return gc.replica case "AC_FLOW_NAME": return gc.flow.Name + case "AC_ID": + return gc.id default: val, ok := gc.args[name] if ok { @@ -84,11 +87,6 @@ func (gc graphContext) Graph() interfaces.DependencyGraph { return gc.graph } -// Dependency returns Dependency for which child is the resource being created with this context -func (gc graphContext) Dependency() *client.Dependency { - return gc.dependency -} - // newScheduledResourceFor returns new scheduled resource for given resource in init state func newScheduledResourceFor(r interfaces.Resource, suffix string, context *graphContext, existing bool) *ScheduledResource { return &ScheduledResource{ @@ -159,7 +157,9 @@ func groupDependencies(dependencies []client.Dependency, defaultFlow = []client.Dependency{} addResource := func(name string) { if !strings.HasPrefix(name, "flow/") && !isDependant[name] { - defaultFlow = append(defaultFlow, client.Dependency{Parent: defaultFlowName, Child: name}) + dep := client.Dependency{Parent: defaultFlowName, Child: name} + dep.Name = name + defaultFlow = append(defaultFlow, dep) isDependant[name] = true } } @@ -328,11 +328,11 @@ func getArgFunc(gc interfaces.GraphContext) func(string) string { func (sched *scheduler) prepareContext(parentContext *graphContext, dependency *client.Dependency, replica string) *graphContext { context := &graphContext{ - scheduler: sched, - graph: parentContext.graph, - flow: parentContext.flow, - replica: replica, - dependency: dependency, + scheduler: sched, + graph: parentContext.graph, + flow: parentContext.flow, + replica: replica, + id: getVertexID(dependency, replica), } context.args = make(map[string]string) @@ -344,6 +344,15 @@ func (sched *scheduler) prepareContext(parentContext *graphContext, dependency * return context } +func getVertexID(dependency *client.Dependency, replica string) string { + var depName string + if dependency != nil { + depName = strings.Replace(dependency.Name, dependency.GenerateName, "", 1) + } + depName += replica + return depName +} + func (sched *scheduler) updateContext(context, parentContext *graphContext, dependency client.Dependency) { for key, value := range dependency.Args { context.args[key] = copier.EvaluateString(value, parentContext.GetArg) @@ -661,29 +670,130 @@ func (sched *scheduler) BuildDependencyGraph(options interfaces.DependencyGraphO return depGraph, nil } +func listDependencies(dependencies map[string][]client.Dependency, parent string, flow *client.Flow, + useDestructionSelector bool, context *graphContext) []client.Dependency { + + deps := filterDependencies(dependencies, parent, flow, useDestructionSelector) + var result []client.Dependency + for _, dep := range deps { + if len(dep.GenerateFor) == 0 { + result = append(result, dep) + continue + } + + var keys []string + for k := range dep.GenerateFor { + keys = append(keys, k) + } + sort.Strings(keys) + lists := make([][]string, len(dep.GenerateFor)) + for i, key := range keys { + lists[i] = expandListExpression(copier.EvaluateString(dep.GenerateFor[key], getArgFunc(context))) + } + for n, combination := range permute(lists) { + newArgs := make(map[string]string, len(dep.Args)+len(keys)) + for k, v := range dep.Args { + newArgs[k] = v + } + for i, key := range keys { + newArgs[key] = combination[i] + } + depCopy := dep + depCopy.Args = newArgs + depCopy.Name += strconv.Itoa(n + 1) + result = append(result, depCopy) + } + } + return result +} + +func permute(variants [][]string) [][]string { + switch len(variants) { + case 0: + return variants + case 1: + var result [][]string + for _, v := range variants[0] { + result = append(result, []string{v}) + } + return result + default: + var result [][]string + for _, tail := range variants[len(variants)-1] { + for _, p := range permute(variants[:len(variants)-1]) { + result = append(result, append(p, tail)) + } + } + return result + } +} + +func expandListExpression(expr string) []string { + var result []string + for _, part := range strings.Split(expr, ",") { + part = strings.TrimSpace(part) + if part == "" { + continue + } + + isRange := true + var from, to int + + rangeParts := strings.SplitN(part, "..", 2) + if len(rangeParts) != 2 { + isRange = false + } + + var err error + if isRange { + from, err = strconv.Atoi(rangeParts[0]) + if err != nil { + isRange = false + } + } + if isRange { + to, err = strconv.Atoi(rangeParts[1]) + if err != nil { + isRange = false + } + } + + if isRange { + for i := from; i <= to; i++ { + result = append(result, strconv.Itoa(i)) + } + } else { + result = append(result, part) + } + } + return result +} + +type interimGraphVertex struct { + dependency client.Dependency + scheduledResource *ScheduledResource + parentContext *graphContext +} + func (sched *scheduler) fillDependencyGraph(rootContext *graphContext, resDefs map[string]client.ResourceDefinition, dependencies map[string][]client.Dependency, flow *client.Flow, replicas []client.Replica, useDestructionSelector bool) error { - type Block struct { - dependency client.Dependency - scheduledResource *ScheduledResource - parentContext *graphContext - } - blocks := map[string][]*Block{} + var vertices [][]interimGraphVertex silent := rootContext.graph.Options().Silent for _, replica := range replicas { + var replicaVertices []interimGraphVertex replicaName := replica.ReplicaName() replicaContext := sched.prepareContext(rootContext, nil, replicaName) queue := list.New() - queue.PushFront(&Block{dependency: client.Dependency{Child: "flow/" + flow.Name}}) + queue.PushFront(interimGraphVertex{dependency: client.Dependency{Child: "flow/" + flow.Name}}) for e := queue.Front(); e != nil; e = e.Next() { - parent := e.Value.(*Block) + parent := e.Value.(interimGraphVertex) - deps := filterDependencies(dependencies, parent.dependency.Child, flow, useDestructionSelector) + deps := listDependencies(dependencies, parent.dependency.Child, flow, useDestructionSelector, replicaContext) for _, dep := range deps { if parent.scheduledResource != nil && strings.HasPrefix(parent.scheduledResource.Key(), "flow/") { @@ -707,44 +817,149 @@ func (sched *scheduler) fillDependencyGraph(rootContext *graphContext, } sr.usedInReplicas = []string{replicaName} - block := &Block{ + vertex := interimGraphVertex{ scheduledResource: sr, dependency: dep, parentContext: parentContext, } - - blocks[dep.Child] = append(blocks[dep.Child], block) + replicaVertices = append(replicaVertices, vertex) if parent.scheduledResource != nil { sr.Requires = append(sr.Requires, parent.scheduledResource.Key()) parent.scheduledResource.RequiredBy = append(parent.scheduledResource.RequiredBy, sr.Key()) sr.Meta[parent.dependency.Child] = dep.Meta } - queue.PushBack(block) + queue.PushBack(vertex) } } - for _, block := range blocks { - for _, entry := range block { - key := entry.scheduledResource.Key() - existingSr := rootContext.graph.graph[key] - if existingSr == nil { - if !silent { - log.Printf("Adding resource %s to the dependency graph flow %s", key, flow.Name) - } - rootContext.graph.graph[key] = entry.scheduledResource - } else { - sched.updateContext(existingSr.context, entry.parentContext, entry.dependency) - existingSr.Requires = append(existingSr.Requires, entry.scheduledResource.Requires...) - existingSr.RequiredBy = append(existingSr.RequiredBy, entry.scheduledResource.RequiredBy...) - existingSr.usedInReplicas = append(existingSr.usedInReplicas, entry.scheduledResource.usedInReplicas...) - for metaKey, metaValue := range entry.scheduledResource.Meta { - existingSr.Meta[metaKey] = metaValue - } + vertices = append(vertices, replicaVertices) + } + + if flow.Sequential { + sched.concatenateReplicas(vertices, rootContext, rootContext.graph.Options()) + } else { + sched.mergeReplicas(vertices, rootContext, rootContext.graph.Options()) + } + return nil +} + +func (sched *scheduler) mergeReplicas(vertices [][]interimGraphVertex, gc *graphContext, + options interfaces.DependencyGraphOptions) { + + for _, replicaVertices := range vertices { + sched.mergeInterimGraphVertices(replicaVertices, gc.graph.graph, options) + } +} + +func (sched *scheduler) concatenateReplicas(vertices [][]interimGraphVertex, gc *graphContext, + options interfaces.DependencyGraphOptions) { + graph := gc.graph.graph + var previousReplicaGraph map[string]*ScheduledResource + for i, replicaVertices := range vertices { + replicaGraph := map[string]*ScheduledResource{} + sched.mergeInterimGraphVertices(replicaVertices, replicaGraph, options) + + if i > 0 { + correctDuplicateResources(graph, replicaGraph, i) + + for _, leafName := range getLeafs(previousReplicaGraph) { + for _, rootName := range getRoots(replicaGraph) { + root := replicaGraph[rootName] + leaf := previousReplicaGraph[leafName] + root.Requires = append(root.Requires, leafName) + leaf.RequiredBy = append(leaf.RequiredBy, rootName) } } } + previousReplicaGraph = replicaGraph + for key, value := range replicaGraph { + graph[key] = value + } + } +} + +func correctDuplicateResources(existingGraph, newGraph map[string]*ScheduledResource, index int) { + toReplace := map[string]*ScheduledResource{} + for key, sr := range newGraph { + if existingGraph[key] != nil { + toReplace[key] = sr + } + } + for key, sr := range toReplace { + sr.context.id = existingGraph[key].context.id + j := index + 1 + suffix := sr.suffix + for { + sr.suffix = fmt.Sprintf("%s #%d", suffix, j) + if existingGraph[sr.Key()] == nil { + break + } + j++ + } + for _, rKey := range sr.RequiredBy { + requires := newGraph[rKey].Requires + for i, rKey2 := range requires { + if rKey2 == key { + requires[i] = sr.Key() + break + } + } + } + for _, rKey := range sr.Requires { + requiredBy := newGraph[rKey].RequiredBy + for i, rKey2 := range requiredBy { + if rKey2 == key { + requiredBy[i] = sr.Key() + break + } + } + } + delete(newGraph, key) + newGraph[sr.Key()] = sr + } +} + +func getRoots(graph map[string]*ScheduledResource) []string { + var result []string + for key, sr := range graph { + if len(sr.Requires) == 0 { + result = append(result, key) + } + } + return result +} + +func getLeafs(graph map[string]*ScheduledResource) []string { + var result []string + for key, sr := range graph { + if len(sr.RequiredBy) == 0 { + result = append(result, key) + } + } + return result +} + +func (sched *scheduler) mergeInterimGraphVertices(vertices []interimGraphVertex, graph map[string]*ScheduledResource, + options interfaces.DependencyGraphOptions) { + + for _, entry := range vertices { + key := entry.scheduledResource.Key() + existingSr := graph[key] + if existingSr == nil { + if !options.Silent { + log.Printf("Adding resource %s to the dependency graph flow %s", key, options.FlowName) + } + graph[key] = entry.scheduledResource + } else { + sched.updateContext(existingSr.context, entry.parentContext, entry.dependency) + existingSr.Requires = append(existingSr.Requires, entry.scheduledResource.Requires...) + existingSr.RequiredBy = append(existingSr.RequiredBy, entry.scheduledResource.RequiredBy...) + existingSr.usedInReplicas = append(existingSr.usedInReplicas, entry.scheduledResource.usedInReplicas...) + for metaKey, metaValue := range entry.scheduledResource.Meta { + existingSr.Meta[metaKey] = metaValue + } + } } - return nil } // getResourceDestructors builds a list of functions, each of them delete one of replica resources diff --git a/pkg/scheduler/dependency_graph_test.go b/pkg/scheduler/dependency_graph_test.go index 9cc7068..07ae319 100644 --- a/pkg/scheduler/dependency_graph_test.go +++ b/pkg/scheduler/dependency_graph_test.go @@ -15,6 +15,7 @@ package scheduler import ( + "strings" "testing" "github.com/Mirantis/k8s-AppController/pkg/client" @@ -319,3 +320,76 @@ func TestDependencyToFlowMatching(t *testing.T) { } } } + +// TestPermute tests permute function +func TestPermute(t *testing.T) { + alphabets := [][]string{ + {"1", "2", "3"}, + {"+", "-"}, + {"a", "b"}, + {"="}, + } + + expected := map[string]bool{ + "1+a=": true, + "1+b=": true, + "1-a=": true, + "1-b=": true, + "2+a=": true, + "2+b=": true, + "2-a=": true, + "2-b=": true, + "3+a=": true, + "3+b=": true, + "3-a=": true, + "3-b=": true, + } + permutations := permute(alphabets) + for _, combination := range permutations { + combinationStr := strings.Join(combination, "") + if !expected[combinationStr] { + t.Errorf("unexpected combination %s", combinationStr) + } else { + delete(expected, combinationStr) + } + } + if len(expected) != 0 { + t.Error("not all combinations were generated") + } + + alphabets = append(alphabets, make([]string, 0)) + if len(permute(alphabets)) != 0 { + t.Error("empty alphabet didin't result in empty permutation list") + } +} + +// TestExpendListExpression tests list expression translation to list of strings +func TestExpendListExpression(t *testing.T) { + table := map[string][]string{ + "1": {"1"}, + "1..5": {"1", "2", "3", "4", "5"}, + "2..-1": {}, + "a, b": {"a", "b"}, + "a, b, 2..4": {"a", "b", "2", "3", "4"}, + "-1..1, 2..4, x": {"-1", "0", "1", "2", "3", "4", "x"}, + "a..b": {"a..b"}, + "..": {".."}, + "1...3": {"1...3"}, + "1..b": {"1..b"}, + "a..b, 1..3": {"a..b", "1", "2", "3"}, + "a..b, c..d": {"a..b", "c..d"}, + "": {}, + } + for expr, expected := range table { + result := expandListExpression(expr) + if len(result) != len(expected) { + t.Errorf("unexpected result length for expression %s: %d != %d", expr, len(result), len(expected)) + } else { + for i := range expected { + if expected[i] != result[i] { + t.Errorf("invalid entry %d for expression %s: %s != %s", i, expr, expected[i], result[i]) + } + } + } + } +} diff --git a/pkg/scheduler/flows_test.go b/pkg/scheduler/flows_test.go index 464865c..1bc48aa 100644 --- a/pkg/scheduler/flows_test.go +++ b/pkg/scheduler/flows_test.go @@ -1367,3 +1367,184 @@ func TestSyncOnVoidResource(t *testing.T) { depGraph.Deploy(stopChan) ensureReplicas(c, t, replicaCount, replicaCount) } + +// TestConsumeReplicatedFlow tests case, where each replica of the outer flow consumes N replicas of another flow +// by replicating dependency which leads to the consumed flow +func TestConsumeReplicatedFlow(t *testing.T) { + dep := mocks.MakeDependency("flow/outer", "flow/inner/$AC_NAME-$i", "flow=outer") + dep.GenerateFor = map[string]string{"i": "1..3"} + + c := mocks.NewClient( + mocks.MakeFlow("inner"), + mocks.MakeFlow("outer"), + mocks.MakeResourceDefinition("job/ready-$AC_NAME"), + dep, + mocks.MakeDependency("flow/inner", "job/ready-$AC_NAME", "flow=inner"), + ) + depGraph, err := New(c, nil, 0).BuildDependencyGraph( + interfaces.DependencyGraphOptions{ReplicaCount: 2, FlowName: "outer"}) + if err != nil { + t.Fatal(err) + } + stopChan := make(chan struct{}) + depGraph.Deploy(stopChan) + + ensureReplicas(c, t, 2*3, 3*2+2) +} + +// TestComplexDependencyReplication tests complex dependency generation over two list expressions +func TestComplexDependencyReplication(t *testing.T) { + dep := mocks.MakeDependency("flow/test", "job/ready-$x-$y", "flow=test") + dep.GenerateFor = map[string]string{ + "x": "1..3, 8..9", + "y": "a, b", + } + + c := mocks.NewClient( + mocks.MakeFlow("test"), + mocks.MakeResourceDefinition("job/ready-$x-$y"), + dep, + ) + depGraph, err := New(c, nil, 0).BuildDependencyGraph( + interfaces.DependencyGraphOptions{ReplicaCount: 1, FlowName: "test"}) + if err != nil { + t.Fatal(err) + } + stopChan := make(chan struct{}) + depGraph.Deploy(stopChan) + + expectedJobNames := map[string]bool{ + "ready-1-a": true, + "ready-2-a": true, + "ready-3-a": true, + "ready-8-a": true, + "ready-9-a": true, + "ready-1-b": true, + "ready-2-b": true, + "ready-3-b": true, + "ready-8-b": true, + "ready-9-b": true, + } + jobs := ensureReplicas(c, t, len(expectedJobNames), 1) + for _, j := range jobs { + if !expectedJobNames[j.Name] { + t.Errorf("unexpected job %s", j.Name) + } else { + delete(expectedJobNames, j.Name) + } + } + if len(expectedJobNames) != 0 { + t.Error("not all jobs were found") + } +} + +// TestDynamicDependencyReplication tests that variables can be used in list expressions used for dependency replication +func TestDynamicDependencyReplication(t *testing.T) { + flow := mocks.MakeFlow("test") + flow.Flow.Parameters = map[string]client.FlowParameter{ + "replicaCount": mocks.MakeFlowParameter("1"), + } + + dep := mocks.MakeDependency("flow/test", "job/ready-$index", "flow=test") + dep.GenerateFor = map[string]string{ + "index": "1..$replicaCount", + } + + c := mocks.NewClient( + flow, + mocks.MakeResourceDefinition("job/ready-$index"), + dep, + ) + depGraph, err := New(c, nil, 0).BuildDependencyGraph( + interfaces.DependencyGraphOptions{ReplicaCount: 1, FlowName: "test", + Args: map[string]string{"replicaCount": "7"}}) + if err != nil { + t.Fatal(err) + } + stopChan := make(chan struct{}) + depGraph.Deploy(stopChan) + + ensureReplicas(c, t, 7, 1) +} + +// TestSequentialReplication tests that resources of sequentially replicated flows create in right order +func TestSequentialReplication(t *testing.T) { + replicaCount := 3 + flow := mocks.MakeFlow("test") + flow.Flow.Sequential = true + + c, fake := mocks.NewClientWithFake( + flow, + mocks.MakeResourceDefinition("pod/ready-$AC_NAME"), + mocks.MakeResourceDefinition("secret/secret"), + mocks.MakeResourceDefinition("job/ready-$AC_NAME"), + mocks.MakeDependency("flow/test", "pod/ready-$AC_NAME", "flow=test"), + mocks.MakeDependency("pod/ready-$AC_NAME", "secret/secret", "flow=test"), + mocks.MakeDependency("secret/secret", "job/ready-$AC_NAME", "flow=test"), + ) + + stopChan := make(chan struct{}) + var deployed []string + fake.PrependReactor("create", "*", + func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + resource := action.GetResource().Resource + if resource != "replica" { + deployed = append(deployed, resource) + } + + return false, nil, nil + }) + + depGraph, err := New(c, nil, 0).BuildDependencyGraph( + interfaces.DependencyGraphOptions{ReplicaCount: replicaCount, FlowName: "test"}) + if err != nil { + t.Fatal(err) + } + + graph := depGraph.(*dependencyGraph).graph + if len(graph) != 3*replicaCount { + t.Error("wrong dependency graph length") + } + + depGraph.Deploy(stopChan) + expected := []string{"pods", "secrets", "jobs", "pods", "jobs", "pods", "jobs"} + if len(deployed) != len(expected) { + t.Fatal("invalid resource sequence", deployed) + } + for i, r := range deployed { + if expected[i] != r { + t.Fatal("invalid resource sequence") + } + } + + ensureReplicas(c, t, replicaCount, replicaCount) +} + +// TestSequentialReplicationWithSharedFlow tests that flow consumed as a resource shared by replicas of +// sequentially replicated flow deployed only once +func TestSequentialReplicationWithSharedFlow(t *testing.T) { + replicaCount := 3 + flow := mocks.MakeFlow("outer") + flow.Flow.Sequential = true + + c := mocks.NewClient( + flow, + mocks.MakeFlow("inner"), + mocks.MakeResourceDefinition("job/ready-a$AC_NAME"), + mocks.MakeResourceDefinition("job/ready-b$AC_NAME"), + mocks.MakeDependency("flow/outer", "flow/inner", "flow=outer"), + mocks.MakeDependency("flow/inner", "job/ready-a$AC_NAME", "flow=outer"), + mocks.MakeDependency("flow/inner", "job/ready-b$AC_NAME", "flow=inner"), + ) + + stopChan := make(chan struct{}) + + depGraph, err := New(c, nil, 0).BuildDependencyGraph( + interfaces.DependencyGraphOptions{ReplicaCount: replicaCount, FlowName: "outer"}) + if err != nil { + t.Fatal(err) + } + + depGraph.Deploy(stopChan) + ensureReplicas(c, t, replicaCount+1, replicaCount+1) +}