global_map = pickle.loads(get_data("unitree_go2_bigoffice_map.pickle").read_bytes())drawing = Space()# this is not necessary but we use a global map as a nice base for a drawingdrawing.add(global_map)drawing.add(store.streams.color_image)drawing.to_svg("assets/color_image.svg")
our drawing system applies turbo color scheme to timestamps by defaultwe can create new streams by querying existing streams, and we can save, further transform or draw those
Python
drawing = Space()drawing.add(global_map)drawing.add( store.streams.color_image \ # calculate speed in m/s by checking distance between poses and timestamps of observations .transform(speed()) \ # rolling window average .transform(smooth(50)))drawing.to_svg("assets/speed.svg")
we can do all kinds of things with this, for example map out room lighting
Python
drawing = Space()drawing.add(global_map)drawing.add( store.streams.color_image \ # here we will take 4fps because brightness calculation loads the actual image # observation.data triggers another db query to fetch the data # otherwise observations only hold positions and timestamps .transform(throttle(0.25)) \ # we calculate brightness .map(lambda obs: obs.derive(data=obs.data.brightness)))drawing.to_svg("assets/brightness.svg")
So knowing above, we can create embeddings for the full stream,
Python
from dimos.models.embedding.clip import CLIPModelfrom dimos.msgs.sensor_msgs.Image import Imagefrom dimos.memory2.transform import QualityWindowfrom dimos.memory2.embed import EmbedImagesembedded = store.stream("color_image_embedded", Image)clip = CLIPModel()# Downsample to 2Hz, filter dark images, then embedpipeline = ( store.streams.color_image.filter(lambda obs: obs.data.brightness > 0.1) .transform(QualityWindow(lambda img: img.sharpness, window=0.5)) .transform(EmbedImages(clip)) .save(embedded))print(pipeline)
this pipeline is ready to execute by lazy, we can execute it by iterating, or calling .drain()
skip
for obs in pipeline: print(f" [{count}] ts={obs.ts:.2f} pose={obs.pose}")
let’s query it!
Python
from dimos.models.embedding.clip import CLIPModeldrawing = Space()drawing.add(global_map)clip = CLIPModel()search_vector = clip.embed_text("shop")drawing.add(store.streams.color_image_embedded.search(search_vector))drawing.to_svg("assets/embedding.svg")
We don’t really have to deal with the whole global map actually, let’s get top 10 embeddings, and render only lidar around those.
Python
from dimos.models.embedding.clip import CLIPModelfrom dimos.mapping.voxels import VoxelMapTransformerdrawing = Space()# this is defined here, but not executedmatches = store.streams.color_image_embedded.search(search_vector, k=30)print(matches) # Stream("color_image_embedded") | vector_search(k=50)# here we execute it once, and feed it into a global mapper, then draw the mapdrawing.add( matches.map(lambda obs: store.streams.lidar.at(obs.ts).last()) \ .transform(VoxelMapTransformer()) \ .last().data)# then we add matches to the mapdrawing.add(matches)drawing.to_svg("assets/embedding_focused.svg")
Stream("color_image_embedded") | vector_search(k=30)13:15:15.190 [inf][dimos/mapping/voxels.py ] VoxelGrid using device: CUDA:0