Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions compute/kubernetes/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,63 @@ func (b *Backend) cleanResources(ctx context.Context, taskId string) error {
return errs
}

// terminalWaitingReasons lists container waiting reasons that will never self-
// resolve and should be treated as a permanent failure.
var terminalWaitingReasons = []string{
"CreateContainerConfigError",
"InvalidImageName",
"CreateContainerError",
}

// hasTerminalContainerWaitingError returns true if any pod belonging to the
// given job has a container stuck in a waiting state whose reason is known to
// be permanent (e.g. CreateContainerConfigError). These pods will never
// transition to a running state on their own so the task must be failed early
// rather than waiting for the Job's backoff limit to be exhausted.
func (b *Backend) hasTerminalContainerWaitingError(ctx context.Context, jobName string) (bool, string) {
pods, err := b.client.CoreV1().Pods(b.conf.Kubernetes.JobsNamespace).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("job-name=%s", jobName),
})
if err != nil {
b.log.Error("reconcile: listing pods for job", "taskID", jobName, "error", err)
return false, ""
}
for _, pod := range pods.Items {
for _, cs := range pod.Status.ContainerStatuses {
if cs.State.Waiting == nil {
continue
}
reason := cs.State.Waiting.Reason
for _, terminal := range terminalWaitingReasons {
if reason == terminal {
msg := cs.State.Waiting.Message
if msg == "" {
msg = reason
}
return true, fmt.Sprintf("%s: %s", reason, msg)
}
}
}
// Also check init containers
for _, cs := range pod.Status.InitContainerStatuses {
if cs.State.Waiting == nil {
continue
}
reason := cs.State.Waiting.Reason
for _, terminal := range terminalWaitingReasons {
if reason == terminal {
msg := cs.State.Waiting.Message
if msg == "" {
msg = reason
}
return true, fmt.Sprintf("%s: %s", reason, msg)
}
}
}
}
return false, ""
}

// isJobSchedulingTimedOut returns true if all pods for the given job have been
// stuck in Pending (with a scheduling condition) for longer than timeout.
// It returns false if any pod has been scheduled, or if pod status cannot be determined.
Expand Down Expand Up @@ -384,6 +441,31 @@ func (b *Backend) isJobSchedulingTimedOut(ctx context.Context, jobName string, t
return false
}

// hasJobFailedCreateEvent returns true if the Kubernetes Job has emitted at
// least one FailedCreate event — meaning the Job controller tried to create a
// pod but was rejected before the pod object was ever persisted (e.g. due to
// Pod Security Admission enforcement). In that case there are no pod objects
// to inspect, so hasTerminalContainerWaitingError cannot detect the failure.
// The most recent event message is returned as the reason string.
func (b *Backend) hasJobFailedCreateEvent(ctx context.Context, jobName string) (bool, string) {
evList, err := b.client.CoreV1().Events(b.conf.Kubernetes.JobsNamespace).List(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("involvedObject.name=%s,reason=FailedCreate", jobName),
})
if err != nil {
b.log.Error("reconcile: listing events for job", "taskID", jobName, "error", err)
b.log.Debug("assuming no FailedCreate events due to error listing events", "taskID", jobName)
return false, ""
}
if len(evList.Items) == 0 {
b.log.Debug("no FailedCreate events found for job", "taskID", jobName)
return false, ""
}
// Return the message from the most recent event.
latest := evList.Items[len(evList.Items)-1]
b.log.Debug("found FailedCreate event for job", "taskID", jobName, "reason", latest.Message)
return true, latest.Message
}

