-
Notifications
You must be signed in to change notification settings - Fork 421
Add heartbeat to catalog background tasks #9868
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add heartbeat to catalog background tasks #9868
Conversation
|
I don't think this is the way we should go with this. There's no need to reinvent the wheel.
|
@N-o-Z the requirements (and pain points) are described in the Enterprise PR - according to these, I don't think there's a current requirement for client aborting the operation, hence it should be controlled by the server. Having said that, can you please point out the part of the import code that implements something similar? (and maybe @nopcoder has something to contribute to this discussion.) |
|
We already had this discussion and the PRD needs to be updated. |
@N-o-Z Who took part in the (apparently undocumented) discussion you've mentioned, and can shed light on the details? |
@nopcoder @Annaseli and myself took part of the undocumented discussion. You can discuss this with @nopcoder as the product owner |
The heart beat mechanism is in place because at this point we do not provide a way to report progress or cancellation as part of commit/merge. The async client have hard limit configured, but if we can reduce the timeout the client is configured because the server to report status update is it better than the current state. |
So why replace one temporary mechanism with another? Implementing the cancel mechanism is relatively quick and will take the same effort as introducing a new temporary solution |
I don't think that progress and cancellation in the context of change of commit and merge is a small task. |
| // start heartbeat: a background goroutine to update the task status in the kv store every 5 seconds, until the task is done | ||
| cancelCtx, cancel := context.WithCancel(ctx) | ||
| currTaskStatus := proto.Clone(taskStatus) // deep copy of the task status, to avoid race conditions | ||
| go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this go routine runs in parallel to the go routines that the c.workPool.Submit starts? if so, maybe instead of writing this as a separate go routine we can send it as an additional task to the steps that c.workPool.Submit gets?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's here because it needs the same context (it uses cancelCtx), so as far as I can tell, it can't be moved from here.
Anyway, this seems like a known go pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn’t mean to move it elsewhere, but rather to include it as one of the steps that c.workPool.Submit runs. I was thinking that instead of launching this work in a separate go func(), we could add it as an additional task in the steps list.
The idea would be to extract the logic currently inside that go func() into a helper function, and then add it to steps before calling c.workPool.Submit, which already iterates over all the steps.
If you think it won't be a good ideat to do this, I think at least we should move the go func() { ... } into a helper function, because RunBackgroundTaskSteps has become quite large.
| _, err = GetTaskStatus(ctx, kvStore, repository, taskID, &status) | ||
| require.NoError(t, err) | ||
| require.True(t, status.Task.Done) | ||
| require.NotEmpty(t, status.Task.ErrorMsg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we use the actual ErrorToStatusCodeAndMsg: api.ErrorToStatusAndMsg, in the catalog initialization, we can check that the status_code updates here correctly as well.
I think it will be good to do a similar test but with a longer task that eventually returns an error to check how the heartbeat mechanism deals with it - that it stoped updating after that.
itaigilo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Annaseli for your review,
Comments were addressed / fixed,
PTAL again.
pkg/catalog/catalog.go
Outdated
| // RunBackgroundTaskSteps update task status provided after filling the 'Task' field and update for each step provided. | ||
| // the task status is updated after each step, and the task is marked as completed if the step is the last one. | ||
| // initial update if the task is done before running the steps. | ||
| func (c *Catalog) RunBackgroundTaskSteps(repository *graveler.RepositoryRecord, taskID string, steps []TaskStep, taskStatus protoreflect.ProtoMessage) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is true,
But I prefer to keep this PR contained.
I can create an Issue and handle this right afterwards.
| // start heartbeat: a background goroutine to update the task status in the kv store every 5 seconds, until the task is done | ||
| cancelCtx, cancel := context.WithCancel(ctx) | ||
| currTaskStatus := proto.Clone(taskStatus) // deep copy of the task status, to avoid race conditions | ||
| go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's here because it needs the same context (it uses cancelCtx), so as far as I can tell, it can't be moved from here.
Anyway, this seems like a known go pattern.
Annaseli
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Have non blocking comments.
|
|
||
| // Verify timestamp was updated during execution | ||
| require.True(t, completionTime.After(timestampDuringExecution) || completionTime.Equal(timestampDuringExecution), | ||
| "completion timestamp (%v) should be after or equal to timestamp during execution (%v)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this one is the case: || completionTime.Equal(timestampDuringExecution), we didn't actually checked that timestamp was updated during execution right? because in this case the completionTime just equals to the timestampDuringExecution that equals to the first Task.UpdatedAt since TaskHeartbeatInterval is 5 seconds.
Closes #9872.
Change Description
In order to support long lasting async tasks and monitor them, adding a heartbeat goroutine that updates the
updated_atof a task while it's still running.Testing Details
Tested locally (by manually increasing the commit time and validating that the heartbeat indeed updates the
updated_atand stops when the task stops).Added unit tests.
In addition, once the feature's Esti tests will be added, it will also validate this don't cause regression.