Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion slice/internal/controller/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,9 +454,12 @@ func (r *WorkloadReconciler) updateJobSetBeforeUnsuspend(ctx context.Context, wl
return err
}
patchJobSet := core.BaseSSAJobSet(jobSet)

for i := range jobSet.Spec.ReplicatedJobs {
rj := &jobSet.Spec.ReplicatedJobs[i]
if rj.Template.Spec.Template.Spec.NodeSelector[core.TPUTopologyAnnotation] != "" {
// the NodeSelector with topology annotation has already been copied, nothing to do
return nil
}
replicaJob := core.BaseSSAReplicatedJob(rj.Name)
if topology := rj.Template.Spec.Template.Annotations[core.TPUSliceTopologyAnnotation]; topology != "" {
log.V(5).Info("Copying topology annotation as nodeSelector", "topology", topology, "replicatedJobName", rj.Name)
Expand Down
117 changes: 117 additions & 0 deletions slice/test/e2e/jobset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1299,5 +1299,122 @@ var _ = ginkgo.Describe("JobSet", func() {
utils.ExpectObjectToBeDeleted(ctx, k8sClient, createdWorkload, false)
})
})

ginkgo.It("should recreate Slice if it is deleted while Workload is running", func() {
jobSet := testingjobsjobset.MakeJobSet("jobset", ns.Name).
Queue(lq.Name).
ReplicatedJobs(
testingjobsjobset.ReplicatedJobRequirements{
Name: "rj1",
Image: utils.E2eTestAgnHostImage,
Args: utils.BehaviorWaitForDeletion,
Replicas: 1,
Parallelism: 16,
Completions: 16,
PodAnnotations: map[string]string{
core.TPUSliceTopologyAnnotation: "4x4x4",
},
NodeSelector: map[string]string{
"cloud.google.com/gke-tpu-accelerator": string(slice.TypeTpu7x),
},
},
).
RequestAndLimit("rj1", core.TPUResourceName, "4").
Obj()

ginkgo.By("Creating a JobSet", func() {
utils.MustCreate(ctx, k8sClient, jobSet)
})

createdWorkload := &kueue.Workload{}
wlKey := types.NamespacedName{
Name: jobsetcontroller.GetWorkloadNameForJobSet(jobSet.Name, jobSet.UID),
Namespace: ns.Name,
}

ginkgo.By("Waiting for Admission of the Workload", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, wlKey, createdWorkload)).Should(gomega.Succeed())
g.Expect(createdWorkload.Status.Admission).ShouldNot(gomega.BeNil())
}, utils.Timeout, utils.Interval).Should(gomega.Succeed())
})

createdSlice := &slice.Slice{}
sliceKey := core.SliceKeyFromWorkload(createdWorkload, "rj1", 0)
var oldSliceUID types.UID

ginkgo.By("Checking that Slice is created", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, sliceKey, createdSlice)).To(gomega.Succeed())
oldSliceUID = createdSlice.GetUID()
}, utils.Timeout, utils.Interval).Should(gomega.Succeed())
})

ginkgo.By("Adding Ready condition", func() {
utils.SetSliceReady(ctx, k8sClient, sliceKey, "4x4x4")
})

ginkgo.By("Checking that the Workload is admitted", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, wlKey, createdWorkload)).Should(gomega.Succeed())
g.Expect(workload.IsAdmitted(createdWorkload)).Should(gomega.BeTrue())
g.Expect(createdWorkload.Status.AdmissionChecks).Should(gomega.BeComparableTo([]kueue.AdmissionCheckState{{
Name: kueue.AdmissionCheckReference(ac.Name),
State: kueue.CheckStateReady,
Message: `Slices are in states: 1 ACTIVE`,
}}, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime", "PodSetUpdates")))
}, utils.LongTimeout, utils.Timeout).Should(gomega.Succeed())
})

ginkgo.By("Deleting the Slice", func() {
gomega.Expect(k8sClient.Delete(ctx, createdSlice)).To(gomega.Succeed())
})

ginkgo.By("Checking that a new Slice is created", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, sliceKey, createdSlice)).To(gomega.Succeed())
g.Expect(createdSlice.GetUID()).ShouldNot(gomega.Equal(oldSliceUID))
}, utils.Timeout, utils.Interval).Should(gomega.Succeed())
})

ginkgo.By("Checking that the Admission Check state is pending", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, wlKey, createdWorkload)).Should(gomega.Succeed())
g.Expect(createdWorkload.Status.AdmissionChecks).Should(gomega.BeComparableTo([]kueue.AdmissionCheckState{{
Name: kueue.AdmissionCheckReference(ac.Name),
State: kueue.CheckStatePending,
Message: `Slices are in states: 1 CREATED`,
}}, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime", "PodSetUpdates")))
}, utils.Timeout, utils.Interval).Should(gomega.Succeed())
})

ginkgo.By("Adding Ready condition to the new Slice", func() {
utils.SetSliceReady(ctx, k8sClient, sliceKey, "4x4x4")
})

ginkgo.By("Checking that the Workload is admitted again", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, wlKey, createdWorkload)).Should(gomega.Succeed())
g.Expect(workload.IsAdmitted(createdWorkload)).Should(gomega.BeTrue())
g.Expect(createdWorkload.Status.AdmissionChecks).Should(gomega.BeComparableTo([]kueue.AdmissionCheckState{{
Name: kueue.AdmissionCheckReference(ac.Name),
State: kueue.CheckStateReady,
Message: `Slices are in states: 1 ACTIVE`,
}}, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime", "PodSetUpdates")))
}, utils.LongTimeout, utils.Timeout).Should(gomega.Succeed())
})

ginkgo.By("Deleting JobSet", func() {
utils.ExpectObjectToBeDeleted(ctx, k8sClient, jobSet, true)
})

ginkgo.By("Checking that Slice is deleted", func() {
utils.ExpectObjectToBeDeleted(ctx, k8sClient, createdSlice, false)
})

ginkgo.By("Checking that Workload is deleted", func() {
utils.ExpectObjectToBeDeleted(ctx, k8sClient, createdWorkload, false)
})
})
})
})
Loading