Skip to content

Conversation

@itaigilo
Copy link
Contributor

@itaigilo itaigilo commented Dec 22, 2025

Closes #9872.

Change Description

In order to support long lasting async tasks and monitor them, adding a heartbeat goroutine that updates the updated_at of a task while it's still running.

Testing Details

Tested locally (by manually increasing the commit time and validating that the heartbeat indeed updates the updated_at and stops when the task stops).

Added unit tests.

In addition, once the feature's Esti tests will be added, it will also validate this don't cause regression.

@itaigilo itaigilo added include-changelog PR description should be included in next release changelog minor-change Used for PRs that don't require issue attached labels Dec 22, 2025
@github-actions github-actions bot added the area/cataloger Improvements or additions to the cataloger label Dec 22, 2025
@itaigilo itaigilo removed the minor-change Used for PRs that don't require issue attached label Dec 22, 2025
@github-actions github-actions bot added the area/testing Improvements or additions to tests label Dec 22, 2025
@itaigilo itaigilo marked this pull request as ready for review December 22, 2025 17:16
@itaigilo itaigilo requested review from a team, Annaseli and nopcoder December 22, 2025 17:16
@N-o-Z
Copy link
Member

N-o-Z commented Dec 22, 2025

I don't think this is the way we should go with this. There's no need to reinvent the wheel.
We already have a solid logic with Import.

  1. Server should not control task timeout - only client.
  2. We should implement a cancel operation.
  3. Client should call cancel when it decides to abort the operation (whether due to timeout or otherwise)

@itaigilo
Copy link
Contributor Author

I don't think this is the way we should go with this. There's no need to reinvent the wheel. We already have a solid logic with Import.

  1. Server should not control task timeout - only client.
  2. We should implement a cancel operation.
  3. Client should call cancel when it decides to abort the operation (whether due to timeout or otherwise)

@N-o-Z the requirements (and pain points) are described in the Enterprise PR - according to these, I don't think there's a current requirement for client aborting the operation, hence it should be controlled by the server.

Having said that, can you please point out the part of the import code that implements something similar?

(and maybe @nopcoder has something to contribute to this discussion.)

@N-o-Z
Copy link
Member

N-o-Z commented Dec 22, 2025

We already had this discussion and the PRD needs to be updated.
You can look at the Import endpoints in swagger

@itaigilo
Copy link
Contributor Author

We already had this discussion and the PRD needs to be updated. You can look at the Import endpoints in swagger

@N-o-Z Who took part in the (apparently undocumented) discussion you've mentioned, and can shed light on the details?

@N-o-Z
Copy link
Member

N-o-Z commented Dec 22, 2025

We already had this discussion and the PRD needs to be updated. You can look at the Import endpoints in swagger

@N-o-Z Who took part in the (apparently undocumented) discussion you've mentioned, and can shed light on the details?

@nopcoder @Annaseli and myself took part of the undocumented discussion. You can discuss this with @nopcoder as the product owner

@nopcoder
Copy link
Contributor

  • Server should not control task timeout - only client.
  • We should implement a cancel operation.
  • Client should call cancel when it decides to abort the operation (whether due to timeout or otherwise)

The heart beat mechanism is in place because at this point we do not provide a way to report progress or cancellation as part of commit/merge.

The async client have hard limit configured, but if we can reduce the timeout the client is configured because the server to report status update is it better than the current state.
I agree that when each async operation will work like import, this mechanism can be removed.

@N-o-Z
Copy link
Member

N-o-Z commented Dec 22, 2025

  • Server should not control task timeout - only client.
  • We should implement a cancel operation.
  • Client should call cancel when it decides to abort the operation (whether due to timeout or otherwise)

The heart beat mechanism is in place because at this point we do not provide a way to report progress or cancellation as part of commit/merge.

The async client have hard limit configured, but if we can reduce the timeout the client is configured because the server to report status update is it better than the current state. I agree that when each async operation will work like import, this mechanism can be removed.

So why replace one temporary mechanism with another? Implementing the cancel mechanism is relatively quick and will take the same effort as introducing a new temporary solution

