viva_gige/
gvsp.rs

1//! GVSP packet parsing, reassembly and resend orchestration.
2//!
3//! The GigE Vision Streaming Protocol delivers image data over UDP. Packets can
4//! arrive out of order or be dropped entirely; this module reconstructs complete
5//! frames while coordinating resend requests over GVCP. The implementation keeps
6//! copies to a minimum by writing directly into pooled [`BytesMut`] buffers that
7//! are subsequently frozen into [`Bytes`] once a frame is ready.
8
9use std::collections::VecDeque;
10use std::net::Ipv4Addr;
11use std::ops::RangeInclusive;
12use std::time::{Duration, Instant};
13
14use crate::nic::Iface;
15use crate::stats::StreamStatsAccumulator;
16use bytes::{Buf, Bytes, BytesMut};
17use thiserror::Error;
18use tracing::{debug, warn};
19
20/// GVSP payload type for image data as defined by the specification (Table
21/// 36). Other payload types are currently not supported by the reassembler but
22/// will still be parsed.
23const PAYLOAD_TYPE_IMAGE: u8 = 0x01;
24
25/// Size of the GVSP header preceding payload packets. The reassembler uses the
26/// value when allocating buffers so the application payload fits within the
27/// negotiated packet size.
28const GVSP_HEADER_SIZE: usize = 8;
29
30/// Destination for GVSP packets received by the stream.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum StreamDest {
33    /// Standard unicast delivery towards a single host.
34    Unicast {
35        /// Destination IPv4 address configured on the camera.
36        dst_ip: Ipv4Addr,
37        /// UDP port used for streaming.
38        dst_port: u16,
39    },
40    /// Multicast delivery towards one or more hosts joined to the group.
41    Multicast {
42        /// Multicast group IPv4 address.
43        group: Ipv4Addr,
44        /// UDP port used for streaming.
45        port: u16,
46        /// Whether loopback is enabled on the local socket.
47        loopback: bool,
48        /// Outbound multicast time-to-live.
49        ttl: u32,
50    },
51}
52
53impl StreamDest {
54    /// Retrieve the configured UDP port.
55    pub fn port(&self) -> u16 {
56        match self {
57            StreamDest::Unicast { dst_port, .. } => *dst_port,
58            StreamDest::Multicast { port, .. } => *port,
59        }
60    }
61
62    /// Retrieve the configured IPv4 destination address.
63    pub fn addr(&self) -> Ipv4Addr {
64        match self {
65            StreamDest::Unicast { dst_ip, .. } => *dst_ip,
66            StreamDest::Multicast { group, .. } => *group,
67        }
68    }
69
70    /// Whether the destination represents multicast delivery.
71    pub fn is_multicast(&self) -> bool {
72        matches!(self, StreamDest::Multicast { .. })
73    }
74}
75
76/// Stream configuration shared between the control plane and GVSP receiver.
77#[derive(Debug, Clone)]
78pub struct StreamConfig {
79    /// Destination configuration for the GVSP stream.
80    pub dest: StreamDest,
81    /// Interface used for receiving packets and multicast subscription.
82    pub iface: Iface,
83    /// Override for GVSP packet size determined via control plane.
84    pub packet_size: Option<u32>,
85    /// Override for GVSP packet delay determined via control plane.
86    pub packet_delay: Option<u32>,
87    /// Optional source filter restricting packets to the configured IPv4 address.
88    pub source_filter: Option<Ipv4Addr>,
89    /// Whether GVCP resend requests should be issued when drops are detected.
90    pub resend_enabled: bool,
91}
92
93/// Errors raised while handling GVSP packets.
94#[derive(Debug, Error)]
95pub enum GvspError {
96    #[error("unsupported packet type: {0}")]
97    Unsupported(&'static str),
98    #[error("invalid packet: {0}")]
99    Invalid(&'static str),
100    #[error("resend timeout")]
101    ResendTimeout,
102}
103
104/// Raw GVSP chunk extracted from a payload or trailer block.
105#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct ChunkRaw {
107    pub id: u16,
108    pub data: Bytes,
109}
110
111/// Parse a chunk payload following the `[id][reserved][length][data...]` layout.
112pub fn parse_chunks(mut payload: &[u8]) -> Vec<ChunkRaw> {
113    let mut chunks = Vec::new();
114    while !payload.is_empty() {
115        if payload.len() < 8 {
116            warn!(remaining = payload.len(), "chunk header truncated");
117            break;
118        }
119        let mut cursor = payload;
120        let id = cursor.get_u16();
121        let _reserved = cursor.get_u16();
122        let length = cursor.get_u32() as usize;
123        let total = 8 + length;
124        if payload.len() < total {
125            warn!(
126                chunk_id = format_args!("{:#06x}", id),
127                len = payload.len(),
128                expected = total,
129                "chunk data truncated"
130            );
131            break;
132        }
133        let data = Bytes::copy_from_slice(&payload[8..total]);
134        debug!(
135            chunk_id = format_args!("{:#06x}", id),
136            len = length,
137            "parsed chunk"
138        );
139        chunks.push(ChunkRaw { id, data });
140        payload = &payload[total..];
141    }
142    chunks
143}
144
145/// Representation of a GVSP packet.
146///
147/// Block IDs are `u64` and packet IDs are `u32` to support both standard
148/// (16-bit block / 24-bit packet) and extended ID mode (64-bit block /
149/// 32-bit packet) as defined in GigE Vision 2.0+.
150#[derive(Debug, Clone)]
151pub enum GvspPacket {
152    /// Start-of-frame leader packet with metadata.
153    Leader {
154        block_id: u64,
155        packet_id: u32,
156        payload_type: u8,
157        timestamp: u64,
158        width: u32,
159        height: u32,
160        pixel_format: u32,
161    },
162    /// Payload data packet carrying pixel bytes.
163    Payload {
164        block_id: u64,
165        packet_id: u32,
166        data: Bytes,
167    },
168    /// End-of-frame trailer packet.
169    Trailer {
170        block_id: u64,
171        packet_id: u32,
172        status: u16,
173        chunk_data: Bytes,
174    },
175}
176
177/// Parse a raw UDP payload into a GVSP packet.
178/// Parse a GVSP packet from raw bytes.
179///
180/// GVSP header layout (8 bytes):
181///
182/// | Offset | Size | Field         |
183/// |--------|------|---------------|
184/// |      0 |    2 | Status        |
185/// |      2 |    2 | Block ID      |
186/// |      4 |    1 | Packet format |
187/// |      5 |    3 | Packet ID     |
188/// Size of the extended GVSP header (GigE Vision 2.0+).
189const GVSP_EXTENDED_HEADER_SIZE: usize = 20;
190
191/// Extended ID flag: bit 7 of the packet_format byte.
192const EXTENDED_ID_FLAG: u8 = 0x80;
193
194pub fn parse_packet(payload: &[u8]) -> Result<GvspPacket, GvspError> {
195    if payload.len() < GVSP_HEADER_SIZE {
196        return Err(GvspError::Invalid("GVSP header truncated"));
197    }
198
199    let packet_format_byte = payload[4];
200    let extended = (packet_format_byte & EXTENDED_ID_FLAG) != 0;
201    let packet_format = packet_format_byte & 0x0F;
202
203    let (block_id, packet_id, data_offset) = if extended {
204        // Extended ID header (20 bytes):
205        // [0-1]  status
206        // [2-3]  block_id low 16 (backward compat)
207        // [4]    packet_format | 0x80
208        // [5-7]  packet_id low 24
209        // [8-15] block_id 64-bit
210        // [16-19] packet_id 32-bit
211        if payload.len() < GVSP_EXTENDED_HEADER_SIZE {
212            return Err(GvspError::Invalid("extended GVSP header truncated"));
213        }
214        let block_id = u64::from_be_bytes([
215            payload[8],
216            payload[9],
217            payload[10],
218            payload[11],
219            payload[12],
220            payload[13],
221            payload[14],
222            payload[15],
223        ]);
224        let packet_id = u32::from_be_bytes([payload[16], payload[17], payload[18], payload[19]]);
225        (block_id, packet_id, GVSP_EXTENDED_HEADER_SIZE)
226    } else {
227        // Standard header (8 bytes)
228        let block_id = u16::from_be_bytes([payload[2], payload[3]]) as u64;
229        let packet_id = u32::from_be_bytes([0, payload[5], payload[6], payload[7]]);
230        (block_id, packet_id, GVSP_HEADER_SIZE)
231    };
232
233    let payload_type = (u16::from_be_bytes([payload[0], payload[1]]) >> 4) as u8;
234
235    match packet_format {
236        0x01 => parse_leader(packet_id, block_id, payload_type, &payload[data_offset..]),
237        0x03 => parse_payload(packet_id, block_id, &payload[data_offset..]),
238        0x02 => parse_trailer(packet_id, block_id, &payload[data_offset..]),
239        _ => Err(GvspError::Unsupported("packet format")),
240    }
241}
242
243/// Parse a GVSP Data Leader packet.
244///
245/// Leader payload layout:
246///
247/// | Offset | Size | Field        |
248/// |--------|------|--------------|
249/// |      0 |    2 | Reserved     |
250/// |      2 |    2 | Payload type |
251/// |      4 |    8 | Timestamp    |
252/// |     12 |    4 | Pixel format |
253/// |     16 |    4 | Width        |
254/// |     20 |    4 | Height       |
255fn parse_leader(
256    packet_id: u32,
257    block_id: u64,
258    _payload_type_header: u8,
259    payload: &[u8],
260) -> Result<GvspPacket, GvspError> {
261    if payload.len() < 24 {
262        return Err(GvspError::Invalid("leader payload truncated"));
263    }
264    let mut cursor = payload;
265    let _reserved = cursor.get_u16();
266    let payload_type = cursor.get_u16() as u8;
267    if payload_type != PAYLOAD_TYPE_IMAGE {
268        return Err(GvspError::Unsupported("payload type"));
269    }
270    let timestamp = cursor.get_u64();
271    let pixel_format = cursor.get_u32();
272    let width = cursor.get_u32();
273    let height = cursor.get_u32();
274    Ok(GvspPacket::Leader {
275        block_id,
276        packet_id,
277        payload_type,
278        timestamp,
279        width,
280        height,
281        pixel_format,
282    })
283}
284
285fn parse_payload(packet_id: u32, block_id: u64, payload: &[u8]) -> Result<GvspPacket, GvspError> {
286    Ok(GvspPacket::Payload {
287        block_id,
288        packet_id,
289        data: Bytes::copy_from_slice(payload),
290    })
291}
292
293fn parse_trailer(packet_id: u32, block_id: u64, payload: &[u8]) -> Result<GvspPacket, GvspError> {
294    if payload.len() < 2 {
295        return Err(GvspError::Invalid("trailer truncated"));
296    }
297    let mut cursor = payload;
298    let status = cursor.get_u16();
299    let chunk_data = if payload.len() > 2 {
300        Bytes::copy_from_slice(&payload[2..])
301    } else {
302        Bytes::new()
303    };
304    Ok(GvspPacket::Trailer {
305        block_id,
306        packet_id,
307        status,
308        chunk_data,
309    })
310}
311
312/// Bitmap tracking received packets within a block.
313#[derive(Debug, Clone)]
314pub struct PacketBitmap {
315    words: Vec<u64>,
316    received: usize,
317    total: usize,
318}
319
320impl PacketBitmap {
321    /// Create a bitmap with the given packet capacity.
322    pub fn new(total: usize) -> Self {
323        let words = total.div_ceil(64);
324        Self {
325            words: vec![0; words],
326            received: 0,
327            total,
328        }
329    }
330
331    fn mask_for(&self, packet_id: usize) -> (usize, u64) {
332        let word = packet_id / 64;
333        let bit = packet_id % 64;
334        (word, 1u64 << bit)
335    }
336
337    /// Mark a packet index as received.
338    pub fn set(&mut self, packet_id: usize) -> bool {
339        if packet_id >= self.total {
340            return false;
341        }
342        let (word, mask) = self.mask_for(packet_id);
343        let entry = &mut self.words[word];
344        if *entry & mask == 0 {
345            *entry |= mask;
346            self.received += 1;
347            true
348        } else {
349            false
350        }
351    }
352
353    /// Check whether the bitmap reports all packets received.
354    pub fn is_complete(&self) -> bool {
355        self.received == self.total
356    }
357
358    /// Return missing packet ranges as inclusive `[start, end]` indices.
359    pub fn missing_ranges(&self) -> Vec<RangeInclusive<u32>> {
360        let mut ranges = Vec::new();
361        let mut current: Option<(u32, u32)> = None;
362        for idx in 0..self.total {
363            let (word, mask) = self.mask_for(idx);
364            let present = (self.words[word] & mask) != 0;
365            match (present, current) {
366                (false, None) => current = Some((idx as u32, idx as u32)),
367                (false, Some((start, _))) => current = Some((start, idx as u32)),
368                (true, Some((start, end))) => {
369                    ranges.push(start..=end);
370                    current = None;
371                }
372                _ => {}
373            }
374        }
375        if let Some((start, end)) = current {
376            ranges.push(start..=end);
377        }
378        ranges
379    }
380}
381
382/// Representation of a partially received frame.
383#[derive(Debug)]
384pub struct FrameAssembly {
385    block_id: u64,
386    expected_packets: usize,
387    packet_payload: usize,
388    bitmap: PacketBitmap,
389    buffer: BytesMut,
390    lengths: Vec<usize>,
391    deadline: Instant,
392}
393
394impl FrameAssembly {
395    /// Create a new frame assembly using the supplied buffer.
396    pub fn new(
397        block_id: u64,
398        expected_packets: usize,
399        packet_payload: usize,
400        buffer: BytesMut,
401        deadline: Instant,
402    ) -> Self {
403        Self {
404            block_id,
405            expected_packets,
406            packet_payload,
407            bitmap: PacketBitmap::new(expected_packets),
408            buffer,
409            lengths: vec![0; expected_packets],
410            deadline,
411        }
412    }
413
414    /// Returns the block identifier associated with this frame.
415    pub fn block_id(&self) -> u64 {
416        self.block_id
417    }
418
419    /// Whether the reassembly deadline has elapsed.
420    pub fn is_expired(&self, now: Instant) -> bool {
421        now >= self.deadline
422    }
423
424    /// Insert a packet payload into the buffer.
425    pub fn ingest(&mut self, packet_id: usize, payload: &[u8]) -> bool {
426        if packet_id >= self.expected_packets || payload.len() > self.packet_payload {
427            return false;
428        }
429        if !self.bitmap.set(packet_id) {
430            return true;
431        }
432        // Track actual payload length for compaction at finish time.
433        self.lengths[packet_id] = payload.len();
434        let offset = packet_id * self.packet_payload;
435        if self.buffer.len() < offset + payload.len() {
436            self.buffer.resize(offset + payload.len(), 0);
437        }
438        self.buffer[offset..offset + payload.len()].copy_from_slice(payload);
439        true
440    }
441
442    /// Finalise the frame if all packets have been received.
443    pub fn finish(self) -> Option<Bytes> {
444        if !self.bitmap.is_complete() {
445            return None;
446        }
447
448        // If all packets except possibly the last are full-sized, we can
449        // return a slice of the existing buffer without extra copying.
450        let full_sized_prefix = if self.expected_packets > 0 {
451            self.lengths
452                .iter()
453                .take(self.expected_packets.saturating_sub(1))
454                .all(|&len| len == self.packet_payload)
455        } else {
456            true
457        };
458
459        if full_sized_prefix {
460            let last_len = *self.lengths.last().unwrap_or(&0);
461            let used = self
462                .packet_payload
463                .saturating_mul(self.expected_packets.saturating_sub(1))
464                + last_len;
465            let mut buf = self.buffer;
466            if buf.len() > used {
467                buf.truncate(used);
468            }
469            return Some(buf.freeze());
470        }
471
472        // Otherwise, compact the data to remove any gaps introduced by
473        // shorter packets occurring before the last packet.
474        let total: usize = self.lengths.iter().sum();
475        let mut out = BytesMut::with_capacity(total);
476        for (i, &len) in self.lengths.iter().enumerate() {
477            if len == 0 {
478                continue;
479            }
480            let start = i * self.packet_payload;
481            let end = start + len;
482            out.extend_from_slice(&self.buffer[start..end]);
483        }
484        Some(out.freeze())
485    }
486}
487
488/// Helper struct tracking resend attempts for a given block.
489#[derive(Debug, Clone)]
490pub struct ResendPlanner {
491    retries: u32,
492    max_retries: u32,
493    base_delay: Duration,
494    next_deadline: Instant,
495}
496
497impl ResendPlanner {
498    pub fn new(max_retries: u32, base_delay: Duration) -> Self {
499        Self {
500            retries: 0,
501            max_retries,
502            base_delay,
503            next_deadline: Instant::now(),
504        }
505    }
506
507    /// Determine whether a resend can be attempted at the provided instant.
508    pub fn should_resend(&self, now: Instant) -> bool {
509        self.retries < self.max_retries && now >= self.next_deadline
510    }
511
512    /// Record a resend attempt and compute the next deadline.
513    pub fn record_attempt(&mut self, now: Instant, jitter: Duration) {
514        self.retries += 1;
515        let base = self
516            .base_delay
517            .checked_mul(self.retries)
518            .unwrap_or(self.base_delay);
519        self.next_deadline = now + base + jitter;
520    }
521
522    /// Whether the resend planner exhausted all retries.
523    pub fn is_exhausted(&self) -> bool {
524        self.retries >= self.max_retries
525    }
526}
527
528/// Representation of a fully reassembled frame ready for consumption.
529#[derive(Debug, Clone)]
530pub struct CompletedFrame {
531    pub block_id: u64,
532    pub timestamp: Instant,
533    pub payload: Bytes,
534}
535
536/// Frame queue used for communicating between the receiver task and the
537/// application.
538#[derive(Debug)]
539pub struct FrameQueue {
540    inner: VecDeque<CompletedFrame>,
541    capacity: usize,
542}
543
544impl FrameQueue {
545    pub fn new(capacity: usize) -> Self {
546        Self {
547            inner: VecDeque::with_capacity(capacity),
548            capacity,
549        }
550    }
551
552    pub fn push(&mut self, frame: CompletedFrame, stats: &StreamStatsAccumulator) {
553        if self.inner.len() == self.capacity {
554            self.inner.pop_front();
555            stats.record_backpressure_drop();
556        }
557        self.inner.push_back(frame);
558    }
559
560    pub fn pop(&mut self) -> Option<CompletedFrame> {
561        self.inner.pop_front()
562    }
563}
564
565/// Coalesce missing packet ranges into resend requests.
566pub fn coalesce_missing(bitmap: &PacketBitmap, max_range: usize) -> Vec<RangeInclusive<u32>> {
567    bitmap
568        .missing_ranges()
569        .into_iter()
570        .flat_map(|range| split_range(range, max_range))
571        .collect()
572}
573
574fn split_range(range: RangeInclusive<u32>, max_len: usize) -> Vec<RangeInclusive<u32>> {
575    let start = *range.start() as usize;
576    let end = *range.end() as usize;
577    if max_len == 0 {
578        return vec![range];
579    }
580    let mut result = Vec::new();
581    let mut current = start;
582    while current <= end {
583        let upper = (current + max_len - 1).min(end);
584        result.push(current as u32..=upper as u32);
585        current = upper + 1;
586    }
587    result
588}
589
590/// Zero-copy block assembly state machine.
591#[derive(Debug)]
592pub struct Reassembler {
593    active: Option<FrameAssembly>,
594    packet_payload: usize,
595    stats: StreamStatsAccumulator,
596}
597
598impl Reassembler {
599    pub fn new(packet_payload: usize, stats: StreamStatsAccumulator) -> Self {
600        Self {
601            active: None,
602            packet_payload,
603            stats,
604        }
605    }
606
607    /// Start a new block, evicting the previous one when necessary.
608    pub fn start_block(&mut self, block_id: u64, expected_packets: usize, buffer: BytesMut) {
609        let deadline = Instant::now() + Duration::from_millis(50);
610        self.active = Some(FrameAssembly::new(
611            block_id,
612            expected_packets,
613            self.packet_payload,
614            buffer,
615            deadline,
616        ));
617    }
618
619    /// Insert a packet belonging to the active block.
620    pub fn push_packet(&mut self, packet_id: usize, payload: &[u8]) {
621        if let Some(assembly) = self.active.as_mut()
622            && assembly.ingest(packet_id, payload)
623        {
624            self.stats.record_packet();
625        }
626    }
627
628    /// Attempt to finish the current block.
629    pub fn finish_block(&mut self) -> Option<Bytes> {
630        self.active.take().and_then(FrameAssembly::finish)
631    }
632}
633
634#[cfg(test)]
635mod tests {
636    use super::*;
637
638    #[test]
639    fn parse_multiple_chunks() {
640        let mut payload = Vec::new();
641        payload.extend_from_slice(&0x0001u16.to_be_bytes());
642        payload.extend_from_slice(&0u16.to_be_bytes());
643        payload.extend_from_slice(&4u32.to_be_bytes());
644        payload.extend_from_slice(&[1, 2, 3, 4]);
645        payload.extend_from_slice(&0x0002u16.to_be_bytes());
646        payload.extend_from_slice(&0u16.to_be_bytes());
647        payload.extend_from_slice(&2u32.to_be_bytes());
648        payload.extend_from_slice(&[5, 6]);
649        let chunks = parse_chunks(&payload);
650        assert_eq!(chunks.len(), 2);
651        assert_eq!(chunks[0].id, 0x0001);
652        assert_eq!(chunks[0].data.as_ref(), &[1, 2, 3, 4]);
653        assert_eq!(chunks[1].id, 0x0002);
654        assert_eq!(chunks[1].data.as_ref(), &[5, 6]);
655    }
656
657    #[test]
658    fn truncated_chunk_is_ignored() {
659        let payload = vec![0u8; 6];
660        let chunks = parse_chunks(&payload);
661        assert!(chunks.is_empty());
662    }
663
664    #[test]
665    fn parse_chunks_tolerates_padding() {
666        for _ in 0..128 {
667            let count = fastrand::usize(..6);
668            let mut payload = Vec::new();
669            let mut entries = Vec::new();
670            for _ in 0..count {
671                let id = fastrand::u16(..);
672                let len = fastrand::usize(..16);
673                let mut data = vec![0u8; len];
674                for byte in &mut data {
675                    *byte = fastrand::u8(..);
676                }
677                payload.extend_from_slice(&id.to_be_bytes());
678                payload.extend_from_slice(&0u16.to_be_bytes());
679                payload.extend_from_slice(&(data.len() as u32).to_be_bytes());
680                payload.extend_from_slice(&data);
681                entries.push((id, data));
682            }
683            let padding_len = fastrand::usize(..8);
684            for _ in 0..padding_len {
685                payload.push(fastrand::u8(..));
686            }
687            let parsed = parse_chunks(&payload);
688            assert!(parsed.len() <= entries.len());
689            for (idx, chunk) in parsed.iter().enumerate() {
690                assert_eq!(chunk.id, entries[idx].0);
691                assert_eq!(chunk.data.as_ref(), entries[idx].1.as_slice());
692            }
693        }
694    }
695
696    #[test]
697    fn bitmap_missing_ranges_coalesce() {
698        let mut bitmap = PacketBitmap::new(10);
699        for &idx in &[0usize, 1, 5, 6, 9] {
700            bitmap.set(idx);
701        }
702        let ranges = bitmap.missing_ranges();
703        assert_eq!(ranges.len(), 2);
704        assert_eq!(ranges[0], 2..=4);
705        assert_eq!(ranges[1], 7..=8);
706    }
707
708    #[test]
709    fn coalesce_splits_large_ranges() {
710        let mut bitmap = PacketBitmap::new(20);
711        for idx in [0usize, 1, 2, 18, 19] {
712            bitmap.set(idx);
713        }
714        let ranges = coalesce_missing(&bitmap, 4);
715        assert_eq!(ranges, vec![3..=6, 7..=10, 11..=14, 15..=17]);
716    }
717
718    #[test]
719    fn reassembler_finishes_frame() {
720        let stats = StreamStatsAccumulator::new();
721        let mut reassembler = Reassembler::new(4, stats);
722        reassembler.start_block(1, 3, BytesMut::with_capacity(12));
723        reassembler.push_packet(0, &[1, 2, 3]);
724        reassembler.push_packet(1, &[4, 5, 6]);
725        reassembler.push_packet(2, &[7, 8, 9]);
726        let frame = reassembler.finish_block().expect("frame");
727        assert_eq!(frame.as_ref(), &[1, 2, 3, 4, 5, 6, 7, 8, 9]);
728    }
729}