1use std::collections::VecDeque;
10use std::fs;
11use std::io;
12use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
13use std::sync::Mutex;
14
15use bytes::BytesMut;
16use if_addrs::IfAddr;
17use socket2::{Domain, Protocol, SockRef, Socket, Type};
18use tokio::net::UdpSocket;
19use tracing::info;
20
21#[cfg(any(target_os = "linux", target_os = "android"))]
22use tracing::warn;
23
24pub const DEFAULT_RCVBUF_BYTES: usize = 4 << 20; const IFACE_NAME_MAX: usize = 15; #[derive(Debug, Clone, PartialEq, Eq)]
36pub struct Iface {
37 name: String,
38 index: u32,
39 ipv4: Option<Ipv4Addr>,
40 ipv6: Option<Ipv6Addr>,
41}
42
43impl Iface {
44 pub fn from_system(name: &str) -> io::Result<Self> {
46 if name.is_empty() || name.len() > IFACE_NAME_MAX {
47 return Err(io::Error::new(
48 io::ErrorKind::InvalidInput,
49 format!("invalid interface name '{name}'"),
50 ));
51 }
52
53 let index_path = format!("/sys/class/net/{name}/ifindex");
54 let index = fs::read_to_string(&index_path)
55 .map_err(|err| io::Error::new(err.kind(), format!("{index_path}: {err}")))?
56 .trim()
57 .parse::<u32>()
58 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?;
59
60 let mut ipv4 = None;
61 let mut ipv6 = None;
62 for iface in if_addrs::get_if_addrs()? {
63 if iface.name != name {
64 continue;
65 }
66 match iface.addr {
67 IfAddr::V4(v4) => ipv4 = Some(v4.ip),
68 IfAddr::V6(v6) => ipv6 = Some(v6.ip),
69 }
70 }
71
72 Ok(Self {
73 name: name.to_string(),
74 index,
75 ipv4,
76 ipv6,
77 })
78 }
79
80 pub fn from_ipv4(addr: Ipv4Addr) -> io::Result<Self> {
82 for iface in if_addrs::get_if_addrs()? {
83 if let IfAddr::V4(v4) = iface.addr {
84 if v4.ip == addr {
85 return Self::from_system(&iface.name);
86 }
87 }
88 }
89 Err(io::Error::new(
90 io::ErrorKind::NotFound,
91 format!("no interface with IPv4 {addr}"),
92 ))
93 }
94
95 pub fn name(&self) -> &str {
97 &self.name
98 }
99
100 pub fn index(&self) -> u32 {
103 self.index
104 }
105
106 pub fn ipv4(&self) -> Option<Ipv4Addr> {
108 self.ipv4
109 }
110
111 #[allow(dead_code)]
113 pub fn ipv6(&self) -> Option<Ipv6Addr> {
114 self.ipv6
115 }
116}
117
118pub fn mtu(_iface: &Iface) -> io::Result<u32> {
124 #[cfg(target_os = "linux")]
125 {
126 let path = format!("/sys/class/net/{}/mtu", _iface.name());
127 match fs::read_to_string(path) {
128 Ok(contents) => {
129 let mtu = contents
130 .trim()
131 .parse::<u32>()
132 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?;
133 tracing::debug!(name = _iface.name(), mtu, "resolved interface MTU");
134 return Ok(mtu);
135 }
136 Err(err) => {
137 warn!(name = _iface.name(), error = %err, "failed to read MTU, using default");
138 }
139 }
140 }
141
142 Ok(1500)
143}
144
145pub fn best_packet_size(mtu: u32) -> u32 {
150 const ETHERNET_L2: u32 = 14; const IPV4_HEADER: u32 = 20; const UDP_HEADER: u32 = 8; mtu.saturating_sub(ETHERNET_L2 + IPV4_HEADER + UDP_HEADER)
155}
156
157#[derive(Debug, Clone)]
159pub struct McOptions {
160 pub loopback: bool,
162 pub ttl: u32,
164 pub rcvbuf_bytes: usize,
166 pub reuse_addr: bool,
168}
169
170impl Default for McOptions {
171 fn default() -> Self {
172 Self {
173 loopback: false,
174 ttl: 1,
175 rcvbuf_bytes: DEFAULT_RCVBUF_BYTES,
176 reuse_addr: true,
177 }
178 }
179}
180
181pub async fn bind_udp(
183 bind: IpAddr,
184 port: u16,
185 iface: Option<Iface>,
186 recv_buffer: Option<usize>,
187) -> io::Result<UdpSocket> {
188 let recv_buffer = recv_buffer.unwrap_or(DEFAULT_RCVBUF_BYTES);
189 if let Some(ipv4) = iface.as_ref().and_then(|iface| iface.ipv4()) {
190 info!(name = iface.as_ref().map(Iface::name), %ipv4, port, "binding GVSP socket");
191 } else {
192 info!(%bind, port, "binding GVSP socket");
193 }
194
195 let domain = match bind {
196 IpAddr::V4(_) => Domain::IPV4,
197 IpAddr::V6(_) => Domain::IPV6,
198 };
199 let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
200
201 socket.set_reuse_address(true)?;
202 #[cfg(all(unix, not(target_os = "solaris")))]
203 socket.set_reuse_port(true)?;
204
205 socket.set_recv_buffer_size(recv_buffer)?;
206
207 #[cfg(any(target_os = "linux", target_os = "android"))]
208 if let Some(iface) = iface.as_ref() {
209 if let Err(err) = socket.bind_device(Some(iface.name().as_bytes())) {
210 warn!(name = iface.name(), error = %err, "SO_BINDTODEVICE failed");
211 }
212 }
213
214 let addr = SocketAddr::new(bind, port);
215 socket.bind(&addr.into())?;
216
217 let std_socket: std::net::UdpSocket = socket.into();
218 std_socket.set_nonblocking(true)?;
219 UdpSocket::from_std(std_socket)
220}
221
222fn validate_multicast_inputs(group: Ipv4Addr, ttl: u32) -> io::Result<()> {
223 if ttl > 255 {
224 return Err(io::Error::new(
225 io::ErrorKind::InvalidInput,
226 "multicast TTL must be <= 255",
227 ));
228 }
229 if (group.octets()[0] & 0xF0) != 0xE0 {
230 return Err(io::Error::new(
231 io::ErrorKind::InvalidInput,
232 "multicast group must be within 224.0.0.0/4",
233 ));
234 }
235 Ok(())
236}
237
238pub async fn bind_multicast(
240 iface: &Iface,
241 group: Ipv4Addr,
242 port: u16,
243 opts: &McOptions,
244) -> io::Result<UdpSocket> {
245 validate_multicast_inputs(group, opts.ttl)?;
246 let iface_addr = iface
247 .ipv4()
248 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "interface lacks IPv4"))?;
249
250 info!(name = iface.name(), %group, port, "binding multicast GVSP socket");
251
252 let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
253
254 if opts.reuse_addr {
255 socket.set_reuse_address(true)?;
256 #[cfg(all(unix, not(target_os = "solaris")))]
257 socket.set_reuse_port(true)?;
258 }
259
260 socket.set_recv_buffer_size(opts.rcvbuf_bytes)?;
261 socket.set_multicast_loop_v4(opts.loopback)?;
262 socket.set_multicast_ttl_v4(opts.ttl)?;
263 socket.set_multicast_if_v4(&iface_addr)?;
264
265 #[cfg(any(target_os = "linux", target_os = "android"))]
266 if let Err(err) = socket.bind_device(Some(iface.name().as_bytes())) {
267 warn!(name = iface.name(), error = %err, "SO_BINDTODEVICE failed");
268 }
269
270 let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port);
271 socket.bind(&bind_addr.into())?;
272 socket.join_multicast_v4(&group, &iface_addr)?;
273
274 let std_socket: std::net::UdpSocket = socket.into();
275 std_socket.set_nonblocking(true)?;
276 UdpSocket::from_std(std_socket)
277}
278
279pub fn join_multicast(sock: &UdpSocket, group: Ipv4Addr, iface: &Iface) -> io::Result<()> {
281 let socket = SockRef::from(sock);
282 let iface_addr = iface.ipv4().unwrap_or(Ipv4Addr::UNSPECIFIED);
283 socket.join_multicast_v4(&group, &iface_addr)?;
284 Ok(())
285}
286
287#[derive(Debug)]
289pub struct BufferPool {
290 buffers: Mutex<VecDeque<BytesMut>>,
291 size: usize,
292}
293
294impl BufferPool {
295 pub fn new(capacity: usize, size: usize) -> Self {
297 let mut buffers = VecDeque::with_capacity(capacity);
298 for _ in 0..capacity {
299 buffers.push_back(BytesMut::with_capacity(size));
300 }
301 Self {
302 buffers: Mutex::new(buffers),
303 size,
304 }
305 }
306
307 pub fn acquire(&self) -> Option<BytesMut> {
309 self.buffers
310 .lock()
311 .ok()
312 .and_then(|mut guard| guard.pop_front())
313 }
314
315 pub fn release(&self, mut buffer: BytesMut) {
317 buffer.truncate(0);
318 buffer.reserve(self.size);
319 if let Ok(mut guard) = self.buffers.lock() {
320 guard.push_back(buffer);
321 }
322 }
323}
324
325pub fn default_bind_addr() -> IpAddr {
327 IpAddr::V4(Ipv4Addr::UNSPECIFIED)
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333
334 #[test]
335 fn reject_invalid_ttl() {
336 let err = validate_multicast_inputs(Ipv4Addr::new(239, 0, 0, 1), 512).unwrap_err();
337 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
338 }
339
340 #[test]
341 fn reject_non_multicast_group() {
342 let err = validate_multicast_inputs(Ipv4Addr::new(192, 168, 1, 1), 1).unwrap_err();
343 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
344 }
345
346 #[test]
347 fn accept_valid_group() {
348 assert!(validate_multicast_inputs(Ipv4Addr::new(239, 192, 1, 10), 1).is_ok());
349 }
350
351 #[test]
352 fn packet_size_respects_headers() {
353 let mtu = 1500;
354 let size = best_packet_size(mtu);
355 assert!(size < mtu);
356 assert_eq!(size, 1500 - (14 + 20 + 8));
357 }
358
359 #[test]
360 fn buffer_pool_recycles() {
361 let pool = BufferPool::new(2, 1024);
362 let mut buf = pool.acquire().expect("buffer");
363 buf.extend_from_slice(&[1, 2, 3]);
364 pool.release(buf);
365 let buf2 = pool.acquire().expect("buffer");
366 assert!(buf2.is_empty());
367 assert!(buf2.capacity() >= 1024);
368 }
369}