@nopcoder
Copy link
Contributor

  • Server should not control task timeout - only client.
  • We should implement a cancel operation.
  • Client should call cancel when it decides to abort the operation (whether due to timeout or otherwise)

The heart beat mechanism is in place because at this point we do not provide a way to report progress or cancellation as part of commit/merge.
The async client have hard limit configured, but if we can reduce the timeout the client is configured because the server to report status update is it better than the current state. I agree that when each async operation will work like import, this mechanism can be removed.

So why replace one temporary mechanism with another? Implementing the cancel mechanism is relatively quick and will take the same effort as introducing a new temporary solution

I don't think that progress and cancellation in the context of change of commit and merge is a small task.

// start heartbeat: a background goroutine to update the task status in the kv store every 5 seconds, until the task is done
cancelCtx, cancel := context.WithCancel(ctx)
currTaskStatus := proto.Clone(taskStatus) // deep copy of the task status, to avoid race conditions
go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this go routine runs in parallel to the go routines that the c.workPool.Submit starts? if so, maybe instead of writing this as a separate go routine we can send it as an additional task to the steps that c.workPool.Submit gets?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's here because it needs the same context (it uses cancelCtx), so as far as I can tell, it can't be moved from here.

Anyway, this seems like a known go pattern.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn’t mean to move it elsewhere, but rather to include it as one of the steps that c.workPool.Submit runs. I was thinking that instead of launching this work in a separate go func(), we could add it as an additional task in the steps list.

The idea would be to extract the logic currently inside that go func() into a helper function, and then add it to steps before calling c.workPool.Submit, which already iterates over all the steps.

If you think it won't be a good ideat to do this, I think at least we should move the go func() { ... } into a helper function, because RunBackgroundTaskSteps has become quite large.

_, err = GetTaskStatus(ctx, kvStore, repository, taskID, &status)
require.NoError(t, err)
require.True(t, status.Task.Done)
require.NotEmpty(t, status.Task.ErrorMsg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we use the actual ErrorToStatusCodeAndMsg: api.ErrorToStatusAndMsg, in the catalog initialization, we can check that the status_code updates here correctly as well.

I think it will be good to do a similar test but with a longer task that eventually returns an error to check how the heartbeat mechanism deals with it - that it stoped updating after that.

Copy link
Contributor Author

@itaigilo itaigilo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Annaseli for your review,
Comments were addressed / fixed,
PTAL again.

// RunBackgroundTaskSteps update task status provided after filling the 'Task' field and update for each step provided.
// the task status is updated after each step, and the task is marked as completed if the step is the last one.
// initial update if the task is done before running the steps.
func (c *Catalog) RunBackgroundTaskSteps(repository *graveler.RepositoryRecord, taskID string, steps []TaskStep, taskStatus protoreflect.ProtoMessage) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true,
But I prefer to keep this PR contained.
I can create an Issue and handle this right afterwards.

// start heartbeat: a background goroutine to update the task status in the kv store every 5 seconds, until the task is done
cancelCtx, cancel := context.WithCancel(ctx)
currTaskStatus := proto.Clone(taskStatus) // deep copy of the task status, to avoid race conditions
go func() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's here because it needs the same context (it uses cancelCtx), so as far as I can tell, it can't be moved from here.

Anyway, this seems like a known go pattern.

Copy link
Contributor

@Annaseli Annaseli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Have non blocking comments.


// Verify timestamp was updated during execution
require.True(t, completionTime.After(timestampDuringExecution) || completionTime.Equal(timestampDuringExecution),
"completion timestamp (%v) should be after or equal to timestamp during execution (%v)",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this one is the case: || completionTime.Equal(timestampDuringExecution), we didn't actually checked that timestamp was updated during execution right? because in this case the completionTime just equals to the timestampDuringExecution that equals to the first Task.UpdatedAt since TaskHeartbeatInterval is 5 seconds.

@itaigilo itaigilo merged commit fc134f6 into master Jan 5, 2026
42 checks passed
@itaigilo itaigilo deleted the feature/add-heartbeat-for-catalog-async-tasks branch January 5, 2026 10:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/cataloger Improvements or additions to the cataloger area/testing Improvements or additions to tests include-changelog PR description should be included in next release changelog

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Catalog background tasks heartbeat

5 participants