1use std::collections::VecDeque;
10use std::net::Ipv4Addr;
11use std::ops::RangeInclusive;
12use std::time::{Duration, Instant};
13
14use crate::nic::Iface;
15use crate::stats::StreamStatsAccumulator;
16use bytes::{Buf, Bytes, BytesMut};
17use thiserror::Error;
18use tracing::{debug, warn};
19
20const PAYLOAD_TYPE_IMAGE: u8 = 0x01;
24
25const GVSP_HEADER_SIZE: usize = 8;
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum StreamDest {
33 Unicast {
35 dst_ip: Ipv4Addr,
37 dst_port: u16,
39 },
40 Multicast {
42 group: Ipv4Addr,
44 port: u16,
46 loopback: bool,
48 ttl: u32,
50 },
51}
52
53impl StreamDest {
54 pub fn port(&self) -> u16 {
56 match self {
57 StreamDest::Unicast { dst_port, .. } => *dst_port,
58 StreamDest::Multicast { port, .. } => *port,
59 }
60 }
61
62 pub fn addr(&self) -> Ipv4Addr {
64 match self {
65 StreamDest::Unicast { dst_ip, .. } => *dst_ip,
66 StreamDest::Multicast { group, .. } => *group,
67 }
68 }
69
70 pub fn is_multicast(&self) -> bool {
72 matches!(self, StreamDest::Multicast { .. })
73 }
74}
75
76#[derive(Debug, Clone)]
78pub struct StreamConfig {
79 pub dest: StreamDest,
81 pub iface: Iface,
83 pub packet_size: Option<u32>,
85 pub packet_delay: Option<u32>,
87 pub source_filter: Option<Ipv4Addr>,
89 pub resend_enabled: bool,
91}
92
93#[derive(Debug, Error)]
95pub enum GvspError {
96 #[error("unsupported packet type: {0}")]
97 Unsupported(&'static str),
98 #[error("invalid packet: {0}")]
99 Invalid(&'static str),
100 #[error("resend timeout")]
101 ResendTimeout,
102}
103
104#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct ChunkRaw {
107 pub id: u16,
108 pub data: Bytes,
109}
110
111pub fn parse_chunks(mut payload: &[u8]) -> Vec<ChunkRaw> {
113 let mut chunks = Vec::new();
114 while !payload.is_empty() {
115 if payload.len() < 8 {
116 warn!(remaining = payload.len(), "chunk header truncated");
117 break;
118 }
119 let mut cursor = payload;
120 let id = cursor.get_u16();
121 let _reserved = cursor.get_u16();
122 let length = cursor.get_u32() as usize;
123 let total = 8 + length;
124 if payload.len() < total {
125 warn!(
126 chunk_id = format_args!("{:#06x}", id),
127 len = payload.len(),
128 expected = total,
129 "chunk data truncated"
130 );
131 break;
132 }
133 let data = Bytes::copy_from_slice(&payload[8..total]);
134 debug!(
135 chunk_id = format_args!("{:#06x}", id),
136 len = length,
137 "parsed chunk"
138 );
139 chunks.push(ChunkRaw { id, data });
140 payload = &payload[total..];
141 }
142 chunks
143}
144
145#[derive(Debug, Clone)]
151pub enum GvspPacket {
152 Leader {
154 block_id: u64,
155 packet_id: u32,
156 payload_type: u8,
157 timestamp: u64,
158 width: u32,
159 height: u32,
160 pixel_format: u32,
161 },
162 Payload {
164 block_id: u64,
165 packet_id: u32,
166 data: Bytes,
167 },
168 Trailer {
170 block_id: u64,
171 packet_id: u32,
172 status: u16,
173 chunk_data: Bytes,
174 },
175}
176
177const GVSP_EXTENDED_HEADER_SIZE: usize = 20;
190
191const EXTENDED_ID_FLAG: u8 = 0x80;
193
194pub fn parse_packet(payload: &[u8]) -> Result<GvspPacket, GvspError> {
195 if payload.len() < GVSP_HEADER_SIZE {
196 return Err(GvspError::Invalid("GVSP header truncated"));
197 }
198
199 let packet_format_byte = payload[4];
200 let extended = (packet_format_byte & EXTENDED_ID_FLAG) != 0;
201 let packet_format = packet_format_byte & 0x0F;
202
203 let (block_id, packet_id, data_offset) = if extended {
204 if payload.len() < GVSP_EXTENDED_HEADER_SIZE {
212 return Err(GvspError::Invalid("extended GVSP header truncated"));
213 }
214 let block_id = u64::from_be_bytes([
215 payload[8],
216 payload[9],
217 payload[10],
218 payload[11],
219 payload[12],
220 payload[13],
221 payload[14],
222 payload[15],
223 ]);
224 let packet_id = u32::from_be_bytes([payload[16], payload[17], payload[18], payload[19]]);
225 (block_id, packet_id, GVSP_EXTENDED_HEADER_SIZE)
226 } else {
227 let block_id = u16::from_be_bytes([payload[2], payload[3]]) as u64;
229 let packet_id = u32::from_be_bytes([0, payload[5], payload[6], payload[7]]);
230 (block_id, packet_id, GVSP_HEADER_SIZE)
231 };
232
233 let payload_type = (u16::from_be_bytes([payload[0], payload[1]]) >> 4) as u8;
234
235 match packet_format {
236 0x01 => parse_leader(packet_id, block_id, payload_type, &payload[data_offset..]),
237 0x03 => parse_payload(packet_id, block_id, &payload[data_offset..]),
238 0x02 => parse_trailer(packet_id, block_id, &payload[data_offset..]),
239 _ => Err(GvspError::Unsupported("packet format")),
240 }
241}
242
243fn parse_leader(
256 packet_id: u32,
257 block_id: u64,
258 _payload_type_header: u8,
259 payload: &[u8],
260) -> Result<GvspPacket, GvspError> {
261 if payload.len() < 24 {
262 return Err(GvspError::Invalid("leader payload truncated"));
263 }
264 let mut cursor = payload;
265 let _reserved = cursor.get_u16();
266 let payload_type = cursor.get_u16() as u8;
267 if payload_type != PAYLOAD_TYPE_IMAGE {
268 return Err(GvspError::Unsupported("payload type"));
269 }
270 let timestamp = cursor.get_u64();
271 let pixel_format = cursor.get_u32();
272 let width = cursor.get_u32();
273 let height = cursor.get_u32();
274 Ok(GvspPacket::Leader {
275 block_id,
276 packet_id,
277 payload_type,
278 timestamp,
279 width,
280 height,
281 pixel_format,
282 })
283}
284
285fn parse_payload(packet_id: u32, block_id: u64, payload: &[u8]) -> Result<GvspPacket, GvspError> {
286 Ok(GvspPacket::Payload {
287 block_id,
288 packet_id,
289 data: Bytes::copy_from_slice(payload),
290 })
291}
292
293fn parse_trailer(packet_id: u32, block_id: u64, payload: &[u8]) -> Result<GvspPacket, GvspError> {
294 if payload.len() < 2 {
295 return Err(GvspError::Invalid("trailer truncated"));
296 }
297 let mut cursor = payload;
298 let status = cursor.get_u16();
299 let chunk_data = if payload.len() > 2 {
300 Bytes::copy_from_slice(&payload[2..])
301 } else {
302 Bytes::new()
303 };
304 Ok(GvspPacket::Trailer {
305 block_id,
306 packet_id,
307 status,
308 chunk_data,
309 })
310}
311
312#[derive(Debug, Clone)]
314pub struct PacketBitmap {
315 words: Vec<u64>,
316 received: usize,
317 total: usize,
318}
319
320impl PacketBitmap {
321 pub fn new(total: usize) -> Self {
323 let words = total.div_ceil(64);
324 Self {
325 words: vec![0; words],
326 received: 0,
327 total,
328 }
329 }
330
331 fn mask_for(&self, packet_id: usize) -> (usize, u64) {
332 let word = packet_id / 64;
333 let bit = packet_id % 64;
334 (word, 1u64 << bit)
335 }
336
337 pub fn set(&mut self, packet_id: usize) -> bool {
339 if packet_id >= self.total {
340 return false;
341 }
342 let (word, mask) = self.mask_for(packet_id);
343 let entry = &mut self.words[word];
344 if *entry & mask == 0 {
345 *entry |= mask;
346 self.received += 1;
347 true
348 } else {
349 false
350 }
351 }
352
353 pub fn is_complete(&self) -> bool {
355 self.received == self.total
356 }
357
358 pub fn missing_ranges(&self) -> Vec<RangeInclusive<u32>> {
360 let mut ranges = Vec::new();
361 let mut current: Option<(u32, u32)> = None;
362 for idx in 0..self.total {
363 let (word, mask) = self.mask_for(idx);
364 let present = (self.words[word] & mask) != 0;
365 match (present, current) {
366 (false, None) => current = Some((idx as u32, idx as u32)),
367 (false, Some((start, _))) => current = Some((start, idx as u32)),
368 (true, Some((start, end))) => {
369 ranges.push(start..=end);
370 current = None;
371 }
372 _ => {}
373 }
374 }
375 if let Some((start, end)) = current {
376 ranges.push(start..=end);
377 }
378 ranges
379 }
380}
381
382#[derive(Debug)]
384pub struct FrameAssembly {
385 block_id: u64,
386 expected_packets: usize,
387 packet_payload: usize,
388 bitmap: PacketBitmap,
389 buffer: BytesMut,
390 lengths: Vec<usize>,
391 deadline: Instant,
392}
393
394impl FrameAssembly {
395 pub fn new(
397 block_id: u64,
398 expected_packets: usize,
399 packet_payload: usize,
400 buffer: BytesMut,
401 deadline: Instant,
402 ) -> Self {
403 Self {
404 block_id,
405 expected_packets,
406 packet_payload,
407 bitmap: PacketBitmap::new(expected_packets),
408 buffer,
409 lengths: vec![0; expected_packets],
410 deadline,
411 }
412 }
413
414 pub fn block_id(&self) -> u64 {
416 self.block_id
417 }
418
419 pub fn is_expired(&self, now: Instant) -> bool {
421 now >= self.deadline
422 }
423
424 pub fn ingest(&mut self, packet_id: usize, payload: &[u8]) -> bool {
426 if packet_id >= self.expected_packets || payload.len() > self.packet_payload {
427 return false;
428 }
429 if !self.bitmap.set(packet_id) {
430 return true;
431 }
432 self.lengths[packet_id] = payload.len();
434 let offset = packet_id * self.packet_payload;
435 if self.buffer.len() < offset + payload.len() {
436 self.buffer.resize(offset + payload.len(), 0);
437 }
438 self.buffer[offset..offset + payload.len()].copy_from_slice(payload);
439 true
440 }
441
442 pub fn finish(self) -> Option<Bytes> {
444 if !self.bitmap.is_complete() {
445 return None;
446 }
447
448 let full_sized_prefix = if self.expected_packets > 0 {
451 self.lengths
452 .iter()
453 .take(self.expected_packets.saturating_sub(1))
454 .all(|&len| len == self.packet_payload)
455 } else {
456 true
457 };
458
459 if full_sized_prefix {
460 let last_len = *self.lengths.last().unwrap_or(&0);
461 let used = self
462 .packet_payload
463 .saturating_mul(self.expected_packets.saturating_sub(1))
464 + last_len;
465 let mut buf = self.buffer;
466 if buf.len() > used {
467 buf.truncate(used);
468 }
469 return Some(buf.freeze());
470 }
471
472 let total: usize = self.lengths.iter().sum();
475 let mut out = BytesMut::with_capacity(total);
476 for (i, &len) in self.lengths.iter().enumerate() {
477 if len == 0 {
478 continue;
479 }
480 let start = i * self.packet_payload;
481 let end = start + len;
482 out.extend_from_slice(&self.buffer[start..end]);
483 }
484 Some(out.freeze())
485 }
486}
487
488#[derive(Debug, Clone)]
490pub struct ResendPlanner {
491 retries: u32,
492 max_retries: u32,
493 base_delay: Duration,
494 next_deadline: Instant,
495}
496
497impl ResendPlanner {
498 pub fn new(max_retries: u32, base_delay: Duration) -> Self {
499 Self {
500 retries: 0,
501 max_retries,
502 base_delay,
503 next_deadline: Instant::now(),
504 }
505 }
506
507 pub fn should_resend(&self, now: Instant) -> bool {
509 self.retries < self.max_retries && now >= self.next_deadline
510 }
511
512 pub fn record_attempt(&mut self, now: Instant, jitter: Duration) {
514 self.retries += 1;
515 let base = self
516 .base_delay
517 .checked_mul(self.retries)
518 .unwrap_or(self.base_delay);
519 self.next_deadline = now + base + jitter;
520 }
521
522 pub fn is_exhausted(&self) -> bool {
524 self.retries >= self.max_retries
525 }
526}
527
528#[derive(Debug, Clone)]
530pub struct CompletedFrame {
531 pub block_id: u64,
532 pub timestamp: Instant,
533 pub payload: Bytes,
534}
535
536#[derive(Debug)]
539pub struct FrameQueue {
540 inner: VecDeque<CompletedFrame>,
541 capacity: usize,
542}
543
544impl FrameQueue {
545 pub fn new(capacity: usize) -> Self {
546 Self {
547 inner: VecDeque::with_capacity(capacity),
548 capacity,
549 }
550 }
551
552 pub fn push(&mut self, frame: CompletedFrame, stats: &StreamStatsAccumulator) {
553 if self.inner.len() == self.capacity {
554 self.inner.pop_front();
555 stats.record_backpressure_drop();
556 }
557 self.inner.push_back(frame);
558 }
559
560 pub fn pop(&mut self) -> Option<CompletedFrame> {
561 self.inner.pop_front()
562 }
563}
564
565pub fn coalesce_missing(bitmap: &PacketBitmap, max_range: usize) -> Vec<RangeInclusive<u32>> {
567 bitmap
568 .missing_ranges()
569 .into_iter()
570 .flat_map(|range| split_range(range, max_range))
571 .collect()
572}
573
574fn split_range(range: RangeInclusive<u32>, max_len: usize) -> Vec<RangeInclusive<u32>> {
575 let start = *range.start() as usize;
576 let end = *range.end() as usize;
577 if max_len == 0 {
578 return vec![range];
579 }
580 let mut result = Vec::new();
581 let mut current = start;
582 while current <= end {
583 let upper = (current + max_len - 1).min(end);
584 result.push(current as u32..=upper as u32);
585 current = upper + 1;
586 }
587 result
588}
589
590#[derive(Debug)]
592pub struct Reassembler {
593 active: Option<FrameAssembly>,
594 packet_payload: usize,
595 stats: StreamStatsAccumulator,
596}
597
598impl Reassembler {
599 pub fn new(packet_payload: usize, stats: StreamStatsAccumulator) -> Self {
600 Self {
601 active: None,
602 packet_payload,
603 stats,
604 }
605 }
606
607 pub fn start_block(&mut self, block_id: u64, expected_packets: usize, buffer: BytesMut) {
609 let deadline = Instant::now() + Duration::from_millis(50);
610 self.active = Some(FrameAssembly::new(
611 block_id,
612 expected_packets,
613 self.packet_payload,
614 buffer,
615 deadline,
616 ));
617 }
618
619 pub fn push_packet(&mut self, packet_id: usize, payload: &[u8]) {
621 if let Some(assembly) = self.active.as_mut()
622 && assembly.ingest(packet_id, payload)
623 {
624 self.stats.record_packet();
625 }
626 }
627
628 pub fn finish_block(&mut self) -> Option<Bytes> {
630 self.active.take().and_then(FrameAssembly::finish)
631 }
632}
633
634#[cfg(test)]
635mod tests {
636 use super::*;
637
638 #[test]
639 fn parse_multiple_chunks() {
640 let mut payload = Vec::new();
641 payload.extend_from_slice(&0x0001u16.to_be_bytes());
642 payload.extend_from_slice(&0u16.to_be_bytes());
643 payload.extend_from_slice(&4u32.to_be_bytes());
644 payload.extend_from_slice(&[1, 2, 3, 4]);
645 payload.extend_from_slice(&0x0002u16.to_be_bytes());
646 payload.extend_from_slice(&0u16.to_be_bytes());
647 payload.extend_from_slice(&2u32.to_be_bytes());
648 payload.extend_from_slice(&[5, 6]);
649 let chunks = parse_chunks(&payload);
650 assert_eq!(chunks.len(), 2);
651 assert_eq!(chunks[0].id, 0x0001);
652 assert_eq!(chunks[0].data.as_ref(), &[1, 2, 3, 4]);
653 assert_eq!(chunks[1].id, 0x0002);
654 assert_eq!(chunks[1].data.as_ref(), &[5, 6]);
655 }
656
657 #[test]
658 fn truncated_chunk_is_ignored() {
659 let payload = vec![0u8; 6];
660 let chunks = parse_chunks(&payload);
661 assert!(chunks.is_empty());
662 }
663
664 #[test]
665 fn parse_chunks_tolerates_padding() {
666 for _ in 0..128 {
667 let count = fastrand::usize(..6);
668 let mut payload = Vec::new();
669 let mut entries = Vec::new();
670 for _ in 0..count {
671 let id = fastrand::u16(..);
672 let len = fastrand::usize(..16);
673 let mut data = vec![0u8; len];
674 for byte in &mut data {
675 *byte = fastrand::u8(..);
676 }
677 payload.extend_from_slice(&id.to_be_bytes());
678 payload.extend_from_slice(&0u16.to_be_bytes());
679 payload.extend_from_slice(&(data.len() as u32).to_be_bytes());
680 payload.extend_from_slice(&data);
681 entries.push((id, data));
682 }
683 let padding_len = fastrand::usize(..8);
684 for _ in 0..padding_len {
685 payload.push(fastrand::u8(..));
686 }
687 let parsed = parse_chunks(&payload);
688 assert!(parsed.len() <= entries.len());
689 for (idx, chunk) in parsed.iter().enumerate() {
690 assert_eq!(chunk.id, entries[idx].0);
691 assert_eq!(chunk.data.as_ref(), entries[idx].1.as_slice());
692 }
693 }
694 }
695
696 #[test]
697 fn bitmap_missing_ranges_coalesce() {
698 let mut bitmap = PacketBitmap::new(10);
699 for &idx in &[0usize, 1, 5, 6, 9] {
700 bitmap.set(idx);
701 }
702 let ranges = bitmap.missing_ranges();
703 assert_eq!(ranges.len(), 2);
704 assert_eq!(ranges[0], 2..=4);
705 assert_eq!(ranges[1], 7..=8);
706 }
707
708 #[test]
709 fn coalesce_splits_large_ranges() {
710 let mut bitmap = PacketBitmap::new(20);
711 for idx in [0usize, 1, 2, 18, 19] {
712 bitmap.set(idx);
713 }
714 let ranges = coalesce_missing(&bitmap, 4);
715 assert_eq!(ranges, vec![3..=6, 7..=10, 11..=14, 15..=17]);
716 }
717
718 #[test]
719 fn reassembler_finishes_frame() {
720 let stats = StreamStatsAccumulator::new();
721 let mut reassembler = Reassembler::new(4, stats);
722 reassembler.start_block(1, 3, BytesMut::with_capacity(12));
723 reassembler.push_packet(0, &[1, 2, 3]);
724 reassembler.push_packet(1, &[4, 5, 6]);
725 reassembler.push_packet(2, &[7, 8, 9]);
726 let frame = reassembler.finish_block().expect("frame");
727 assert_eq!(frame.as_ref(), &[1, 2, 3, 4, 5, 6, 7, 8, 9]);
728 }
729}