Skip to main content

Advanced Stream Handling

Prerequisite: Read ReactiveX Fundamentals first for Observable basics.

Backpressure and Parallel Subscribers to Hardware

In robotics, we deal with hardware that produces data at its own pace - a camera outputs 30fps whether you’re ready or not. We can’t tell the camera to slow down. And we often have multiple consumers: one module wants every frame for recording, another runs slow ML inference and only needs the latest frame. The problem: A fast producer can overwhelm a slow consumer, causing memory buildup or dropped frames. We might have multiple subscribers to the same hardware that operate at different speeds. output The solution: The backpressure() wrapper handles this by:
  1. Sharing the source - Camera runs once, all subscribers share the stream
  2. Per-subscriber speed - Fast subscribers get every frame, slow ones get the latest when ready
  3. No blocking - Slow subscribers never block the source or each other
session=bp
import time
import reactivex as rx
from reactivex import operators as ops
from reactivex.scheduler import ThreadPoolScheduler
from dimos.utils.reactive import backpressure

# We need this scaffolding here. Normally DimOS handles this.
scheduler = ThreadPoolScheduler(max_workers=4)

# Simulate fast source
source = rx.interval(0.05).pipe(ops.take(20))
safe = backpressure(source, scheduler=scheduler)

fast_results = []
slow_results = []

safe.subscribe(lambda x: fast_results.append(x))

def slow_handler(x):
    time.sleep(0.15)
    slow_results.append(x)

safe.subscribe(slow_handler)

time.sleep(1.5)
print(f"fast got {len(fast_results)} items: {fast_results[:5]}...")
print(f"slow got {len(slow_results)} items (skipped {len(fast_results) - len(slow_results)})")
scheduler.executor.shutdown(wait=True)
fast got 20 items: [0, 1, 2, 3, 4]...
slow got 7 items (skipped 13)

How it works

output The LATEST strategy means: when the slow subscriber finishes processing, it gets whatever the most recent value is, skipping any values that arrived while it was busy.

Usage in modules

Most module streams offer backpressured observables.
session=bp
from dimos.core.module import Module
from dimos.core.stream import In
from dimos.msgs.sensor_msgs import Image

class MLModel(Module):
    color_image: In[Image]
    def start(self):
       # no reactivex, simple callback
       self.color_image.subscribe(...)
       # backpressured
       self.color_image.observable().subscribe(...)
       # non-backpressured - will pile up queue
       self.color_image.pure_observable().subscribe(...)

Getting Values Synchronously

Sometimes you don’t want a stream, you just want to call a function and get the latest value. If you are doing this periodically as a part of a processing loop, it is very likely that your code will be much cleaner and safer using actual reactivex pipeline. So bias towards checking our reactivex quick guide and official docs (TODO we should actually make this example actually executable)
skip
    self.color_image.observable().pipe(
        # takes the best image from a stream every 200ms,
        # ensuring we are feeding our detector with highest quality frames
        quality_barrier(lambda x: x["quality"], target_frequency=0.2),

        # converts Image into Person detections
        ops.map(detect_person),

        # converts Detection2D to Twist pointing in the direction of a detection
        ops.map(detection2d_to_twist),

        # emits the latest value every 50ms making our control loop run at 20hz
        # despite detections running at 200ms
        ops.sample(0.05),
    ).subscribe(self.twist.publish) # shoots off the Twist out of the module
If you’d still like to switch to synchronous fetching, we provide two approaches, getter_hot() and getter_cold()
getter_hot()getter_cold()
SubscriptionStays active in backgroundFresh subscription each call
Read speedInstant (value already cached)Slower (waits for value)
ResourcesKeeps connection openOpens/closes each call
Use whenFrequent reads, need latestOccasional reads, save resources
output Prefer getter_cold() when you can afford to wait and warmup isn’t expensive. It’s simpler (no cleanup needed) and doesn’t hold resources. Only use getter_hot() when you need instant reads or the source is expensive to start.

getter_hot() - Background subscription, instant reads

Subscribes immediately and keeps updating in the background. Each call returns the cached latest value instantly.
session=sync
import time
import reactivex as rx
from reactivex import operators as ops
from dimos.utils.reactive import getter_hot

source = rx.interval(0.1).pipe(ops.take(10))

get_val = getter_hot(source, timeout=5.0) # blocks until first message, with 5s timeout
# alternatively not to block (but get_val() might return None)
# get_val = getter_hot(source, nonblocking=True)

print("first call:", get_val())  # instant - value already there
time.sleep(0.35)
print("after 350ms:", get_val())  # instant - returns cached latest
time.sleep(0.35)
print("after 700ms:", get_val())

get_val.dispose()  # Don't forget to clean up!
first call: 0
after 350ms: 3
after 700ms: 6

getter_cold() - Fresh subscription each call

Each call creates a new subscription, waits for one value, and cleans up. Slower but doesn’t hold resources:
session=sync
from dimos.utils.reactive import getter_cold

source = rx.of(0, 1, 2, 3, 4)
get_val = getter_cold(source, timeout=5.0)

# Each call creates fresh subscription, gets first value
print("call 1:", get_val())  # subscribes, gets 0, disposes
print("call 2:", get_val())  # subscribes again, gets 0, disposes
print("call 3:", get_val())  # subscribes again, gets 0, disposes
call 1: 0
call 2: 0
call 3: 0