Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ linters:
- unused
- usestdlibvars
- whitespace
- wsl_v5
settings:
depguard:
rules:
Expand Down
3 changes: 3 additions & 0 deletions cmd/addon-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func main() {
kpApp.Action(func(_ *kingpin.ParseContext) error {
klogtolog.InitAdapter(shapp.DebugKubernetesAPI, logger.Named("klog"))
stdliblogtolog.InitAdapter(logger)

return nil
})

Expand Down Expand Up @@ -85,6 +86,7 @@ func start(logger *log.Logger) func(_ *kingpin.ParseContext) error {
if os.Getenv("ADDON_OPERATOR_HA") == "true" {
operator.Logger.Info("Addon-operator is starting in HA mode")
runHAMode(ctx, operator)

return nil
}

Expand Down Expand Up @@ -179,6 +181,7 @@ func runHAMode(ctx context.Context, operator *addon_operator.AddonOperator) {
go func() {
<-ctx.Done()
log.Info("Context canceled received")

if err := syscall.Kill(1, syscall.SIGUSR2); err != nil {
operator.Logger.Fatal("Couldn't shutdown addon-operator", log.Err(err))
}
Expand Down
1 change: 1 addition & 0 deletions cmd/post-renderer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func main() {
fmt.Fprintf(os.Stderr, "couldn't read input from stdin: %s", err)
os.Exit(1)
}

buf := bytes.NewBuffer(inputBytes)

renderer := post_renderer.NewPostRenderer(map[string]string{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type podSpecFilteredObj v1.PodSpec

func ObjFilter(obj *unstructured.Unstructured) (gohook.FilterResult, error) {
pod := &v1.Pod{}

err := sdk.FromUnstructured(obj, pod)
if err != nil {
return nil, err
Expand Down
3 changes: 3 additions & 0 deletions pkg/addon-operator/admission_http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (as *AdmissionServer) start(ctx context.Context) {

go func() {
cert := path.Join(as.certsDir, "tls.crt")

key := path.Join(as.certsDir, "tls.key")
if err := srv.ListenAndServeTLS(cert, key); err != nil {
if errors.Is(err, http.ErrServerClosed) {
Expand All @@ -72,10 +73,12 @@ func (as *AdmissionServer) start(ctx context.Context) {
<-ctx.Done()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)

defer func() {
// extra handling here
cancel()
}()

if err := srv.Shutdown(ctx); err != nil {
log.Fatal("Server Shutdown Failed", log.Err(err))
}
Expand Down
1 change: 1 addition & 0 deletions pkg/addon-operator/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (op *AddonOperator) bootstrap() error {
// Initialize the debug server for troubleshooting and monitoring
// TODO: rewrite shapp global variables to the addon-operator one
var err error

op.DebugServer, err = shell_operator.RunDefaultDebugServer(shapp.DebugUnixSocket, shapp.DebugHttpServerAddr, op.Logger.Named("debug-server"))
if err != nil {
log.Error("Fatal: start Debug server", log.Err(err))
Expand Down
8 changes: 8 additions & 0 deletions pkg/addon-operator/converge/converge.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (cs *ConvergeState) SetOnConvergeFinish(callback func()) {
func (cs *ConvergeState) SetFirstRunPhase(ph FirstConvergePhase) {
cs.phaseMu.Lock()
defer cs.phaseMu.Unlock()

cs.firstRunPhase = ph
if ph == FirstDone {
close(cs.FirstRunDoneC)
Expand All @@ -70,12 +71,14 @@ func (cs *ConvergeState) SetFirstRunPhase(ph FirstConvergePhase) {
func (cs *ConvergeState) GetFirstRunPhase() FirstConvergePhase {
cs.phaseMu.RLock()
defer cs.phaseMu.RUnlock()

return cs.firstRunPhase
}

func (cs *ConvergeState) SetPhase(ph ConvergePhase) {
cs.phaseMu.Lock()
defer cs.phaseMu.Unlock()

cs.phase = ph

if ph == RunBeforeAll && cs.onConvergeStart != nil {
Expand All @@ -90,6 +93,7 @@ func (cs *ConvergeState) SetPhase(ph ConvergePhase) {
func (cs *ConvergeState) GetPhase() ConvergePhase {
cs.phaseMu.RLock()
defer cs.phaseMu.RUnlock()

return cs.phase
}

Expand Down Expand Up @@ -131,6 +135,7 @@ func IsConvergeTask(t sh_task.Task) bool {
return true
}
}

return false
}

Expand All @@ -140,6 +145,7 @@ func IsFirstConvergeTask(t sh_task.Task) bool {
case task.ModulePurge, task.DiscoverHelmReleases, task.GlobalHookEnableKubernetesBindings, task.GlobalHookEnableScheduleBindings:
return true
}

return false
}

Expand All @@ -152,6 +158,7 @@ func NewConvergeModulesTask(description string, convergeEvent ConvergeEvent, log
}).
WithQueuedAt(time.Now())
convergeTask.SetProp(ConvergeEventProp, convergeEvent)

return convergeTask
}

Expand All @@ -164,5 +171,6 @@ func NewApplyKubeConfigValuesTask(description string, logLabels map[string]strin
GlobalValuesChanged: globalValuesChanged,
}).
WithQueuedAt(time.Now())

return convergeTask
}
13 changes: 13 additions & 0 deletions pkg/addon-operator/debug_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (op *AddonOperator) RegisterDebugGlobalRoutes(dbgSrv *debug.Server) {
func(_ *http.Request) (any, error) {
kubeHookNames := op.ModuleManager.GetGlobalHooksInOrder(types.OnKubernetesEvent)
snapshots := make(map[string]any)

for _, hName := range kubeHookNames {
h := op.ModuleManager.GetGlobalHook(hName)
snapshots[hName] = h.GetHookController().SnapshotsDump()
Expand All @@ -68,26 +69,30 @@ func (op *AddonOperator) RegisterDebugGraphRoutes(dbgSrv *debug.Server) {
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))

return
}

w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(dotDesc)

return
}

image, err := op.ModuleManager.GetGraphImage(req.Context())
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = fmt.Fprintf(w, "couldn't get graph's image: %s", err)

return
}

buf := new(bytes.Buffer)
if err = png.Encode(buf, image); err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(fmt.Errorf("couldn't encode png graph's image").Error()))

return
}

Expand All @@ -101,6 +106,7 @@ func (op *AddonOperator) RegisterDebugModuleRoutes(dbgSrv *debug.Server) {
dbgSrv.RegisterHandler(http.MethodGet, "/module/list.{format:(json|yaml|text)}", func(_ *http.Request) (any, error) {
mods := op.ModuleManager.GetEnabledModuleNames()
sort.Strings(mods)

return map[string][]string{"enabledModules": mods}, nil
})

Expand All @@ -110,6 +116,7 @@ func (op *AddonOperator) RegisterDebugModuleRoutes(dbgSrv *debug.Server) {

withGlobal := false
withGlobalStr := r.URL.Query().Get("global")

v, err := strconv.ParseBool(withGlobalStr)
if err == nil {
withGlobal = v
Expand Down Expand Up @@ -149,11 +156,13 @@ func (op *AddonOperator) RegisterDebugModuleRoutes(dbgSrv *debug.Server) {

dbgSrv.RegisterHandler(http.MethodGet, "/module/{name}/render", func(r *http.Request) (any, error) {
modName := chi.URLParam(r, "name")

debugMode, err := strconv.ParseBool(r.URL.Query().Get("debug"))
if err != nil {
// if empty or unparsable - set false
debugMode = false
}

diffMode, err := strconv.ParseBool(r.URL.Query().Get("diff"))
if err != nil {
// if empty or unparsable - set false
Expand Down Expand Up @@ -228,6 +237,7 @@ func (op *AddonOperator) RegisterDebugModuleRoutes(dbgSrv *debug.Server) {
defer differ.TearDown()

const maxRetries = 4

buffer := new(bytes.Buffer)
printer := diff.Printer{}
diffProgram := &diff.DiffProgram{
Expand All @@ -250,6 +260,7 @@ func (op *AddonOperator) RegisterDebugModuleRoutes(dbgSrv *debug.Server) {
if !apierrors.IsNotFound(err) {
return err
}

info.Object = nil
}

Expand Down Expand Up @@ -325,6 +336,7 @@ func (op *AddonOperator) RegisterDebugModuleRoutes(dbgSrv *debug.Server) {
}

mHooks := m.GetHooks()

snapshots := make(map[string]any)
for _, h := range mHooks {
snapshots[h.GetName()] = h.GetHookController().SnapshotsDump()
Expand All @@ -347,6 +359,7 @@ func (op *AddonOperator) RegisterDiscoveryRoute(dbgSrv *debug.Server) {
_, _ = fmt.Fprintf(buf, "%s %s\n", method, route)
return nil
}

return nil
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/addon-operator/diff/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func NewDiffer(from, to string) (*Differ, error) {
if err != nil {
return nil, err
}

return &Differ{Differ: d}, nil
}

Expand Down Expand Up @@ -60,6 +61,7 @@ func (ow *objectWrapper) Merged() (runtime.Object, error) {
if err != nil {
return nil, err
}

return ow.preprocessObject(obj), nil
}

Expand Down Expand Up @@ -97,7 +99,9 @@ func (p *Printer) Print(obj runtime.Object, w io.Writer) error {
if err != nil {
return err
}

_, err = w.Write(data)

return err
}

Expand All @@ -106,9 +110,11 @@ func toUnstructured(obj runtime.Object) (*unstructured.Unstructured, error) {
if unstr, ok := obj.(*unstructured.Unstructured); ok {
return unstr, nil
}

data, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
return nil, err
}

return &unstructured.Unstructured{Object: data}, nil
}
1 change: 1 addition & 0 deletions pkg/addon-operator/ensure_crds.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func (op *AddonOperator) EnsureCRDs(module *modules.BasicModule) ([]string, erro
if cp == nil {
return nil, nil
}

if err := cp.Run(context.TODO()); err != nil {
return nil, err
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/addon-operator/handler_module_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
func (op *AddonOperator) StartModuleManagerEventHandler() {
go func() {
logEntry := op.Logger.With(pkg.LogKeyOperatorComponent, "handleManagerEvents")

for {
select {
case schedulerEvent := <-op.ModuleManager.SchedulerEventCh():
Expand Down Expand Up @@ -70,6 +71,7 @@ func (op *AddonOperator) StartModuleManagerEventHandler() {
pkg.LogKeyEventSource: "KubeConfigExtenderChanged",
}
eventLogEntry := utils.EnrichLoggerWithLabels(logEntry, logLabels)

switch event.Type {
case config.KubeConfigInvalid:
op.ModuleManager.SetKubeConfigValid(false)
Expand All @@ -81,6 +83,7 @@ func (op *AddonOperator) StartModuleManagerEventHandler() {
slog.Any(pkg.LogKeyModuleValuesChanged, event.ModuleValuesChanged),
slog.Any(pkg.LogKeyModuleEnabledStateChanged, event.ModuleEnabledStateChanged),
slog.Any(pkg.LogKeyModuleMaintenanceChanged, event.ModuleMaintenanceChanged))

if !op.ModuleManager.GetKubeConfigValid() {
eventLogEntry.Info("KubeConfig become valid")
}
Expand Down Expand Up @@ -113,7 +116,9 @@ func (op *AddonOperator) StartModuleManagerEventHandler() {
op.engine.TaskQueues.GetMain().CancelTaskDelay()
op.logTaskAdd(eventLogEntry, "KubeConfigExtender is updated, put first", kubeConfigTask)
}

eventLogEntry.Info("Kube config modification detected, ignore until starting first converge")

break
}

Expand Down Expand Up @@ -146,6 +151,7 @@ func (op *AddonOperator) StartModuleManagerEventHandler() {
if kubeConfigTask != nil {
op.engine.TaskQueues.GetMain().AddFirst(kubeConfigTask)
}

logEntry.Info("ConvergeModules: kube config modification detected, rerun all modules required")
op.engine.TaskQueues.GetMain().AddLast(convergeTask)
}
Expand All @@ -169,10 +175,12 @@ func (op *AddonOperator) StartModuleManagerEventHandler() {
if kubeConfigTask != nil {
reloadTasks := op.CreateReloadModulesTasks(modulesToRerun, kubeConfigTask.GetLogLabels(), "KubeConfig-Changed-Modules")
op.engine.TaskQueues.GetMain().AddFirst(kubeConfigTask)

if len(reloadTasks) > 0 {
for i := len(reloadTasks) - 1; i >= 0; i-- {
op.engine.TaskQueues.GetMain().AddAfter(kubeConfigTask.GetId(), reloadTasks[i])
}

logEntry.Info("ConvergeModules: kube config modification detected, append tasks to rerun modules",
slog.Int(pkg.LogKeyCount, len(reloadTasks)),
slog.Any(pkg.LogKeyModules, modulesToRerun))
Expand All @@ -199,6 +207,7 @@ func (op *AddonOperator) StartModuleManagerEventHandler() {
// helm reslease in unexpected state event
if HelmReleaseStatusEvent.UnexpectedStatus {
op.engine.MetricStorage.CounterAdd(metrics.ModulesHelmReleaseRedeployedTotal, 1.0, map[string]string{pkg.MetricKeyModule: HelmReleaseStatusEvent.ModuleName})

eventDescription = "HelmReleaseUnexpectedStatus"
additionalDescription = "unexpected helm release status"
} else {
Expand Down
6 changes: 6 additions & 0 deletions pkg/addon-operator/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (op *AddonOperator) checkLeaderReadiness(w http.ResponseWriter) {
if leader == "" {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("HA mode is enabled but no leader is elected\n"))

return
}

Expand All @@ -66,20 +67,23 @@ func (op *AddonOperator) checkLeaderReadiness(w http.ResponseWriter) {
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("HA mode is enabled but couldn't craft a request to the leader\n"))

return
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("HA mode is enabled but couldn't send a request to the leader\n"))

return
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte("HA mode is enabled but the leader's status response code isn't OK\n"))

return
}

Expand Down Expand Up @@ -129,8 +133,10 @@ func (op *AddonOperator) handleConvergeStatus(writer http.ResponseWriter, reques

if request.URL.Query().Get("output") == "json" {
writer.Header().Set("Content-Type", "application/json")

response := generateConvergeJSON(op.ConvergeState.GetFirstRunPhase(), convergeTasks)
_ = json.NewEncoder(writer).Encode(response)

return
}

Expand Down
Loading
Loading