tl_gige/
stats.rs

1//! Streaming statistics helpers.
2
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, Instant};
6
7const EWMA_ALPHA: f64 = 0.2;
8
9/// Immutable view of streaming statistics suitable for UI overlays.
10#[derive(Debug, Clone, Copy, PartialEq)]
11pub struct StreamStats {
12    pub frames: u64,
13    pub bytes: u64,
14    pub drops: u64,
15    pub resends: u64,
16    pub last_frame_dt: Duration,
17    pub avg_fps: f64,
18    pub avg_mbps: f64,
19    pub avg_latency_ms: Option<f64>,
20    pub packets: u64,
21    pub resend_ranges: u64,
22    pub backpressure_drops: u64,
23    pub late_frames: u64,
24    pub pool_exhaustions: u64,
25    pub elapsed: Duration,
26    pub packets_per_second: f64,
27}
28
29impl Default for StreamStats {
30    fn default() -> Self {
31        StreamStats {
32            frames: 0,
33            bytes: 0,
34            drops: 0,
35            resends: 0,
36            last_frame_dt: Duration::ZERO,
37            avg_fps: 0.0,
38            avg_mbps: 0.0,
39            avg_latency_ms: None,
40            packets: 0,
41            resend_ranges: 0,
42            backpressure_drops: 0,
43            late_frames: 0,
44            pool_exhaustions: 0,
45            elapsed: Duration::ZERO,
46            packets_per_second: 0.0,
47        }
48    }
49}
50
51#[derive(Debug)]
52struct StatsState {
53    frames: u64,
54    bytes: u64,
55    packets: u64,
56    resends: u64,
57    resend_ranges: u64,
58    drops: u64,
59    backpressure_drops: u64,
60    late_frames: u64,
61    pool_exhaustions: u64,
62    last_frame_dt: Duration,
63    avg_fps: f64,
64    avg_mbps: f64,
65    avg_latency_ms: Option<f64>,
66    last_frame_instant: Option<Instant>,
67    start: Instant,
68}
69
70impl StatsState {
71    fn new() -> Self {
72        Self {
73            frames: 0,
74            bytes: 0,
75            packets: 0,
76            resends: 0,
77            resend_ranges: 0,
78            drops: 0,
79            backpressure_drops: 0,
80            late_frames: 0,
81            pool_exhaustions: 0,
82            last_frame_dt: Duration::ZERO,
83            avg_fps: 0.0,
84            avg_mbps: 0.0,
85            avg_latency_ms: None,
86            last_frame_instant: None,
87            start: Instant::now(),
88        }
89    }
90}
91
92#[derive(Debug, Clone)]
93pub struct StreamStatsAccumulator {
94    inner: Arc<StatsInner>,
95}
96
97#[derive(Debug)]
98struct StatsInner {
99    state: Mutex<StatsState>,
100}
101
102impl StreamStatsAccumulator {
103    /// Create a new statistics accumulator.
104    pub fn new() -> Self {
105        Self {
106            inner: Arc::new(StatsInner {
107                state: Mutex::new(StatsState::new()),
108            }),
109        }
110    }
111
112    /// Record a received packet.
113    pub fn record_packet(&self) {
114        let mut state = self.inner.state.lock().expect("stats mutex poisoned");
115        state.packets += 1;
116    }
117
118    /// Record a resend request.
119    pub fn record_resend(&self) {
120        let mut state = self.inner.state.lock().expect("stats mutex poisoned");
121        state.resends += 1;
122    }
123
124    /// Record the number of packet ranges covered by a resend request.
125    pub fn record_resend_ranges(&self, ranges: u64) {
126        if ranges == 0 {
127            return;
128        }
129        let mut state = self.inner.state.lock().expect("stats mutex poisoned");
130        state.resend_ranges += ranges;
131    }
132
133    /// Record a dropped frame event.
134    pub fn record_drop(&self) {
135        let mut state = self.inner.state.lock().expect("stats mutex poisoned");
136        state.drops += 1;
137    }
138
139    /// Record a drop caused by application backpressure.
140    pub fn record_backpressure_drop(&self) {
141        let mut state = self.inner.state.lock().expect("stats mutex poisoned");
142        state.backpressure_drops += 1;
143    }
144
145    /// Record a frame that missed its presentation deadline.
146    pub fn record_late_frame(&self) {
147        let mut state = self.inner.state.lock().expect("stats mutex poisoned");
148        state.late_frames += 1;
149    }
150
151    /// Record an exhausted frame buffer pool event.
152    pub fn record_pool_exhaustion(&self) {
153        let mut state = self.inner.state.lock().expect("stats mutex poisoned");
154        state.pool_exhaustions += 1;
155    }
156
157    /// Update metrics for a fully received frame.
158    pub fn record_frame(&self, bytes: usize, latency: Option<Duration>) {
159        let now = Instant::now();
160        let mut state = self.inner.state.lock().expect("stats mutex poisoned");
161        state.frames += 1;
162        state.bytes += bytes as u64;
163
164        if let Some(prev) = state.last_frame_instant.replace(now) {
165            let dt = now.saturating_duration_since(prev);
166            if dt > Duration::ZERO {
167                state.last_frame_dt = dt;
168                let fps = 1.0 / dt.as_secs_f64();
169                state.avg_fps = if state.avg_fps == 0.0 {
170                    fps
171                } else {
172                    state.avg_fps + EWMA_ALPHA * (fps - state.avg_fps)
173                };
174                let mbps = (bytes as f64 * 8.0) / 1_000_000.0 / dt.as_secs_f64();
175                state.avg_mbps = if state.avg_mbps == 0.0 {
176                    mbps
177                } else {
178                    state.avg_mbps + EWMA_ALPHA * (mbps - state.avg_mbps)
179                };
180            }
181        } else {
182            state.last_frame_dt = Duration::ZERO;
183        }
184
185        if let Some(latency) = latency {
186            let ms = latency.as_secs_f64() * 1_000.0;
187            state.avg_latency_ms = Some(match state.avg_latency_ms {
188                Some(prev) => prev + EWMA_ALPHA * (ms - prev),
189                None => ms,
190            });
191        }
192    }
193
194    /// Produce a snapshot of the accumulated statistics.
195    pub fn snapshot(&self) -> StreamStats {
196        let state = self.inner.state.lock().expect("stats mutex poisoned");
197        let elapsed = state.start.elapsed();
198        let packets_per_second = if elapsed > Duration::ZERO {
199            state.packets as f64 / elapsed.as_secs_f64()
200        } else {
201            0.0
202        };
203
204        StreamStats {
205            frames: state.frames,
206            bytes: state.bytes,
207            drops: state.drops + state.backpressure_drops,
208            resends: state.resends,
209            last_frame_dt: state.last_frame_dt,
210            avg_fps: state.avg_fps,
211            avg_mbps: state.avg_mbps,
212            avg_latency_ms: state.avg_latency_ms,
213            packets: state.packets,
214            resend_ranges: state.resend_ranges,
215            backpressure_drops: state.backpressure_drops,
216            late_frames: state.late_frames,
217            pool_exhaustions: state.pool_exhaustions,
218            elapsed,
219            packets_per_second,
220        }
221    }
222}
223
224impl Default for StreamStatsAccumulator {
225    fn default() -> Self {
226        Self::new()
227    }
228}
229
230/// Event channel statistics.
231#[derive(Debug)]
232pub struct EventStats {
233    received: AtomicU64,
234    malformed: AtomicU64,
235    filtered: AtomicU64,
236    start: Instant,
237}
238
239impl EventStats {
240    /// Create a new accumulator for GVCP events.
241    pub fn new() -> Self {
242        Self {
243            received: AtomicU64::new(0),
244            malformed: AtomicU64::new(0),
245            filtered: AtomicU64::new(0),
246            start: Instant::now(),
247        }
248    }
249
250    /// Record a successfully parsed event packet.
251    pub fn record_event(&self) {
252        self.received.fetch_add(1, Ordering::Relaxed);
253    }
254
255    /// Record a dropped or malformed event packet.
256    pub fn record_malformed(&self) {
257        self.malformed.fetch_add(1, Ordering::Relaxed);
258    }
259
260    /// Record an event filtered out by the application.
261    pub fn record_filtered(&self) {
262        self.filtered.fetch_add(1, Ordering::Relaxed);
263    }
264
265    /// Snapshot the collected counters.
266    pub fn snapshot(&self) -> EventSnapshot {
267        EventSnapshot {
268            received: self.received.load(Ordering::Relaxed),
269            malformed: self.malformed.load(Ordering::Relaxed),
270            filtered: self.filtered.load(Ordering::Relaxed),
271            elapsed: self.start.elapsed().as_secs_f32(),
272        }
273    }
274}
275
276impl Default for EventStats {
277    fn default() -> Self {
278        Self::new()
279    }
280}
281
282/// Immutable view of event statistics.
283#[derive(Debug, Clone, Copy, PartialEq)]
284pub struct EventSnapshot {
285    pub received: u64,
286    pub malformed: u64,
287    pub filtered: u64,
288    pub elapsed: f32,
289}
290
291/// Action command dispatch statistics.
292#[derive(Debug)]
293pub struct ActionStats {
294    sent: AtomicU64,
295    acknowledgements: AtomicU64,
296    failures: AtomicU64,
297}
298
299impl ActionStats {
300    /// Create a new accumulator for action command metrics.
301    pub fn new() -> Self {
302        Self {
303            sent: AtomicU64::new(0),
304            acknowledgements: AtomicU64::new(0),
305            failures: AtomicU64::new(0),
306        }
307    }
308
309    /// Record a dispatched action.
310    pub fn record_send(&self) {
311        self.sent.fetch_add(1, Ordering::Relaxed);
312    }
313
314    /// Record a received acknowledgement.
315    pub fn record_ack(&self) {
316        self.acknowledgements.fetch_add(1, Ordering::Relaxed);
317    }
318
319    /// Record a failure while dispatching or waiting for acknowledgements.
320    pub fn record_failure(&self) {
321        self.failures.fetch_add(1, Ordering::Relaxed);
322    }
323
324    /// Snapshot the collected counters.
325    pub fn snapshot(&self) -> ActionSnapshot {
326        ActionSnapshot {
327            sent: self.sent.load(Ordering::Relaxed),
328            acknowledgements: self.acknowledgements.load(Ordering::Relaxed),
329            failures: self.failures.load(Ordering::Relaxed),
330        }
331    }
332}
333
334impl Default for ActionStats {
335    fn default() -> Self {
336        Self::new()
337    }
338}
339
340/// Immutable view of action statistics.
341#[derive(Debug, Clone, Copy, PartialEq, Eq)]
342pub struct ActionSnapshot {
343    pub sent: u64,
344    pub acknowledgements: u64,
345    pub failures: u64,
346}
347
348/// Timestamp synchronisation statistics.
349#[derive(Debug)]
350pub struct TimeStats {
351    samples: AtomicU64,
352    latches: AtomicU64,
353    resets: AtomicU64,
354}
355
356impl TimeStats {
357    /// Create a new accumulator for timestamp operations.
358    pub fn new() -> Self {
359        Self {
360            samples: AtomicU64::new(0),
361            latches: AtomicU64::new(0),
362            resets: AtomicU64::new(0),
363        }
364    }
365
366    /// Record a calibration sample.
367    pub fn record_sample(&self) {
368        self.samples.fetch_add(1, Ordering::Relaxed);
369    }
370
371    /// Record a timestamp latch request.
372    pub fn record_latch(&self) {
373        self.latches.fetch_add(1, Ordering::Relaxed);
374    }
375
376    /// Record a timestamp reset operation.
377    pub fn record_reset(&self) {
378        self.resets.fetch_add(1, Ordering::Relaxed);
379    }
380
381    /// Snapshot the current counters.
382    pub fn snapshot(&self) -> TimeSnapshot {
383        TimeSnapshot {
384            samples: self.samples.load(Ordering::Relaxed),
385            latches: self.latches.load(Ordering::Relaxed),
386            resets: self.resets.load(Ordering::Relaxed),
387        }
388    }
389}
390
391impl Default for TimeStats {
392    fn default() -> Self {
393        Self::new()
394    }
395}
396
397/// Immutable view of timestamp statistics.
398#[derive(Debug, Clone, Copy, PartialEq, Eq)]
399pub struct TimeSnapshot {
400    pub samples: u64,
401    pub latches: u64,
402    pub resets: u64,
403}