tl_gige/
gvsp.rs

1//! GVSP packet parsing, reassembly and resend orchestration.
2//!
3//! The GigE Vision Streaming Protocol delivers image data over UDP. Packets can
4//! arrive out of order or be dropped entirely; this module reconstructs complete
5//! frames while coordinating resend requests over GVCP. The implementation keeps
6//! copies to a minimum by writing directly into pooled [`BytesMut`] buffers that
7//! are subsequently frozen into [`Bytes`] once a frame is ready.
8
9use std::collections::VecDeque;
10use std::net::Ipv4Addr;
11use std::ops::RangeInclusive;
12use std::time::{Duration, Instant};
13
14use crate::nic::Iface;
15use crate::stats::StreamStatsAccumulator;
16use bytes::{Buf, Bytes, BytesMut};
17use thiserror::Error;
18use tracing::{debug, warn};
19
20/// GVSP payload type for image data as defined by the specification (Table
21/// 36). Other payload types are currently not supported by the reassembler but
22/// will still be parsed.
23const PAYLOAD_TYPE_IMAGE: u8 = 0x01;
24
25/// Size of the GVSP header preceding payload packets. The reassembler uses the
26/// value when allocating buffers so the application payload fits within the
27/// negotiated packet size.
28const GVSP_HEADER_SIZE: usize = 8;
29
30/// Destination for GVSP packets received by the stream.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum StreamDest {
33    /// Standard unicast delivery towards a single host.
34    Unicast {
35        /// Destination IPv4 address configured on the camera.
36        dst_ip: Ipv4Addr,
37        /// UDP port used for streaming.
38        dst_port: u16,
39    },
40    /// Multicast delivery towards one or more hosts joined to the group.
41    Multicast {
42        /// Multicast group IPv4 address.
43        group: Ipv4Addr,
44        /// UDP port used for streaming.
45        port: u16,
46        /// Whether loopback is enabled on the local socket.
47        loopback: bool,
48        /// Outbound multicast time-to-live.
49        ttl: u32,
50    },
51}
52
53impl StreamDest {
54    /// Retrieve the configured UDP port.
55    pub fn port(&self) -> u16 {
56        match self {
57            StreamDest::Unicast { dst_port, .. } => *dst_port,
58            StreamDest::Multicast { port, .. } => *port,
59        }
60    }
61
62    /// Retrieve the configured IPv4 destination address.
63    pub fn addr(&self) -> Ipv4Addr {
64        match self {
65            StreamDest::Unicast { dst_ip, .. } => *dst_ip,
66            StreamDest::Multicast { group, .. } => *group,
67        }
68    }
69
70    /// Whether the destination represents multicast delivery.
71    pub fn is_multicast(&self) -> bool {
72        matches!(self, StreamDest::Multicast { .. })
73    }
74}
75
76/// Stream configuration shared between the control plane and GVSP receiver.
77#[derive(Debug, Clone)]
78pub struct StreamConfig {
79    /// Destination configuration for the GVSP stream.
80    pub dest: StreamDest,
81    /// Interface used for receiving packets and multicast subscription.
82    pub iface: Iface,
83    /// Override for GVSP packet size determined via control plane.
84    pub packet_size: Option<u32>,
85    /// Override for GVSP packet delay determined via control plane.
86    pub packet_delay: Option<u32>,
87    /// Optional source filter restricting packets to the configured IPv4 address.
88    pub source_filter: Option<Ipv4Addr>,
89    /// Whether GVCP resend requests should be issued when drops are detected.
90    pub resend_enabled: bool,
91}
92
93/// Errors raised while handling GVSP packets.
94#[derive(Debug, Error)]
95pub enum GvspError {
96    #[error("unsupported packet type: {0}")]
97    Unsupported(&'static str),
98    #[error("invalid packet: {0}")]
99    Invalid(&'static str),
100    #[error("resend timeout")]
101    ResendTimeout,
102}
103
104/// Raw GVSP chunk extracted from a payload or trailer block.
105#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct ChunkRaw {
107    pub id: u16,
108    pub data: Bytes,
109}
110
111/// Parse a chunk payload following the `[id][reserved][length][data...]` layout.
112pub fn parse_chunks(mut payload: &[u8]) -> Vec<ChunkRaw> {
113    let mut chunks = Vec::new();
114    while !payload.is_empty() {
115        if payload.len() < 8 {
116            warn!(remaining = payload.len(), "chunk header truncated");
117            break;
118        }
119        let mut cursor = payload;
120        let id = cursor.get_u16();
121        let _reserved = cursor.get_u16();
122        let length = cursor.get_u32() as usize;
123        let total = 8 + length;
124        if payload.len() < total {
125            warn!(
126                chunk_id = format_args!("{:#06x}", id),
127                len = payload.len(),
128                expected = total,
129                "chunk data truncated"
130            );
131            break;
132        }
133        let data = Bytes::copy_from_slice(&payload[8..total]);
134        debug!(
135            chunk_id = format_args!("{:#06x}", id),
136            len = length,
137            "parsed chunk"
138        );
139        chunks.push(ChunkRaw { id, data });
140        payload = &payload[total..];
141    }
142    chunks
143}
144
145/// Representation of a GVSP packet.
146#[derive(Debug, Clone)]
147pub enum GvspPacket {
148    /// Start-of-frame leader packet with metadata.
149    Leader {
150        block_id: u16,
151        packet_id: u16,
152        payload_type: u8,
153        timestamp: u64,
154        width: u32,
155        height: u32,
156        pixel_format: u32,
157    },
158    /// Payload data packet carrying pixel bytes.
159    Payload {
160        block_id: u16,
161        packet_id: u16,
162        data: Bytes,
163    },
164    /// End-of-frame trailer packet.
165    Trailer {
166        block_id: u16,
167        packet_id: u16,
168        status: u16,
169        chunk_data: Bytes,
170    },
171}
172
173/// Parse a raw UDP payload into a GVSP packet.
174pub fn parse_packet(payload: &[u8]) -> Result<GvspPacket, GvspError> {
175    if payload.len() < GVSP_HEADER_SIZE {
176        return Err(GvspError::Invalid("GVSP header truncated"));
177    }
178    let mut cursor = payload;
179    let status = cursor.get_u16();
180    let block_id = cursor.get_u16();
181    let packet_id = cursor.get_u16();
182    let payload_type = (status >> 8) as u8;
183    let packet_format = status & 0x00FF;
184
185    match packet_format {
186        0x01 => parse_leader(
187            packet_id,
188            block_id,
189            payload_type,
190            &payload[GVSP_HEADER_SIZE..],
191        ),
192        0x02 => parse_payload(packet_id, block_id, &payload[GVSP_HEADER_SIZE..]),
193        0x03 => parse_trailer(packet_id, block_id, &payload[GVSP_HEADER_SIZE..]),
194        _ => Err(GvspError::Unsupported("packet format")),
195    }
196}
197
198fn parse_leader(
199    packet_id: u16,
200    block_id: u16,
201    payload_type: u8,
202    payload: &[u8],
203) -> Result<GvspPacket, GvspError> {
204    if payload_type != PAYLOAD_TYPE_IMAGE {
205        return Err(GvspError::Unsupported("payload type"));
206    }
207    if payload.len() < 24 {
208        return Err(GvspError::Invalid("leader payload truncated"));
209    }
210    let mut cursor = payload;
211    let timestamp = cursor.get_u64();
212    let width = cursor.get_u32();
213    let height = cursor.get_u32();
214    let pixel_format = cursor.get_u32();
215    Ok(GvspPacket::Leader {
216        block_id,
217        packet_id,
218        payload_type,
219        timestamp,
220        width,
221        height,
222        pixel_format,
223    })
224}
225
226fn parse_payload(packet_id: u16, block_id: u16, payload: &[u8]) -> Result<GvspPacket, GvspError> {
227    Ok(GvspPacket::Payload {
228        block_id,
229        packet_id,
230        data: Bytes::copy_from_slice(payload),
231    })
232}
233
234fn parse_trailer(packet_id: u16, block_id: u16, payload: &[u8]) -> Result<GvspPacket, GvspError> {
235    if payload.len() < 2 {
236        return Err(GvspError::Invalid("trailer truncated"));
237    }
238    let mut cursor = payload;
239    let status = cursor.get_u16();
240    let chunk_data = if payload.len() > 2 {
241        Bytes::copy_from_slice(&payload[2..])
242    } else {
243        Bytes::new()
244    };
245    Ok(GvspPacket::Trailer {
246        block_id,
247        packet_id,
248        status,
249        chunk_data,
250    })
251}
252
253/// Bitmap tracking received packets within a block.
254#[derive(Debug, Clone)]
255pub struct PacketBitmap {
256    words: Vec<u64>,
257    received: usize,
258    total: usize,
259}
260
261impl PacketBitmap {
262    /// Create a bitmap with the given packet capacity.
263    pub fn new(total: usize) -> Self {
264        let words = total.div_ceil(64);
265        Self {
266            words: vec![0; words],
267            received: 0,
268            total,
269        }
270    }
271
272    fn mask_for(&self, packet_id: usize) -> (usize, u64) {
273        let word = packet_id / 64;
274        let bit = packet_id % 64;
275        (word, 1u64 << bit)
276    }
277
278    /// Mark a packet index as received.
279    pub fn set(&mut self, packet_id: usize) -> bool {
280        if packet_id >= self.total {
281            return false;
282        }
283        let (word, mask) = self.mask_for(packet_id);
284        let entry = &mut self.words[word];
285        if *entry & mask == 0 {
286            *entry |= mask;
287            self.received += 1;
288            true
289        } else {
290            false
291        }
292    }
293
294    /// Check whether the bitmap reports all packets received.
295    pub fn is_complete(&self) -> bool {
296        self.received == self.total
297    }
298
299    /// Return missing packet ranges as inclusive `[start, end]` indices.
300    pub fn missing_ranges(&self) -> Vec<RangeInclusive<u16>> {
301        let mut ranges = Vec::new();
302        let mut current: Option<(u16, u16)> = None;
303        for idx in 0..self.total {
304            let (word, mask) = self.mask_for(idx);
305            let present = (self.words[word] & mask) != 0;
306            match (present, current) {
307                (false, None) => current = Some((idx as u16, idx as u16)),
308                (false, Some((start, _))) => current = Some((start, idx as u16)),
309                (true, Some((start, end))) => {
310                    ranges.push(start..=end);
311                    current = None;
312                }
313                _ => {}
314            }
315        }
316        if let Some((start, end)) = current {
317            ranges.push(start..=end);
318        }
319        ranges
320    }
321}
322
323/// Representation of a partially received frame.
324#[derive(Debug)]
325pub struct FrameAssembly {
326    block_id: u16,
327    expected_packets: usize,
328    packet_payload: usize,
329    bitmap: PacketBitmap,
330    buffer: BytesMut,
331    lengths: Vec<usize>,
332    deadline: Instant,
333}
334
335impl FrameAssembly {
336    /// Create a new frame assembly using the supplied buffer.
337    pub fn new(
338        block_id: u16,
339        expected_packets: usize,
340        packet_payload: usize,
341        buffer: BytesMut,
342        deadline: Instant,
343    ) -> Self {
344        Self {
345            block_id,
346            expected_packets,
347            packet_payload,
348            bitmap: PacketBitmap::new(expected_packets),
349            buffer,
350            lengths: vec![0; expected_packets],
351            deadline,
352        }
353    }
354
355    /// Returns the block identifier associated with this frame.
356    pub fn block_id(&self) -> u16 {
357        self.block_id
358    }
359
360    /// Whether the reassembly deadline has elapsed.
361    pub fn is_expired(&self, now: Instant) -> bool {
362        now >= self.deadline
363    }
364
365    /// Insert a packet payload into the buffer.
366    pub fn ingest(&mut self, packet_id: usize, payload: &[u8]) -> bool {
367        if packet_id >= self.expected_packets || payload.len() > self.packet_payload {
368            return false;
369        }
370        if !self.bitmap.set(packet_id) {
371            return true;
372        }
373        // Track actual payload length for compaction at finish time.
374        self.lengths[packet_id] = payload.len();
375        let offset = packet_id * self.packet_payload;
376        if self.buffer.len() < offset + payload.len() {
377            self.buffer.resize(offset + payload.len(), 0);
378        }
379        self.buffer[offset..offset + payload.len()].copy_from_slice(payload);
380        true
381    }
382
383    /// Finalise the frame if all packets have been received.
384    pub fn finish(self) -> Option<Bytes> {
385        if !self.bitmap.is_complete() {
386            return None;
387        }
388
389        // If all packets except possibly the last are full-sized, we can
390        // return a slice of the existing buffer without extra copying.
391        let full_sized_prefix = if self.expected_packets > 0 {
392            self.lengths
393                .iter()
394                .take(self.expected_packets.saturating_sub(1))
395                .all(|&len| len == self.packet_payload)
396        } else {
397            true
398        };
399
400        if full_sized_prefix {
401            let last_len = *self.lengths.last().unwrap_or(&0);
402            let used = self
403                .packet_payload
404                .saturating_mul(self.expected_packets.saturating_sub(1))
405                + last_len;
406            let mut buf = self.buffer;
407            if buf.len() > used {
408                buf.truncate(used);
409            }
410            return Some(buf.freeze());
411        }
412
413        // Otherwise, compact the data to remove any gaps introduced by
414        // shorter packets occurring before the last packet.
415        let total: usize = self.lengths.iter().sum();
416        let mut out = BytesMut::with_capacity(total);
417        for (i, &len) in self.lengths.iter().enumerate() {
418            if len == 0 {
419                continue;
420            }
421            let start = i * self.packet_payload;
422            let end = start + len;
423            out.extend_from_slice(&self.buffer[start..end]);
424        }
425        Some(out.freeze())
426    }
427}
428
429/// Helper struct tracking resend attempts for a given block.
430#[derive(Debug, Clone)]
431pub struct ResendPlanner {
432    retries: u32,
433    max_retries: u32,
434    base_delay: Duration,
435    next_deadline: Instant,
436}
437
438impl ResendPlanner {
439    pub fn new(max_retries: u32, base_delay: Duration) -> Self {
440        Self {
441            retries: 0,
442            max_retries,
443            base_delay,
444            next_deadline: Instant::now(),
445        }
446    }
447
448    /// Determine whether a resend can be attempted at the provided instant.
449    pub fn should_resend(&self, now: Instant) -> bool {
450        self.retries < self.max_retries && now >= self.next_deadline
451    }
452
453    /// Record a resend attempt and compute the next deadline.
454    pub fn record_attempt(&mut self, now: Instant, jitter: Duration) {
455        self.retries += 1;
456        let base = self
457            .base_delay
458            .checked_mul(self.retries)
459            .unwrap_or(self.base_delay);
460        self.next_deadline = now + base + jitter;
461    }
462
463    /// Whether the resend planner exhausted all retries.
464    pub fn is_exhausted(&self) -> bool {
465        self.retries >= self.max_retries
466    }
467}
468
469/// Representation of a fully reassembled frame ready for consumption.
470#[derive(Debug, Clone)]
471pub struct CompletedFrame {
472    pub block_id: u16,
473    pub timestamp: Instant,
474    pub payload: Bytes,
475}
476
477/// Frame queue used for communicating between the receiver task and the
478/// application.
479#[derive(Debug)]
480pub struct FrameQueue {
481    inner: VecDeque<CompletedFrame>,
482    capacity: usize,
483}
484
485impl FrameQueue {
486    pub fn new(capacity: usize) -> Self {
487        Self {
488            inner: VecDeque::with_capacity(capacity),
489            capacity,
490        }
491    }
492
493    pub fn push(&mut self, frame: CompletedFrame, stats: &StreamStatsAccumulator) {
494        if self.inner.len() == self.capacity {
495            self.inner.pop_front();
496            stats.record_backpressure_drop();
497        }
498        self.inner.push_back(frame);
499    }
500
501    pub fn pop(&mut self) -> Option<CompletedFrame> {
502        self.inner.pop_front()
503    }
504}
505
506/// Coalesce missing packet ranges into resend requests.
507pub fn coalesce_missing(bitmap: &PacketBitmap, max_range: usize) -> Vec<RangeInclusive<u16>> {
508    bitmap
509        .missing_ranges()
510        .into_iter()
511        .flat_map(|range| split_range(range, max_range))
512        .collect()
513}
514
515fn split_range(range: RangeInclusive<u16>, max_len: usize) -> Vec<RangeInclusive<u16>> {
516    let start = *range.start() as usize;
517    let end = *range.end() as usize;
518    if max_len == 0 {
519        return vec![range];
520    }
521    let mut result = Vec::new();
522    let mut current = start;
523    while current <= end {
524        let upper = (current + max_len - 1).min(end);
525        result.push(current as u16..=upper as u16);
526        current = upper + 1;
527    }
528    result
529}
530
531/// Zero-copy block assembly state machine.
532#[derive(Debug)]
533pub struct Reassembler {
534    active: Option<FrameAssembly>,
535    packet_payload: usize,
536    stats: StreamStatsAccumulator,
537}
538
539impl Reassembler {
540    pub fn new(packet_payload: usize, stats: StreamStatsAccumulator) -> Self {
541        Self {
542            active: None,
543            packet_payload,
544            stats,
545        }
546    }
547
548    /// Start a new block, evicting the previous one when necessary.
549    pub fn start_block(&mut self, block_id: u16, expected_packets: usize, buffer: BytesMut) {
550        let deadline = Instant::now() + Duration::from_millis(50);
551        self.active = Some(FrameAssembly::new(
552            block_id,
553            expected_packets,
554            self.packet_payload,
555            buffer,
556            deadline,
557        ));
558    }
559
560    /// Insert a packet belonging to the active block.
561    pub fn push_packet(&mut self, packet_id: usize, payload: &[u8]) {
562        if let Some(assembly) = self.active.as_mut() {
563            if assembly.ingest(packet_id, payload) {
564                self.stats.record_packet();
565            }
566        }
567    }
568
569    /// Attempt to finish the current block.
570    pub fn finish_block(&mut self) -> Option<Bytes> {
571        self.active.take().and_then(FrameAssembly::finish)
572    }
573}
574
575#[cfg(test)]
576mod tests {
577    use super::*;
578
579    #[test]
580    fn parse_multiple_chunks() {
581        let mut payload = Vec::new();
582        payload.extend_from_slice(&0x0001u16.to_be_bytes());
583        payload.extend_from_slice(&0u16.to_be_bytes());
584        payload.extend_from_slice(&4u32.to_be_bytes());
585        payload.extend_from_slice(&[1, 2, 3, 4]);
586        payload.extend_from_slice(&0x0002u16.to_be_bytes());
587        payload.extend_from_slice(&0u16.to_be_bytes());
588        payload.extend_from_slice(&2u32.to_be_bytes());
589        payload.extend_from_slice(&[5, 6]);
590        let chunks = parse_chunks(&payload);
591        assert_eq!(chunks.len(), 2);
592        assert_eq!(chunks[0].id, 0x0001);
593        assert_eq!(chunks[0].data.as_ref(), &[1, 2, 3, 4]);
594        assert_eq!(chunks[1].id, 0x0002);
595        assert_eq!(chunks[1].data.as_ref(), &[5, 6]);
596    }
597
598    #[test]
599    fn truncated_chunk_is_ignored() {
600        let payload = vec![0u8; 6];
601        let chunks = parse_chunks(&payload);
602        assert!(chunks.is_empty());
603    }
604
605    #[test]
606    fn parse_chunks_tolerates_padding() {
607        for _ in 0..128 {
608            let count = fastrand::usize(..6);
609            let mut payload = Vec::new();
610            let mut entries = Vec::new();
611            for _ in 0..count {
612                let id = fastrand::u16(..);
613                let len = fastrand::usize(..16);
614                let mut data = vec![0u8; len];
615                for byte in &mut data {
616                    *byte = fastrand::u8(..);
617                }
618                payload.extend_from_slice(&id.to_be_bytes());
619                payload.extend_from_slice(&0u16.to_be_bytes());
620                payload.extend_from_slice(&(data.len() as u32).to_be_bytes());
621                payload.extend_from_slice(&data);
622                entries.push((id, data));
623            }
624            let padding_len = fastrand::usize(..8);
625            for _ in 0..padding_len {
626                payload.push(fastrand::u8(..));
627            }
628            let parsed = parse_chunks(&payload);
629            assert!(parsed.len() <= entries.len());
630            for (idx, chunk) in parsed.iter().enumerate() {
631                assert_eq!(chunk.id, entries[idx].0);
632                assert_eq!(chunk.data.as_ref(), entries[idx].1.as_slice());
633            }
634        }
635    }
636
637    #[test]
638    fn bitmap_missing_ranges_coalesce() {
639        let mut bitmap = PacketBitmap::new(10);
640        for &idx in &[0usize, 1, 5, 6, 9] {
641            bitmap.set(idx);
642        }
643        let ranges = bitmap.missing_ranges();
644        assert_eq!(ranges.len(), 2);
645        assert_eq!(ranges[0], 2..=4);
646        assert_eq!(ranges[1], 7..=8);
647    }
648
649    #[test]
650    fn coalesce_splits_large_ranges() {
651        let mut bitmap = PacketBitmap::new(20);
652        for idx in [0usize, 1, 2, 18, 19] {
653            bitmap.set(idx);
654        }
655        let ranges = coalesce_missing(&bitmap, 4);
656        assert_eq!(ranges, vec![3..=6, 7..=10, 11..=14, 15..=17]);
657    }
658
659    #[test]
660    fn reassembler_finishes_frame() {
661        let stats = StreamStatsAccumulator::new();
662        let mut reassembler = Reassembler::new(4, stats);
663        reassembler.start_block(1, 3, BytesMut::with_capacity(12));
664        reassembler.push_packet(0, &[1, 2, 3]);
665        reassembler.push_packet(1, &[4, 5, 6]);
666        reassembler.push_packet(2, &[7, 8, 9]);
667        let frame = reassembler.finish_block().expect("frame");
668        assert_eq!(frame.as_ref(), &[1, 2, 3, 4, 5, 6, 7, 8, 9]);
669    }
670}