1use std::io;
4use std::io::ErrorKind;
5use std::net::{IpAddr, SocketAddr};
6
7use bytes::{Buf, Bytes};
8#[cfg(test)]
9use bytes::{BufMut, BytesMut};
10use socket2::{Domain, Protocol, Socket, Type};
11use tokio::net::UdpSocket;
12use tokio::sync::Mutex;
13use tracing::{debug, info, trace, warn};
14
15mod consts {
17 pub const GVCP_HEADER: usize = 8;
19 pub const OPCODE_EVENT_DATA_ACK: u16 = 0x000D;
21 pub const DEFAULT_RCVBUF: usize = 1 << 20; pub const MAX_EVENT_SIZE: usize = 2048;
25}
26
27#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct EventPacket {
30 pub src: SocketAddr,
32 pub event_id: u16,
34 pub timestamp_dev: u64,
36 pub stream_channel: u16,
38 pub block_id: u16,
40 pub payload: Bytes,
42}
43
44impl EventPacket {
45 fn parse(src: SocketAddr, data: &[u8]) -> io::Result<Self> {
46 if data.len() < consts::GVCP_HEADER + 2 {
47 return Err(io::Error::new(ErrorKind::InvalidData, "packet too short"));
48 }
49 if data.len() > consts::MAX_EVENT_SIZE {
50 return Err(io::Error::new(ErrorKind::InvalidData, "packet too large"));
51 }
52
53 let mut cursor = data;
54 let status = cursor.get_u16();
55 let opcode = cursor.get_u16();
56 let length = cursor.get_u16() as usize;
57 let _request_id = cursor.get_u16();
58
59 if status != 0 {
60 return Err(io::Error::new(
61 ErrorKind::InvalidData,
62 "device reported error status",
63 ));
64 }
65 if opcode != consts::OPCODE_EVENT_DATA_ACK {
66 return Err(io::Error::new(
67 ErrorKind::InvalidData,
68 "unexpected opcode for event packet",
69 ));
70 }
71 if length + consts::GVCP_HEADER != data.len() {
72 return Err(io::Error::new(ErrorKind::InvalidData, "length mismatch"));
73 }
74
75 if cursor.remaining() < 2 {
76 return Err(io::Error::new(
77 ErrorKind::InvalidData,
78 "event payload missing identifier",
79 ));
80 }
81 let event_id = cursor.get_u16();
82
83 if cursor.remaining() < 2 {
84 return Err(io::Error::new(
85 ErrorKind::InvalidData,
86 "event payload missing notification",
87 ));
88 }
89 let _notification = cursor.get_u16();
90
91 let timestamp_dev = if cursor.remaining() >= 8 {
92 let high = cursor.get_u32() as u64;
93 let low = cursor.get_u32() as u64;
94 (high << 32) | low
95 } else {
96 return Err(io::Error::new(
97 ErrorKind::InvalidData,
98 "event payload missing timestamp",
99 ));
100 };
101
102 let mut stream_channel = 0u16;
103 let mut block_id = 0u16;
104 let mut payload_length = 0usize;
105 if cursor.remaining() >= 6 {
106 stream_channel = cursor.get_u16();
107 block_id = cursor.get_u16();
108 payload_length = cursor.get_u16() as usize;
109 }
110
111 if cursor.remaining() < 2 {
112 return Err(io::Error::new(
113 ErrorKind::InvalidData,
114 "event payload missing reserved field",
115 ));
116 }
117 let _reserved = cursor.get_u16();
119
120 let remaining = cursor.remaining();
121 if payload_length != 0 && payload_length != remaining {
122 return Err(io::Error::new(
123 ErrorKind::InvalidData,
124 "event payload length mismatch",
125 ));
126 }
127
128 let payload = if remaining > 0 {
129 Bytes::copy_from_slice(cursor)
130 } else {
131 Bytes::new()
132 };
133
134 Ok(Self {
135 src,
136 event_id,
137 timestamp_dev,
138 stream_channel,
139 block_id,
140 payload,
141 })
142 }
143}
144
145pub struct EventSocket {
147 sock: UdpSocket,
148 buffer: Mutex<Vec<u8>>,
149}
150
151impl EventSocket {
152 pub async fn bind(local_ip: IpAddr, port: u16) -> io::Result<Self> {
154 let domain = match local_ip {
155 IpAddr::V4(_) => Domain::IPV4,
156 IpAddr::V6(_) => Domain::IPV6,
157 };
158 let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
159 socket.set_reuse_address(true)?;
160 socket.set_nonblocking(true)?;
161 if let Err(err) = socket.set_recv_buffer_size(consts::DEFAULT_RCVBUF) {
162 warn!(?err, "failed to grow GVCP message buffer");
163 }
164 let addr = SocketAddr::new(local_ip, port);
165 socket.bind(&addr.into())?;
166 let sock = UdpSocket::from_std(socket.into())?;
167 info!(local = %addr, "bound GVCP message socket");
168 Ok(Self {
169 sock,
170 buffer: Mutex::new(vec![0u8; consts::MAX_EVENT_SIZE]),
171 })
172 }
173
174 pub async fn recv(&self) -> io::Result<EventPacket> {
176 loop {
177 let mut buffer = self.buffer.lock().await;
178 let (len, src) = self.sock.recv_from(&mut buffer[..]).await?;
179 trace!(bytes = len, %src, "received GVCP message");
180 match EventPacket::parse(src, &buffer[..len]) {
181 Ok(packet) => {
182 debug!(event_id = packet.event_id, %src, "parsed GVCP event");
183 return Ok(packet);
184 }
185 Err(err) => {
186 warn!(%src, error = %err, "discarding malformed event packet");
187 continue;
188 }
189 }
190 }
191 }
192
193 pub fn local_addr(&self) -> io::Result<SocketAddr> {
195 self.sock.local_addr()
196 }
197
198 #[cfg(test)]
200 pub fn socket(&self) -> &UdpSocket {
201 &self.sock
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208 use std::net::Ipv4Addr;
209
210 fn build_packet() -> Bytes {
211 const EVENT_HEADER_LEN: usize = 20;
212 let mut buf = BytesMut::with_capacity(consts::GVCP_HEADER + EVENT_HEADER_LEN + 4);
213 buf.put_u16(0); buf.put_u16(consts::OPCODE_EVENT_DATA_ACK);
215 buf.put_u16((EVENT_HEADER_LEN + 4) as u16);
216 buf.put_u16(0xCAFE); buf.put_u16(0x1234); buf.put_u16(0x0001); buf.put_u32(0x0002_0003); buf.put_u32(0x0004_0005); buf.put_u16(7); buf.put_u16(8); buf.put_u16(4); buf.put_u16(0); buf.extend_from_slice(&[1u8, 2, 3, 4]);
226 buf.freeze()
227 }
228
229 #[tokio::test]
230 async fn parse_valid_packet() {
231 let packet = build_packet();
232 let src = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3956);
233 let parsed = EventPacket::parse(src, &packet).expect("packet");
234 assert_eq!(parsed.event_id, 0x1234);
235 assert_eq!(parsed.timestamp_dev, 0x0002_0003_0004_0005);
236 assert_eq!(parsed.stream_channel, 7);
237 assert_eq!(parsed.block_id, 8);
238 assert_eq!(&parsed.payload[..], &[1, 2, 3, 4]);
239 }
240
241 #[tokio::test]
242 async fn reject_short_packet() {
243 let src = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 3956);
244 let data = Bytes::from_static(&[0x00, 0x01, 0x02]);
245 let err = EventPacket::parse(src, &data).unwrap_err();
246 assert_eq!(err.kind(), ErrorKind::InvalidData);
247 }
248}