1use std::cmp::Ordering;
10use std::collections::VecDeque;
11use std::convert::TryInto;
12use std::time::{Duration, Instant, SystemTime};
13
14use async_trait::async_trait;
15use thiserror::Error;
16use tracing::trace;
17
18use crate::gvcp::GigeError;
19
20pub const REG_TIMESTAMP_CONTROL: u64 = 0x0900_0100;
22pub const REG_TIMESTAMP_VALUE: u64 = 0x0900_0104;
24pub const REG_TIMESTAMP_TICK_FREQUENCY: u64 = 0x0900_010C;
26pub const TIMESTAMP_LATCH_BIT: u32 = 0x0000_0002;
28pub const TIMESTAMP_RESET_BIT: u32 = 0x0000_0001;
30pub const DEFAULT_TIME_WINDOW: usize = 32;
32
33#[derive(Debug, Error)]
35pub enum TimeError {
36 #[error("control: {0}")]
37 Control(#[from] GigeError),
38 #[error("protocol: {0}")]
39 Protocol(&'static str),
40}
41
42#[async_trait]
44pub trait ControlChannel: Send + Sync {
45 async fn read_register(&self, addr: u64, len: usize) -> Result<Vec<u8>, TimeError>;
46 async fn write_register(&self, addr: u64, data: &[u8]) -> Result<(), TimeError>;
47}
48
49fn write_u32_be(value: u32) -> [u8; 4] {
50 value.to_be_bytes()
51}
52
53fn parse_u64_be(data: &[u8]) -> Result<u64, TimeError> {
54 if data.len() != 8 {
55 return Err(TimeError::Protocol("unexpected register size"));
56 }
57 Ok(u64::from_be_bytes(
58 data.try_into().expect("slice length checked"),
59 ))
60}
61
62pub async fn timestamp_reset<C: ControlChannel>(ctrl: &C) -> Result<(), TimeError> {
64 trace!("triggering timestamp reset");
65 ctrl.write_register(REG_TIMESTAMP_CONTROL, &write_u32_be(TIMESTAMP_RESET_BIT))
66 .await
67}
68
69pub async fn timestamp_latch<C: ControlChannel>(ctrl: &C) -> Result<(), TimeError> {
71 trace!("triggering timestamp latch");
72 ctrl.write_register(REG_TIMESTAMP_CONTROL, &write_u32_be(TIMESTAMP_LATCH_BIT))
73 .await
74}
75
76pub async fn read_timestamp_value<C: ControlChannel>(ctrl: &C) -> Result<u64, TimeError> {
78 let bytes = ctrl.read_register(REG_TIMESTAMP_VALUE, 8).await?;
79 parse_u64_be(&bytes)
80}
81
82pub async fn read_tick_frequency<C: ControlChannel>(ctrl: &C) -> Result<u64, TimeError> {
84 let bytes = ctrl.read_register(REG_TIMESTAMP_TICK_FREQUENCY, 8).await?;
85 parse_u64_be(&bytes)
86}
87
88#[derive(Debug, Clone)]
96pub struct TimeSync {
97 a: f64,
99 b: f64,
101 window: VecDeque<(u64, Instant)>,
103 cap: usize,
105 origin_instant: Option<Instant>,
107 origin_system: Option<SystemTime>,
109 freq_hz: Option<f64>,
111 auto_fit: bool,
113 trim_outliers: bool,
115}
116
117impl TimeSync {
118 pub fn new() -> Self {
120 Self::with_capacity(DEFAULT_TIME_WINDOW)
121 }
122
123 pub fn with_capacity(cap: usize) -> Self {
125 Self {
126 a: 0.0,
127 b: 0.0,
128 window: VecDeque::with_capacity(cap),
129 cap,
130 origin_instant: None,
131 origin_system: None,
132 freq_hz: None,
133 auto_fit: true,
134 trim_outliers: false,
135 }
136 }
137
138 pub fn set_auto_fit(&mut self, enabled: bool) -> &mut Self {
142 self.auto_fit = enabled;
143 self
144 }
145
146 pub fn set_trim_outliers(&mut self, enabled: bool) -> &mut Self {
151 self.trim_outliers = enabled;
152 self
153 }
154
155 pub fn coefficients(&self) -> (f64, f64) {
157 (self.a, self.b)
158 }
159
160 pub fn len(&self) -> usize {
162 self.window.len()
163 }
164
165 pub fn is_empty(&self) -> bool {
167 self.window.is_empty()
168 }
169
170 pub fn samples(&self) -> impl Iterator<Item = (u64, Instant)> + '_ {
172 self.window.iter().copied()
173 }
174
175 pub fn capacity(&self) -> usize {
177 self.cap
178 }
179
180 pub fn origin_instant(&self) -> Option<Instant> {
182 self.origin_instant
183 }
184
185 pub fn origin_system(&self) -> Option<SystemTime> {
187 self.origin_system
188 }
189
190 pub fn sample_bounds(&self) -> Option<((u64, Instant), (u64, Instant))> {
192 let first = *self.window.front()?;
193 let last = *self.window.back()?;
194 Some((first, last))
195 }
196
197 pub fn freq_hz(&self) -> Option<f64> {
199 self.freq_hz
200 }
201
202 pub fn set_freq_hz(&mut self, freq: f64) {
204 self.freq_hz = Some(freq);
205 }
206
207 pub fn update(&mut self, dev_ts: u64, host_instant: Instant) {
211 if self.origin_instant.is_none() {
212 self.origin_instant = Some(host_instant);
213 self.origin_system = Some(SystemTime::now());
214 }
215 if self.window.len() == self.cap {
216 self.window.pop_front();
217 }
218 self.window.push_back((dev_ts, host_instant));
219 if self.auto_fit {
220 self.recompute();
221 }
222 }
223
224 pub fn fit(&mut self, freq_hz: Option<f64>) -> Option<(f64, f64)> {
229 if let Some(freq) = freq_hz {
230 self.freq_hz = Some(freq);
231 }
232 self.recompute();
233 if self.window.len() >= 2 {
234 Some((self.a, self.b))
235 } else {
236 None
237 }
238 }
239
240 fn recompute(&mut self) {
241 if self.window.len() < 2 {
242 return;
243 }
244 let origin = match self.origin_instant {
245 Some(o) => o,
246 None => return,
247 };
248 let base_tick = match self.window.front() {
249 Some((t, _)) => *t as f64,
250 None => return,
251 };
252
253 let samples: Vec<(f64, f64)> = self
254 .window
255 .iter()
256 .map(|(ticks, host)| {
257 let x = (*ticks as f64) - base_tick;
258 let y = host.duration_since(origin).as_secs_f64();
259 (x, y)
260 })
261 .collect();
262
263 let (mut slope, mut intercept_rel) = match compute_fit(&samples) {
264 Some((s, i)) => (s, i),
265 None => return,
266 };
267
268 if self.trim_outliers && samples.len() >= 10 {
270 let mut residuals: Vec<(usize, f64)> = samples
271 .iter()
272 .enumerate()
273 .map(|(idx, (x, y))| {
274 let predicted = slope * *x + intercept_rel;
275 (idx, y - predicted)
276 })
277 .collect();
278 residuals.sort_by(|a, b| match a.1.partial_cmp(&b.1) {
279 Some(order) => order,
280 None => Ordering::Equal,
281 });
282 let trim = ((residuals.len() as f64) * 0.1).floor() as usize;
283 if trim > 0 && residuals.len() > trim * 2 {
284 let trimmed_samples: Vec<(f64, f64)> = residuals[trim..residuals.len() - trim]
285 .iter()
286 .map(|(idx, _)| samples[*idx])
287 .collect();
288 if let Some((s, i)) = compute_fit(&trimmed_samples) {
289 slope = s;
290 intercept_rel = i;
291 }
292 }
293 }
294
295 let intercept = intercept_rel - slope * base_tick;
296 self.a = slope;
297 self.b = intercept;
298
299 trace!(
300 slope = self.a,
301 intercept = self.b,
302 samples = self.window.len(),
303 "recomputed time mapping"
304 );
305 }
306
307 pub fn to_host_time(&self, dev_ts: u64) -> SystemTime {
309 let origin = match self.origin_system {
310 Some(o) => o,
311 None => return SystemTime::now(),
312 };
313 let seconds = self.a * dev_ts as f64 + self.b;
314 if seconds.is_finite() && seconds >= 0.0 {
315 match Duration::try_from_secs_f64(seconds) {
316 Ok(duration) => origin + duration,
317 Err(_) => origin,
318 }
319 } else {
320 origin
321 }
322 }
323}
324
325impl Default for TimeSync {
326 fn default() -> Self {
327 Self::new()
328 }
329}
330
331fn compute_fit(samples: &[(f64, f64)]) -> Option<(f64, f64)> {
333 if samples.len() < 2 {
334 return None;
335 }
336 let mut sum_x = 0.0;
337 let mut sum_y = 0.0;
338 for (x, y) in samples {
339 sum_x += x;
340 sum_y += y;
341 }
342 let n = samples.len() as f64;
343 let mean_x = sum_x / n;
344 let mean_y = sum_y / n;
345 let mut denom = 0.0;
346 let mut numer = 0.0;
347 for (x, y) in samples {
348 let dx = x - mean_x;
349 let dy = y - mean_y;
350 denom += dx * dx;
351 numer += dx * dy;
352 }
353 if denom.abs() < f64::EPSILON {
354 return None;
355 }
356 let slope = numer / denom;
357 let intercept = mean_y - slope * mean_x;
358 Some((slope, intercept))
359}
360
361#[async_trait]
362impl ControlChannel for tokio::sync::Mutex<crate::gvcp::GigeDevice> {
363 async fn read_register(&self, addr: u64, len: usize) -> Result<Vec<u8>, TimeError> {
364 let mut guard = self.lock().await;
365 guard.read_mem(addr, len).await.map_err(TimeError::from)
366 }
367
368 async fn write_register(&self, addr: u64, data: &[u8]) -> Result<(), TimeError> {
369 let mut guard = self.lock().await;
370 guard.write_mem(addr, data).await.map_err(TimeError::from)
371 }
372}
373
374#[cfg(test)]
375mod tests {
376 use super::*;
377
378 #[test]
379 fn regression_tracks_linear_relation() {
380 let mut sync = TimeSync::new();
381 let start = Instant::now();
382 for i in 0..16u64 {
383 let dev = i * 1000;
384 let host = start + Duration::from_millis(i * 16);
385 sync.update(dev, host);
386 }
387 let mapped = sync.to_host_time(64_000);
388 let origin = sync.origin_system().unwrap();
389 let mapped_secs = mapped.duration_since(origin).unwrap().as_secs_f64();
390 let expected_secs = Duration::from_millis(1024).as_secs_f64();
391 assert!((mapped_secs - expected_secs).abs() < 0.1);
392 }
393
394 #[test]
395 fn with_capacity_and_manual_fit() {
396 let mut sync = TimeSync::with_capacity(8);
397 sync.set_auto_fit(false);
398 let start = Instant::now();
399 for i in 0..8u64 {
400 let dev = i * 1000;
401 let host = start + Duration::from_millis(i * 10);
402 sync.update(dev, host);
403 }
404 let (a, _) = sync.coefficients();
406 assert_eq!(a, 0.0);
407
408 let result = sync.fit(Some(100_000.0));
410 assert!(result.is_some());
411 let (a, _) = sync.coefficients();
412 assert!(a > 0.0);
413 assert_eq!(sync.freq_hz(), Some(100_000.0));
414 }
415
416 #[test]
417 fn outlier_trimming() {
418 let mut sync = TimeSync::with_capacity(32);
419 sync.set_trim_outliers(true).set_auto_fit(false);
420 let start = Instant::now();
421 for i in 0..20u64 {
423 let dev = i * 1000;
424 let jitter = if i == 10 { 50 } else { 0 }; let host = start + Duration::from_millis(i * 10 + jitter);
426 sync.update(dev, host);
427 }
428 sync.fit(None);
429 assert_eq!(sync.len(), 20);
431 }
432}