1use std::collections::VecDeque;
10use std::net::Ipv4Addr;
11use std::ops::RangeInclusive;
12use std::time::{Duration, Instant};
13
14use crate::nic::Iface;
15use crate::stats::StreamStatsAccumulator;
16use bytes::{Buf, Bytes, BytesMut};
17use thiserror::Error;
18use tracing::{debug, warn};
19
20const PAYLOAD_TYPE_IMAGE: u8 = 0x01;
24
25const GVSP_HEADER_SIZE: usize = 8;
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum StreamDest {
33 Unicast {
35 dst_ip: Ipv4Addr,
37 dst_port: u16,
39 },
40 Multicast {
42 group: Ipv4Addr,
44 port: u16,
46 loopback: bool,
48 ttl: u32,
50 },
51}
52
53impl StreamDest {
54 pub fn port(&self) -> u16 {
56 match self {
57 StreamDest::Unicast { dst_port, .. } => *dst_port,
58 StreamDest::Multicast { port, .. } => *port,
59 }
60 }
61
62 pub fn addr(&self) -> Ipv4Addr {
64 match self {
65 StreamDest::Unicast { dst_ip, .. } => *dst_ip,
66 StreamDest::Multicast { group, .. } => *group,
67 }
68 }
69
70 pub fn is_multicast(&self) -> bool {
72 matches!(self, StreamDest::Multicast { .. })
73 }
74}
75
76#[derive(Debug, Clone)]
78pub struct StreamConfig {
79 pub dest: StreamDest,
81 pub iface: Iface,
83 pub packet_size: Option<u32>,
85 pub packet_delay: Option<u32>,
87 pub source_filter: Option<Ipv4Addr>,
89 pub resend_enabled: bool,
91}
92
93#[derive(Debug, Error)]
95pub enum GvspError {
96 #[error("unsupported packet type: {0}")]
97 Unsupported(&'static str),
98 #[error("invalid packet: {0}")]
99 Invalid(&'static str),
100 #[error("resend timeout")]
101 ResendTimeout,
102}
103
104#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct ChunkRaw {
107 pub id: u16,
108 pub data: Bytes,
109}
110
111pub fn parse_chunks(mut payload: &[u8]) -> Vec<ChunkRaw> {
113 let mut chunks = Vec::new();
114 while !payload.is_empty() {
115 if payload.len() < 8 {
116 warn!(remaining = payload.len(), "chunk header truncated");
117 break;
118 }
119 let mut cursor = payload;
120 let id = cursor.get_u16();
121 let _reserved = cursor.get_u16();
122 let length = cursor.get_u32() as usize;
123 let total = 8 + length;
124 if payload.len() < total {
125 warn!(
126 chunk_id = format_args!("{:#06x}", id),
127 len = payload.len(),
128 expected = total,
129 "chunk data truncated"
130 );
131 break;
132 }
133 let data = Bytes::copy_from_slice(&payload[8..total]);
134 debug!(
135 chunk_id = format_args!("{:#06x}", id),
136 len = length,
137 "parsed chunk"
138 );
139 chunks.push(ChunkRaw { id, data });
140 payload = &payload[total..];
141 }
142 chunks
143}
144
145#[derive(Debug, Clone)]
147pub enum GvspPacket {
148 Leader {
150 block_id: u16,
151 packet_id: u16,
152 payload_type: u8,
153 timestamp: u64,
154 width: u32,
155 height: u32,
156 pixel_format: u32,
157 },
158 Payload {
160 block_id: u16,
161 packet_id: u16,
162 data: Bytes,
163 },
164 Trailer {
166 block_id: u16,
167 packet_id: u16,
168 status: u16,
169 chunk_data: Bytes,
170 },
171}
172
173pub fn parse_packet(payload: &[u8]) -> Result<GvspPacket, GvspError> {
175 if payload.len() < GVSP_HEADER_SIZE {
176 return Err(GvspError::Invalid("GVSP header truncated"));
177 }
178 let mut cursor = payload;
179 let status = cursor.get_u16();
180 let block_id = cursor.get_u16();
181 let packet_id = cursor.get_u16();
182 let payload_type = (status >> 8) as u8;
183 let packet_format = status & 0x00FF;
184
185 match packet_format {
186 0x01 => parse_leader(
187 packet_id,
188 block_id,
189 payload_type,
190 &payload[GVSP_HEADER_SIZE..],
191 ),
192 0x02 => parse_payload(packet_id, block_id, &payload[GVSP_HEADER_SIZE..]),
193 0x03 => parse_trailer(packet_id, block_id, &payload[GVSP_HEADER_SIZE..]),
194 _ => Err(GvspError::Unsupported("packet format")),
195 }
196}
197
198fn parse_leader(
199 packet_id: u16,
200 block_id: u16,
201 payload_type: u8,
202 payload: &[u8],
203) -> Result<GvspPacket, GvspError> {
204 if payload_type != PAYLOAD_TYPE_IMAGE {
205 return Err(GvspError::Unsupported("payload type"));
206 }
207 if payload.len() < 24 {
208 return Err(GvspError::Invalid("leader payload truncated"));
209 }
210 let mut cursor = payload;
211 let timestamp = cursor.get_u64();
212 let width = cursor.get_u32();
213 let height = cursor.get_u32();
214 let pixel_format = cursor.get_u32();
215 Ok(GvspPacket::Leader {
216 block_id,
217 packet_id,
218 payload_type,
219 timestamp,
220 width,
221 height,
222 pixel_format,
223 })
224}
225
226fn parse_payload(packet_id: u16, block_id: u16, payload: &[u8]) -> Result<GvspPacket, GvspError> {
227 Ok(GvspPacket::Payload {
228 block_id,
229 packet_id,
230 data: Bytes::copy_from_slice(payload),
231 })
232}
233
234fn parse_trailer(packet_id: u16, block_id: u16, payload: &[u8]) -> Result<GvspPacket, GvspError> {
235 if payload.len() < 2 {
236 return Err(GvspError::Invalid("trailer truncated"));
237 }
238 let mut cursor = payload;
239 let status = cursor.get_u16();
240 let chunk_data = if payload.len() > 2 {
241 Bytes::copy_from_slice(&payload[2..])
242 } else {
243 Bytes::new()
244 };
245 Ok(GvspPacket::Trailer {
246 block_id,
247 packet_id,
248 status,
249 chunk_data,
250 })
251}
252
253#[derive(Debug, Clone)]
255pub struct PacketBitmap {
256 words: Vec<u64>,
257 received: usize,
258 total: usize,
259}
260
261impl PacketBitmap {
262 pub fn new(total: usize) -> Self {
264 let words = total.div_ceil(64);
265 Self {
266 words: vec![0; words],
267 received: 0,
268 total,
269 }
270 }
271
272 fn mask_for(&self, packet_id: usize) -> (usize, u64) {
273 let word = packet_id / 64;
274 let bit = packet_id % 64;
275 (word, 1u64 << bit)
276 }
277
278 pub fn set(&mut self, packet_id: usize) -> bool {
280 if packet_id >= self.total {
281 return false;
282 }
283 let (word, mask) = self.mask_for(packet_id);
284 let entry = &mut self.words[word];
285 if *entry & mask == 0 {
286 *entry |= mask;
287 self.received += 1;
288 true
289 } else {
290 false
291 }
292 }
293
294 pub fn is_complete(&self) -> bool {
296 self.received == self.total
297 }
298
299 pub fn missing_ranges(&self) -> Vec<RangeInclusive<u16>> {
301 let mut ranges = Vec::new();
302 let mut current: Option<(u16, u16)> = None;
303 for idx in 0..self.total {
304 let (word, mask) = self.mask_for(idx);
305 let present = (self.words[word] & mask) != 0;
306 match (present, current) {
307 (false, None) => current = Some((idx as u16, idx as u16)),
308 (false, Some((start, _))) => current = Some((start, idx as u16)),
309 (true, Some((start, end))) => {
310 ranges.push(start..=end);
311 current = None;
312 }
313 _ => {}
314 }
315 }
316 if let Some((start, end)) = current {
317 ranges.push(start..=end);
318 }
319 ranges
320 }
321}
322
323#[derive(Debug)]
325pub struct FrameAssembly {
326 block_id: u16,
327 expected_packets: usize,
328 packet_payload: usize,
329 bitmap: PacketBitmap,
330 buffer: BytesMut,
331 lengths: Vec<usize>,
332 deadline: Instant,
333}
334
335impl FrameAssembly {
336 pub fn new(
338 block_id: u16,
339 expected_packets: usize,
340 packet_payload: usize,
341 buffer: BytesMut,
342 deadline: Instant,
343 ) -> Self {
344 Self {
345 block_id,
346 expected_packets,
347 packet_payload,
348 bitmap: PacketBitmap::new(expected_packets),
349 buffer,
350 lengths: vec![0; expected_packets],
351 deadline,
352 }
353 }
354
355 pub fn block_id(&self) -> u16 {
357 self.block_id
358 }
359
360 pub fn is_expired(&self, now: Instant) -> bool {
362 now >= self.deadline
363 }
364
365 pub fn ingest(&mut self, packet_id: usize, payload: &[u8]) -> bool {
367 if packet_id >= self.expected_packets || payload.len() > self.packet_payload {
368 return false;
369 }
370 if !self.bitmap.set(packet_id) {
371 return true;
372 }
373 self.lengths[packet_id] = payload.len();
375 let offset = packet_id * self.packet_payload;
376 if self.buffer.len() < offset + payload.len() {
377 self.buffer.resize(offset + payload.len(), 0);
378 }
379 self.buffer[offset..offset + payload.len()].copy_from_slice(payload);
380 true
381 }
382
383 pub fn finish(self) -> Option<Bytes> {
385 if !self.bitmap.is_complete() {
386 return None;
387 }
388
389 let full_sized_prefix = if self.expected_packets > 0 {
392 self.lengths
393 .iter()
394 .take(self.expected_packets.saturating_sub(1))
395 .all(|&len| len == self.packet_payload)
396 } else {
397 true
398 };
399
400 if full_sized_prefix {
401 let last_len = *self.lengths.last().unwrap_or(&0);
402 let used = self
403 .packet_payload
404 .saturating_mul(self.expected_packets.saturating_sub(1))
405 + last_len;
406 let mut buf = self.buffer;
407 if buf.len() > used {
408 buf.truncate(used);
409 }
410 return Some(buf.freeze());
411 }
412
413 let total: usize = self.lengths.iter().sum();
416 let mut out = BytesMut::with_capacity(total);
417 for (i, &len) in self.lengths.iter().enumerate() {
418 if len == 0 {
419 continue;
420 }
421 let start = i * self.packet_payload;
422 let end = start + len;
423 out.extend_from_slice(&self.buffer[start..end]);
424 }
425 Some(out.freeze())
426 }
427}
428
429#[derive(Debug, Clone)]
431pub struct ResendPlanner {
432 retries: u32,
433 max_retries: u32,
434 base_delay: Duration,
435 next_deadline: Instant,
436}
437
438impl ResendPlanner {
439 pub fn new(max_retries: u32, base_delay: Duration) -> Self {
440 Self {
441 retries: 0,
442 max_retries,
443 base_delay,
444 next_deadline: Instant::now(),
445 }
446 }
447
448 pub fn should_resend(&self, now: Instant) -> bool {
450 self.retries < self.max_retries && now >= self.next_deadline
451 }
452
453 pub fn record_attempt(&mut self, now: Instant, jitter: Duration) {
455 self.retries += 1;
456 let base = self
457 .base_delay
458 .checked_mul(self.retries)
459 .unwrap_or(self.base_delay);
460 self.next_deadline = now + base + jitter;
461 }
462
463 pub fn is_exhausted(&self) -> bool {
465 self.retries >= self.max_retries
466 }
467}
468
469#[derive(Debug, Clone)]
471pub struct CompletedFrame {
472 pub block_id: u16,
473 pub timestamp: Instant,
474 pub payload: Bytes,
475}
476
477#[derive(Debug)]
480pub struct FrameQueue {
481 inner: VecDeque<CompletedFrame>,
482 capacity: usize,
483}
484
485impl FrameQueue {
486 pub fn new(capacity: usize) -> Self {
487 Self {
488 inner: VecDeque::with_capacity(capacity),
489 capacity,
490 }
491 }
492
493 pub fn push(&mut self, frame: CompletedFrame, stats: &StreamStatsAccumulator) {
494 if self.inner.len() == self.capacity {
495 self.inner.pop_front();
496 stats.record_backpressure_drop();
497 }
498 self.inner.push_back(frame);
499 }
500
501 pub fn pop(&mut self) -> Option<CompletedFrame> {
502 self.inner.pop_front()
503 }
504}
505
506pub fn coalesce_missing(bitmap: &PacketBitmap, max_range: usize) -> Vec<RangeInclusive<u16>> {
508 bitmap
509 .missing_ranges()
510 .into_iter()
511 .flat_map(|range| split_range(range, max_range))
512 .collect()
513}
514
515fn split_range(range: RangeInclusive<u16>, max_len: usize) -> Vec<RangeInclusive<u16>> {
516 let start = *range.start() as usize;
517 let end = *range.end() as usize;
518 if max_len == 0 {
519 return vec![range];
520 }
521 let mut result = Vec::new();
522 let mut current = start;
523 while current <= end {
524 let upper = (current + max_len - 1).min(end);
525 result.push(current as u16..=upper as u16);
526 current = upper + 1;
527 }
528 result
529}
530
531#[derive(Debug)]
533pub struct Reassembler {
534 active: Option<FrameAssembly>,
535 packet_payload: usize,
536 stats: StreamStatsAccumulator,
537}
538
539impl Reassembler {
540 pub fn new(packet_payload: usize, stats: StreamStatsAccumulator) -> Self {
541 Self {
542 active: None,
543 packet_payload,
544 stats,
545 }
546 }
547
548 pub fn start_block(&mut self, block_id: u16, expected_packets: usize, buffer: BytesMut) {
550 let deadline = Instant::now() + Duration::from_millis(50);
551 self.active = Some(FrameAssembly::new(
552 block_id,
553 expected_packets,
554 self.packet_payload,
555 buffer,
556 deadline,
557 ));
558 }
559
560 pub fn push_packet(&mut self, packet_id: usize, payload: &[u8]) {
562 if let Some(assembly) = self.active.as_mut() {
563 if assembly.ingest(packet_id, payload) {
564 self.stats.record_packet();
565 }
566 }
567 }
568
569 pub fn finish_block(&mut self) -> Option<Bytes> {
571 self.active.take().and_then(FrameAssembly::finish)
572 }
573}
574
575#[cfg(test)]
576mod tests {
577 use super::*;
578
579 #[test]
580 fn parse_multiple_chunks() {
581 let mut payload = Vec::new();
582 payload.extend_from_slice(&0x0001u16.to_be_bytes());
583 payload.extend_from_slice(&0u16.to_be_bytes());
584 payload.extend_from_slice(&4u32.to_be_bytes());
585 payload.extend_from_slice(&[1, 2, 3, 4]);
586 payload.extend_from_slice(&0x0002u16.to_be_bytes());
587 payload.extend_from_slice(&0u16.to_be_bytes());
588 payload.extend_from_slice(&2u32.to_be_bytes());
589 payload.extend_from_slice(&[5, 6]);
590 let chunks = parse_chunks(&payload);
591 assert_eq!(chunks.len(), 2);
592 assert_eq!(chunks[0].id, 0x0001);
593 assert_eq!(chunks[0].data.as_ref(), &[1, 2, 3, 4]);
594 assert_eq!(chunks[1].id, 0x0002);
595 assert_eq!(chunks[1].data.as_ref(), &[5, 6]);
596 }
597
598 #[test]
599 fn truncated_chunk_is_ignored() {
600 let payload = vec![0u8; 6];
601 let chunks = parse_chunks(&payload);
602 assert!(chunks.is_empty());
603 }
604
605 #[test]
606 fn parse_chunks_tolerates_padding() {
607 for _ in 0..128 {
608 let count = fastrand::usize(..6);
609 let mut payload = Vec::new();
610 let mut entries = Vec::new();
611 for _ in 0..count {
612 let id = fastrand::u16(..);
613 let len = fastrand::usize(..16);
614 let mut data = vec![0u8; len];
615 for byte in &mut data {
616 *byte = fastrand::u8(..);
617 }
618 payload.extend_from_slice(&id.to_be_bytes());
619 payload.extend_from_slice(&0u16.to_be_bytes());
620 payload.extend_from_slice(&(data.len() as u32).to_be_bytes());
621 payload.extend_from_slice(&data);
622 entries.push((id, data));
623 }
624 let padding_len = fastrand::usize(..8);
625 for _ in 0..padding_len {
626 payload.push(fastrand::u8(..));
627 }
628 let parsed = parse_chunks(&payload);
629 assert!(parsed.len() <= entries.len());
630 for (idx, chunk) in parsed.iter().enumerate() {
631 assert_eq!(chunk.id, entries[idx].0);
632 assert_eq!(chunk.data.as_ref(), entries[idx].1.as_slice());
633 }
634 }
635 }
636
637 #[test]
638 fn bitmap_missing_ranges_coalesce() {
639 let mut bitmap = PacketBitmap::new(10);
640 for &idx in &[0usize, 1, 5, 6, 9] {
641 bitmap.set(idx);
642 }
643 let ranges = bitmap.missing_ranges();
644 assert_eq!(ranges.len(), 2);
645 assert_eq!(ranges[0], 2..=4);
646 assert_eq!(ranges[1], 7..=8);
647 }
648
649 #[test]
650 fn coalesce_splits_large_ranges() {
651 let mut bitmap = PacketBitmap::new(20);
652 for idx in [0usize, 1, 2, 18, 19] {
653 bitmap.set(idx);
654 }
655 let ranges = coalesce_missing(&bitmap, 4);
656 assert_eq!(ranges, vec![3..=6, 7..=10, 11..=14, 15..=17]);
657 }
658
659 #[test]
660 fn reassembler_finishes_frame() {
661 let stats = StreamStatsAccumulator::new();
662 let mut reassembler = Reassembler::new(4, stats);
663 reassembler.start_block(1, 3, BytesMut::with_capacity(12));
664 reassembler.push_packet(0, &[1, 2, 3]);
665 reassembler.push_packet(1, &[4, 5, 6]);
666 reassembler.push_packet(2, &[7, 8, 9]);
667 let frame = reassembler.finish_block().expect("frame");
668 assert_eq!(frame.as_ref(), &[1, 2, 3, 4, 5, 6, 7, 8, 9]);
669 }
670}