# Copyright (c) 2025-2026 Long Horizon Observatory
# Licensed under the Apache License, Version 2.0. See LICENSE file for details.
import logging
from pathlib import Path
from typing import Any
import msgpack
import numpy as np
import pandas as pd
import zmq
from ..core.types import SimulationConfig
[docs]
class ZMQPublisher:
"""
Publishes sensor data and simulation state via ZeroMQ.
Connects to the NVIDIA Omniverse extension using MessagePack.
"""
def __init__(self, config: SimulationConfig, debug_level: int = 0) -> None:
self.config = config
self.debug_level = debug_level
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUB)
# Optimize socket for high throughput / low latency
# LINGER: 0 to discard messages on close immediately
self.socket.setsockopt(zmq.LINGER, 0)
# SNDHWM: High Water Mark (limit queue size)
self.socket.setsockopt(zmq.SNDHWM, 10000)
self.address = f"tcp://{self.config.zmq.host}:{self.config.zmq.port}"
self.socket.bind(self.address)
self.logger = logging.getLogger(__name__)
self.logger.info(f"ZMQ Publisher bound to {self.address} (using MessagePack)")
# Load metadata for auto-correction of tag_id/species
self._id_to_species: dict[str, str] = {}
self._load_metadata()
def _load_metadata(self) -> None:
"""Loads species mapping from biologger_meta.csv if available."""
# The SimulationConfig might be the pydantic model or a nested structure
# Depending on mode (LAB vs SIMULATION)
meta_path = None
if hasattr(self.config, "simulation") and self.config.simulation:
meta_path = getattr(self.config.simulation, "meta_file", None)
elif hasattr(self.config, "meta_file"):
meta_path = getattr(self.config, "meta_file", None)
if not meta_path:
return
# Handle relative paths from config
if not Path(meta_path).is_absolute():
# Assume relative to project root or config location
# For now just try to find it
search_paths = [
Path(meta_path),
Path("datasets/biologger_meta.csv"),
Path("data/biologger_meta.csv"),
]
for p in search_paths:
if p.exists():
meta_path = p
break
try:
p = Path(meta_path)
if p.exists():
df = pd.read_csv(p)
for _, row in df.iterrows():
tid = row.get("tag_id")
sp = row.get("species")
if tid and sp:
self._id_to_species[str(tid).lower().strip()] = sp
self.logger.info(
f"ZMQ Publisher loaded {len(self._id_to_species)} metadata entries"
)
except Exception as e:
self.logger.warning(f"ZMQ Publisher failed to load metadata: {e}")
def _default_converter(self, o: Any) -> Any:
"""Fallback for numpy types during MessagePack packing."""
if isinstance(o, np.integer):
return int(o)
if isinstance(o, np.floating):
return float(o)
if isinstance(o, np.ndarray):
return o.tolist()
if hasattr(o, "isoformat"): # datetime
return o.isoformat()
return str(o)
[docs]
def publish(self, topic: str, data: dict[str, Any]) -> None:
"""
Publishes a message to a specific topic using MessagePack.
Args:
topic: The topic string (e.g., "sensor/imu", "sim/state").
data: The data dictionary to publish.
"""
try:
packed = msgpack.packb(data, use_bin_type=True)
except TypeError:
packed = msgpack.packb(data, default=self._default_converter, use_bin_type=True)
self.socket.send_multipart([topic.encode(), packed])
[docs]
def publish_state(
self, eid: int, sim_id: str, tag_id: str | None, state: dict[str, Any]
) -> None:
"""
Publishes the simulation state to Omniverse in the expected format.
Sends Euler angles (degrees) for the receiver to handle rotation.
Args:
eid: Integer entity identifier.
sim_id: String simulation identifier (e.g., "SF_RED001_v2").
state: Dictionary containing 'pitch_degrees', 'roll_degrees', 'heading_degrees',
'X_Dynamic', 'Y_Dynamic', 'Z_Dynamic', 'VeDBA'.
"""
# Explicit float conversion for speed
roll = float(state.get("roll_degrees", 0.0) or 0.0)
pitch = float(state.get("pitch_degrees", 0.0) or 0.0)
heading = float(state.get("heading_degrees", 0.0) or 0.0)
if np.isnan(roll):
roll = 0.0
if np.isnan(pitch):
pitch = 0.0
if np.isnan(heading):
heading = 0.0
# Physics extraction with explicit casting
x_dyn = float(state.get("X_Dynamic", 0.0) or 0.0)
y_dyn = float(state.get("Y_Dynamic", 0.0) or 0.0)
z_dyn = float(state.get("Z_Dynamic", 0.0) or 0.0)
x_static = float(state.get("X_Static", 0.0) or 0.0)
y_static = float(state.get("Y_Static", 0.0) or 0.0)
z_static = float(state.get("Z_Static", 0.0) or 0.0)
vedba = float(state.get("VeDBA", 0.0) or 0.0)
odba = float(state.get("ODBA", 0.0) or 0.0)
depth = float(state.get("Depth", 0.0) or 0.0)
velocity = float(state.get("velocity", 0.0) or 0.0)
v_velocity = float(state.get("vertical_velocity", 0.0) or 0.0)
pseudo_x = float(state.get("X_Track", state.get("pseudo_x", 0.0)) or 0.0)
pseudo_y = float(state.get("Y_Track", state.get("pseudo_y", 0.0)) or 0.0)
timestamp = float(state.get("timestamp", 0.0) or 0.0)
clock_drift_sec = float(state.get("clock_drift_sec", 0.0) or 0.0)
# Construct efficient payload (short keys for msgpack optimization)
# Auto-resolve species if tag_id is valid
species = "unknown"
if tag_id:
species = self._id_to_species.get(str(tag_id).lower().strip(), "unknown")
payload = {
"eid": eid,
"sim_id": sim_id,
"tag_id": tag_id,
"sp": species,
"ts": timestamp,
"rot": [roll, pitch, heading],
"phys": {
"dacc": [x_dyn, y_dyn, z_dyn],
"sacc": [x_static, y_static, z_static],
"vedba": vedba,
"odba": odba,
"d": depth,
"v": velocity,
"vv": v_velocity,
"px": pseudo_x,
"py": pseudo_y,
"cd": clock_drift_sec,
},
}
packed = msgpack.packb(payload, use_bin_type=True)
topic = self.config.zmq.topic
if self.debug_level >= 2:
self.logger.debug(f"ZMQ Send [eid={eid}]: {payload}")
self.socket.send_multipart([topic.encode(), packed])
[docs]
def close(self) -> None:
"""Clean up ZMQ resources."""
self.socket.close()
self.context.term()