Advanced Stream Handling
Prerequisite: Read ReactiveX Fundamentals first for Observable basics.
Backpressure and Parallel Subscribers to Hardware
In robotics, we deal with hardware that produces data at its own pace - a camera outputs 30fps whether you’re ready or not. We can’t tell the camera to slow down. And we often have multiple consumers: one module wants every frame for recording, another runs slow ML inference and only needs the latest frame. The problem: A fast producer can overwhelm a slow consumer, causing memory buildup or dropped frames. We might have multiple subscribers to the same hardware that operate at different speeds.backpressure() wrapper handles this by:
- Sharing the source - Camera runs once, all subscribers share the stream
- Per-subscriber speed - Fast subscribers get every frame, slow ones get the latest when ready
- No blocking - Slow subscribers never block the source or each other
session=bp
How it works
LATEST strategy means: when the slow subscriber finishes processing, it gets whatever the most recent value is, skipping any values that arrived while it was busy.
Usage in modules
Most module streams offer backpressured observables.session=bp
Getting Values Synchronously
Sometimes you don’t want a stream, you just want to call a function and get the latest value. If you are doing this periodically as a part of a processing loop, it is very likely that your code will be much cleaner and safer using actual reactivex pipeline. So bias towards checking our reactivex quick guide and official docs (TODO we should actually make this example actually executable)skip
getter_hot() and getter_cold()
getter_hot() | getter_cold() | |
|---|---|---|
| Subscription | Stays active in background | Fresh subscription each call |
| Read speed | Instant (value already cached) | Slower (waits for value) |
| Resources | Keeps connection open | Opens/closes each call |
| Use when | Frequent reads, need latest | Occasional reads, save resources |
getter_cold() when you can afford to wait and warmup isn’t expensive. It’s simpler (no cleanup needed) and doesn’t hold resources. Only use getter_hot() when you need instant reads or the source is expensive to start.
getter_hot() - Background subscription, instant reads
Subscribes immediately and keeps updating in the background. Each call returns the cached latest value instantly.
session=sync
getter_cold() - Fresh subscription each call
Each call creates a new subscription, waits for one value, and cleans up. Slower but doesn’t hold resources:
session=sync
