Source code for biologger_sim.processors.streaming

# Copyright (c) 2025-2026 Long Horizon Observatory
# Licensed under the Apache License, Version 2.0. See LICENSE file for details.

import logging
import math
from collections import deque
from typing import Any

from scipy.signal import butter, lfilter, lfilter_zi

from biologger_sim.core.ins import INSSolution
from biologger_sim.core.numeric_utils import safe_float
from biologger_sim.core.processor_interface import BiologgerProcessor
from biologger_sim.functions.depth_smoother import DepthSmoother
from biologger_sim.io.zmq_publisher import ZMQPublisher


[docs] class StreamingProcessor(BiologgerProcessor): """ Causal (real-time) streaming processor for digital twin and on-tag simulation. """ def __init__( self, filt_len: int = 48, freq: int = 16, debug_level: int = 0, locked_attachment_roll_deg: float | None = None, locked_attachment_pitch_deg: float | None = None, locked_mag_offset_x: float | None = None, locked_mag_offset_y: float | None = None, locked_mag_offset_z: float | None = None, locked_mag_sphere_radius: float | None = None, ema_fast_alpha: float = 0.2, ema_slow_alpha: float = 0.02, ema_cross_threshold: float = 0.5, zmq_publisher: ZMQPublisher | None = None, eid: int | None = None, sim_id: str = "default", tag_id: str = "unknown", dead_reckoning_speed_model: str = "odba_scaled", dead_reckoning_constant_speed: float = 1.0, dead_reckoning_odba_factor: float = 2.0, highpass_cutoff: float = 0.1, **kwargs: Any, ) -> None: self.filt_len = filt_len self.freq = freq self.dt = 1.0 / freq self.debug_level = debug_level self.zmq_publisher = zmq_publisher self.eid = eid self.sim_id = sim_id self.tag_id = tag_id self.ema_fast_alpha = ema_fast_alpha self.ema_slow_alpha = ema_slow_alpha self.ema_cross_threshold = ema_cross_threshold self.fast_ema_odba = 0.0 self.slow_ema_odba = 0.0 self.logging_state = "STEADY" # Locked Calibration self.locked_attachment_roll_rad = ( math.radians(locked_attachment_roll_deg) if locked_attachment_roll_deg is not None else None ) self.locked_attachment_pitch_rad = ( math.radians(locked_attachment_pitch_deg) if locked_attachment_pitch_deg is not None else None ) # Lock mag offset if all values provided, else None self.locked_mag_offset: tuple[float, float, float, float] | None = None if all( x is not None for x in [ locked_mag_offset_x, locked_mag_offset_y, locked_mag_offset_z, locked_mag_sphere_radius, ] ): self.locked_mag_offset = ( float(locked_mag_offset_x or 0), float(locked_mag_offset_y or 0), float(locked_mag_offset_z or 0), float(locked_mag_sphere_radius or 1), ) self.accel_buffer: deque[tuple[float, float, float]] = deque(maxlen=filt_len) self.accel_sum = [0.0, 0.0, 0.0] self.gravity = 9.81 # INSSolution provides full Kalman filter depth estimation (match reference) self.ins_solution = INSSolution(sample_rate=freq, biological_constraints=None) self.current_time = 0.0 # Accumulated time for INSSolution self.dead_reckoning_speed_model = dead_reckoning_speed_model self.dead_reckoning_constant_speed = dead_reckoning_constant_speed self.dead_reckoning_odba_factor = dead_reckoning_odba_factor nyquist = 0.5 * freq self.highpass_b, self.highpass_a = butter(2, highpass_cutoff / nyquist, btype="high") self.hp_s1 = 0.0 self.hp_s2 = 0.0 self.highpass_zi_init = False self.record_count = 0 self.pseudo_x = 0.0 self.pseudo_y = 0.0 self.last_timestamp: float | None = None self.logger = logging.getLogger(__name__) if debug_level > 0: self.logger.setLevel(logging.DEBUG) self.depth_smoother = DepthSmoother(freq=freq) self.logger.info(f"StreamingProcessor initialized: filt_len={filt_len}, freq={freq}Hz")
[docs] def reset(self) -> None: self.accel_buffer.clear() self.accel_sum = [0.0, 0.0, 0.0] self.fast_ema_odba = 0.0 self.slow_ema_odba = 0.0 self.record_count = 0 self.pseudo_x = 0.0 self.pseudo_y = 0.0 self.hp_s1 = 0.0 self.hp_s2 = 0.0 self.highpass_zi_init = False self.ins_solution.reset() self.current_time = 0.0
[docs] def process(self, record: dict[str, Any] | Any) -> dict[str, Any]: self.record_count += 1 def get_field(record: dict[str, Any], q: str, u: str) -> float: return safe_float(record.get(q, record.get(u, 0.0))) # 1. Input Acquisition # Data is in 0.1g units ax_m = get_field(record, '"int aX"', "int aX") ay_m = get_field(record, '"int aY"', "int aY") az_m = get_field(record, '"int aZ"', "int aZ") if ax_m is None or ay_m is None or az_m is None: return {} # Ensure data is valid for math operations ax_m = float(ax_m) ay_m = float(ay_m) az_m = float(az_m) depth_raw = get_field(record, '"Depth"', "Depth") ts = record.get("timestamp", 0.0) # 2. Attachment Correction (X then Y) if self.locked_attachment_roll_rad is not None: assert self.locked_attachment_roll_rad is not None assert self.locked_attachment_pitch_rad is not None sr_a, cr_a = ( math.sin(self.locked_attachment_roll_rad), math.cos(self.locked_attachment_roll_rad), ) sp_a, cp_a = ( math.sin(self.locked_attachment_pitch_rad), math.cos(self.locked_attachment_pitch_rad), ) # Xb(roll): y' = y*c - z*s, z' = y*s + z*c ay_r = ay_m * cr_a - az_m * sr_a az_r = ay_m * sr_a + az_m * cr_a # Yb(pitch): x' = x*c - z*s, z' = x*s + z*c ax_att = ax_m * cp_a - az_r * sp_a ay_att = ay_r az_att = ax_m * sp_a + az_r * cp_a else: ax_att, ay_att, az_att = ax_m, ay_m, az_m # 3. Window Management (O(1)) if len(self.accel_buffer) == self.filt_len: old = self.accel_buffer[0] self.accel_sum[0] -= old[0] self.accel_sum[1] -= old[1] self.accel_sum[2] -= old[2] self.accel_buffer.append((ax_att, ay_att, az_att)) self.accel_sum[0] += ax_att self.accel_sum[1] += ay_att self.accel_sum[2] += az_att # Dead Reckoning Timing actual_dt = self.dt if self.last_timestamp is None else ts - self.last_timestamp if actual_dt <= 0: actual_dt = self.dt self.last_timestamp = ts # 4. Gsep & Static Components # During warmup: use raw accel as static approximation, 0 for dynamic # After warmup: proper Gsep separation if len(self.accel_buffer) < self.filt_len: # Warmup: use raw acceleration as static estimate (assumes low motion at start) static_x, static_y, static_z = ax_att, ay_att, az_att # No dynamic separation yet - use 0 (reasonable for deployment start) dyn_x, dyn_y, dyn_z = 0.0, 0.0, 0.0 odba_g = 0.0 else: div = self.filt_len static_x, static_y, static_z = ( self.accel_sum[0] / div, self.accel_sum[1] / div, self.accel_sum[2] / div, ) dyn_x, dyn_y, dyn_z = ax_att - static_x, ay_att - static_y, az_att - static_z odba_g = (abs(dyn_x) + abs(dyn_y) + abs(dyn_z)) / 10.0 # 5. Orientation (R-style) # static_x/y/z are always valid: raw accel during warmup, Gsep-averaged after ax_g, ay_g, az_g = static_x / 10.0, static_y / 10.0, static_z / 10.0 mag_yz = math.sqrt(ay_g**2 + az_g**2) pitch_rad = -math.atan2(ax_g, mag_yz) roll_rad = math.atan2(ay_g, az_g) if abs(az_g) > 1e-6 or abs(ay_g) > 1e-6 else 0.0 sp, cp = math.sin(pitch_rad), math.cos(pitch_rad) sr, cr = math.sin(roll_rad), math.cos(roll_rad) # 6. World-frame vertical acceleration (matching pseudotrack exactly) # Rotation: -sin(p)*ax + cos(p)*sin(r)*ay + cos(p)*cos(r)*az accel_world_z = -sp * ax_g + cp * sr * ay_g + cp * cr * az_g # Remove gravity: For level sensor, accel_world_z ≈ 1.0g, so subtract 1g accel_z_no_gravity = accel_world_z * self.gravity - self.gravity # 7. High-pass Filter (bias removal) # Use lfilter with zi state to support 4th order filter correctly # (Manual implementation was truncated to 2nd order) if not self.highpass_zi_init: # Initialize filter state matching reference # Use accel_z_no_gravity for initialization (assumes steady state at start) self.highpass_zi = lfilter_zi(self.highpass_b, self.highpass_a) * accel_z_no_gravity self.highpass_zi_init = True accel_z_filtered_array, self.highpass_zi = lfilter( self.highpass_b, self.highpass_a, [accel_z_no_gravity], zi=self.highpass_zi ) accel_z_filtered = accel_z_filtered_array[0] # 8. Depth Processing via INSSolution (full Kalman predict+update cycle) # Update accumulated time for INSSolution self.current_time += actual_dt # During warmup (ODBA is NaN), use 0.0 vertical accel to avoid unreliable predictions # INS still runs to build up state, just without integrating unreliable acceleration import numpy as np if math.isnan(odba_g): # Warmup: run INS but don't integrate unreliable vertical acceleration accel_world = np.array([0.0, 0.0, 0.0]) else: # Post-warmup: full INS pipeline with filtered vertical acceleration accel_world = np.array([0.0, 0.0, accel_z_filtered]) depth_measurement = depth_raw if not math.isnan(depth_raw) else None ins_result = self.ins_solution.update( accel_world, depth_measurement, actual_dt, self.current_time ) depth_kalman_filtered = ins_result["depth_kalman_filtered"] # Apply multi-scale depth smoothing to KF output # Pass ODBA in 0.1g units to match pseudotrack's DepthSmoother thresholds odba_01g = abs(dyn_x) + abs(dyn_y) + abs(dyn_z) final_depth = self.depth_smoother.update(depth_kalman_filtered, odba_01g) # 9. Magnetometer & Heading (R-style) heading_deg = float("nan") if self.locked_mag_offset is not None: # Unpack with explicit None->0.0 fallback for type safety ox = self.locked_mag_offset[0] or 0.0 oy = self.locked_mag_offset[1] or 0.0 oz = self.locked_mag_offset[2] or 0.0 orad = self.locked_mag_offset[3] or 1.0 # Avoid div by zero mx_val = get_field(record, '"int mX"', "int mX") my_val = get_field(record, '"int mY"', "int mY") mz_val = get_field(record, '"int mZ"', "int mZ") mx_n = (mx_val - ox) / orad my_n = (my_val - oy) / orad mz_n = (mz_val - oz) / orad # Att correction (X then Y) roll_att = self.locked_attachment_roll_rad pitch_att = self.locked_attachment_pitch_rad if roll_att is not None and pitch_att is not None: sr_a, cr_a = math.sin(roll_att), math.cos(roll_att) sp_a, cp_a = math.sin(pitch_att), math.cos(pitch_att) # Xb(roll): y' = y*c - z*s, z' = y*s + z*c my_r = my_n * cr_a - mz_n * sr_a mz_r = my_n * sr_a + mz_n * cr_a # Yb(pitch): x' = x*c - z*s, z' = x*s + z*c mx_att_m = mx_n * cp_a - mz_r * sp_a my_att_m = my_r mz_att_m = mx_n * sp_a + mz_r * cp_a else: mx_att_m, my_att_m, mz_att_m = mx_n, my_n, mz_n # World Rotation (v @ Yb(-p) @ Xb(r)) # Yb(-p): angle = -p. sin(-p) = -sp. # x' = x*c - z*(-s) = x*c + z*s # z' = x*(-s) + z*c = -x*s + z*c mx_m_p = mx_att_m * cp + mz_att_m * sp mz_m_p = -mx_att_m * sp + mz_att_m * cp my_m_w = my_att_m * cr - mz_m_p * sr mx_m_w = mx_m_p heading_deg = math.degrees(math.atan2(-my_m_w, mx_m_w)) # 10. Dead Reckoning Integration speed = ( max(0.0, odba_g * self.dead_reckoning_odba_factor) if self.dead_reckoning_speed_model == "odba_scaled" else self.dead_reckoning_constant_speed ) dist = speed * actual_dt self.pseudo_x += ( dist * math.cos(math.radians(heading_deg)) if not math.isnan(heading_deg) else 0.0 ) self.pseudo_y += ( dist * math.sin(math.radians(heading_deg)) if not math.isnan(heading_deg) else 0.0 ) # 11. Derived Metrics & Publishing vedba_g = math.sqrt(dyn_x**2 + dyn_y**2 + dyn_z**2) / 10.0 # Helper to sanitize NaN values for ZMQ (extension can't handle NaN in trail rendering) def nan_to_zero(x: float) -> float: return 0.0 if math.isnan(x) else x result = { "record_count": self.record_count, "timestamp": ts, "Depth": nan_to_zero(final_depth), "roll_degrees": nan_to_zero(math.degrees(roll_rad)), "pitch_degrees": nan_to_zero(math.degrees(pitch_rad)), "heading_degrees": nan_to_zero(heading_deg), "pseudo_x": self.pseudo_x, "pseudo_y": self.pseudo_y, "ODBA": nan_to_zero(odba_g), "VeDBA": nan_to_zero(vedba_g), "velocity": nan_to_zero(speed), "vertical_velocity": 0.0, # TODO: Expose from KF "X_Dynamic": nan_to_zero(dyn_x / 10.0), "Y_Dynamic": nan_to_zero(dyn_y / 10.0), "Z_Dynamic": nan_to_zero(dyn_z / 10.0), "X_Static": nan_to_zero(static_x / 10.0), "Y_Static": nan_to_zero(static_y / 10.0), "Z_Static": nan_to_zero(static_z / 10.0), } if self.zmq_publisher: assert self.eid is not None self.zmq_publisher.publish_state(self.eid, self.sim_id, self.tag_id, result) return result
[docs] def calibrate_from_batch_data(self) -> None: pass
[docs] def get_performance_summary(self) -> dict[str, Any]: return {"processor_type": "StreamingProcessor", "records_processed": self.record_count}
[docs] def update_config(self, config_updates: dict[str, Any]) -> None: pass
[docs] def get_current_state(self) -> dict[str, Any]: return { "record_count": self.record_count, "X_Track": self.pseudo_x, "Y_Track": self.pseudo_y, }