Skip to main content

Temporal Message Alignment

Robots have multiple sensors emitting data at different rates and latencies. A camera might run at 30fps, while lidar scans at 10Hz, and each has different processing delays. For perception tasks like projecting 2D detections into 3D pointclouds, we need to match data from these streams by timestamp. align_timestamped solves this by buffering messages and matching them within a time tolerance. output

Basic Usage

Below we set up replay of real camera and lidar data from the Unitree Go2 robot. You can check it if you’re interested. Streams would normally come from an actual robot into your module via In inputs. detection/module3D.py is a good example of this. Assume we have them. Let’s align them.
skip session=align
# Align video (primary) with lidar (secondary)
# match_tolerance: max time difference for a match (seconds)
# buffer_size: how long to keep messages waiting for matches (seconds)
aligned_pairs = align_timestamped(
    video_stream,
    lidar_stream,
    match_tolerance=0.025,  # 25ms tolerance
    buffer_size=5.0, # how long to wait for match
).pipe(ops.to_list()).run()

print(f"Video: {len(video_frames)} frames, Lidar: {len(lidar_scans)} scans")
print(f"Aligned pairs: {len(aligned_pairs)} out of {len(video_frames)} video frames")

# Show a matched pair
if aligned_pairs:
    img, pc = aligned_pairs[0]
    dt = abs(img.ts - pc.ts)
    print(f"\nFirst matched pair: Δ{dt*1000:.1f}ms")
Video: 29 frames, Lidar: 15 scans
Aligned pairs: 11 out of 29 video frames

First matched pair: Δ11.3ms
skip session=align output=assets/alignment_timeline.png
plot_alignment_timeline(video_frames, lidar_scans, aligned_pairs, '{output}')
output If we loosen up our match tolerance, we might get multiple pairs matching the same lidar frame.
skip session=align
aligned_pairs = align_timestamped(
    video_stream,
    lidar_stream,
    match_tolerance=0.05,  # 50ms tolerance
    buffer_size=5.0, # how long to wait for match
).pipe(ops.to_list()).run()

print(f"Video: {len(video_frames)} frames, Lidar: {len(lidar_scans)} scans")
print(f"Aligned pairs: {len(aligned_pairs)} out of {len(video_frames)} video frames")
Video: 58 frames, Lidar: 30 scans
Aligned pairs: 23 out of 58 video frames
skip session=align output=assets/alignment_timeline2.png
plot_alignment_timeline(video_frames, lidar_scans, aligned_pairs, '{output}')
output

Combine Frame Alignment with a Quality Filter

More on quality filtering here.
skip session=align
from dimos.msgs.sensor_msgs.Image import Image, sharpness_barrier

# Lists to collect items as they flow through streams
video_frames = []
lidar_scans = []

video_stream = video_replay.stream(from_timestamp=seek_ts, duration=2.0).pipe(
    sharpness_barrier(3.0),
    ops.do_action(lambda x: video_frames.append(x))
)

lidar_stream = lidar_replay.stream(from_timestamp=seek_ts, duration=2.0).pipe(
    ops.do_action(lambda x: lidar_scans.append(x))
)

aligned_pairs = align_timestamped(
    video_stream,
    lidar_stream,
    match_tolerance=0.025,  # 25ms tolerance
    buffer_size=5.0, # how long to wait for match
).pipe(ops.to_list()).run()

print(f"Video: {len(video_frames)} frames, Lidar: {len(lidar_scans)} scans")
print(f"Aligned pairs: {len(aligned_pairs)} out of {len(video_frames)} video frames")

Video: 6 frames, Lidar: 15 scans
Aligned pairs: 1 out of 6 video frames
skip session=align output=assets/alignment_timeline3.png
plot_alignment_timeline(video_frames, lidar_scans, aligned_pairs, '{output}')
output We are very picky but data is high quality. Best frame, with closest lidar match in this window.

How It Works

The primary stream (first argument) drives emissions. When a primary message arrives:
  1. Immediate match: If matching secondaries already exist in buffers, emit immediately
  2. Deferred match: If secondaries are missing, buffer the primary and wait
When secondary messages arrive:
  1. Add to buffer for future primary matches
  2. Check buffered primaries - if this completes a match, emit
output

Parameters

ParameterTypeDefaultDescription
primary_observableObservable[T]requiredPrimary stream that drives output timing
*secondary_observablesObservable[S]...requiredOne or more secondary streams to align
match_tolerancefloat0.1Max time difference for a match (seconds)
buffer_sizefloat1.0How long to buffer unmatched messages (seconds)

Usage in Modules

Every module In port exposes an .observable() method that returns a backpressured stream of incoming messages. This makes it easy to align inputs from multiple sensors. From detection/module3D.py, projecting 2D detections into 3D pointclouds:
skip
class Detection3DModule(Detection2DModule):
    color_image: In[Image]
    pointcloud: In[PointCloud2]

    def start(self):
        # Align 2D detections with pointcloud data
        self.detection_stream_3d = align_timestamped(
            backpressure(self.detection_stream_2d()),
            self.pointcloud.observable(),
            match_tolerance=0.25,
            buffer_size=20.0,
        ).pipe(ops.map(detection2d_to_3d))
The 2D detection stream (camera + ML model) is the primary, matched with raw pointcloud data from lidar. The longer buffer_size=20.0 accounts for variable ML inference times.