tl_gige/
message.rs

1//! GVCP message/event channel handling.
2
3use std::io;
4use std::io::ErrorKind;
5use std::net::{IpAddr, SocketAddr};
6
7use bytes::{Buf, Bytes};
8#[cfg(test)]
9use bytes::{BufMut, BytesMut};
10use socket2::{Domain, Protocol, Socket, Type};
11use tokio::net::UdpSocket;
12use tokio::sync::Mutex;
13use tracing::{debug, info, trace, warn};
14
15/// Constants related to GVCP message packets.
16mod consts {
17    /// Size of the GVCP message header in bytes.
18    pub const GVCP_HEADER: usize = 8;
19    /// Opcode identifying a GVCP event data acknowledgement.
20    pub const OPCODE_EVENT_DATA_ACK: u16 = 0x000D;
21    /// Default receive buffer size requested for the UDP socket (bytes).
22    pub const DEFAULT_RCVBUF: usize = 1 << 20; // 1 MiB.
23    /// Maximum datagram size accepted on the event channel (bytes).
24    pub const MAX_EVENT_SIZE: usize = 2048;
25}
26
27/// Parsed representation of a GVCP event packet.
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct EventPacket {
30    /// Source address of the datagram.
31    pub src: SocketAddr,
32    /// Event identifier reported by the device.
33    pub event_id: u16,
34    /// Device timestamp carried by the event (ticks).
35    pub timestamp_dev: u64,
36    /// Stream channel associated with the event when present.
37    pub stream_channel: u16,
38    /// GVSP block identifier associated with the event when present.
39    pub block_id: u16,
40    /// Remaining payload bytes following the event header.
41    pub payload: Bytes,
42}
43
44impl EventPacket {
45    fn parse(src: SocketAddr, data: &[u8]) -> io::Result<Self> {
46        if data.len() < consts::GVCP_HEADER + 2 {
47            return Err(io::Error::new(ErrorKind::InvalidData, "packet too short"));
48        }
49        if data.len() > consts::MAX_EVENT_SIZE {
50            return Err(io::Error::new(ErrorKind::InvalidData, "packet too large"));
51        }
52
53        let mut cursor = data;
54        let status = cursor.get_u16();
55        let opcode = cursor.get_u16();
56        let length = cursor.get_u16() as usize;
57        let _request_id = cursor.get_u16();
58
59        if status != 0 {
60            return Err(io::Error::new(
61                ErrorKind::InvalidData,
62                "device reported error status",
63            ));
64        }
65        if opcode != consts::OPCODE_EVENT_DATA_ACK {
66            return Err(io::Error::new(
67                ErrorKind::InvalidData,
68                "unexpected opcode for event packet",
69            ));
70        }
71        if length + consts::GVCP_HEADER != data.len() {
72            return Err(io::Error::new(ErrorKind::InvalidData, "length mismatch"));
73        }
74
75        if cursor.remaining() < 2 {
76            return Err(io::Error::new(
77                ErrorKind::InvalidData,
78                "event payload missing identifier",
79            ));
80        }
81        let event_id = cursor.get_u16();
82
83        if cursor.remaining() < 2 {
84            return Err(io::Error::new(
85                ErrorKind::InvalidData,
86                "event payload missing notification",
87            ));
88        }
89        let _notification = cursor.get_u16();
90
91        let timestamp_dev = if cursor.remaining() >= 8 {
92            let high = cursor.get_u32() as u64;
93            let low = cursor.get_u32() as u64;
94            (high << 32) | low
95        } else {
96            return Err(io::Error::new(
97                ErrorKind::InvalidData,
98                "event payload missing timestamp",
99            ));
100        };
101
102        let mut stream_channel = 0u16;
103        let mut block_id = 0u16;
104        let mut payload_length = 0usize;
105        if cursor.remaining() >= 6 {
106            stream_channel = cursor.get_u16();
107            block_id = cursor.get_u16();
108            payload_length = cursor.get_u16() as usize;
109        }
110
111        if cursor.remaining() < 2 {
112            return Err(io::Error::new(
113                ErrorKind::InvalidData,
114                "event payload missing reserved field",
115            ));
116        }
117        // Consume the reserved field when present.
118        let _reserved = cursor.get_u16();
119
120        let remaining = cursor.remaining();
121        if payload_length != 0 && payload_length != remaining {
122            return Err(io::Error::new(
123                ErrorKind::InvalidData,
124                "event payload length mismatch",
125            ));
126        }
127
128        let payload = if remaining > 0 {
129            Bytes::copy_from_slice(cursor)
130        } else {
131            Bytes::new()
132        };
133
134        Ok(Self {
135            src,
136            event_id,
137            timestamp_dev,
138            stream_channel,
139            block_id,
140            payload,
141        })
142    }
143}
144
145/// Async GVCP message channel socket.
146pub struct EventSocket {
147    sock: UdpSocket,
148    buffer: Mutex<Vec<u8>>,
149}
150
151impl EventSocket {
152    /// Bind a GVCP message socket on the provided local address.
153    pub async fn bind(local_ip: IpAddr, port: u16) -> io::Result<Self> {
154        let domain = match local_ip {
155            IpAddr::V4(_) => Domain::IPV4,
156            IpAddr::V6(_) => Domain::IPV6,
157        };
158        let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
159        socket.set_reuse_address(true)?;
160        socket.set_nonblocking(true)?;
161        if let Err(err) = socket.set_recv_buffer_size(consts::DEFAULT_RCVBUF) {
162            warn!(?err, "failed to grow GVCP message buffer");
163        }
164        let addr = SocketAddr::new(local_ip, port);
165        socket.bind(&addr.into())?;
166        let sock = UdpSocket::from_std(socket.into())?;
167        info!(local = %addr, "bound GVCP message socket");
168        Ok(Self {
169            sock,
170            buffer: Mutex::new(vec![0u8; consts::MAX_EVENT_SIZE]),
171        })
172    }
173
174    /// Receive and parse the next GVCP event packet.
175    pub async fn recv(&self) -> io::Result<EventPacket> {
176        loop {
177            let mut buffer = self.buffer.lock().await;
178            let (len, src) = self.sock.recv_from(&mut buffer[..]).await?;
179            trace!(bytes = len, %src, "received GVCP message");
180            match EventPacket::parse(src, &buffer[..len]) {
181                Ok(packet) => {
182                    debug!(event_id = packet.event_id, %src, "parsed GVCP event");
183                    return Ok(packet);
184                }
185                Err(err) => {
186                    warn!(%src, error = %err, "discarding malformed event packet");
187                    continue;
188                }
189            }
190        }
191    }
192
193    /// Return the local address bound to the socket.
194    pub fn local_addr(&self) -> io::Result<SocketAddr> {
195        self.sock.local_addr()
196    }
197
198    /// Access the underlying UDP socket (tests only).
199    #[cfg(test)]
200    pub fn socket(&self) -> &UdpSocket {
201        &self.sock
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208    use std::net::Ipv4Addr;
209
210    fn build_packet() -> Bytes {
211        const EVENT_HEADER_LEN: usize = 20;
212        let mut buf = BytesMut::with_capacity(consts::GVCP_HEADER + EVENT_HEADER_LEN + 4);
213        buf.put_u16(0); // status
214        buf.put_u16(consts::OPCODE_EVENT_DATA_ACK);
215        buf.put_u16((EVENT_HEADER_LEN + 4) as u16);
216        buf.put_u16(0xCAFE); // request id
217        buf.put_u16(0x1234); // event id
218        buf.put_u16(0x0001); // notification (unused)
219        buf.put_u32(0x0002_0003); // ts high
220        buf.put_u32(0x0004_0005); // ts low
221        buf.put_u16(7); // stream channel
222        buf.put_u16(8); // block id
223        buf.put_u16(4); // payload length
224        buf.put_u16(0); // reserved
225        buf.extend_from_slice(&[1u8, 2, 3, 4]);
226        buf.freeze()
227    }
228
229    #[tokio::test]
230    async fn parse_valid_packet() {
231        let packet = build_packet();
232        let src = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3956);
233        let parsed = EventPacket::parse(src, &packet).expect("packet");
234        assert_eq!(parsed.event_id, 0x1234);
235        assert_eq!(parsed.timestamp_dev, 0x0002_0003_0004_0005);
236        assert_eq!(parsed.stream_channel, 7);
237        assert_eq!(parsed.block_id, 8);
238        assert_eq!(&parsed.payload[..], &[1, 2, 3, 4]);
239    }
240
241    #[tokio::test]
242    async fn reject_short_packet() {
243        let src = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3956);
244        let data = Bytes::from_static(&[0x00, 0x01, 0x02]);
245        let err = EventPacket::parse(src, &data).unwrap_err();
246        assert_eq!(err.kind(), ErrorKind::InvalidData);
247    }
248}