-
Notifications
You must be signed in to change notification settings - Fork 89
[dbsp] Adaptive joins #5287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[dbsp] Adaptive joins #5287
Conversation
mihaibudiu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I get a general picture.
So the repartitioning is blocking for the current step?
| /// Read all incoming messages for `receiver`. | ||
| /// | ||
| /// Values are passed to callback function `cb`. | ||
| /// Values are passed to callback function `cb` in the order of worker indexes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there an debug_assert for that?
| //! # use csv::Reader; | ||
| //! # use dbsp::utils::{Tup2, Tup3}; | ||
| //! # use dbsp::{OrdIndexedZSet, OutputHandle, RootCircuit, ZSetHandle, ZWeight}; | ||
| //! # use dbsp::{OrdIndexedZSet, OutputHandle, RootCircuit, ZSetHandle, ZWeight, IndexedZSetReader}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't it strange that this is not used anywhere?
| Ok(Ok(resp)) => resp, | ||
| }; | ||
|
|
||
| // Receive responses. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is not very informative
| } | ||
| } | ||
|
|
||
| /// A sink operator that consumes two input streams, but does not produce |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
three input streams?
| /// The storage directory supplied does not match the runtime circuit. | ||
| IncompatibleStorage, | ||
| /// Error deserializing checkpointed state. | ||
| CheckpointParseError(String), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how come this wasn't needed before?
| } | ||
|
|
||
| /// Total weight of records across all shards. | ||
| pub fn total_records(&self) -> ZWeight { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could be maintained incrementally
|
|
||
| /// The policy that was used to accumulate the integral before the rebalancing. | ||
| /// This is the policy used during the previous transaction. | ||
| trace_policy: PartitioningPolicy, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
previous_trace_policy
| /// Assumes that the cursor was partitioned using Policy::Shard or Policy::Balance, i.e., partitions | ||
| /// in different workers don't overlap. | ||
| /// | ||
| /// Returns after exhausting the cursor or when one of the builders reaches chunk_size entries, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the builder becomes full?
Who continues the work and when?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is the check for chunk_size?
| /// Partition cursor based on the current policy. | ||
| /// | ||
| /// Assumes that the cursor was partitioned using Policy::Broadcast, i.e., all partitions contain identical data. | ||
| /// The implementation takes advantage of this by having each worker only send data that belongs to the same worker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If they have already the data, why is sending even needed? Don't they just have to delete some of the data they already have?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you doing this because it's cheaper to retract everything and send what is needed because the data structures are immutable? Still, you don't need to "send" anything, the data is already there, every worker should be able to compute what to keep.
|
|
||
| // Retract the contents of the integral by sending it with negated weights to the accumulator | ||
| // in the same worker. | ||
| while integral_cursor.key_valid() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the loop which continues sending data after builders become full?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so this does not clear the integral, it just sends the negation to the consumers?
|
It seems that the job of the SQL compiler will be minimal: just create the right type of join (adaptive or non-adaptive), where adaptive joins are allowed only in the toplevel circuit. |
|
Sorry pressed wrong button |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements adaptive joins in DBSP, introducing a balancing system that dynamically selects optimal partitioning policies for join operations based on runtime characteristics like data skew and size. The implementation includes a MaxSAT solver for policy selection, metadata exchange between workers, and comprehensive test coverage.
Key Changes:
- Introduces a
Balancercomponent that manages partitioning policies for joins across the circuit - Adds
accumulate_trace_balancedoperation that combines exchange, accumulation, and integration with dynamic rebalancing - Implements MaxSAT solver for selecting optimal balancing policies under hard constraints
Reviewed changes
Copilot reviewed 66 out of 69 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| crates/dbsp/src/utils/graph.rs | New graph utilities for computing connected components |
| crates/dbsp/src/operator/dynamic/balance/*.rs | Core balancing infrastructure including MaxSAT solver and balancer logic |
| crates/dbsp/src/operator/dynamic/join.rs | Added balanced join variants |
| crates/dbsp/src/operator/dynamic/outer_join.rs | Added balanced left join variants |
| crates/dbsp/src/circuit/circuit_builder.rs | Circuit API extensions for balancer integration |
| crates/dbsp/src/circuit/runtime.rs | Added metadata broadcast mechanism |
| crates/dbsp/src/typed_batch.rs | Moved iter() method to trait for broader accessibility |
Comments suppressed due to low confidence (3)
crates/dbsp/src/operator/dynamic/balance/test.rs:1
- The first argument should be
falseto match the function name 'no_checkpoints'. Currently passingtruefor checkpoints parameter contradicts the test name.
crates/dbsp/src/operator/dynamic/balance/test.rs:1 - The first argument should be
falseto match the function name 'no_checkpoints'. Currently passingtruefor checkpoints parameter contradicts the test name.
crates/dbsp/src/operator/dynamic/balance/test.rs:1 - The second argument should be
trueto match the function calling a left join test, and the test expects checkpoints. The arguments appear to be incorrect.
| } | ||
| } | ||
|
|
||
| // TODO: use a better type thank serde_json::Value. |
Copilot
AI
Dec 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'thank' to 'than'
| // TODO: use a better type thank serde_json::Value. | |
| // TODO: use a better type than serde_json::Value. |
bbfa8ae to
653e816
Compare
This fixes the following dealock: - Output connector initialization fails. - Before exiting, the controller Drop's DBSPHandle, which waits for all worker threads and aux threads to exit. Aux threads are used with connectors that have output buffers. - However since nothing wakes up the aux thread, it remains parked. The fix is for DBSPHandle::kill to unpark aux thread. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
We had two functions that initialized logging for testing purposes in the tree. We unify them under `utils`, so we can use logging in new tests as well. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Fixes #4632. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
…t metric Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
Fix tooltip styles to not take up full height regardless of content Made table headers sticky Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
653e816 to
23c4ad6
Compare
|
Fixes #4632.
Fixes #5333
Details coming up