1use std::collections::VecDeque;
10#[cfg(any(target_os = "linux", target_os = "android"))]
11use std::fs;
12use std::io;
13use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
14use std::sync::Mutex;
15
16use bytes::BytesMut;
17use if_addrs::IfAddr;
18use socket2::{Domain, Protocol, SockRef, Socket, Type};
19use tokio::net::UdpSocket;
20use tracing::info;
21
22#[cfg(any(target_os = "linux", target_os = "android"))]
23use tracing::warn;
24
25pub const DEFAULT_RCVBUF_BYTES: usize = 4 << 20; #[cfg(not(target_os = "windows"))]
35const IFACE_NAME_MAX: usize = 15;
36#[cfg(target_os = "windows")]
37const IFACE_NAME_MAX: usize = 64;
38
39#[cfg(any(target_os = "linux", target_os = "android"))]
41fn iface_name_to_index(name: &str) -> io::Result<u32> {
42 let index_path = format!("/sys/class/net/{name}/ifindex");
43 fs::read_to_string(&index_path)
44 .map_err(|err| io::Error::new(err.kind(), format!("{index_path}: {err}")))?
45 .trim()
46 .parse::<u32>()
47 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
48}
49
50#[cfg(all(unix, not(any(target_os = "linux", target_os = "android"))))]
52fn iface_name_to_index(name: &str) -> io::Result<u32> {
53 use std::ffi::CString;
54
55 let c_name = CString::new(name).map_err(|_| {
56 io::Error::new(
57 io::ErrorKind::InvalidInput,
58 format!("interface name contains null byte: '{name}'"),
59 )
60 })?;
61 let index = unsafe { libc::if_nametoindex(c_name.as_ptr()) };
63 if index == 0 {
64 Err(io::Error::new(
65 io::ErrorKind::NotFound,
66 format!("interface '{name}' not found"),
67 ))
68 } else {
69 Ok(index)
70 }
71}
72
73#[cfg(target_os = "windows")]
75fn iface_name_to_index(name: &str) -> io::Result<u32> {
76 for (idx, iface) in if_addrs::get_if_addrs()?.iter().enumerate() {
79 if iface.name == name {
80 return Ok((idx + 1) as u32);
81 }
82 }
83 Err(io::Error::new(
84 io::ErrorKind::NotFound,
85 format!("interface '{name}' not found"),
86 ))
87}
88
89#[derive(Debug, Clone, PartialEq, Eq)]
91pub struct Iface {
92 name: String,
93 index: u32,
94 ipv4: Option<Ipv4Addr>,
95 ipv6: Option<Ipv6Addr>,
96}
97
98impl Iface {
99 pub fn from_system(name: &str) -> io::Result<Self> {
101 if name.is_empty() || name.len() > IFACE_NAME_MAX {
102 return Err(io::Error::new(
103 io::ErrorKind::InvalidInput,
104 format!("invalid interface name '{name}'"),
105 ));
106 }
107
108 let index = iface_name_to_index(name)?;
109
110 let mut ipv4 = None;
111 let mut ipv6 = None;
112 for iface in if_addrs::get_if_addrs()? {
113 if iface.name != name {
114 continue;
115 }
116 match iface.addr {
117 IfAddr::V4(v4) => ipv4 = Some(v4.ip),
118 IfAddr::V6(v6) => ipv6 = Some(v6.ip),
119 }
120 }
121
122 Ok(Self {
123 name: name.to_string(),
124 index,
125 ipv4,
126 ipv6,
127 })
128 }
129
130 pub fn from_ipv4(addr: Ipv4Addr) -> io::Result<Self> {
132 for iface in if_addrs::get_if_addrs()? {
133 if let IfAddr::V4(v4) = iface.addr
134 && v4.ip == addr
135 {
136 return Self::from_system(&iface.name);
137 }
138 }
139 Err(io::Error::new(
140 io::ErrorKind::NotFound,
141 format!("no interface with IPv4 {addr}"),
142 ))
143 }
144
145 pub fn name(&self) -> &str {
147 &self.name
148 }
149
150 pub fn index(&self) -> u32 {
153 self.index
154 }
155
156 pub fn ipv4(&self) -> Option<Ipv4Addr> {
158 self.ipv4
159 }
160
161 #[allow(dead_code)]
163 pub fn ipv6(&self) -> Option<Ipv6Addr> {
164 self.ipv6
165 }
166}
167
168pub fn mtu(_iface: &Iface) -> io::Result<u32> {
174 #[cfg(target_os = "linux")]
175 {
176 let path = format!("/sys/class/net/{}/mtu", _iface.name());
177 match fs::read_to_string(path) {
178 Ok(contents) => {
179 let mtu = contents
180 .trim()
181 .parse::<u32>()
182 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?;
183 tracing::debug!(name = _iface.name(), mtu, "resolved interface MTU");
184 return Ok(mtu);
185 }
186 Err(err) => {
187 warn!(name = _iface.name(), error = %err, "failed to read MTU, using default");
188 }
189 }
190 }
191
192 Ok(1500)
193}
194
195pub fn best_packet_size(mtu: u32) -> u32 {
200 const ETHERNET_L2: u32 = 14; const IPV4_HEADER: u32 = 20; const UDP_HEADER: u32 = 8; mtu.saturating_sub(ETHERNET_L2 + IPV4_HEADER + UDP_HEADER)
205}
206
207#[derive(Debug, Clone)]
209pub struct McOptions {
210 pub loopback: bool,
212 pub ttl: u32,
214 pub rcvbuf_bytes: usize,
216 pub reuse_addr: bool,
218}
219
220impl Default for McOptions {
221 fn default() -> Self {
222 Self {
223 loopback: false,
224 ttl: 1,
225 rcvbuf_bytes: DEFAULT_RCVBUF_BYTES,
226 reuse_addr: true,
227 }
228 }
229}
230
231pub async fn bind_udp(
233 bind: IpAddr,
234 port: u16,
235 iface: Option<Iface>,
236 recv_buffer: Option<usize>,
237) -> io::Result<UdpSocket> {
238 let recv_buffer = recv_buffer.unwrap_or(DEFAULT_RCVBUF_BYTES);
239 if let Some(ipv4) = iface.as_ref().and_then(|iface| iface.ipv4()) {
240 info!(name = iface.as_ref().map(Iface::name), %ipv4, port, "binding GVSP socket");
241 } else {
242 info!(%bind, port, "binding GVSP socket");
243 }
244
245 let domain = match bind {
246 IpAddr::V4(_) => Domain::IPV4,
247 IpAddr::V6(_) => Domain::IPV6,
248 };
249 let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
250
251 socket.set_reuse_address(true)?;
252 #[cfg(all(unix, not(target_os = "solaris")))]
253 socket.set_reuse_port(true)?;
254
255 socket.set_recv_buffer_size(recv_buffer)?;
256
257 #[cfg(any(target_os = "linux", target_os = "android"))]
258 if let Some(iface) = iface.as_ref()
259 && let Err(err) = socket.bind_device(Some(iface.name().as_bytes()))
260 {
261 warn!(name = iface.name(), error = %err, "SO_BINDTODEVICE failed");
262 }
263
264 let addr = SocketAddr::new(bind, port);
265 socket.bind(&addr.into())?;
266
267 let std_socket: std::net::UdpSocket = socket.into();
268 std_socket.set_nonblocking(true)?;
269 UdpSocket::from_std(std_socket)
270}
271
272fn validate_multicast_inputs(group: Ipv4Addr, ttl: u32) -> io::Result<()> {
273 if ttl > 255 {
274 return Err(io::Error::new(
275 io::ErrorKind::InvalidInput,
276 "multicast TTL must be <= 255",
277 ));
278 }
279 if (group.octets()[0] & 0xF0) != 0xE0 {
280 return Err(io::Error::new(
281 io::ErrorKind::InvalidInput,
282 "multicast group must be within 224.0.0.0/4",
283 ));
284 }
285 Ok(())
286}
287
288pub async fn bind_multicast(
290 iface: &Iface,
291 group: Ipv4Addr,
292 port: u16,
293 opts: &McOptions,
294) -> io::Result<UdpSocket> {
295 validate_multicast_inputs(group, opts.ttl)?;
296 let iface_addr = iface
297 .ipv4()
298 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "interface lacks IPv4"))?;
299
300 info!(name = iface.name(), %group, port, "binding multicast GVSP socket");
301
302 let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
303
304 if opts.reuse_addr {
305 socket.set_reuse_address(true)?;
306 #[cfg(all(unix, not(target_os = "solaris")))]
307 socket.set_reuse_port(true)?;
308 }
309
310 socket.set_recv_buffer_size(opts.rcvbuf_bytes)?;
311 socket.set_multicast_loop_v4(opts.loopback)?;
312 socket.set_multicast_ttl_v4(opts.ttl)?;
313 socket.set_multicast_if_v4(&iface_addr)?;
314
315 #[cfg(any(target_os = "linux", target_os = "android"))]
316 if let Err(err) = socket.bind_device(Some(iface.name().as_bytes())) {
317 warn!(name = iface.name(), error = %err, "SO_BINDTODEVICE failed");
318 }
319
320 let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port);
321 socket.bind(&bind_addr.into())?;
322 socket.join_multicast_v4(&group, &iface_addr)?;
323
324 let std_socket: std::net::UdpSocket = socket.into();
325 std_socket.set_nonblocking(true)?;
326 UdpSocket::from_std(std_socket)
327}
328
329pub fn join_multicast(sock: &UdpSocket, group: Ipv4Addr, iface: &Iface) -> io::Result<()> {
331 let socket = SockRef::from(sock);
332 let iface_addr = iface.ipv4().unwrap_or(Ipv4Addr::UNSPECIFIED);
333 socket.join_multicast_v4(&group, &iface_addr)?;
334 Ok(())
335}
336
337#[derive(Debug)]
339pub struct BufferPool {
340 buffers: Mutex<VecDeque<BytesMut>>,
341 size: usize,
342}
343
344impl BufferPool {
345 pub fn new(capacity: usize, size: usize) -> Self {
347 let mut buffers = VecDeque::with_capacity(capacity);
348 for _ in 0..capacity {
349 buffers.push_back(BytesMut::with_capacity(size));
350 }
351 Self {
352 buffers: Mutex::new(buffers),
353 size,
354 }
355 }
356
357 pub fn acquire(&self) -> Option<BytesMut> {
359 self.buffers
360 .lock()
361 .ok()
362 .and_then(|mut guard| guard.pop_front())
363 }
364
365 pub fn release(&self, mut buffer: BytesMut) {
367 buffer.truncate(0);
368 buffer.reserve(self.size);
369 if let Ok(mut guard) = self.buffers.lock() {
370 guard.push_back(buffer);
371 }
372 }
373}
374
375pub fn default_bind_addr() -> IpAddr {
377 IpAddr::V4(Ipv4Addr::UNSPECIFIED)
378}
379
380#[cfg(test)]
381mod tests {
382 use super::*;
383
384 #[test]
385 fn reject_invalid_ttl() {
386 let err = validate_multicast_inputs(Ipv4Addr::new(239, 0, 0, 1), 512).unwrap_err();
387 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
388 }
389
390 #[test]
391 fn reject_non_multicast_group() {
392 let err = validate_multicast_inputs(Ipv4Addr::new(192, 168, 1, 1), 1).unwrap_err();
393 assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
394 }
395
396 #[test]
397 fn accept_valid_group() {
398 assert!(validate_multicast_inputs(Ipv4Addr::new(239, 192, 1, 10), 1).is_ok());
399 }
400
401 #[test]
402 #[cfg(not(target_os = "windows"))]
403 fn from_system_loopback() {
404 let lo_name = if cfg!(target_os = "macos") {
405 "lo0"
406 } else {
407 "lo"
408 };
409 let iface = Iface::from_system(lo_name).expect("loopback interface should exist");
410 assert!(iface.ipv4().unwrap().is_loopback());
411 }
412
413 #[test]
414 fn packet_size_respects_headers() {
415 let mtu = 1500;
416 let size = best_packet_size(mtu);
417 assert!(size < mtu);
418 assert_eq!(size, 1500 - (14 + 20 + 8));
419 }
420
421 #[test]
422 fn buffer_pool_recycles() {
423 let pool = BufferPool::new(2, 1024);
424 let mut buf = pool.acquire().expect("buffer");
425 buf.extend_from_slice(&[1, 2, 3]);
426 pool.release(buf);
427 let buf2 = pool.acquire().expect("buffer");
428 assert!(buf2.is_empty());
429 assert!(buf2.capacity() >= 1024);
430 }
431}