1use std::collections::VecDeque;
4use std::convert::TryInto;
5use std::time::{Duration, Instant, SystemTime};
6
7use async_trait::async_trait;
8use thiserror::Error;
9use tracing::{debug, trace};
10
11use crate::gvcp::GigeError;
12
13pub const REG_TIMESTAMP_CONTROL: u64 = 0x0900_0100;
15pub const REG_TIMESTAMP_VALUE: u64 = 0x0900_0104;
17pub const REG_TIMESTAMP_TICK_FREQUENCY: u64 = 0x0900_010C;
19pub const TIMESTAMP_LATCH_BIT: u32 = 0x0000_0002;
21pub const TIMESTAMP_RESET_BIT: u32 = 0x0000_0001;
23const MAX_TIME_WINDOW: usize = 32;
25
26#[derive(Debug, Error)]
28pub enum TimeError {
29 #[error("control: {0}")]
30 Control(#[from] GigeError),
31 #[error("protocol: {0}")]
32 Protocol(&'static str),
33}
34
35#[async_trait]
37pub trait ControlChannel: Send + Sync {
38 async fn read_register(&self, addr: u64, len: usize) -> Result<Vec<u8>, TimeError>;
39 async fn write_register(&self, addr: u64, data: &[u8]) -> Result<(), TimeError>;
40}
41
42fn write_u32_be(value: u32) -> [u8; 4] {
43 value.to_be_bytes()
44}
45
46fn parse_u64_be(data: &[u8]) -> Result<u64, TimeError> {
47 if data.len() != 8 {
48 return Err(TimeError::Protocol("unexpected register size"));
49 }
50 Ok(u64::from_be_bytes(
51 data.try_into().expect("slice length checked"),
52 ))
53}
54
55pub async fn timestamp_reset<C: ControlChannel>(ctrl: &C) -> Result<(), TimeError> {
57 trace!("triggering timestamp reset");
58 ctrl.write_register(REG_TIMESTAMP_CONTROL, &write_u32_be(TIMESTAMP_RESET_BIT))
59 .await
60}
61
62pub async fn timestamp_latch<C: ControlChannel>(ctrl: &C) -> Result<(), TimeError> {
64 trace!("triggering timestamp latch");
65 ctrl.write_register(REG_TIMESTAMP_CONTROL, &write_u32_be(TIMESTAMP_LATCH_BIT))
66 .await
67}
68
69pub async fn read_timestamp_value<C: ControlChannel>(ctrl: &C) -> Result<u64, TimeError> {
71 let bytes = ctrl.read_register(REG_TIMESTAMP_VALUE, 8).await?;
72 parse_u64_be(&bytes)
73}
74
75pub async fn read_tick_frequency<C: ControlChannel>(ctrl: &C) -> Result<u64, TimeError> {
77 let bytes = ctrl.read_register(REG_TIMESTAMP_TICK_FREQUENCY, 8).await?;
78 parse_u64_be(&bytes)
79}
80
81#[derive(Debug, Clone)]
83pub struct TimeSync {
84 pub(crate) a: f64,
85 pub(crate) b: f64,
86 window: VecDeque<(u64, Instant)>,
87 anchor_host: Instant,
88 anchor_system: SystemTime,
89}
90
91impl TimeSync {
92 pub fn new() -> Self {
94 let anchor_host = Instant::now();
95 let anchor_system = SystemTime::now();
96 Self {
97 a: 1.0,
98 b: 0.0,
99 window: VecDeque::new(),
100 anchor_host,
101 anchor_system,
102 }
103 }
104
105 pub fn coefficients(&self) -> (f64, f64) {
107 (self.a, self.b)
108 }
109
110 fn recompute(&mut self) {
111 if self.window.len() < 2 {
112 return;
113 }
114 let n = self.window.len() as f64;
115 let mut sum_x = 0f64;
116 let mut sum_y = 0f64;
117 let mut sum_xx = 0f64;
118 let mut sum_xy = 0f64;
119 for (dev, host) in &self.window {
120 let x = *dev as f64;
121 let y = if *host >= self.anchor_host {
122 host.duration_since(self.anchor_host).as_secs_f64()
123 } else {
124 0.0
125 };
126 sum_x += x;
127 sum_y += y;
128 sum_xx += x * x;
129 sum_xy += x * y;
130 }
131 let denom = n * sum_xx - sum_x * sum_x;
132 if denom.abs() < f64::EPSILON {
133 return;
134 }
135 let slope = (n * sum_xy - sum_x * sum_y) / denom;
136 let intercept = (sum_y - slope * sum_x) / n;
137 self.a = slope;
138 self.b = intercept;
139 debug!(
140 slope = self.a,
141 intercept = self.b,
142 "recomputed time mapping"
143 );
144 }
145
146 pub fn update(&mut self, dev_ts: u64, host_instant: Instant) {
148 if self.window.is_empty() {
149 self.anchor_host = host_instant;
150 self.anchor_system = SystemTime::now();
151 }
152 if self.window.len() == MAX_TIME_WINDOW {
153 self.window.pop_front();
154 }
155 self.window.push_back((dev_ts, host_instant));
156 self.recompute();
157 }
158
159 pub fn to_host_time(&self, dev_ts: u64) -> SystemTime {
161 let seconds = self.a * dev_ts as f64 + self.b;
162 if seconds.is_finite() && seconds >= 0.0 {
163 let duration = Duration::from_secs_f64(seconds);
164 self.anchor_system + duration
165 } else {
166 self.anchor_system
167 }
168 }
169}
170
171impl Default for TimeSync {
172 fn default() -> Self {
173 Self::new()
174 }
175}
176
177#[async_trait]
178impl ControlChannel for tokio::sync::Mutex<crate::gvcp::GigeDevice> {
179 async fn read_register(&self, addr: u64, len: usize) -> Result<Vec<u8>, TimeError> {
180 let mut guard = self.lock().await;
181 guard.read_mem(addr, len).await.map_err(TimeError::from)
182 }
183
184 async fn write_register(&self, addr: u64, data: &[u8]) -> Result<(), TimeError> {
185 let mut guard = self.lock().await;
186 guard.write_mem(addr, data).await.map_err(TimeError::from)
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193
194 #[test]
195 fn regression_tracks_linear_relation() {
196 let mut sync = TimeSync::new();
197 let start = Instant::now();
198 for i in 0..16u64 {
199 let dev = i * 1000;
200 let host = start + Duration::from_millis(i * 16);
201 sync.update(dev, host);
202 }
203 let mapped = sync.to_host_time(64_000);
204 let mapped_secs = mapped
205 .duration_since(sync.anchor_system)
206 .unwrap()
207 .as_secs_f64();
208 let expected_secs = Duration::from_millis(1024).as_secs_f64();
209 assert!((mapped_secs - expected_secs).abs() < 0.1);
210 }
211}