# Copyright (c) 2025-2026 Long Horizon Observatory
# Licensed under the Apache License, Version 2.0. See LICENSE file for details.
import logging
import math
from collections import deque
from typing import Any
from scipy.signal import butter, lfilter, lfilter_zi
from biologger_sim.core.ins import INSSolution
from biologger_sim.core.numeric_utils import safe_float
from biologger_sim.core.processor_interface import BiologgerProcessor
from biologger_sim.functions.depth_smoother import DepthSmoother
from biologger_sim.io.zmq_publisher import ZMQPublisher
[docs]
class StreamingProcessor(BiologgerProcessor):
"""
Causal (real-time) streaming processor for digital twin and on-tag simulation.
"""
def __init__(
self,
filt_len: int = 48,
freq: int = 16,
debug_level: int = 0,
locked_attachment_roll_deg: float | None = None,
locked_attachment_pitch_deg: float | None = None,
locked_mag_offset_x: float | None = None,
locked_mag_offset_y: float | None = None,
locked_mag_offset_z: float | None = None,
locked_mag_sphere_radius: float | None = None,
ema_fast_alpha: float = 0.2,
ema_slow_alpha: float = 0.02,
ema_cross_threshold: float = 0.5,
zmq_publisher: ZMQPublisher | None = None,
eid: int | None = None,
sim_id: str = "default",
tag_id: str = "unknown",
dead_reckoning_speed_model: str = "odba_scaled",
dead_reckoning_constant_speed: float = 1.0,
dead_reckoning_odba_factor: float = 2.0,
highpass_cutoff: float = 0.1,
**kwargs: Any,
) -> None:
self.filt_len = filt_len
self.freq = freq
self.dt = 1.0 / freq
self.debug_level = debug_level
self.zmq_publisher = zmq_publisher
self.eid = eid
self.sim_id = sim_id
self.tag_id = tag_id
self.ema_fast_alpha = ema_fast_alpha
self.ema_slow_alpha = ema_slow_alpha
self.ema_cross_threshold = ema_cross_threshold
self.fast_ema_odba = 0.0
self.slow_ema_odba = 0.0
self.logging_state = "STEADY"
# Locked Calibration
self.locked_attachment_roll_rad = (
math.radians(locked_attachment_roll_deg)
if locked_attachment_roll_deg is not None
else None
)
self.locked_attachment_pitch_rad = (
math.radians(locked_attachment_pitch_deg)
if locked_attachment_pitch_deg is not None
else None
)
# Lock mag offset if all values provided, else None
self.locked_mag_offset: tuple[float, float, float, float] | None = None
if all(
x is not None
for x in [
locked_mag_offset_x,
locked_mag_offset_y,
locked_mag_offset_z,
locked_mag_sphere_radius,
]
):
self.locked_mag_offset = (
float(locked_mag_offset_x or 0),
float(locked_mag_offset_y or 0),
float(locked_mag_offset_z or 0),
float(locked_mag_sphere_radius or 1),
)
self.accel_buffer: deque[tuple[float, float, float]] = deque(maxlen=filt_len)
self.accel_sum = [0.0, 0.0, 0.0]
self.gravity = 9.81
# INSSolution provides full Kalman filter depth estimation (match reference)
self.ins_solution = INSSolution(sample_rate=freq, biological_constraints=None)
self.current_time = 0.0 # Accumulated time for INSSolution
self.dead_reckoning_speed_model = dead_reckoning_speed_model
self.dead_reckoning_constant_speed = dead_reckoning_constant_speed
self.dead_reckoning_odba_factor = dead_reckoning_odba_factor
nyquist = 0.5 * freq
self.highpass_b, self.highpass_a = butter(2, highpass_cutoff / nyquist, btype="high")
self.hp_s1 = 0.0
self.hp_s2 = 0.0
self.highpass_zi_init = False
self.record_count = 0
self.pseudo_x = 0.0
self.pseudo_y = 0.0
self.last_timestamp: float | None = None
self.logger = logging.getLogger(__name__)
if debug_level > 0:
self.logger.setLevel(logging.DEBUG)
self.depth_smoother = DepthSmoother(freq=freq)
self.logger.info(f"StreamingProcessor initialized: filt_len={filt_len}, freq={freq}Hz")
[docs]
def reset(self) -> None:
self.accel_buffer.clear()
self.accel_sum = [0.0, 0.0, 0.0]
self.fast_ema_odba = 0.0
self.slow_ema_odba = 0.0
self.record_count = 0
self.pseudo_x = 0.0
self.pseudo_y = 0.0
self.hp_s1 = 0.0
self.hp_s2 = 0.0
self.highpass_zi_init = False
self.ins_solution.reset()
self.current_time = 0.0
[docs]
def process(self, record: dict[str, Any] | Any) -> dict[str, Any]:
self.record_count += 1
def get_field(record: dict[str, Any], q: str, u: str) -> float:
return safe_float(record.get(q, record.get(u, 0.0)))
# 1. Input Acquisition
# Data is in 0.1g units
ax_m = get_field(record, '"int aX"', "int aX")
ay_m = get_field(record, '"int aY"', "int aY")
az_m = get_field(record, '"int aZ"', "int aZ")
if ax_m is None or ay_m is None or az_m is None:
return {}
# Ensure data is valid for math operations
ax_m = float(ax_m)
ay_m = float(ay_m)
az_m = float(az_m)
depth_raw = get_field(record, '"Depth"', "Depth")
ts = record.get("timestamp", 0.0)
# 2. Attachment Correction (X then Y)
if self.locked_attachment_roll_rad is not None:
assert self.locked_attachment_roll_rad is not None
assert self.locked_attachment_pitch_rad is not None
sr_a, cr_a = (
math.sin(self.locked_attachment_roll_rad),
math.cos(self.locked_attachment_roll_rad),
)
sp_a, cp_a = (
math.sin(self.locked_attachment_pitch_rad),
math.cos(self.locked_attachment_pitch_rad),
)
# Xb(roll): y' = y*c - z*s, z' = y*s + z*c
ay_r = ay_m * cr_a - az_m * sr_a
az_r = ay_m * sr_a + az_m * cr_a
# Yb(pitch): x' = x*c - z*s, z' = x*s + z*c
ax_att = ax_m * cp_a - az_r * sp_a
ay_att = ay_r
az_att = ax_m * sp_a + az_r * cp_a
else:
ax_att, ay_att, az_att = ax_m, ay_m, az_m
# 3. Window Management (O(1))
if len(self.accel_buffer) == self.filt_len:
old = self.accel_buffer[0]
self.accel_sum[0] -= old[0]
self.accel_sum[1] -= old[1]
self.accel_sum[2] -= old[2]
self.accel_buffer.append((ax_att, ay_att, az_att))
self.accel_sum[0] += ax_att
self.accel_sum[1] += ay_att
self.accel_sum[2] += az_att
# Dead Reckoning Timing
actual_dt = self.dt if self.last_timestamp is None else ts - self.last_timestamp
if actual_dt <= 0:
actual_dt = self.dt
self.last_timestamp = ts
# 4. Gsep & Static Components
# During warmup: use raw accel as static approximation, 0 for dynamic
# After warmup: proper Gsep separation
if len(self.accel_buffer) < self.filt_len:
# Warmup: use raw acceleration as static estimate (assumes low motion at start)
static_x, static_y, static_z = ax_att, ay_att, az_att
# No dynamic separation yet - use 0 (reasonable for deployment start)
dyn_x, dyn_y, dyn_z = 0.0, 0.0, 0.0
odba_g = 0.0
else:
div = self.filt_len
static_x, static_y, static_z = (
self.accel_sum[0] / div,
self.accel_sum[1] / div,
self.accel_sum[2] / div,
)
dyn_x, dyn_y, dyn_z = ax_att - static_x, ay_att - static_y, az_att - static_z
odba_g = (abs(dyn_x) + abs(dyn_y) + abs(dyn_z)) / 10.0
# 5. Orientation (R-style)
# static_x/y/z are always valid: raw accel during warmup, Gsep-averaged after
ax_g, ay_g, az_g = static_x / 10.0, static_y / 10.0, static_z / 10.0
mag_yz = math.sqrt(ay_g**2 + az_g**2)
pitch_rad = -math.atan2(ax_g, mag_yz)
roll_rad = math.atan2(ay_g, az_g) if abs(az_g) > 1e-6 or abs(ay_g) > 1e-6 else 0.0
sp, cp = math.sin(pitch_rad), math.cos(pitch_rad)
sr, cr = math.sin(roll_rad), math.cos(roll_rad)
# 6. World-frame vertical acceleration (matching pseudotrack exactly)
# Rotation: -sin(p)*ax + cos(p)*sin(r)*ay + cos(p)*cos(r)*az
accel_world_z = -sp * ax_g + cp * sr * ay_g + cp * cr * az_g
# Remove gravity: For level sensor, accel_world_z ≈ 1.0g, so subtract 1g
accel_z_no_gravity = accel_world_z * self.gravity - self.gravity
# 7. High-pass Filter (bias removal)
# Use lfilter with zi state to support 4th order filter correctly
# (Manual implementation was truncated to 2nd order)
if not self.highpass_zi_init:
# Initialize filter state matching reference
# Use accel_z_no_gravity for initialization (assumes steady state at start)
self.highpass_zi = lfilter_zi(self.highpass_b, self.highpass_a) * accel_z_no_gravity
self.highpass_zi_init = True
accel_z_filtered_array, self.highpass_zi = lfilter(
self.highpass_b, self.highpass_a, [accel_z_no_gravity], zi=self.highpass_zi
)
accel_z_filtered = accel_z_filtered_array[0]
# 8. Depth Processing via INSSolution (full Kalman predict+update cycle)
# Update accumulated time for INSSolution
self.current_time += actual_dt
# During warmup (ODBA is NaN), use 0.0 vertical accel to avoid unreliable predictions
# INS still runs to build up state, just without integrating unreliable acceleration
import numpy as np
if math.isnan(odba_g):
# Warmup: run INS but don't integrate unreliable vertical acceleration
accel_world = np.array([0.0, 0.0, 0.0])
else:
# Post-warmup: full INS pipeline with filtered vertical acceleration
accel_world = np.array([0.0, 0.0, accel_z_filtered])
depth_measurement = depth_raw if not math.isnan(depth_raw) else None
ins_result = self.ins_solution.update(
accel_world, depth_measurement, actual_dt, self.current_time
)
depth_kalman_filtered = ins_result["depth_kalman_filtered"]
# Apply multi-scale depth smoothing to KF output
# Pass ODBA in 0.1g units to match pseudotrack's DepthSmoother thresholds
odba_01g = abs(dyn_x) + abs(dyn_y) + abs(dyn_z)
final_depth = self.depth_smoother.update(depth_kalman_filtered, odba_01g)
# 9. Magnetometer & Heading (R-style)
heading_deg = float("nan")
if self.locked_mag_offset is not None:
# Unpack with explicit None->0.0 fallback for type safety
ox = self.locked_mag_offset[0] or 0.0
oy = self.locked_mag_offset[1] or 0.0
oz = self.locked_mag_offset[2] or 0.0
orad = self.locked_mag_offset[3] or 1.0 # Avoid div by zero
mx_val = get_field(record, '"int mX"', "int mX")
my_val = get_field(record, '"int mY"', "int mY")
mz_val = get_field(record, '"int mZ"', "int mZ")
mx_n = (mx_val - ox) / orad
my_n = (my_val - oy) / orad
mz_n = (mz_val - oz) / orad
# Att correction (X then Y)
roll_att = self.locked_attachment_roll_rad
pitch_att = self.locked_attachment_pitch_rad
if roll_att is not None and pitch_att is not None:
sr_a, cr_a = math.sin(roll_att), math.cos(roll_att)
sp_a, cp_a = math.sin(pitch_att), math.cos(pitch_att)
# Xb(roll): y' = y*c - z*s, z' = y*s + z*c
my_r = my_n * cr_a - mz_n * sr_a
mz_r = my_n * sr_a + mz_n * cr_a
# Yb(pitch): x' = x*c - z*s, z' = x*s + z*c
mx_att_m = mx_n * cp_a - mz_r * sp_a
my_att_m = my_r
mz_att_m = mx_n * sp_a + mz_r * cp_a
else:
mx_att_m, my_att_m, mz_att_m = mx_n, my_n, mz_n
# World Rotation (v @ Yb(-p) @ Xb(r))
# Yb(-p): angle = -p. sin(-p) = -sp.
# x' = x*c - z*(-s) = x*c + z*s
# z' = x*(-s) + z*c = -x*s + z*c
mx_m_p = mx_att_m * cp + mz_att_m * sp
mz_m_p = -mx_att_m * sp + mz_att_m * cp
my_m_w = my_att_m * cr - mz_m_p * sr
mx_m_w = mx_m_p
heading_deg = math.degrees(math.atan2(-my_m_w, mx_m_w))
# 10. Dead Reckoning Integration
speed = (
max(0.0, odba_g * self.dead_reckoning_odba_factor)
if self.dead_reckoning_speed_model == "odba_scaled"
else self.dead_reckoning_constant_speed
)
dist = speed * actual_dt
self.pseudo_x += (
dist * math.cos(math.radians(heading_deg)) if not math.isnan(heading_deg) else 0.0
)
self.pseudo_y += (
dist * math.sin(math.radians(heading_deg)) if not math.isnan(heading_deg) else 0.0
)
# 11. Derived Metrics & Publishing
vedba_g = math.sqrt(dyn_x**2 + dyn_y**2 + dyn_z**2) / 10.0
# Helper to sanitize NaN values for ZMQ (extension can't handle NaN in trail rendering)
def nan_to_zero(x: float) -> float:
return 0.0 if math.isnan(x) else x
result = {
"record_count": self.record_count,
"timestamp": ts,
"Depth": nan_to_zero(final_depth),
"roll_degrees": nan_to_zero(math.degrees(roll_rad)),
"pitch_degrees": nan_to_zero(math.degrees(pitch_rad)),
"heading_degrees": nan_to_zero(heading_deg),
"pseudo_x": self.pseudo_x,
"pseudo_y": self.pseudo_y,
"ODBA": nan_to_zero(odba_g),
"VeDBA": nan_to_zero(vedba_g),
"velocity": nan_to_zero(speed),
"vertical_velocity": 0.0, # TODO: Expose from KF
"X_Dynamic": nan_to_zero(dyn_x / 10.0),
"Y_Dynamic": nan_to_zero(dyn_y / 10.0),
"Z_Dynamic": nan_to_zero(dyn_z / 10.0),
"X_Static": nan_to_zero(static_x / 10.0),
"Y_Static": nan_to_zero(static_y / 10.0),
"Z_Static": nan_to_zero(static_z / 10.0),
}
if self.zmq_publisher:
assert self.eid is not None
self.zmq_publisher.publish_state(self.eid, self.sim_id, self.tag_id, result)
return result
[docs]
def calibrate_from_batch_data(self) -> None:
pass
[docs]
def update_config(self, config_updates: dict[str, Any]) -> None:
pass
[docs]
def get_current_state(self) -> dict[str, Any]:
return {
"record_count": self.record_count,
"X_Track": self.pseudo_x,
"Y_Track": self.pseudo_y,
}