diff --git a/compute/kubernetes/backend.go b/compute/kubernetes/backend.go index 7f5ebff4f..0b5b70c03 100644 --- a/compute/kubernetes/backend.go +++ b/compute/kubernetes/backend.go @@ -353,6 +353,115 @@ func (b *Backend) cleanResources(ctx context.Context, taskId string) error { return errs } +// terminalWaitingReasons lists container waiting reasons that will never self- +// resolve and should be treated as a permanent failure. +var terminalWaitingReasons = []string{ + "CreateContainerConfigError", + "InvalidImageName", + "CreateContainerError", +} + +// hasTerminalContainerWaitingError returns true if any pod belonging to the +// given job has a container stuck in a waiting state whose reason is known to +// be permanent (e.g. CreateContainerConfigError). These pods will never +// transition to a running state on their own so the task must be failed early +// rather than waiting for the Job's backoff limit to be exhausted. +func (b *Backend) hasTerminalContainerWaitingError(ctx context.Context, jobName string) (bool, string) { + pods, err := b.client.CoreV1().Pods(b.conf.Kubernetes.JobsNamespace).List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("job-name=%s", jobName), + }) + if err != nil { + b.log.Error("reconcile: listing pods for job", "taskID", jobName, "error", err) + return false, "" + } + for _, pod := range pods.Items { + for _, cs := range pod.Status.ContainerStatuses { + if cs.State.Waiting == nil { + continue + } + reason := cs.State.Waiting.Reason + for _, terminal := range terminalWaitingReasons { + if reason == terminal { + msg := cs.State.Waiting.Message + if msg == "" { + msg = reason + } + return true, fmt.Sprintf("%s: %s", reason, msg) + } + } + } + // Also check init containers + for _, cs := range pod.Status.InitContainerStatuses { + if cs.State.Waiting == nil { + continue + } + reason := cs.State.Waiting.Reason + for _, terminal := range terminalWaitingReasons { + if reason == terminal { + msg := cs.State.Waiting.Message + if msg == "" { + msg = reason + } + return true, fmt.Sprintf("%s: %s", reason, msg) + } + } + } + } + return false, "" +} + +// podWarningEventReasons lists the pod event reasons (from `kubectl describe pod`) +// that are safe to surface to users as system log entries. These are all +// informational failure signals with no risk of leaking sensitive runtime internals. +var podWarningEventReasons = []string{ + "Failed", // image pull failures, container start failures + "BackOff", // back-off restarting / pulling + "ErrImagePull", // explicit image-pull error + "ImagePullBackOff", // image pull back-off + "StartError", // OCI runtime / entrypoint errors +} + +// fetchPodWarningEvents returns Warning events for pods belonging to jobName +// whose reason is in podWarningEventReasons. The messages are deduplicated and +// returned as a newline-joined string. An empty string is returned when nothing +// useful is found. This surfaces the human-readable detail that appears in +// `kubectl describe pod` (e.g. "Error: secret \"foo\" not found") into the +// TES task system logs. +func (b *Backend) fetchPodWarningEvents(ctx context.Context, jobName string) string { + pods, err := b.client.CoreV1().Pods(b.conf.Kubernetes.JobsNamespace).List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("job-name=%s", jobName), + }) + if err != nil { + b.log.Error("reconcile: listing pods for warning events", "taskID", jobName, "error", err) + return "" + } + + seen := make(map[string]struct{}) + var messages []string + for _, pod := range pods.Items { + evList, err := b.client.CoreV1().Events(b.conf.Kubernetes.JobsNamespace).List(ctx, metav1.ListOptions{ + FieldSelector: fmt.Sprintf("involvedObject.name=%s,type=Warning", pod.Name), + }) + if err != nil { + b.log.Error("reconcile: listing events for pod", "taskID", jobName, "pod", pod.Name, "error", err) + continue + } + for _, ev := range evList.Items { + for _, allowed := range podWarningEventReasons { + if ev.Reason == allowed { + key := ev.Reason + ":" + ev.Message + if _, dup := seen[key]; !dup { + seen[key] = struct{}{} + messages = append(messages, fmt.Sprintf("%s: %s", ev.Reason, ev.Message)) + } + break + } + } + } + } + return strings.Join(messages, "\n") +} + // isJobSchedulingTimedOut returns true if all pods for the given job have been // stuck in Pending (with a scheduling condition) for longer than timeout. // It returns false if any pod has been scheduled, or if pod status cannot be determined. @@ -384,6 +493,67 @@ func (b *Backend) isJobSchedulingTimedOut(ctx context.Context, jobName string, t return false } +// getFailedPodInfo returns a human-readable summary of why the most recently +// terminated pod for jobName failed: "exit code N (Reason): Message". It is +// best-effort; an empty string is returned when no useful information is found. +func (b *Backend) getFailedPodInfo(ctx context.Context, jobName string) string { + pods, err := b.client.CoreV1().Pods(b.conf.Kubernetes.JobsNamespace).List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("job-name=%s", jobName), + }) + if err != nil { + b.log.Error("reconcile: listing pods for failed job", "taskID", jobName, "error", err) + return "" + } + + var latestFinish metav1.Time + var result string + for _, pod := range pods.Items { + for _, cs := range pod.Status.ContainerStatuses { + t := cs.State.Terminated + if t == nil { + continue + } + if t.ExitCode == 0 { + continue + } + if latestFinish.IsZero() || t.FinishedAt.After(latestFinish.Time) { + latestFinish = t.FinishedAt + reason := t.Reason + if reason == "" { + reason = "ExitError" + } + result = fmt.Sprintf("exit code %d (%s): %s", t.ExitCode, reason, t.Message) + } + } + } + return result +} + +// hasJobFailedCreateEvent returns true if the Kubernetes Job has emitted at +// least one FailedCreate event — meaning the Job controller tried to create a +// pod but was rejected before the pod object was ever persisted (e.g. due to +// Pod Security Admission enforcement). In that case there are no pod objects +// to inspect, so hasTerminalContainerWaitingError cannot detect the failure. +// The most recent event message is returned as the reason string. +func (b *Backend) hasJobFailedCreateEvent(ctx context.Context, jobName string) (bool, string) { + evList, err := b.client.CoreV1().Events(b.conf.Kubernetes.JobsNamespace).List(ctx, metav1.ListOptions{ + FieldSelector: fmt.Sprintf("involvedObject.name=%s,reason=FailedCreate", jobName), + }) + if err != nil { + b.log.Error("reconcile: listing events for job", "taskID", jobName, "error", err) + b.log.Debug("assuming no FailedCreate events due to error listing events", "taskID", jobName) + return false, "" + } + if len(evList.Items) == 0 { + b.log.Debug("no FailedCreate events found for job", "taskID", jobName) + return false, "" + } + // Return the message from the most recent event. + latest := evList.Items[len(evList.Items)-1] + b.log.Debug("found FailedCreate event for job", "taskID", jobName, "reason", latest.Message) + return true, latest.Message +} + // Reconcile loops through tasks and checks the status from Funnel's database // against the status reported by Kubernetes. This allows the backend to report // system error's that prevented the worker process from running. @@ -402,6 +572,7 @@ func (b *Backend) isJobSchedulingTimedOut(ctx context.Context, jobName string, t // // This loop is also used to cleanup successful jobs. func (b *Backend) reconcile(ctx context.Context, rate time.Duration, disableCleanup bool) { + fmt.Println("DEBUG: Starting Kubernetes backend reconciler loop with rate", rate) // Clears all resources that still exist from jobs that have run before this server started. // This handles two cases: // 1. Completed jobs (Succeeded/Failed) that were not cleaned up before the server restarted. @@ -453,6 +624,7 @@ func (b *Backend) reconcile(ctx context.Context, rate time.Duration, disableClea case <-ctx.Done(): return case <-ticker.C: + fmt.Println("DEBUG: Running Kubernetes backend reconciler loop!") // List worker jobs only (label selector excludes executor jobs and unrelated jobs). // Bug: If K8s Job is not created by the time reconciler runs, then the TES Task itself will be prematurely marked as SYSTEM_ERROR @@ -474,6 +646,7 @@ func (b *Backend) reconcile(ctx context.Context, rate time.Duration, disableClea // List non-terminal tasks from Funnel's database states := []tes.State{tes.State_QUEUED, tes.State_INITIALIZING, tes.State_RUNNING} for _, s := range states { + fmt.Println("DEBUG: Reconciling tasks with state", s) pageToken := "" for { lresp, err := b.database.ListTasks(ctx, &tes.ListTasksRequest{ @@ -489,6 +662,7 @@ func (b *Backend) reconcile(ctx context.Context, rate time.Duration, disableClea // Compare Funnel Tasks against K8s Jobs for _, task := range lresp.Tasks { + fmt.Println("DEBUG: Reconciling task", task.Id, "with state", task.State) taskID := task.Id // If the job exists, check its current status (Active, Succeeded, Failed) @@ -504,8 +678,53 @@ func (b *Backend) reconcile(ctx context.Context, rate time.Duration, disableClea jobName := j.Name status := j.Status + fmt.Println("DEBUG: Job status for task", taskID, "is Active:", status.Active, "Succeeded:", status.Succeeded, "Failed:", status.Failed) switch { case status.Active > 0: + // Check for container waiting errors that will never self-resolve + // (e.g. CreateContainerConfigError). These keep the Job Active + // indefinitely, so we must detect and fail them explicitly. + if terminal, reason := b.hasTerminalContainerWaitingError(ctx, jobName); terminal { + b.log.Debug("reconcile: worker pod has terminal container waiting error", "taskID", jobName, "reason", reason) + b.event.WriteEvent(ctx, events.NewState(jobName, tes.SystemError)) + errDetail := reason + if podEvents := b.fetchPodWarningEvents(ctx, jobName); podEvents != "" { + errDetail = fmt.Sprintf("%s\n%s", reason, podEvents) + } + b.event.WriteEvent(ctx, events.NewSystemLog( + jobName, 0, 0, "error", + "Kubernetes worker pod has a terminal container waiting error", + map[string]string{"error": errDetail}, + )) + if !disableCleanup { + if err := b.cleanResources(ctx, jobName); err != nil { + b.log.Error("failed to clean resources", "taskID", jobName, "error", err) + } + } + continue + } + + // Check for FailedCreate events on the Job itself. This catches + // cases where pod creation is rejected before a pod object is + // ever persisted (e.g. Pod Security Admission enforcement blocks + // the pod), so there are no pod container statuses to inspect. + b.log.Debug("checking for FailedCreate events on job", "taskID", jobName) + if failed, reason := b.hasJobFailedCreateEvent(ctx, jobName); failed { + b.log.Debug("reconcile: worker job has FailedCreate event", "taskID", jobName, "reason", reason) + b.event.WriteEvent(ctx, events.NewState(jobName, tes.SystemError)) + b.event.WriteEvent(ctx, events.NewSystemLog( + jobName, 0, 0, "error", + "Kubernetes worker job failed to create pod", + map[string]string{"error": reason}, + )) + if !disableCleanup { + if err := b.cleanResources(ctx, jobName); err != nil { + b.log.Error("failed to clean resources", "taskID", jobName, "error", err) + } + } + continue + } + // If a scheduling timeout is configured, check whether the worker // pod has been stuck in Pending beyond that duration. This catches // scheduling failures (bad NodeSelector, insufficient resources, etc.) @@ -553,13 +772,18 @@ func (b *Backend) reconcile(ctx context.Context, rate time.Duration, disableClea b.log.Error("reconcile: marshal failed job conditions", "taskID", jobName, "error", err) } + errDetails := map[string]string{"error": string(conds)} + if podInfo := b.getFailedPodInfo(ctx, jobName); podInfo != "" { + errDetails["executor_error"] = podInfo + } + b.event.WriteEvent(ctx, events.NewState(jobName, tes.SystemError)) b.event.WriteEvent( ctx, events.NewSystemLog( jobName, 0, 0, "error", "Kubernetes job in FAILED state", - map[string]string{"error": string(conds)}, + errDetails, ), ) @@ -574,6 +798,28 @@ func (b *Backend) reconcile(ctx context.Context, rate time.Duration, disableClea continue } delete(failedJobEvents, jobName) + + default: + // All status counters are zero: the Job controller has not yet + // recorded any Active/Succeeded/Failed pods. This happens when + // every pod creation attempt is rejected before Kubernetes + // persists a pod object (e.g. Pod Security Admission blocks the + // pod). Check for FailedCreate events which are the only signal + // available in this state. + if failed, reason := b.hasJobFailedCreateEvent(ctx, jobName); failed { + b.log.Debug("reconcile: worker job has FailedCreate event (zero-status)", "taskID", jobName, "reason", reason) + b.event.WriteEvent(ctx, events.NewState(jobName, tes.SystemError)) + b.event.WriteEvent(ctx, events.NewSystemLog( + jobName, 0, 0, "error", + "Kubernetes worker job failed to create pod", + map[string]string{"error": reason}, + )) + if !disableCleanup { + if err := b.cleanResources(ctx, jobName); err != nil { + b.log.Error("failed to clean resources", "taskID", jobName, "error", err) + } + } + } } } diff --git a/compute/kubernetes/backend_test.go b/compute/kubernetes/backend_test.go index 6436c6966..86b7b849e 100644 --- a/compute/kubernetes/backend_test.go +++ b/compute/kubernetes/backend_test.go @@ -5,7 +5,10 @@ import ( "context" "fmt" "reflect" + "strings" + "sync" "testing" + "time" "github.com/ohsu-comp-bio/funnel/config" "github.com/ohsu-comp-bio/funnel/events" @@ -389,3 +392,604 @@ func TestCancel_SAInUse(t *testing.T) { t.Errorf("expected SA to remain while pod is still running, got: %v", err) } } + +// TestHasTerminalContainerWaitingError verifies that hasTerminalContainerWaitingError +// correctly detects pods stuck in terminal waiting states (e.g. CreateContainerConfigError). +func TestHasTerminalContainerWaitingError(t *testing.T) { + const ns = "test-namespace" + + tests := []struct { + name string + containerState corev1.ContainerState + initState corev1.ContainerState + wantTerminal bool + wantReasonPart string + }{ + { + name: "CreateContainerConfigError is terminal", + containerState: corev1.ContainerState{ + Waiting: &corev1.ContainerStateWaiting{ + Reason: "CreateContainerConfigError", + Message: "secret not found", + }, + }, + wantTerminal: true, + wantReasonPart: "CreateContainerConfigError", + }, + { + name: "InvalidImageName is terminal", + containerState: corev1.ContainerState{ + Waiting: &corev1.ContainerStateWaiting{ + Reason: "InvalidImageName", + Message: "bad image", + }, + }, + wantTerminal: true, + wantReasonPart: "InvalidImageName", + }, + { + name: "CreateContainerError is terminal", + containerState: corev1.ContainerState{ + Waiting: &corev1.ContainerStateWaiting{ + Reason: "CreateContainerError", + Message: "failed to create container", + }, + }, + wantTerminal: true, + wantReasonPart: "CreateContainerError", + }, + { + name: "init container with CreateContainerConfigError is terminal", + initState: corev1.ContainerState{ + Waiting: &corev1.ContainerStateWaiting{ + Reason: "CreateContainerConfigError", + Message: "configmap not found", + }, + }, + wantTerminal: true, + wantReasonPart: "CreateContainerConfigError", + }, + { + name: "ContainerCreating is not terminal", + containerState: corev1.ContainerState{ + Waiting: &corev1.ContainerStateWaiting{ + Reason: "ContainerCreating", + }, + }, + wantTerminal: false, + }, + { + name: "ImagePullBackOff is not in the terminal list", + containerState: corev1.ContainerState{ + Waiting: &corev1.ContainerStateWaiting{ + Reason: "ImagePullBackOff", + }, + }, + wantTerminal: false, + }, + { + name: "running container is not terminal", + containerState: corev1.ContainerState{ + Running: &corev1.ContainerStateRunning{}, + }, + wantTerminal: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + fakeClient := fake.NewSimpleClientset() + ctx := context.Background() + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: ns, + Labels: map[string]string{"job-name": "test-job"}, + }, + } + + if tc.containerState != (corev1.ContainerState{}) { + pod.Status.ContainerStatuses = []corev1.ContainerStatus{ + {Name: "main", State: tc.containerState}, + } + } + if tc.initState != (corev1.ContainerState{}) { + pod.Status.InitContainerStatuses = []corev1.ContainerStatus{ + {Name: "init", State: tc.initState}, + } + } + + if _, err := fakeClient.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil { + t.Fatalf("creating pod: %v", err) + } + + conf := config.DefaultConfig() + conf.Kubernetes.JobsNamespace = ns + b := &Backend{ + client: fakeClient, + log: logger.NewLogger("test", logger.DefaultConfig()), + conf: conf, + } + + got, reason := b.hasTerminalContainerWaitingError(ctx, "test-job") + if got != tc.wantTerminal { + t.Errorf("hasTerminalContainerWaitingError() = %v, want %v (reason=%q)", got, tc.wantTerminal, reason) + } + if tc.wantTerminal && tc.wantReasonPart != "" { + if !strings.Contains(reason, tc.wantReasonPart) { + t.Errorf("reason %q does not contain %q", reason, tc.wantReasonPart) + } + } + }) + } +} + +func TestHasJobFailedCreateEvent(t *testing.T) { + const ns = "test-ns" + const jobName = "test-job" + + psaMessage := `pods "test-job-abc" is forbidden: violates PodSecurity "restricted:latest": ` + + `allowPrivilegeEscalation != false, runAsNonRoot != true` + + cases := []struct { + name string + events []corev1.Event + wantFailed bool + wantReasonPart string + }{ + { + name: "no events → no failure", + events: nil, + wantFailed: false, + }, + { + name: "unrelated event reason → no failure", + events: []corev1.Event{ + { + ObjectMeta: metav1.ObjectMeta{Name: "ev1", Namespace: ns}, + InvolvedObject: corev1.ObjectReference{Name: jobName}, + Reason: "Scheduled", + Message: "Successfully assigned pod", + }, + }, + wantFailed: false, + }, + { + name: "FailedCreate event from PSA enforcement → failure detected", + events: []corev1.Event{ + { + ObjectMeta: metav1.ObjectMeta{Name: "ev-fc", Namespace: ns}, + InvolvedObject: corev1.ObjectReference{Name: jobName}, + Reason: "FailedCreate", + Message: psaMessage, + }, + }, + wantFailed: true, + wantReasonPart: "violates PodSecurity", + }, + { + name: "FailedCreate for missing service account → failure detected", + events: []corev1.Event{ + { + ObjectMeta: metav1.ObjectMeta{Name: "ev-sa", Namespace: ns}, + InvolvedObject: corev1.ObjectReference{Name: jobName}, + Reason: "FailedCreate", + Message: `pods "test-job-" is forbidden: error looking up service account jobs/funnel-worker-sa: serviceaccount "funnel-worker-sa" not found`, + }, + }, + wantFailed: true, + wantReasonPart: "serviceaccount", + }, + { + name: "multiple events, last is FailedCreate → failure detected with last message", + events: []corev1.Event{ + { + ObjectMeta: metav1.ObjectMeta{Name: "ev1", Namespace: ns}, + InvolvedObject: corev1.ObjectReference{Name: jobName}, + Reason: "Scheduled", + Message: "first message", + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "ev2", Namespace: ns}, + InvolvedObject: corev1.ObjectReference{Name: jobName}, + Reason: "FailedCreate", + Message: psaMessage, + }, + }, + wantFailed: true, + wantReasonPart: "violates PodSecurity", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + + // Build fake client pre-populated with events. + var objs []runtime.Object + for i := range tc.events { + objs = append(objs, &tc.events[i]) + } + fakeClient := fake.NewSimpleClientset(objs...) + + // The fake client's field selector support is limited; we intercept + // the List call and filter manually to simulate the FieldSelector + // used by hasJobFailedCreateEvent. + fakeClient.PrependReactor("list", "events", func(action k8stesting.Action) (bool, runtime.Object, error) { + la := action.(k8stesting.ListAction) + fs := la.GetListRestrictions().Fields.String() + + all, err := fakeClient.Tracker().List( + corev1.SchemeGroupVersion.WithResource("events"), + corev1.SchemeGroupVersion.WithKind("Event"), + ns, + ) + if err != nil { + return true, nil, err + } + evList := all.(*corev1.EventList) + var filtered []corev1.Event + for _, ev := range evList.Items { + nameMatch := strings.Contains(fs, fmt.Sprintf("involvedObject.name=%s", ev.InvolvedObject.Name)) + reasonMatch := strings.Contains(fs, fmt.Sprintf("reason=%s", ev.Reason)) + if nameMatch && reasonMatch { + filtered = append(filtered, ev) + } + } + return true, &corev1.EventList{Items: filtered}, nil + }) + + conf := config.DefaultConfig() + conf.Kubernetes.JobsNamespace = ns + b := &Backend{ + client: fakeClient, + log: logger.NewLogger("test", logger.DefaultConfig()), + conf: conf, + } + + got, reason := b.hasJobFailedCreateEvent(ctx, jobName) + if got != tc.wantFailed { + t.Errorf("hasJobFailedCreateEvent() = %v, want %v (reason=%q)", got, tc.wantFailed, reason) + } + if tc.wantFailed && tc.wantReasonPart != "" { + if !strings.Contains(reason, tc.wantReasonPart) { + t.Errorf("reason %q does not contain %q", reason, tc.wantReasonPart) + } + } + }) + } +} + +// mockReadOnlyServer implements tes.ReadOnlyServer for reconciler tests. +type mockReadOnlyServer struct { + mu sync.Mutex + tasks []*tes.Task +} + +func (m *mockReadOnlyServer) ListTasks(_ context.Context, req *tes.ListTasksRequest) (*tes.ListTasksResponse, error) { + m.mu.Lock() + defer m.mu.Unlock() + var out []*tes.Task + for _, t := range m.tasks { + if t.State == req.State { + out = append(out, t) + } + } + return &tes.ListTasksResponse{Tasks: out}, nil +} + +func (m *mockReadOnlyServer) GetTask(_ context.Context, req *tes.GetTaskRequest) (*tes.Task, error) { + m.mu.Lock() + defer m.mu.Unlock() + for _, t := range m.tasks { + if t.Id == req.Id { + return t, nil + } + } + return nil, fmt.Errorf("task %s not found", req.Id) +} + +func (m *mockReadOnlyServer) Close() {} + +// capturingEventWriter records all events written to it. +type capturingEventWriter struct { + mu sync.Mutex + events []*events.Event +} + +func (c *capturingEventWriter) WriteEvent(_ context.Context, ev *events.Event) error { + c.mu.Lock() + defer c.mu.Unlock() + c.events = append(c.events, ev) + return nil +} + +func (c *capturingEventWriter) Close() {} + +func (c *capturingEventWriter) hasSystemError(taskID string) bool { + c.mu.Lock() + defer c.mu.Unlock() + for _, ev := range c.events { + if ev.Id == taskID && ev.Type == events.Type_TASK_STATE { + if ev.GetState() == tes.State_SYSTEM_ERROR { + return true + } + } + } + return false +} + +// TestReconcile_ZeroStatusFailedCreate verifies that a Job with all-zero +// status counters (Active=0, Succeeded=0, Failed=0) but with a FailedCreate +// event — as produced by Pod Security Admission enforcement — is detected by +// the reconciler and transitions the task to SYSTEM_ERROR. +func TestReconcile_ZeroStatusFailedCreate(t *testing.T) { + const ns = "test-ns" + const taskID = "test-task-psa" + + psaMsg := `pods "test-task-psa-abc" is forbidden: violates PodSecurity "restricted:latest": runAsNonRoot != true` + + // Build a Job with all-zero status counters (what we see with PSA blocking). + job := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: taskID, + Namespace: ns, + Labels: map[string]string{"app": "funnel-worker"}, + }, + Status: batchv1.JobStatus{ + Active: 0, + Succeeded: 0, + Failed: 0, + }, + } + + // FailedCreate event on the Job (emitted by the Job controller). + failedCreateEvent := &corev1.Event{ + ObjectMeta: metav1.ObjectMeta{Name: "ev-fc", Namespace: ns}, + InvolvedObject: corev1.ObjectReference{Name: taskID}, + Reason: "FailedCreate", + Message: psaMsg, + } + + fakeClient := fake.NewSimpleClientset(job, failedCreateEvent) + + // Intercept event List calls to apply field-selector filtering manually, + // since the fake client does not support server-side field selectors. + fakeClient.PrependReactor("list", "events", func(action k8stesting.Action) (bool, runtime.Object, error) { + la := action.(k8stesting.ListAction) + fs := la.GetListRestrictions().Fields.String() + + all, err := fakeClient.Tracker().List( + corev1.SchemeGroupVersion.WithResource("events"), + corev1.SchemeGroupVersion.WithKind("Event"), + ns, + ) + if err != nil { + return true, nil, err + } + evList := all.(*corev1.EventList) + var filtered []corev1.Event + for _, ev := range evList.Items { + nameMatch := strings.Contains(fs, fmt.Sprintf("involvedObject.name=%s", ev.InvolvedObject.Name)) + reasonMatch := strings.Contains(fs, fmt.Sprintf("reason=%s", ev.Reason)) + if nameMatch && reasonMatch { + filtered = append(filtered, ev) + } + } + return true, &corev1.EventList{Items: filtered}, nil + }) + + db := &mockReadOnlyServer{ + tasks: []*tes.Task{ + {Id: taskID, State: tes.State_QUEUED}, + }, + } + evWriter := &capturingEventWriter{} + + conf := config.DefaultConfig() + conf.Kubernetes.JobsNamespace = ns + + b := &Backend{ + client: fakeClient, + event: evWriter, + database: db, + log: logger.NewLogger("test", logger.DefaultConfig()), + conf: conf, + } + + // Run a single reconcile tick (context cancels after the first tick fires). + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + b.reconcile(ctx, 100*time.Millisecond, true /* disableCleanup */) + + if !evWriter.hasSystemError(taskID) { + t.Errorf("expected SYSTEM_ERROR event for task %s, got events: %+v", taskID, evWriter.events) + } +} + +func TestFetchPodWarningEvents(t *testing.T) { + const ns = "test-ns" + const jobName = "test-job" + + cases := []struct { + name string + podName string + events []corev1.Event + wantContain []string // substrings that must appear in result + wantEmpty bool + }{ + { + name: "no events → empty string", + podName: "pod-1", + events: nil, + wantEmpty: true, + }, + { + name: "non-warning event type → ignored", + podName: "pod-1", + events: []corev1.Event{ + { + ObjectMeta: metav1.ObjectMeta{Name: "ev1", Namespace: ns}, + InvolvedObject: corev1.ObjectReference{Name: "pod-1"}, + Type: "Normal", + Reason: "Pulled", + Message: "Successfully pulled image", + }, + }, + wantEmpty: true, + }, + { + name: "unallowed warning reason → ignored", + podName: "pod-1", + events: []corev1.Event{ + { + ObjectMeta: metav1.ObjectMeta{Name: "ev2", Namespace: ns}, + InvolvedObject: corev1.ObjectReference{Name: "pod-1"}, + Type: "Warning", + Reason: "Evicted", + Message: "node ran out of memory", + }, + }, + wantEmpty: true, + }, + { + name: "CreateContainerConfigError → secret not found in pod event", + podName: "pod-1", + events: []corev1.Event{ + { + ObjectMeta: metav1.ObjectMeta{Name: "ev3", Namespace: ns}, + InvolvedObject: corev1.ObjectReference{Name: "pod-1"}, + Type: "Warning", + Reason: "Failed", + Message: `Error: secret "example-secret" not found`, + }, + }, + wantContain: []string{`Failed: Error: secret "example-secret" not found`}, + }, + { + name: "ErrImagePull → image not found", + podName: "pod-1", + events: []corev1.Event{ + { + ObjectMeta: metav1.ObjectMeta{Name: "ev4", Namespace: ns}, + InvolvedObject: corev1.ObjectReference{Name: "pod-1"}, + Type: "Warning", + Reason: "ErrImagePull", + Message: `Failed to pull image "xyz": not found`, + }, + }, + wantContain: []string{`ErrImagePull: Failed to pull image "xyz": not found`}, + }, + { + name: "StartError → bad entrypoint", + podName: "pod-1", + events: []corev1.Event{ + { + ObjectMeta: metav1.ObjectMeta{Name: "ev5", Namespace: ns}, + InvolvedObject: corev1.ObjectReference{Name: "pod-1"}, + Type: "Warning", + Reason: "StartError", + Message: `exec: "badcmd": executable file not found in $PATH`, + }, + }, + wantContain: []string{`StartError: exec: "badcmd": executable file not found in $PATH`}, + }, + { + name: "duplicate events → deduplicated", + podName: "pod-1", + events: []corev1.Event{ + { + ObjectMeta: metav1.ObjectMeta{Name: "ev6a", Namespace: ns}, + InvolvedObject: corev1.ObjectReference{Name: "pod-1"}, + Type: "Warning", + Reason: "Failed", + Message: `Error: secret "s" not found`, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "ev6b", Namespace: ns}, + InvolvedObject: corev1.ObjectReference{Name: "pod-1"}, + Type: "Warning", + Reason: "Failed", + Message: `Error: secret "s" not found`, + }, + }, + wantContain: []string{`Failed: Error: secret "s" not found`}, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + + // Pre-populate the fake client with the pod and events. + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: tc.podName, + Namespace: ns, + Labels: map[string]string{"job-name": jobName}, + }, + } + objs := []runtime.Object{pod} + for i := range tc.events { + objs = append(objs, &tc.events[i]) + } + fakeClient := fake.NewSimpleClientset(objs...) + + // Intercept event List calls to filter by involvedObject.name and type, + // since the fake client does not support server-side field selectors. + fakeClient.PrependReactor("list", "events", func(action k8stesting.Action) (bool, runtime.Object, error) { + la := action.(k8stesting.ListAction) + fs := la.GetListRestrictions().Fields.String() + + all, err := fakeClient.Tracker().List( + corev1.SchemeGroupVersion.WithResource("events"), + corev1.SchemeGroupVersion.WithKind("Event"), + ns, + ) + if err != nil { + return true, nil, err + } + evList := all.(*corev1.EventList) + var filtered []corev1.Event + for _, ev := range evList.Items { + nameMatch := strings.Contains(fs, fmt.Sprintf("involvedObject.name=%s", ev.InvolvedObject.Name)) + typeMatch := !strings.Contains(fs, "type=") || strings.Contains(fs, fmt.Sprintf("type=%s", ev.Type)) + if nameMatch && typeMatch { + filtered = append(filtered, ev) + } + } + return true, &corev1.EventList{Items: filtered}, nil + }) + + conf := config.DefaultConfig() + conf.Kubernetes.JobsNamespace = ns + b := &Backend{ + client: fakeClient, + log: logger.NewLogger("test", logger.DefaultConfig()), + conf: conf, + } + + got := b.fetchPodWarningEvents(ctx, jobName) + + if tc.wantEmpty { + if got != "" { + t.Errorf("expected empty string, got %q", got) + } + return + } + for _, want := range tc.wantContain { + if !strings.Contains(got, want) { + t.Errorf("result %q does not contain %q", got, want) + } + } + // Deduplication check: count occurrences of the first wantContain + if len(tc.wantContain) > 0 { + count := strings.Count(got, tc.wantContain[0]) + if count > 1 { + t.Errorf("expected %q to appear once, got %d times in %q", tc.wantContain[0], count, got) + } + } + }) + } +} diff --git a/worker/kubernetes.go b/worker/kubernetes.go index 859863f31..5fd0faf01 100644 --- a/worker/kubernetes.go +++ b/worker/kubernetes.go @@ -57,8 +57,12 @@ type K8sSystemErr struct { } func (e *K8sExecutorErr) Error() string { + reason := e.Reason + if reason == "" { + reason = "ExitError" + } return fmt.Sprintf("executor job %s failed with exit code %d (%s): %s", - e.JobName, e.ExitCode, e.Reason, e.Message) + e.JobName, e.ExitCode, reason, e.Message) } func (e *K8sSystemErr) Error() string {