feat: Re-spill sort stream if unable to reserve for 2 streams#22945
feat: Re-spill sort stream if unable to reserve for 2 streams#22945EmilyMatt wants to merge 8 commits into
Conversation
|
Could you please add a test in memory constraint env fuzz testing we have? |
|
I'm not sure what the supposed API change is, as all the functions I've modified are private 🤔 and the "details" tab shows unrelated stuff |
2010YOUY01
left a comment
There was a problem hiding this comment.
It would be great to construct a end-to-end reproducer (ideally at SQL level) for the following reasons.
- I think this implementation makes sense to me, but it inevatiably introduced some complexity inside operator, if we can know the end-to-end goal from the specific workload, we can try to think about is there any easier alternative.
- The UTs are very low-level, if we decide to refactor the implementation someday, this coverage is very likely to get lost, an e2e test would be more robust to the rapidly moving codebase.
There was a problem hiding this comment.
Another optimization ideas (not in this PR):
scenerio
Let's say you have 8 files:
M,L,M,M,M,M,M,M
(M is file with medium max batch size, L is file with large max batch size)
in your implementation: you see that you can't merge the first 2 files since L is very large so you spilt L batch size by half and try again
so after spilt you have:
M,L_Split,M,M,M,M,M,M
and now you can merge the first 4 streams: M,L_Split,M,M
and continue as usual.
but what you did was changing batch size for everything (which is required so you don't go back to the same large batch in the worst case scenerio)
but this also harm the performance of the entire multi level merge since you now spill and merge in smaller batches.
My idea is that you can delay the split of L to last so all the sort and spill files before it will use the old batch size but only the last one will use the smaller batch size and this will increase performance.
There was a problem hiding this comment.
Yes you are very smart Raz ;)
| if number_of_spills_to_read_for_current_phase == 0 { | ||
| // We couldn't even reserve a single stream - one record batch | ||
| // is larger than the whole merge budget. That's the lone-batch | ||
| // case, not the 2-stream merge skew we rescue here - surface it. | ||
| return Err(err); | ||
| } |
There was a problem hiding this comment.
You can can possibly still split batch size into 2, the calculation for amount of memory for merge stream is more than what is needed for only splitting a batch in 2, so you can try to check if have enough memory for splitting in 2
There was a problem hiding this comment.
Worst case is still 2
There was a problem hiding this comment.
Or aspiring to 2 on a level where it doesn't matter(1 huge string with everything else nulls, for example)
| @@ -182,7 +191,17 @@ impl MultiLevelMergeBuilder { | |||
|
|
|||
| async fn create_stream(mut self) -> Result<SendableRecordBatchStream> { | |||
There was a problem hiding this comment.
Can you please try to revert the batch size back to the original value for the last stream if it is possible to stay in the limit?
so if the original very large batch was spread across many rows that are now in different batches or files we may still keep the batch size as one last try
There was a problem hiding this comment.
I prefer not to as this complicates the code, I don't really have a guarantee that the last stream is not a part of a merge with the results of the first streams, as there can be multiple levels, I don't wanna fuck with that
| /// next attempt can seat both streams. One stream's worth of memory is reserved | ||
| /// for the duration and freed afterwards. Makes the merge resilient to skew. | ||
| async fn split_spill_file_in_half(&mut self, index: usize) -> Result<()> { | ||
| let target = self.sorted_spill_files.remove(index); |
There was a problem hiding this comment.
to avoid shifting everything can we swap the index with the last index, then pop, create a new file, push and swap again.
There was a problem hiding this comment.
Applied against my better judgment, as this is both negligible compared to the work done here and makes the code more complex.
(and besides the point may even be optimized out by the compiler, but that is untested)
I believe I've added an e2e test, but let me know if it is not what you intended. can confirm it fails without this implementation |
| return resources_err!( | ||
| "Cannot merge sorted runs: a single record batch of {old_max} bytes \ | ||
| exceeds the available merge memory and cannot be split further" | ||
| ); |
There was a problem hiding this comment.
I think it would be useful (not for now, or even at all) if we could print the largest row size to help the user with understanding the issue
|
|
||
| return Err(err); | ||
| // buffer_len == 1 and we still can't seat the minimum of 2 streams. | ||
| if number_of_spills_to_read_for_current_phase == 0 { |
There was a problem hiding this comment.
this can happen if we have 1 sorted stream and the first spill file is very large, why can't we split it in those cases? (but propagate the error when minimum_number_of_required_streams is greater than 1)
There was a problem hiding this comment.
It would be a good expansion but a bigger refactor than I want to do right now, this feature was unobtrusive because the code paths are separated.
Perhaps a good solution would even be to spill the sorted streams and only then call this function, at a small perf cost in exchange for resilience.
| /// Proves the fix: two sorted runs whose largest batches are too big to both | ||
| /// be seated in the merge budget at once are re-spilled (halved) until they | ||
| /// fit, and the merge then completes with fully sorted, complete output. | ||
| /// Before the fix this returned `ResourcesExhausted` instead of merging. |
| /// which lowers the per-stream merge reservation so the | ||
| /// next attempt can seat both streams. One stream's worth of memory is reserved | ||
| /// for the duration and freed afterwards. Makes the merge resilient to skew. | ||
| async fn split_spill_file_in_half(&mut self, index: usize) -> Result<()> { |
There was a problem hiding this comment.
Can you please add log::debug! when this happens
rluvaton
left a comment
There was a problem hiding this comment.
LGTM, great idea!
this will make sort even more resilient
Rationale for this change
I've encountered several cases where the merge reservation cannot acquire the minimum reservation needed(2 streams, with a buffer size of 1), we currently error in these cases but that's not a necessity.
I've implemented a simple re-spill mechanism for when the first 2 streams cannot be reserved: we take the larger of those streams, and we re-spill it with all its batches split in half(done using slice() so no copying happens at that stage and we'll have a smaller memory peak)
This converges until we have enough memory to perform the merge(because max_record_batch_size is halved, ideally)
I've encountered this in situations where there is heavy skew, so maybe in the future might be worth it running this in general whenever one stream has a max_record_batch_size that is far above the other streams, could greatly improve performance of the entire merge stream at the cost of re-spilling once.
What changes are included in this PR?
Aforementioned implementation
Are these changes tested?
Yes
Are there any user-facing changes?
No