-
Notifications
You must be signed in to change notification settings - Fork 1.9k
feat: Improve sort memory resilience #19494
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| let indices = lexsort_to_indices(&sort_columns, fetch)?; | ||
| let mut columns = take_arrays(batch.columns(), &indices, None)?; | ||
|
|
||
| // The columns may be larger than the unsorted columns in `batch` especially for variable length |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not really needed as take actually uses the offsets for variable length columns.
At worst the buffers themselves will be rounded up to the 64 byte multiplier to optimize SIMD operations, but that is actually desirable and not something we want to mess with.
|
run benchmarks |
|
🤖 |
|
run benchmark sort |
|
🤖 Hi @rluvaton, thanks for the request (#19494 (comment)).
Please choose one or more of these with |
|
🤖: Benchmark completed Details
|
|
run benchmark clickbench_partitioned |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmark sort |
|
🤖 |
| // Spilling to disk and reading back also ensures batch size is consistent | ||
| // rather than potentially having one significantly larger last batch. | ||
| self.spill()?; | ||
| self.spill()?; // TODO: use sort_batch_chunked instead |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will you address this in this pr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, I initially thought about this but then I saw there's an immediate spill now, rather than returning an in memory stream, which feels like a deliberate decision, and I didn't want to add side-effects to this PR
| /// This is calculated by adding the record batch's memory size | ||
| /// (which can be much larger than expected for sliced record batches) | ||
| /// with the sliced buffer sizes, as that is the amount that will be needed to create the new buffer. | ||
| /// The latter is rounded up to the nearest multiple of 64 based on the architecture, | ||
| /// as this is how arrow creates buffers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Estimate how much memory is needed to sort a `RecordBatch`.
///
/// For sliced batches, `get_record_batch_memory_size` returns the size of the
/// underlying shared buffers (which may be larger than the logical data).
/// We add `get_sliced_size()` (the actual logical data size, rounded to 64 bytes)
/// because sorting will create new buffers containing only the referenced data.
///
/// Total = existing buffer size + new sorted buffer size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fine by me, updated.
Weijun-H
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very impressive pr 👍 I left some suggestion for your reference
|
🤖: Benchmark completed Details
|
Which issue does this PR close?
Closes #19493 .
Rationale for this change
Greatly reduces the memory requested by ExternalSorter to perform sorts, adds much more granularity to the reservations, and Tries to do this with minimal overhead by merging the splitting and sorting processes.
What changes are included in this PR?
The sort stream will calculate the indices once, but the take will be done in batches, so we create batch_size sized RecordBatches, whose get_record_batch_size results return info that is very close to their sliced sizes(if not completely the same), this means there is no need for the precaution of reserving a huge amount of memory in order to do the merge sort, meaning we can merge more streams at the same time, and so on and so forth.
Are these changes tested?
Yes
Are there any user-facing changes?
There is a new sort_batch_chunked function, which returns a Vec of RecordBatch, based on the provided batch_size.
Some docs are updated.