Source code for biologger_sim.io.zmq_publisher

# Copyright (c) 2025-2026 Long Horizon Observatory
# Licensed under the Apache License, Version 2.0. See LICENSE file for details.

import logging
from pathlib import Path
from typing import Any

import msgpack
import numpy as np
import pandas as pd
import zmq

from ..core.types import SimulationConfig


[docs] class ZMQPublisher: """ Publishes sensor data and simulation state via ZeroMQ. Connects to the NVIDIA Omniverse extension using MessagePack. """ def __init__(self, config: SimulationConfig, debug_level: int = 0) -> None: self.config = config self.debug_level = debug_level self.context = zmq.Context() self.socket = self.context.socket(zmq.PUB) # Optimize socket for high throughput / low latency # LINGER: 0 to discard messages on close immediately self.socket.setsockopt(zmq.LINGER, 0) # SNDHWM: High Water Mark (limit queue size) self.socket.setsockopt(zmq.SNDHWM, 10000) self.address = f"tcp://{self.config.zmq.host}:{self.config.zmq.port}" self.socket.bind(self.address) self.logger = logging.getLogger(__name__) self.logger.info(f"ZMQ Publisher bound to {self.address} (using MessagePack)") # Load metadata for auto-correction of tag_id/species self._id_to_species: dict[str, str] = {} self._load_metadata() def _load_metadata(self) -> None: """Loads species mapping from biologger_meta.csv if available.""" # The SimulationConfig might be the pydantic model or a nested structure # Depending on mode (LAB vs SIMULATION) meta_path = None if hasattr(self.config, "simulation") and self.config.simulation: meta_path = getattr(self.config.simulation, "meta_file", None) elif hasattr(self.config, "meta_file"): meta_path = getattr(self.config, "meta_file", None) if not meta_path: return # Handle relative paths from config if not Path(meta_path).is_absolute(): # Assume relative to project root or config location # For now just try to find it search_paths = [ Path(meta_path), Path("datasets/biologger_meta.csv"), Path("data/biologger_meta.csv"), ] for p in search_paths: if p.exists(): meta_path = p break try: p = Path(meta_path) if p.exists(): df = pd.read_csv(p) for _, row in df.iterrows(): tid = row.get("tag_id") sp = row.get("species") if tid and sp: self._id_to_species[str(tid).lower().strip()] = sp self.logger.info( f"ZMQ Publisher loaded {len(self._id_to_species)} metadata entries" ) except Exception as e: self.logger.warning(f"ZMQ Publisher failed to load metadata: {e}") def _default_converter(self, o: Any) -> Any: """Fallback for numpy types during MessagePack packing.""" if isinstance(o, np.integer): return int(o) if isinstance(o, np.floating): return float(o) if isinstance(o, np.ndarray): return o.tolist() if hasattr(o, "isoformat"): # datetime return o.isoformat() return str(o)
[docs] def publish(self, topic: str, data: dict[str, Any]) -> None: """ Publishes a message to a specific topic using MessagePack. Args: topic: The topic string (e.g., "sensor/imu", "sim/state"). data: The data dictionary to publish. """ try: packed = msgpack.packb(data, use_bin_type=True) except TypeError: packed = msgpack.packb(data, default=self._default_converter, use_bin_type=True) self.socket.send_multipart([topic.encode(), packed])
[docs] def publish_state( self, eid: int, sim_id: str, tag_id: str | None, state: dict[str, Any] ) -> None: """ Publishes the simulation state to Omniverse in the expected format. Sends Euler angles (degrees) for the receiver to handle rotation. Args: eid: Integer entity identifier. sim_id: String simulation identifier (e.g., "SF_RED001_v2"). state: Dictionary containing 'pitch_degrees', 'roll_degrees', 'heading_degrees', 'X_Dynamic', 'Y_Dynamic', 'Z_Dynamic', 'VeDBA'. """ # Explicit float conversion for speed roll = float(state.get("roll_degrees", 0.0) or 0.0) pitch = float(state.get("pitch_degrees", 0.0) or 0.0) heading = float(state.get("heading_degrees", 0.0) or 0.0) if np.isnan(roll): roll = 0.0 if np.isnan(pitch): pitch = 0.0 if np.isnan(heading): heading = 0.0 # Physics extraction with explicit casting x_dyn = float(state.get("X_Dynamic", 0.0) or 0.0) y_dyn = float(state.get("Y_Dynamic", 0.0) or 0.0) z_dyn = float(state.get("Z_Dynamic", 0.0) or 0.0) x_static = float(state.get("X_Static", 0.0) or 0.0) y_static = float(state.get("Y_Static", 0.0) or 0.0) z_static = float(state.get("Z_Static", 0.0) or 0.0) vedba = float(state.get("VeDBA", 0.0) or 0.0) odba = float(state.get("ODBA", 0.0) or 0.0) depth = float(state.get("Depth", 0.0) or 0.0) velocity = float(state.get("velocity", 0.0) or 0.0) v_velocity = float(state.get("vertical_velocity", 0.0) or 0.0) pseudo_x = float(state.get("X_Track", state.get("pseudo_x", 0.0)) or 0.0) pseudo_y = float(state.get("Y_Track", state.get("pseudo_y", 0.0)) or 0.0) timestamp = float(state.get("timestamp", 0.0) or 0.0) clock_drift_sec = float(state.get("clock_drift_sec", 0.0) or 0.0) # Construct efficient payload (short keys for msgpack optimization) # Auto-resolve species if tag_id is valid species = "unknown" if tag_id: species = self._id_to_species.get(str(tag_id).lower().strip(), "unknown") payload = { "eid": eid, "sim_id": sim_id, "tag_id": tag_id, "sp": species, "ts": timestamp, "rot": [roll, pitch, heading], "phys": { "dacc": [x_dyn, y_dyn, z_dyn], "sacc": [x_static, y_static, z_static], "vedba": vedba, "odba": odba, "d": depth, "v": velocity, "vv": v_velocity, "px": pseudo_x, "py": pseudo_y, "cd": clock_drift_sec, }, } packed = msgpack.packb(payload, use_bin_type=True) topic = self.config.zmq.topic if self.debug_level >= 2: self.logger.debug(f"ZMQ Send [eid={eid}]: {payload}") self.socket.send_multipart([topic.encode(), packed])
[docs] def close(self) -> None: """Clean up ZMQ resources.""" self.socket.close() self.context.term()