1use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, Instant};
6
7const EWMA_ALPHA: f64 = 0.2;
8
9#[derive(Debug, Clone, Copy, PartialEq)]
11pub struct StreamStats {
12 pub frames: u64,
13 pub bytes: u64,
14 pub drops: u64,
15 pub resends: u64,
16 pub last_frame_dt: Duration,
17 pub avg_fps: f64,
18 pub avg_mbps: f64,
19 pub avg_latency_ms: Option<f64>,
20 pub packets: u64,
21 pub resend_ranges: u64,
22 pub backpressure_drops: u64,
23 pub late_frames: u64,
24 pub pool_exhaustions: u64,
25 pub elapsed: Duration,
26 pub packets_per_second: f64,
27}
28
29impl Default for StreamStats {
30 fn default() -> Self {
31 StreamStats {
32 frames: 0,
33 bytes: 0,
34 drops: 0,
35 resends: 0,
36 last_frame_dt: Duration::ZERO,
37 avg_fps: 0.0,
38 avg_mbps: 0.0,
39 avg_latency_ms: None,
40 packets: 0,
41 resend_ranges: 0,
42 backpressure_drops: 0,
43 late_frames: 0,
44 pool_exhaustions: 0,
45 elapsed: Duration::ZERO,
46 packets_per_second: 0.0,
47 }
48 }
49}
50
51#[derive(Debug)]
52struct StatsState {
53 frames: u64,
54 bytes: u64,
55 packets: u64,
56 resends: u64,
57 resend_ranges: u64,
58 drops: u64,
59 backpressure_drops: u64,
60 late_frames: u64,
61 pool_exhaustions: u64,
62 last_frame_dt: Duration,
63 avg_fps: f64,
64 avg_mbps: f64,
65 avg_latency_ms: Option<f64>,
66 last_frame_instant: Option<Instant>,
67 start: Instant,
68}
69
70impl StatsState {
71 fn new() -> Self {
72 Self {
73 frames: 0,
74 bytes: 0,
75 packets: 0,
76 resends: 0,
77 resend_ranges: 0,
78 drops: 0,
79 backpressure_drops: 0,
80 late_frames: 0,
81 pool_exhaustions: 0,
82 last_frame_dt: Duration::ZERO,
83 avg_fps: 0.0,
84 avg_mbps: 0.0,
85 avg_latency_ms: None,
86 last_frame_instant: None,
87 start: Instant::now(),
88 }
89 }
90}
91
92#[derive(Debug, Clone)]
93pub struct StreamStatsAccumulator {
94 inner: Arc<StatsInner>,
95}
96
97#[derive(Debug)]
98struct StatsInner {
99 state: Mutex<StatsState>,
100}
101
102impl StreamStatsAccumulator {
103 pub fn new() -> Self {
105 Self {
106 inner: Arc::new(StatsInner {
107 state: Mutex::new(StatsState::new()),
108 }),
109 }
110 }
111
112 pub fn record_packet(&self) {
114 let mut state = self.inner.state.lock().expect("stats mutex poisoned");
115 state.packets += 1;
116 }
117
118 pub fn record_resend(&self) {
120 let mut state = self.inner.state.lock().expect("stats mutex poisoned");
121 state.resends += 1;
122 }
123
124 pub fn record_resend_ranges(&self, ranges: u64) {
126 if ranges == 0 {
127 return;
128 }
129 let mut state = self.inner.state.lock().expect("stats mutex poisoned");
130 state.resend_ranges += ranges;
131 }
132
133 pub fn record_drop(&self) {
135 let mut state = self.inner.state.lock().expect("stats mutex poisoned");
136 state.drops += 1;
137 }
138
139 pub fn record_backpressure_drop(&self) {
141 let mut state = self.inner.state.lock().expect("stats mutex poisoned");
142 state.backpressure_drops += 1;
143 }
144
145 pub fn record_late_frame(&self) {
147 let mut state = self.inner.state.lock().expect("stats mutex poisoned");
148 state.late_frames += 1;
149 }
150
151 pub fn record_pool_exhaustion(&self) {
153 let mut state = self.inner.state.lock().expect("stats mutex poisoned");
154 state.pool_exhaustions += 1;
155 }
156
157 pub fn record_frame(&self, bytes: usize, latency: Option<Duration>) {
159 let now = Instant::now();
160 let mut state = self.inner.state.lock().expect("stats mutex poisoned");
161 state.frames += 1;
162 state.bytes += bytes as u64;
163
164 if let Some(prev) = state.last_frame_instant.replace(now) {
165 let dt = now.saturating_duration_since(prev);
166 if dt > Duration::ZERO {
167 state.last_frame_dt = dt;
168 let fps = 1.0 / dt.as_secs_f64();
169 state.avg_fps = if state.avg_fps == 0.0 {
170 fps
171 } else {
172 state.avg_fps + EWMA_ALPHA * (fps - state.avg_fps)
173 };
174 let mbps = (bytes as f64 * 8.0) / 1_000_000.0 / dt.as_secs_f64();
175 state.avg_mbps = if state.avg_mbps == 0.0 {
176 mbps
177 } else {
178 state.avg_mbps + EWMA_ALPHA * (mbps - state.avg_mbps)
179 };
180 }
181 } else {
182 state.last_frame_dt = Duration::ZERO;
183 }
184
185 if let Some(latency) = latency {
186 let ms = latency.as_secs_f64() * 1_000.0;
187 state.avg_latency_ms = Some(match state.avg_latency_ms {
188 Some(prev) => prev + EWMA_ALPHA * (ms - prev),
189 None => ms,
190 });
191 }
192 }
193
194 pub fn snapshot(&self) -> StreamStats {
196 let state = self.inner.state.lock().expect("stats mutex poisoned");
197 let elapsed = state.start.elapsed();
198 let packets_per_second = if elapsed > Duration::ZERO {
199 state.packets as f64 / elapsed.as_secs_f64()
200 } else {
201 0.0
202 };
203
204 StreamStats {
205 frames: state.frames,
206 bytes: state.bytes,
207 drops: state.drops + state.backpressure_drops,
208 resends: state.resends,
209 last_frame_dt: state.last_frame_dt,
210 avg_fps: state.avg_fps,
211 avg_mbps: state.avg_mbps,
212 avg_latency_ms: state.avg_latency_ms,
213 packets: state.packets,
214 resend_ranges: state.resend_ranges,
215 backpressure_drops: state.backpressure_drops,
216 late_frames: state.late_frames,
217 pool_exhaustions: state.pool_exhaustions,
218 elapsed,
219 packets_per_second,
220 }
221 }
222}
223
224impl Default for StreamStatsAccumulator {
225 fn default() -> Self {
226 Self::new()
227 }
228}
229
230#[derive(Debug)]
232pub struct EventStats {
233 received: AtomicU64,
234 malformed: AtomicU64,
235 filtered: AtomicU64,
236 start: Instant,
237}
238
239impl EventStats {
240 pub fn new() -> Self {
242 Self {
243 received: AtomicU64::new(0),
244 malformed: AtomicU64::new(0),
245 filtered: AtomicU64::new(0),
246 start: Instant::now(),
247 }
248 }
249
250 pub fn record_event(&self) {
252 self.received.fetch_add(1, Ordering::Relaxed);
253 }
254
255 pub fn record_malformed(&self) {
257 self.malformed.fetch_add(1, Ordering::Relaxed);
258 }
259
260 pub fn record_filtered(&self) {
262 self.filtered.fetch_add(1, Ordering::Relaxed);
263 }
264
265 pub fn snapshot(&self) -> EventSnapshot {
267 EventSnapshot {
268 received: self.received.load(Ordering::Relaxed),
269 malformed: self.malformed.load(Ordering::Relaxed),
270 filtered: self.filtered.load(Ordering::Relaxed),
271 elapsed: self.start.elapsed().as_secs_f32(),
272 }
273 }
274}
275
276impl Default for EventStats {
277 fn default() -> Self {
278 Self::new()
279 }
280}
281
282#[derive(Debug, Clone, Copy, PartialEq)]
284pub struct EventSnapshot {
285 pub received: u64,
286 pub malformed: u64,
287 pub filtered: u64,
288 pub elapsed: f32,
289}
290
291#[derive(Debug)]
293pub struct ActionStats {
294 sent: AtomicU64,
295 acknowledgements: AtomicU64,
296 failures: AtomicU64,
297}
298
299impl ActionStats {
300 pub fn new() -> Self {
302 Self {
303 sent: AtomicU64::new(0),
304 acknowledgements: AtomicU64::new(0),
305 failures: AtomicU64::new(0),
306 }
307 }
308
309 pub fn record_send(&self) {
311 self.sent.fetch_add(1, Ordering::Relaxed);
312 }
313
314 pub fn record_ack(&self) {
316 self.acknowledgements.fetch_add(1, Ordering::Relaxed);
317 }
318
319 pub fn record_failure(&self) {
321 self.failures.fetch_add(1, Ordering::Relaxed);
322 }
323
324 pub fn snapshot(&self) -> ActionSnapshot {
326 ActionSnapshot {
327 sent: self.sent.load(Ordering::Relaxed),
328 acknowledgements: self.acknowledgements.load(Ordering::Relaxed),
329 failures: self.failures.load(Ordering::Relaxed),
330 }
331 }
332}
333
334impl Default for ActionStats {
335 fn default() -> Self {
336 Self::new()
337 }
338}
339
340#[derive(Debug, Clone, Copy, PartialEq, Eq)]
342pub struct ActionSnapshot {
343 pub sent: u64,
344 pub acknowledgements: u64,
345 pub failures: u64,
346}
347
348#[derive(Debug)]
350pub struct TimeStats {
351 samples: AtomicU64,
352 latches: AtomicU64,
353 resets: AtomicU64,
354}
355
356impl TimeStats {
357 pub fn new() -> Self {
359 Self {
360 samples: AtomicU64::new(0),
361 latches: AtomicU64::new(0),
362 resets: AtomicU64::new(0),
363 }
364 }
365
366 pub fn record_sample(&self) {
368 self.samples.fetch_add(1, Ordering::Relaxed);
369 }
370
371 pub fn record_latch(&self) {
373 self.latches.fetch_add(1, Ordering::Relaxed);
374 }
375
376 pub fn record_reset(&self) {
378 self.resets.fetch_add(1, Ordering::Relaxed);
379 }
380
381 pub fn snapshot(&self) -> TimeSnapshot {
383 TimeSnapshot {
384 samples: self.samples.load(Ordering::Relaxed),
385 latches: self.latches.load(Ordering::Relaxed),
386 resets: self.resets.load(Ordering::Relaxed),
387 }
388 }
389}
390
391impl Default for TimeStats {
392 fn default() -> Self {
393 Self::new()
394 }
395}
396
397#[derive(Debug, Clone, Copy, PartialEq, Eq)]
399pub struct TimeSnapshot {
400 pub samples: u64,
401 pub latches: u64,
402 pub resets: u64,
403}