rxRust is a zero-cost, type-safe Rust implementation of Reactive Extensions. It brings Functional Reactive Programming (FRP) to Rust, enabling a declarative coding style for handling asynchronous events, stream processing, and concurrency.
It leverages Rust's ownership system to ensure memory safety and efficient resource usage, making it ideal for build robust event-driven applications.
- Zero-Cost Abstractions: Heavily relies on generic specialization and monomorphization to compile down to efficient code, ensuring minimal overhead for your asynchronous streams.
- Pay-for-what-you-need: Choose the right tool for the job. Use
Local(Rc/RefCell) for single-threaded performance without locking overhead, orShared(Arc/Mutex) when thread synchronization is actually required. - Unified Logic, Adaptive Context: Write your stream logic once. The same operator chains adapt seamlessly to different environments based on the context they run in.
- Interoperability: Seamlessly integrates with Rust
Futures, streams, andasync/await. - Wasm Support: Works out of the box on WebAssembly.
Add this to your Cargo.toml:
[dependencies]
rxrust = "1.0.0-rc.0"rxRust v1.0 unifies its API and implementation logic, while the Context (environment) provides the most suitable resource types at compile-time. The API remains consistent regardless of whether you are in a single-threaded or multi-threaded environment.
use rxrust::prelude::*;
fn main() {
// π’ Local Context: No locks, high performance for single thread
Local::from_iter(0..10)
.filter(|v| v % 2 == 0)
.subscribe(|v| println!("Local Even: {}", v));
// π΅ Shared Context: Thread-safe, ready for concurrency
Shared::from_iter(0..10)
.map(|v| v * 2)
.subscribe(|v| println!("Shared Doubled: {}", v));
}The Context determines the execution strategy and memory management:
Local: No Locking. UsesRcandRefCell. Ideal for UI threads, WASM, or single-threaded event loops. The compiler ensures these types don't leak across threads.Shared: Thread Safe. UsesArcandMutex. Required when streams need to jump across threads or share state globally.
Control when and where work happens using schedulers. By default, rxRust schedulers rely on tokio. You can also disable the default scheduler feature to implement a custom scheduler adapted to your own runtime.
use rxrust::prelude::*;
#[tokio::main(flavor = "local")]
async fn main() {
// Emit a value after 100ms
Local::timer(Duration::from_millis(100))
.subscribe(|_| println!("Tick!"));
// Throttle a busy stream
Local::interval(Duration::from_millis(10))
.throttle_time(Duration::from_millis(100), ThrottleEdge::leading())
.subscribe(|v| println!("Throttled: {}", v));
}Note: Local context schedulers require running within a LocalSet or a compatible LocalRuntime.
Subjects allow you to multicast events to multiple subscribers. The Subject type automatically adapts its internal storage (Rc<RefCell<...>> vs Arc<Mutex<...>>) based on the context used to create it.
use rxrust::prelude::*;
// Created in a Local context, this Subject uses RefCell internally (no Mutex)
let mut subject = Local::subject();
subject.clone().subscribe(|v| println!("Observer A: {}", v));
subject.clone().subscribe(|v| println!("Observer B: {}", v));
subject.next(1);
subject.next(2);For a deeper dive into core concepts, advanced architecture, and cookbooks, check out our rxRust Online Guide & Documentation.
rxRust targets stable Rust by default.
For advanced use-cases involving lifetime-dependent mapped outputs (e.g. &'a mut T -> &'a U), map provides an experimental implementation behind the Cargo feature nightly.
- Enable:
rxrust = { version = "1.0.0-rc.0", features = ["nightly"] } - Build with:
cargo +nightly test --features nightly
Once the relevant language support (like GATs stabilization in specific patterns) is mature, we plan to expand this support.
We welcome contributions! rxRust is a community-driven project.
- Check out Missing Features to see what operators are needed.
- Look for "Good First Issue" tags on GitHub.
- Improve documentation or add examples.
MIT License
