tl_gige/
gvcp.rs

1//! GVCP control plane utilities.
2
3use std::collections::HashMap;
4use std::io::Cursor;
5use std::net::{IpAddr, Ipv4Addr, SocketAddr};
6use std::time::Duration;
7
8use bytes::{Buf, BufMut, Bytes, BytesMut};
9use fastrand::Rng;
10use genicp::{
11    decode_ack, encode_cmd, AckHeader, CommandFlags, GenCpAck, GenCpCmd, OpCode, StatusCode,
12};
13use if_addrs::{get_if_addrs, IfAddr};
14use thiserror::Error;
15use tokio::net::UdpSocket;
16use tokio::task::JoinSet;
17use tokio::time;
18use tracing::{debug, info, trace, warn};
19
20use crate::nic::{self, Iface};
21
22/// GVCP protocol constants grouped by semantic area.
23pub mod consts {
24    use std::time::Duration;
25
26    /// GVCP control port as defined by the GigE Vision specification (section 7.3).
27    pub const PORT: u16 = 3956;
28    /// Opcode of the discovery command.
29    pub const DISCOVERY_COMMAND: u16 = 0x0002;
30    /// Opcode of the discovery acknowledgement.
31    pub const DISCOVERY_ACK: u16 = 0x0003;
32    /// Opcode for requesting packet resends.
33    pub const PACKET_RESEND_COMMAND: u16 = 0x0040;
34    /// Opcode of the packet resend acknowledgement.
35    pub const PACKET_RESEND_ACK: u16 = 0x0041;
36
37    /// Address of the SFNC `GevMessageChannel0DestinationAddress` register.
38    pub const MESSAGE_DESTINATION_ADDRESS: u64 = 0x0900_0200;
39    /// Address of the SFNC `GevMessageChannel0DestinationPort` register.
40    pub const MESSAGE_DESTINATION_PORT: u64 = 0x0900_0204;
41    /// Base address of the event notification mask (`GevEventNotificationAll`).
42    pub const EVENT_NOTIFICATION_BASE: u64 = 0x0900_0300;
43    /// Stride between successive event notification mask registers (bytes).
44    pub const EVENT_NOTIFICATION_STRIDE: u64 = 4;
45
46    /// Maximum number of bytes we read per GenCP `ReadMem` operation.
47    pub const GENCP_MAX_BLOCK: usize = 512;
48    /// Additional bytes that accompany a GenCP `WriteMem` block.
49    pub const GENCP_WRITE_OVERHEAD: usize = 8;
50
51    /// Default timeout for control transactions.
52    pub const CONTROL_TIMEOUT: Duration = Duration::from_millis(500);
53    /// Maximum number of automatic retries for a control transaction.
54    pub const MAX_RETRIES: usize = 4;
55    /// Base delay used for retry backoff.
56    pub const RETRY_BASE_DELAY: Duration = Duration::from_millis(20);
57    /// Upper bound for the random jitter added to the retry delay (inclusive).
58    pub const RETRY_JITTER: Duration = Duration::from_millis(10);
59
60    /// Maximum number of bytes captured while listening for discovery responses.
61    pub const DISCOVERY_BUFFER: usize = 2048;
62
63    /// Base register for stream channel configuration (GigE Vision 2.1, table 63).
64    pub const STREAM_CHANNEL_BASE: u64 = 0x0900_0400;
65    /// Stride in bytes between successive stream channel blocks.
66    pub const STREAM_CHANNEL_STRIDE: u64 = 0x40;
67    /// Offset for `GevSCPHostIPAddress` within a stream channel block.
68    pub const STREAM_DESTINATION_ADDRESS: u64 = 0x00;
69    /// Offset for `GevSCPHostPort` within a stream channel block.
70    pub const STREAM_DESTINATION_PORT: u64 = 0x04;
71    /// Offset for `GevSCPSPacketSize` within a stream channel block.
72    pub const STREAM_PACKET_SIZE: u64 = 0x24;
73    /// Offset for `GevSCPD` (packet delay) within a stream channel block.
74    pub const STREAM_PACKET_DELAY: u64 = 0x28;
75}
76
77/// Public alias for the GVCP well-known port.
78pub use consts::PORT as GVCP_PORT;
79
80/// GVCP request header.
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub struct GvcpRequestHeader {
83    /// Request flags (acknowledgement, broadcast).
84    pub flags: CommandFlags,
85    /// Raw command/opcode value.
86    pub command: u16,
87    /// Payload length in bytes.
88    pub length: u16,
89    /// Request identifier.
90    pub request_id: u16,
91}
92
93impl GvcpRequestHeader {
94    /// Encode the header into a `Bytes` buffer ready to be transmitted.
95    pub fn encode(self, payload: &[u8]) -> Bytes {
96        let mut buf = BytesMut::with_capacity(genicp::HEADER_SIZE + payload.len());
97        buf.put_u16(self.flags.bits());
98        buf.put_u16(self.command);
99        buf.put_u16(self.length);
100        buf.put_u16(self.request_id);
101        buf.extend_from_slice(payload);
102        buf.freeze()
103    }
104}
105
106/// GVCP acknowledgement header wrapper.
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108pub struct GvcpAckHeader {
109    /// Status reported by the device.
110    pub status: StatusCode,
111    /// Raw command/opcode value.
112    pub command: u16,
113    /// Payload length in bytes.
114    pub length: u16,
115    /// Identifier of the answered request.
116    pub request_id: u16,
117}
118
119impl From<AckHeader> for GvcpAckHeader {
120    fn from(value: AckHeader) -> Self {
121        Self {
122            status: value.status,
123            command: value.opcode.ack_code(),
124            length: value.length,
125            request_id: value.request_id,
126        }
127    }
128}
129
130/// Errors that can occur when operating the GVCP control path.
131#[derive(Debug, Error)]
132pub enum GigeError {
133    #[error("io: {0}")]
134    Io(#[from] std::io::Error),
135    #[error("protocol: {0}")]
136    Protocol(String),
137    #[error("timeout waiting for acknowledgement")]
138    Timeout,
139    #[error("GenCP: {0}")]
140    GenCp(#[from] genicp::GenCpError),
141    #[error("device reported status {0:?}")]
142    Status(StatusCode),
143}
144
145/// Information returned by GVCP discovery packets.
146#[derive(Debug, Clone, PartialEq, Eq)]
147pub struct DeviceInfo {
148    pub ip: Ipv4Addr,
149    pub mac: [u8; 6],
150    pub model: Option<String>,
151    pub manufacturer: Option<String>,
152}
153
154impl DeviceInfo {
155    fn mac_string(&self) -> String {
156        self.mac
157            .iter()
158            .map(|byte| format!("{byte:02X}"))
159            .collect::<Vec<_>>()
160            .join(":")
161    }
162}
163
164/// Discover GigE Vision devices on the local network by broadcasting a GVCP discovery command.
165pub async fn discover(timeout: Duration) -> Result<Vec<DeviceInfo>, GigeError> {
166    discover_filtered(timeout, None).await
167}
168
169/// Discover devices only on the specified interface name.
170pub async fn discover_on_interface(
171    timeout: Duration,
172    interface: &str,
173) -> Result<Vec<DeviceInfo>, GigeError> {
174    discover_filtered(timeout, Some(interface)).await
175}
176
177async fn discover_filtered(
178    timeout: Duration,
179    iface_filter: Option<&str>,
180) -> Result<Vec<DeviceInfo>, GigeError> {
181    let mut interfaces = Vec::new();
182    for iface in get_if_addrs()? {
183        let IfAddr::V4(v4) = iface.addr else {
184            continue;
185        };
186        if v4.ip.is_loopback() {
187            continue;
188        }
189        if let Some(filter) = iface_filter {
190            if iface.name != filter {
191                continue;
192            }
193        }
194        interfaces.push((iface.name, v4));
195    }
196
197    if interfaces.is_empty() {
198        return Ok(Vec::new());
199    }
200
201    let mut join_set = JoinSet::new();
202    for (idx, (name, v4)) in interfaces.into_iter().enumerate() {
203        let request_id = 0x0100u16.wrapping_add(idx as u16);
204        let interface_name = name.clone();
205        join_set.spawn(async move {
206            let local_addr = SocketAddr::new(IpAddr::V4(v4.ip), 0);
207            let socket = UdpSocket::bind(local_addr).await?;
208            socket.set_broadcast(true)?;
209            let broadcast = v4.broadcast.unwrap_or(Ipv4Addr::BROADCAST);
210            let destination = SocketAddr::new(IpAddr::V4(broadcast), consts::PORT);
211
212            let header = GvcpRequestHeader {
213                flags: CommandFlags::ACK_REQUIRED | CommandFlags::BROADCAST,
214                command: consts::DISCOVERY_COMMAND,
215                length: 0,
216                request_id,
217            };
218            let packet = header.encode(&[]);
219            info!(%interface_name, local = %v4.ip, dest = %destination, "sending GVCP discovery");
220            trace!(%interface_name, bytes = packet.len(), "GVCP discovery payload size");
221            socket.send_to(&packet, destination).await?;
222
223            let mut responses = Vec::new();
224            let mut buffer = vec![0u8; consts::DISCOVERY_BUFFER];
225            let timer = time::sleep(timeout);
226            tokio::pin!(timer);
227            loop {
228                tokio::select! {
229                    _ = &mut timer => break,
230                    recv = socket.recv_from(&mut buffer) => {
231                        let (len, src) = recv?;
232                        info!(%interface_name, %src, "received GVCP response");
233                        trace!(%interface_name, bytes = len, "GVCP response length");
234                        if let Some(info) = parse_discovery_ack(&buffer[..len], request_id)? {
235                            trace!(ip = %info.ip, mac = %info.mac_string(), "parsed discovery ack");
236                            responses.push(info);
237                        }
238                    }
239                }
240            }
241            Ok::<_, GigeError>(responses)
242        });
243    }
244
245    let mut seen = HashMap::new();
246    while let Some(res) = join_set.join_next().await {
247        let devices =
248            res.map_err(|e| GigeError::Protocol(format!("discovery task failed: {e}")))??;
249        for dev in devices {
250            seen.entry((dev.ip, dev.mac)).or_insert(dev);
251        }
252    }
253
254    let mut devices: Vec<_> = seen.into_values().collect();
255    devices.sort_by_key(|d| d.ip);
256    Ok(devices)
257}
258
259fn parse_discovery_ack(buf: &[u8], expected_request: u16) -> Result<Option<DeviceInfo>, GigeError> {
260    if buf.len() < genicp::HEADER_SIZE {
261        return Err(GigeError::Protocol("GVCP ack too short".into()));
262    }
263    let mut header = buf;
264    let status = header.get_u16();
265    let command = header.get_u16();
266    let length = header.get_u16() as usize;
267    let request_id = header.get_u16();
268    if request_id != expected_request {
269        return Ok(None);
270    }
271    if command != consts::DISCOVERY_ACK {
272        return Err(GigeError::Protocol(format!(
273            "unexpected discovery opcode {command:#06x}"
274        )));
275    }
276    if status != 0 {
277        return Err(GigeError::Protocol(format!(
278            "discovery returned status {status:#06x}"
279        )));
280    }
281    if buf.len() < genicp::HEADER_SIZE + length {
282        return Err(GigeError::Protocol("discovery payload truncated".into()));
283    }
284    let payload = &buf[genicp::HEADER_SIZE..genicp::HEADER_SIZE + length];
285    let info = parse_discovery_payload(payload)?;
286    Ok(Some(info))
287}
288
289fn parse_discovery_payload(payload: &[u8]) -> Result<DeviceInfo, GigeError> {
290    let mut cursor = Cursor::new(payload);
291    if cursor.remaining() < 32 {
292        return Err(GigeError::Protocol("discovery payload too small".into()));
293    }
294    let _spec_major = cursor.get_u16();
295    let _spec_minor = cursor.get_u16();
296    let _device_mode = cursor.get_u32();
297    let _device_class = cursor.get_u16();
298    let _device_capability = cursor.get_u16();
299    let mut mac = [0u8; 6];
300    cursor.copy_to_slice(&mut mac);
301    let _ip_config_options = cursor.get_u16();
302    let _ip_config_current = cursor.get_u16();
303    let ip = Ipv4Addr::from(cursor.get_u32());
304    let _subnet = cursor.get_u32();
305    let _gateway = cursor.get_u32();
306    let manufacturer = read_fixed_string(&mut cursor, 32)?;
307    let model = read_fixed_string(&mut cursor, 32)?;
308    let _ = skip_string(&mut cursor, 32);
309    let _ = skip_string(&mut cursor, 16);
310    let _ = skip_string(&mut cursor, 16);
311
312    Ok(DeviceInfo {
313        ip,
314        mac,
315        manufacturer,
316        model,
317    })
318}
319
320fn read_fixed_string(cursor: &mut Cursor<&[u8]>, len: usize) -> Result<Option<String>, GigeError> {
321    if cursor.remaining() < len {
322        return Err(GigeError::Protocol("discovery string truncated".into()));
323    }
324    let mut buf = vec![0u8; len];
325    cursor.copy_to_slice(&mut buf);
326    Ok(parse_string(&buf))
327}
328
329fn skip_string(cursor: &mut Cursor<&[u8]>, len: usize) -> Option<()> {
330    if cursor.remaining() < len {
331        return None;
332    }
333    cursor.advance(len);
334    Some(())
335}
336
337fn parse_string(bytes: &[u8]) -> Option<String> {
338    let end = bytes.iter().position(|&b| b == 0).unwrap_or(bytes.len());
339    let slice = &bytes[..end];
340    let s = String::from_utf8_lossy(slice).trim().to_string();
341    if s.is_empty() {
342        None
343    } else {
344        Some(s)
345    }
346}
347
348/// GVCP device handle.
349pub struct GigeDevice {
350    socket: UdpSocket,
351    remote: SocketAddr,
352    request_id: u16,
353    rng: Rng,
354}
355
356/// Stream negotiation outcome describing the values written to the device.
357#[derive(Debug, Clone, Copy, PartialEq, Eq)]
358pub struct StreamParams {
359    /// Selected GVSP packet size (bytes).
360    pub packet_size: u32,
361    /// Packet delay expressed in GVSP clock ticks (80 ns units).
362    pub packet_delay: u32,
363    /// Link MTU used to derive the packet size.
364    pub mtu: u32,
365    /// Host IPv4 address configured on the device.
366    pub host: Ipv4Addr,
367    /// Host port configured on the device.
368    pub port: u16,
369}
370
371impl GigeDevice {
372    /// Connect to a device GVCP endpoint.
373    pub async fn open(addr: SocketAddr) -> Result<Self, GigeError> {
374        let local_ip = match addr.ip() {
375            IpAddr::V4(_) => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
376            IpAddr::V6(_) => {
377                return Err(GigeError::Protocol("IPv6 GVCP is not supported".into()));
378            }
379        };
380        let socket = UdpSocket::bind(SocketAddr::new(local_ip, 0)).await?;
381        socket.connect(addr).await?;
382        Ok(Self {
383            socket,
384            remote: addr,
385            request_id: 1,
386            rng: Rng::new(),
387        })
388    }
389
390    /// Return the remote GVCP socket address associated with this device.
391    pub fn remote_addr(&self) -> SocketAddr {
392        self.remote
393    }
394
395    fn next_request_id(&mut self) -> u16 {
396        let id = self.request_id;
397        self.request_id = self.request_id.wrapping_add(1);
398        if self.request_id == 0 {
399            self.request_id = 1;
400        }
401        id
402    }
403
404    async fn transact_with_retry(
405        &mut self,
406        opcode: OpCode,
407        payload: BytesMut,
408    ) -> Result<GenCpAck, GigeError> {
409        let mut attempt = 0usize;
410        let mut payload = payload;
411        loop {
412            attempt += 1;
413            let request_id = self.next_request_id();
414            let payload_bytes = payload.clone().freeze();
415            let cmd = GenCpCmd {
416                header: genicp::CommandHeader {
417                    flags: CommandFlags::ACK_REQUIRED,
418                    opcode,
419                    length: payload_bytes.len() as u16,
420                    request_id,
421                },
422                payload: payload_bytes.clone(),
423            };
424            let encoded = encode_cmd(&cmd);
425            trace!(request_id, opcode = ?opcode, bytes = encoded.len(), attempt, "sending GenCP command");
426            if let Err(err) = self.socket.send(&encoded).await {
427                if attempt >= consts::MAX_RETRIES {
428                    return Err(err.into());
429                }
430                warn!(request_id, ?opcode, attempt, "send failed, retrying");
431                self.backoff(attempt).await;
432                payload = BytesMut::from(&payload_bytes[..]);
433                continue;
434            }
435
436            let mut buf =
437                vec![
438                    0u8;
439                    genicp::HEADER_SIZE + consts::GENCP_MAX_BLOCK + consts::GENCP_WRITE_OVERHEAD
440                ];
441            match time::timeout(consts::CONTROL_TIMEOUT, self.socket.recv(&mut buf)).await {
442                Ok(Ok(len)) => {
443                    trace!(request_id, bytes = len, attempt, "received GenCP ack");
444                    let ack = decode_ack(&buf[..len])?;
445                    if ack.header.request_id != request_id {
446                        debug!(
447                            request_id,
448                            got = ack.header.request_id,
449                            attempt,
450                            "acknowledgement id mismatch"
451                        );
452                        if attempt >= consts::MAX_RETRIES {
453                            return Err(GigeError::Protocol("acknowledgement id mismatch".into()));
454                        }
455                        self.backoff(attempt).await;
456                        payload = BytesMut::from(&payload_bytes[..]);
457                        continue;
458                    }
459                    if ack.header.opcode != opcode {
460                        return Err(GigeError::Protocol(
461                            "unexpected opcode in acknowledgement".into(),
462                        ));
463                    }
464                    match ack.header.status {
465                        StatusCode::Success => return Ok(ack),
466                        StatusCode::DeviceBusy if attempt < consts::MAX_RETRIES => {
467                            warn!(request_id, attempt, "device busy, retrying");
468                            self.backoff(attempt).await;
469                            payload = BytesMut::from(&payload_bytes[..]);
470                            continue;
471                        }
472                        other => return Err(GigeError::Status(other)),
473                    }
474                }
475                Ok(Err(err)) => {
476                    if attempt >= consts::MAX_RETRIES {
477                        return Err(err.into());
478                    }
479                    warn!(request_id, ?opcode, attempt, "receive error, retrying");
480                    self.backoff(attempt).await;
481                    payload = BytesMut::from(&payload_bytes[..]);
482                }
483                Err(_) => {
484                    if attempt >= consts::MAX_RETRIES {
485                        return Err(GigeError::Timeout);
486                    }
487                    warn!(request_id, ?opcode, attempt, "command timeout, retrying");
488                    self.backoff(attempt).await;
489                    payload = BytesMut::from(&payload_bytes[..]);
490                }
491            }
492        }
493    }
494
495    async fn backoff(&mut self, attempt: usize) {
496        let multiplier = 1u32 << (attempt.saturating_sub(1)).min(3);
497        let base_ms = consts::RETRY_BASE_DELAY.as_millis() as u64;
498        let base = Duration::from_millis(base_ms.saturating_mul(multiplier as u64).max(base_ms));
499        let jitter_ms = self.rng.u64(..=consts::RETRY_JITTER.as_millis() as u64);
500        let jitter = Duration::from_millis(jitter_ms);
501        let delay = base + jitter;
502        debug!(attempt, delay = ?delay, "gvcp retry backoff");
503        time::sleep(delay).await;
504    }
505
506    /// Read a block of memory from the remote device with chunking and retries.
507    pub async fn read_mem(&mut self, addr: u64, len: usize) -> Result<Vec<u8>, GigeError> {
508        let mut remaining = len;
509        let mut offset = 0usize;
510        let mut data = Vec::with_capacity(len);
511        while remaining > 0 {
512            let chunk = remaining.min(consts::GENCP_MAX_BLOCK);
513            let mut payload = BytesMut::with_capacity(12);
514            payload.put_u64(addr + offset as u64);
515            payload.put_u32(chunk as u32);
516            let ack = self.transact_with_retry(OpCode::ReadMem, payload).await?;
517            if ack.payload.len() != chunk {
518                return Err(GigeError::Protocol(format!(
519                    "expected {chunk} bytes but device returned {}",
520                    ack.payload.len()
521                )));
522            }
523            data.extend_from_slice(&ack.payload);
524            remaining -= chunk;
525            offset += chunk;
526        }
527        Ok(data)
528    }
529
530    /// Write a block of memory to the remote device with chunking and retries.
531    pub async fn write_mem(&mut self, addr: u64, data: &[u8]) -> Result<(), GigeError> {
532        let mut offset = 0usize;
533        while offset < data.len() {
534            let chunk =
535                (data.len() - offset).min(consts::GENCP_MAX_BLOCK - consts::GENCP_WRITE_OVERHEAD);
536            if chunk == 0 {
537                return Err(GigeError::Protocol("write chunk size is zero".into()));
538            }
539            let mut payload = BytesMut::with_capacity(consts::GENCP_WRITE_OVERHEAD + chunk);
540            payload.put_u64(addr + offset as u64);
541            payload.extend_from_slice(&data[offset..offset + chunk]);
542            let ack = self.transact_with_retry(OpCode::WriteMem, payload).await?;
543            if !ack.payload.is_empty() {
544                return Err(GigeError::Protocol(
545                    "write acknowledgement carried unexpected payload".into(),
546                ));
547            }
548            offset += chunk;
549        }
550        Ok(())
551    }
552
553    /// Configure the message channel destination address/port.
554    pub async fn set_message_destination(
555        &mut self,
556        ip: Ipv4Addr,
557        port: u16,
558    ) -> Result<(), GigeError> {
559        info!(%ip, port, "configuring message channel destination");
560        self.write_mem(consts::MESSAGE_DESTINATION_ADDRESS, &ip.octets())
561            .await?;
562        self.write_mem(consts::MESSAGE_DESTINATION_PORT, &port.to_be_bytes())
563            .await?;
564        Ok(())
565    }
566
567    fn stream_reg(channel: u32, offset: u64) -> u64 {
568        consts::STREAM_CHANNEL_BASE + channel as u64 * consts::STREAM_CHANNEL_STRIDE + offset
569    }
570
571    /// Configure the GVSP host destination for the provided channel.
572    pub async fn set_stream_destination(
573        &mut self,
574        channel: u32,
575        ip: Ipv4Addr,
576        port: u16,
577    ) -> Result<(), GigeError> {
578        info!(channel, %ip, port, "configuring stream destination");
579        let addr = Self::stream_reg(channel, consts::STREAM_DESTINATION_ADDRESS);
580        self.write_mem(addr, &ip.octets()).await?;
581        let addr = Self::stream_reg(channel, consts::STREAM_DESTINATION_PORT);
582        self.write_mem(addr, &port.to_be_bytes()).await?;
583        Ok(())
584    }
585
586    /// Configure the packet size for the stream channel.
587    pub async fn set_stream_packet_size(
588        &mut self,
589        channel: u32,
590        packet_size: u32,
591    ) -> Result<(), GigeError> {
592        info!(channel, packet_size, "configuring stream packet size");
593        let addr = Self::stream_reg(channel, consts::STREAM_PACKET_SIZE);
594        self.write_mem(addr, &packet_size.to_be_bytes()).await
595    }
596
597    /// Configure the packet delay (`GevSCPD`).
598    pub async fn set_stream_packet_delay(
599        &mut self,
600        channel: u32,
601        packet_delay: u32,
602    ) -> Result<(), GigeError> {
603        debug!(channel, packet_delay, "configuring stream packet delay");
604        let addr = Self::stream_reg(channel, consts::STREAM_PACKET_DELAY);
605        self.write_mem(addr, &packet_delay.to_be_bytes()).await
606    }
607
608    /// Negotiate GVSP parameters with the device given the host interface.
609    pub async fn negotiate_stream(
610        &mut self,
611        channel: u32,
612        iface: &Iface,
613        port: u16,
614        target_mtu: Option<u32>,
615    ) -> Result<StreamParams, GigeError> {
616        let host_ip = iface
617            .ipv4()
618            .ok_or_else(|| GigeError::Protocol("interface lacks IPv4 address".into()))?;
619        let iface_mtu = nic::mtu(iface)?;
620        let mtu = target_mtu.map_or(iface_mtu, |limit| limit.min(iface_mtu));
621        let packet_size = nic::best_packet_size(mtu);
622        let packet_delay = if mtu <= 1500 {
623            // When jumbo frames are unavailable we space out packets by 2 µs to
624            // prevent excessive buffering pressure on receivers. GVSP expresses
625            // `GevSCPD` in units of 80 ns.
626            const DELAY_NS: u32 = 2_000; // 2 µs.
627            DELAY_NS / 80
628        } else {
629            0
630        };
631
632        self.set_stream_destination(channel, host_ip, port).await?;
633        self.set_stream_packet_size(channel, packet_size).await?;
634        self.set_stream_packet_delay(channel, packet_delay).await?;
635
636        Ok(StreamParams {
637            packet_size,
638            packet_delay,
639            mtu,
640            host: host_ip,
641            port,
642        })
643    }
644
645    /// Enable or disable delivery of the provided event identifier.
646    pub async fn enable_event_raw(&mut self, id: u16, on: bool) -> Result<(), GigeError> {
647        let index = (id / 32) as u64;
648        let bit = 1u32 << (id % 32);
649        let addr = consts::EVENT_NOTIFICATION_BASE + index * consts::EVENT_NOTIFICATION_STRIDE;
650        let current = self.read_mem(addr, 4).await?;
651        if current.len() != 4 {
652            return Err(GigeError::Protocol(
653                "event notification register length mismatch".into(),
654            ));
655        }
656        let mut bytes = [0u8; 4];
657        bytes.copy_from_slice(&current);
658        let mut value = u32::from_be_bytes(bytes);
659        if on {
660            value |= bit;
661        } else {
662            value &= !bit;
663        }
664        let new_bytes = value.to_be_bytes();
665        self.write_mem(addr, &new_bytes).await?;
666        debug!(event_id = id, enabled = on, "updated event mask");
667        Ok(())
668    }
669
670    /// Request resend of a packet range for the provided block identifier.
671    pub async fn request_resend(
672        &mut self,
673        block_id: u16,
674        first_packet: u16,
675        last_packet: u16,
676    ) -> Result<(), GigeError> {
677        let mut payload = BytesMut::with_capacity(8);
678        payload.put_u16(block_id);
679        payload.put_u16(0); // Reserved as per spec.
680        payload.put_u16(first_packet);
681        payload.put_u16(last_packet);
682
683        let request_id = self.next_request_id();
684        let header = GvcpRequestHeader {
685            flags: CommandFlags::ACK_REQUIRED,
686            command: consts::PACKET_RESEND_COMMAND,
687            length: payload.len() as u16,
688            request_id,
689        };
690        let packet = header.encode(&payload);
691        trace!(
692            block_id,
693            first_packet,
694            last_packet,
695            request_id,
696            "sending packet resend request"
697        );
698        self.socket.send(&packet).await?;
699        let mut buf = [0u8; genicp::HEADER_SIZE];
700        match time::timeout(consts::CONTROL_TIMEOUT, self.socket.recv(&mut buf)).await {
701            Ok(Ok(len)) => {
702                if len != genicp::HEADER_SIZE {
703                    return Err(GigeError::Protocol("resend ack length mismatch".into()));
704                }
705                let mut cursor = &buf[..];
706                let status = StatusCode::from_raw(cursor.get_u16());
707                let command = cursor.get_u16();
708                let length = cursor.get_u16();
709                let ack_request_id = cursor.get_u16();
710                if command != consts::PACKET_RESEND_ACK {
711                    return Err(GigeError::Protocol("unexpected resend ack opcode".into()));
712                }
713                if length != 0 {
714                    return Err(GigeError::Protocol("resend ack carried payload".into()));
715                }
716                if ack_request_id != request_id {
717                    return Err(GigeError::Protocol("resend ack request id mismatch".into()));
718                }
719                if status != StatusCode::Success {
720                    return Err(GigeError::Status(status));
721                }
722                Ok(())
723            }
724            Ok(Err(err)) => Err(err.into()),
725            Err(_) => Err(GigeError::Timeout),
726        }
727    }
728}
729
730#[cfg(test)]
731mod tests {
732    use super::*;
733
734    #[test]
735    fn request_header_roundtrip() {
736        let header = GvcpRequestHeader {
737            flags: CommandFlags::ACK_REQUIRED,
738            command: 0x1234,
739            length: 4,
740            request_id: 0xBEEF,
741        };
742        let payload = [1u8, 2, 3, 4];
743        let encoded = header.encode(&payload);
744        assert_eq!(encoded.len(), genicp::HEADER_SIZE + payload.len());
745        assert_eq!(&encoded[0..2], &header.flags.bits().to_be_bytes());
746        assert_eq!(&encoded[2..4], &header.command.to_be_bytes());
747        assert_eq!(&encoded[4..6], &header.length.to_be_bytes());
748        assert_eq!(&encoded[6..8], &header.request_id.to_be_bytes());
749        assert_eq!(&encoded[8..], &payload);
750    }
751
752    #[test]
753    fn ack_header_conversion() {
754        let ack = AckHeader {
755            status: StatusCode::DeviceBusy,
756            opcode: OpCode::ReadMem,
757            length: 12,
758            request_id: 0x44,
759        };
760        let converted = GvcpAckHeader::from(ack);
761        assert_eq!(converted.status, StatusCode::DeviceBusy);
762        assert_eq!(converted.command, OpCode::ReadMem.ack_code());
763        assert_eq!(converted.length, 12);
764        assert_eq!(converted.request_id, 0x44);
765    }
766}