// Reconcile loops through tasks and checks the status from Funnel's database
// against the status reported by Kubernetes. This allows the backend to report
// system error's that prevented the worker process from running.
Expand All @@ -402,6 +484,7 @@ func (b *Backend) isJobSchedulingTimedOut(ctx context.Context, jobName string, t
//
// This loop is also used to cleanup successful jobs.
func (b *Backend) reconcile(ctx context.Context, rate time.Duration, disableCleanup bool) {
fmt.Println("DEBUG: Starting Kubernetes backend reconciler loop with rate", rate)
// Clears all resources that still exist from jobs that have run before this server started.
// This handles two cases:
// 1. Completed jobs (Succeeded/Failed) that were not cleaned up before the server restarted.
Expand Down Expand Up @@ -453,6 +536,7 @@ func (b *Backend) reconcile(ctx context.Context, rate time.Duration, disableClea
case <-ctx.Done():
return
case <-ticker.C:
fmt.Println("DEBUG: Running Kubernetes backend reconciler loop!")

// List worker jobs only (label selector excludes executor jobs and unrelated jobs).
// Bug: If K8s Job is not created by the time reconciler runs, then the TES Task itself will be prematurely marked as SYSTEM_ERROR
Expand All @@ -474,6 +558,7 @@ func (b *Backend) reconcile(ctx context.Context, rate time.Duration, disableClea
// List non-terminal tasks from Funnel's database
states := []tes.State{tes.State_QUEUED, tes.State_INITIALIZING, tes.State_RUNNING}
for _, s := range states {
fmt.Println("DEBUG: Reconciling tasks with state", s)
pageToken := ""
for {
lresp, err := b.database.ListTasks(ctx, &tes.ListTasksRequest{
Expand All @@ -489,6 +574,7 @@ func (b *Backend) reconcile(ctx context.Context, rate time.Duration, disableClea

// Compare Funnel Tasks against K8s Jobs
for _, task := range lresp.Tasks {
fmt.Println("DEBUG: Reconciling task", task.Id, "with state", task.State)
taskID := task.Id

// If the job exists, check its current status (Active, Succeeded, Failed)
Expand All @@ -504,8 +590,49 @@ func (b *Backend) reconcile(ctx context.Context, rate time.Duration, disableClea

jobName := j.Name
status := j.Status
fmt.Println("DEBUG: Job status for task", taskID, "is Active:", status.Active, "Succeeded:", status.Succeeded, "Failed:", status.Failed)
switch {
case status.Active > 0:
// Check for container waiting errors that will never self-resolve
// (e.g. CreateContainerConfigError). These keep the Job Active
// indefinitely, so we must detect and fail them explicitly.
if terminal, reason := b.hasTerminalContainerWaitingError(ctx, jobName); terminal {
b.log.Debug("reconcile: worker pod has terminal container waiting error", "taskID", jobName, "reason", reason)
b.event.WriteEvent(ctx, events.NewState(jobName, tes.SystemError))
b.event.WriteEvent(ctx, events.NewSystemLog(
jobName, 0, 0, "error",
"Kubernetes worker pod has a terminal container waiting error",
map[string]string{"error": reason},
))
if !disableCleanup {
if err := b.cleanResources(ctx, jobName); err != nil {
b.log.Error("failed to clean resources", "taskID", jobName, "error", err)
}
}
continue
}

// Check for FailedCreate events on the Job itself. This catches
// cases where pod creation is rejected before a pod object is
// ever persisted (e.g. Pod Security Admission enforcement blocks
// the pod), so there are no pod container statuses to inspect.
b.log.Debug("checking for FailedCreate events on job", "taskID", jobName)
if failed, reason := b.hasJobFailedCreateEvent(ctx, jobName); failed {
b.log.Debug("reconcile: worker job has FailedCreate event", "taskID", jobName, "reason", reason)
b.event.WriteEvent(ctx, events.NewState(jobName, tes.SystemError))
b.event.WriteEvent(ctx, events.NewSystemLog(
jobName, 0, 0, "error",
"Kubernetes worker job failed to create pod",
map[string]string{"error": reason},
))
if !disableCleanup {
if err := b.cleanResources(ctx, jobName); err != nil {
b.log.Error("failed to clean resources", "taskID", jobName, "error", err)
}
}
continue
}

// If a scheduling timeout is configured, check whether the worker
// pod has been stuck in Pending beyond that duration. This catches
// scheduling failures (bad NodeSelector, insufficient resources, etc.)
Expand Down Expand Up @@ -574,6 +701,28 @@ func (b *Backend) reconcile(ctx context.Context, rate time.Duration, disableClea
continue
}
delete(failedJobEvents, jobName)

default:
// All status counters are zero: the Job controller has not yet
// recorded any Active/Succeeded/Failed pods. This happens when
// every pod creation attempt is rejected before Kubernetes
// persists a pod object (e.g. Pod Security Admission blocks the
// pod). Check for FailedCreate events which are the only signal
// available in this state.
if failed, reason := b.hasJobFailedCreateEvent(ctx, jobName); failed {
b.log.Debug("reconcile: worker job has FailedCreate event (zero-status)", "taskID", jobName, "reason", reason)
b.event.WriteEvent(ctx, events.NewState(jobName, tes.SystemError))
b.event.WriteEvent(ctx, events.NewSystemLog(
jobName, 0, 0, "error",
"Kubernetes worker job failed to create pod",
map[string]string{"error": reason},
))
if !disableCleanup {
if err := b.cleanResources(ctx, jobName); err != nil {
b.log.Error("failed to clean resources", "taskID", jobName, "error", err)
}
}
}
}
}

Expand Down
Loading
Loading