Skip to content

Conversation

@ryzhyk
Copy link
Contributor

@ryzhyk ryzhyk commented Dec 15, 2025

Fixes #4632.
Fixes #5333

Details coming up

Copy link
Contributor

@mihaibudiu mihaibudiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I get a general picture.
So the repartitioning is blocking for the current step?

/// Read all incoming messages for `receiver`.
///
/// Values are passed to callback function `cb`.
/// Values are passed to callback function `cb` in the order of worker indexes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there an debug_assert for that?

//! # use csv::Reader;
//! # use dbsp::utils::{Tup2, Tup3};
//! # use dbsp::{OrdIndexedZSet, OutputHandle, RootCircuit, ZSetHandle, ZWeight};
//! # use dbsp::{OrdIndexedZSet, OutputHandle, RootCircuit, ZSetHandle, ZWeight, IndexedZSetReader};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't it strange that this is not used anywhere?

Ok(Ok(resp)) => resp,
};

// Receive responses.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is not very informative

}
}

/// A sink operator that consumes two input streams, but does not produce
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

three input streams?

/// The storage directory supplied does not match the runtime circuit.
IncompatibleStorage,
/// Error deserializing checkpointed state.
CheckpointParseError(String),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how come this wasn't needed before?

}

/// Total weight of records across all shards.
pub fn total_records(&self) -> ZWeight {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be maintained incrementally


/// The policy that was used to accumulate the integral before the rebalancing.
/// This is the policy used during the previous transaction.
trace_policy: PartitioningPolicy,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previous_trace_policy

/// Assumes that the cursor was partitioned using Policy::Shard or Policy::Balance, i.e., partitions
/// in different workers don't overlap.
///
/// Returns after exhausting the cursor or when one of the builders reaches chunk_size entries,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the builder becomes full?
Who continues the work and when?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is the check for chunk_size?

/// Partition cursor based on the current policy.
///
/// Assumes that the cursor was partitioned using Policy::Broadcast, i.e., all partitions contain identical data.
/// The implementation takes advantage of this by having each worker only send data that belongs to the same worker
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If they have already the data, why is sending even needed? Don't they just have to delete some of the data they already have?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you doing this because it's cheaper to retract everything and send what is needed because the data structures are immutable? Still, you don't need to "send" anything, the data is already there, every worker should be able to compute what to keep.


// Retract the contents of the integral by sending it with negated weights to the accumulator
// in the same worker.
while integral_cursor.key_valid() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the loop which continues sending data after builders become full?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this does not clear the integral, it just sends the negation to the consumers?

@mihaibudiu
Copy link
Contributor

It seems that the job of the SQL compiler will be minimal: just create the right type of join (adaptive or non-adaptive), where adaptive joins are allowed only in the toplevel circuit.

@mihaibudiu mihaibudiu marked this pull request as ready for review December 17, 2025 00:48
Copilot AI review requested due to automatic review settings December 17, 2025 00:48
@mihaibudiu mihaibudiu marked this pull request as draft December 17, 2025 00:48
@mihaibudiu
Copy link
Contributor

Sorry pressed wrong button

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements adaptive joins in DBSP, introducing a balancing system that dynamically selects optimal partitioning policies for join operations based on runtime characteristics like data skew and size. The implementation includes a MaxSAT solver for policy selection, metadata exchange between workers, and comprehensive test coverage.

Key Changes:

  • Introduces a Balancer component that manages partitioning policies for joins across the circuit
  • Adds accumulate_trace_balanced operation that combines exchange, accumulation, and integration with dynamic rebalancing
  • Implements MaxSAT solver for selecting optimal balancing policies under hard constraints

Reviewed changes

Copilot reviewed 66 out of 69 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
crates/dbsp/src/utils/graph.rs New graph utilities for computing connected components
crates/dbsp/src/operator/dynamic/balance/*.rs Core balancing infrastructure including MaxSAT solver and balancer logic
crates/dbsp/src/operator/dynamic/join.rs Added balanced join variants
crates/dbsp/src/operator/dynamic/outer_join.rs Added balanced left join variants
crates/dbsp/src/circuit/circuit_builder.rs Circuit API extensions for balancer integration
crates/dbsp/src/circuit/runtime.rs Added metadata broadcast mechanism
crates/dbsp/src/typed_batch.rs Moved iter() method to trait for broader accessibility
Comments suppressed due to low confidence (3)

crates/dbsp/src/operator/dynamic/balance/test.rs:1

  • The first argument should be false to match the function name 'no_checkpoints'. Currently passing true for checkpoints parameter contradicts the test name.
    crates/dbsp/src/operator/dynamic/balance/test.rs:1
  • The first argument should be false to match the function name 'no_checkpoints'. Currently passing true for checkpoints parameter contradicts the test name.
    crates/dbsp/src/operator/dynamic/balance/test.rs:1
  • The second argument should be true to match the function calling a left join test, and the test expects checkpoints. The arguments appear to be incorrect.

}
}

// TODO: use a better type thank serde_json::Value.
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'thank' to 'than'

Suggested change
// TODO: use a better type thank serde_json::Value.
// TODO: use a better type than serde_json::Value.

Copilot uses AI. Check for mistakes.
ryzhyk and others added 8 commits December 24, 2025 15:47
This fixes the following dealock:

- Output connector initialization fails.
- Before exiting, the controller Drop's DBSPHandle, which waits for
  all worker threads and aux threads to exit. Aux threads are used with
  connectors that have output buffers.
- However since nothing wakes up the aux thread, it remains parked.

The fix is for DBSPHandle::kill to unpark aux thread.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
We had two functions that initialized logging for testing purposes in the tree.
We unify them under `utils`, so we can use logging in new tests as well.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Fixes #4632.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
…t metric

Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
Fix tooltip styles to not take up full height regardless of content

Made table headers sticky

Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
@Karakatiza666
Copy link
Contributor

  • There was an issue where pinned "top nodes" tooltip would not update when choosing another metric. I fixed it
  • Fixed an issue where the tooltip took up the entire height of the diagram regardless of content
  • Made table headers and category headers in the tooltip table sticky in web-console
  • Made sure the fixes to "top nodes" button work in profiler-app

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

DBSP core Related to the core DBSP library

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[profiler] The top-20 feature only works for the CPU% metric [RFC] Adaptive joins

4 participants