ReactiveX (RxPY) Quick Reference
RxPY provides composable asynchronous data streams. This is a practical guide focused on common patterns in this codebase.Quick Start: Using an Observable
Given a function that returns anObservable, here’s how to use it:
session=rx
The .pipe() Pattern
Chain operators using .pipe():
session=rx
Common Operators
Transform: map
session=rx
Filter: filter
session=rx
Limit emissions: take
session=rx
Flatten nested observables: flat_map
session=rx
Rate Limiting
sample(interval) - Emit latest value every N seconds
Takes the most recent value at each interval. Good for continuous streams where you want the freshest data.
session=rx
throttle_first(interval) - Emit first, then block for N seconds
Takes the first value then ignores subsequent values for the interval. Good for user input debouncing.
session=rx
Difference Between sample and throttle_first
session=rx
What is an Observable?
An Observable is like a list, but instead of holding all values at once, it produces values over time.| List | Iterator | Observable | |
|---|---|---|---|
| Values | All exist now | Generated on demand | Arrive over time |
| Control | You pull (for x in) | You pull (next()) | Pushed to you |
| Size | Finite | Can be infinite | Can be infinite |
| Async | No | Yes (with asyncio) | Yes |
| Cancel | N/A | Stop calling next() | .dispose() |
next().
Observables are lazy. An Observable is just a description of work to be done - it sits there doing nothing until you call .subscribe(). That’s when it “wakes up” and starts producing values.
This means you can build complex pipelines, pass them around, and nothing happens until someone subscribes.
The three things an Observable can tell you:
- “Here’s a value” (
on_next) - A new value arrived - “Something went wrong” (
on_error) - An error occurred, stream stops - “I’m done” (
on_completed) - No more values coming
.subscribe(). This means you can build up complex pipelines without any work being done, then start the flow when ready.
Here’s the full subscribe signature with all three callbacks:
session=rx
Disposables: Cancelling Subscriptions
When you subscribe, you get back aDisposable. This is your “cancel button”:
session=rx
- Observables can be infinite (sensor feeds, websockets, timers)
- Without disposing, you leak memory and keep processing values forever
- Disposing also cleans up any resources the Observable opened (connections, file handles, etc.)
disposable.dispose().
In dimos modules: Every Module has a self._disposables (a CompositeDisposable) that automatically disposes everything when the module closes:
session=rx
Creating Observables
There are two common callback patterns in APIs. Use the appropriate helper:| Pattern | Example | Helper |
|---|---|---|
| Register/unregister with same callback | sensor.register(cb) / sensor.unregister(cb) | callback_to_observable |
| Subscribe returns unsub function | unsub = pubsub.subscribe(cb) | to_observable |
From register/unregister APIs
Usecallback_to_observable when the API has separate register and unregister functions that take the same callback reference:
session=create
From subscribe-returns-unsub APIs
Useto_observable when the subscribe function returns an unsubscribe callable:
session=create
From scratch with rx.create
session=create
CompositeDisposable
As we know we can always dispose subscriptions when done to prevent leaks:session=dispose
CompositeDisposable:
session=dispose
Reference
| Operator | Purpose | Example |
|---|---|---|
map(fn) | Transform each value | ops.map(lambda x: x * 2) |
filter(pred) | Keep values matching predicate | ops.filter(lambda x: x > 0) |
take(n) | Take first n values | ops.take(10) |
first() | Take first value only | ops.first() |
sample(sec) | Emit latest every interval | ops.sample(0.5) |
throttle_first(sec) | Emit first, block for interval | ops.throttle_first(0.5) |
flat_map(fn) | Map + flatten nested observables | ops.flat_map(lambda x: rx.of(x, x)) |
observe_on(sched) | Switch scheduler | ops.observe_on(pool_scheduler) |
replay(n) | Cache last n values for late subscribers | ops.replay(buffer_size=1) |
timeout(sec) | Error if no value within timeout | ops.timeout(5.0) |
