viva_gige/
gvcp.rs

1//! GVCP control plane utilities.
2
3use std::collections::HashMap;
4use std::io::Cursor;
5use std::net::{IpAddr, Ipv4Addr, SocketAddr};
6use std::time::Duration;
7
8use bytes::{Buf, BufMut, Bytes, BytesMut};
9use fastrand::Rng;
10use if_addrs::{IfAddr, get_if_addrs};
11use thiserror::Error;
12use tokio::net::UdpSocket;
13use tokio::task::JoinSet;
14use tokio::time;
15use tracing::{debug, info, trace, warn};
16use viva_gencp::{AckHeader, CommandFlags, GenCpAck, OpCode, StatusCode, decode_ack};
17
18use crate::nic::{self, Iface};
19
20/// GVCP protocol constants grouped by semantic area.
21pub mod consts {
22    use std::time::Duration;
23
24    /// GVCP control port as defined by the GigE Vision specification (section 7.3).
25    pub const PORT: u16 = 3956;
26    /// Opcode of the discovery command.
27    pub const DISCOVERY_COMMAND: u16 = 0x0002;
28    /// Opcode of the discovery acknowledgement.
29    pub const DISCOVERY_ACK: u16 = 0x0003;
30    /// Opcode of the FORCEIP command.
31    pub const FORCEIP_COMMAND: u16 = 0x0004;
32    /// Opcode of the FORCEIP acknowledgement.
33    pub const FORCEIP_ACK: u16 = 0x0005;
34    /// Opcode for requesting packet resends.
35    pub const PACKET_RESEND_COMMAND: u16 = 0x0040;
36    /// Opcode of the packet resend acknowledgement.
37    pub const PACKET_RESEND_ACK: u16 = 0x0041;
38
39    /// Current IP configuration flags register.
40    ///
41    /// Bit 2 = DHCP, bit 1 = persistent IP, bit 0 = LLA.
42    pub const CURRENT_IP_CONFIG: u64 = 0x0014;
43
44    /// Persistent IP address register (4 bytes at the end of a 16-byte block).
45    pub const PERSISTENT_IP_ADDRESS: u64 = 0x064C;
46    /// Persistent subnet mask register.
47    pub const PERSISTENT_SUBNET_MASK: u64 = 0x065C;
48    /// Persistent default gateway register.
49    pub const PERSISTENT_DEFAULT_GATEWAY: u64 = 0x066C;
50
51    /// Address of the Control Channel Privilege (CCP) register.
52    ///
53    /// A controller must write `CONTROL_PRIVILEGE` to this register before the
54    /// device accepts stream configuration or acquisition commands.
55    pub const CONTROL_CHANNEL_PRIVILEGE: u64 = 0x0a00;
56    /// CCP value claiming exclusive control.
57    pub const CCP_CONTROL: u32 = 1 << 1;
58    /// CCP value indicating an exclusive owner.
59    pub const CCP_EXCLUSIVE: u32 = 1 << 0;
60
61    /// Address of the SFNC `GevMessageChannel0DestinationAddress` register.
62    pub const MESSAGE_DESTINATION_ADDRESS: u64 = 0x0900_0200;
63    /// Address of the SFNC `GevMessageChannel0DestinationPort` register.
64    pub const MESSAGE_DESTINATION_PORT: u64 = 0x0900_0204;
65    /// Base address of the event notification mask (`GevEventNotificationAll`).
66    pub const EVENT_NOTIFICATION_BASE: u64 = 0x0900_0300;
67    /// Stride between successive event notification mask registers (bytes).
68    pub const EVENT_NOTIFICATION_STRIDE: u64 = 4;
69
70    /// Maximum number of bytes we read per GenCP `ReadMem` operation.
71    pub const GENCP_MAX_BLOCK: usize = 512;
72    /// Additional bytes that accompany a GenCP `WriteMem` block.
73    pub const GENCP_WRITE_OVERHEAD: usize = 8;
74
75    /// Default timeout for control transactions.
76    pub const CONTROL_TIMEOUT: Duration = Duration::from_millis(500);
77    /// Maximum number of automatic retries for a control transaction.
78    pub const MAX_RETRIES: usize = 4;
79    /// Base delay used for retry backoff.
80    pub const RETRY_BASE_DELAY: Duration = Duration::from_millis(20);
81    /// Upper bound for the random jitter added to the retry delay (inclusive).
82    pub const RETRY_JITTER: Duration = Duration::from_millis(10);
83
84    /// Maximum number of bytes captured while listening for discovery responses.
85    pub const DISCOVERY_BUFFER: usize = 2048;
86
87    /// Base register for stream channel 0 (GigE Vision bootstrap register map).
88    ///
89    /// The GigE Vision specification defines stream channel bootstrap registers
90    /// starting at 0x0d00. Note: some cameras may use different offsets declared
91    /// in their GenICam XML (e.g. SFNC `GevSCDA` nodes). The bootstrap offsets
92    /// here match the aravis implementation and the GigE Vision 2.x standard.
93    pub const STREAM_CHANNEL_BASE: u64 = 0x0d00;
94    /// Stride in bytes between successive stream channel blocks.
95    pub const STREAM_CHANNEL_STRIDE: u64 = 0x40;
96    /// Offset for `GevSCPHostPort` within a stream channel block.
97    pub const STREAM_DESTINATION_PORT: u64 = 0x00;
98    /// Offset for `GevSCPSPacketSize` within a stream channel block.
99    pub const STREAM_PACKET_SIZE: u64 = 0x04;
100    /// Offset for `GevSCPD` (packet delay) within a stream channel block.
101    pub const STREAM_PACKET_DELAY: u64 = 0x08;
102    /// Offset for `GevSCDA` (stream destination IP address) within a stream channel block.
103    pub const STREAM_DESTINATION_ADDRESS: u64 = 0x18;
104}
105
106/// Public alias for the GVCP well-known port.
107pub use consts::PORT as GVCP_PORT;
108
109/// GVCP request header.
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111pub struct GvcpRequestHeader {
112    /// Request flags (acknowledgement, broadcast).
113    pub flags: CommandFlags,
114    /// Raw command/opcode value.
115    pub command: u16,
116    /// Payload length in bytes.
117    pub length: u16,
118    /// Request identifier.
119    pub request_id: u16,
120}
121
122/// GVCP command message key value (first byte of every GVCP command packet).
123const GVCP_CMD_KEY: u8 = 0x42;
124
125impl GvcpRequestHeader {
126    /// Encode the header into a `Bytes` buffer ready to be transmitted.
127    ///
128    /// Uses proper GVCP wire format: byte 0 = `0x42` (command key),
129    /// byte 1 = flags byte (bit 0 = ACK_REQUIRED, bit 4 = BROADCAST).
130    pub fn encode(self, payload: &[u8]) -> Bytes {
131        let mut buf = BytesMut::with_capacity(viva_gencp::HEADER_SIZE + payload.len());
132        // GVCP command header: key byte + flags byte (not a u16 flags field).
133        buf.put_u8(GVCP_CMD_KEY);
134        buf.put_u8(self.gvcp_flags_byte());
135        buf.put_u16(self.command);
136        buf.put_u16(self.length);
137        buf.put_u16(self.request_id);
138        buf.extend_from_slice(payload);
139        buf.freeze()
140    }
141
142    /// Convert `CommandFlags` to the single-byte GVCP flag field.
143    fn gvcp_flags_byte(&self) -> u8 {
144        let mut byte = 0u8;
145        if self.flags.contains(CommandFlags::ACK_REQUIRED) {
146            byte |= 0x01;
147        }
148        if self.flags.contains(CommandFlags::BROADCAST) {
149            byte |= 0x10;
150        }
151        byte
152    }
153}
154
155/// GVCP acknowledgement header wrapper.
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub struct GvcpAckHeader {
158    /// Status reported by the device.
159    pub status: StatusCode,
160    /// Raw command/opcode value.
161    pub command: u16,
162    /// Payload length in bytes.
163    pub length: u16,
164    /// Identifier of the answered request.
165    pub request_id: u16,
166}
167
168impl From<AckHeader> for GvcpAckHeader {
169    fn from(value: AckHeader) -> Self {
170        Self {
171            status: value.status,
172            command: value.opcode.ack_code(),
173            length: value.length,
174            request_id: value.request_id,
175        }
176    }
177}
178
179/// Errors that can occur when operating the GVCP control path.
180#[derive(Debug, Error)]
181pub enum GigeError {
182    #[error("io: {0}")]
183    Io(#[from] std::io::Error),
184    #[error("protocol: {0}")]
185    Protocol(String),
186    #[error("timeout waiting for acknowledgement")]
187    Timeout,
188    #[error("GenCP: {0}")]
189    GenCp(#[from] viva_gencp::GenCpError),
190    #[error("device reported status {0:?}")]
191    Status(StatusCode),
192}
193
194/// Information returned by GVCP discovery packets.
195#[derive(Debug, Clone, PartialEq, Eq)]
196pub struct DeviceInfo {
197    pub ip: Ipv4Addr,
198    pub mac: [u8; 6],
199    pub model: Option<String>,
200    pub manufacturer: Option<String>,
201}
202
203impl DeviceInfo {
204    fn mac_string(&self) -> String {
205        self.mac
206            .iter()
207            .map(|byte| format!("{byte:02X}"))
208            .collect::<Vec<_>>()
209            .join(":")
210    }
211}
212
213/// Discover GigE Vision devices on the local network by broadcasting a GVCP discovery command.
214pub async fn discover(timeout: Duration) -> Result<Vec<DeviceInfo>, GigeError> {
215    discover_impl(timeout, None, false).await
216}
217
218/// Discover devices only on the specified interface name.
219/// Discover devices only on the specified interface name.
220///
221/// When a user explicitly names an interface (including loopback like `lo0`),
222/// it is always included — the loopback filter only applies to the unfiltered
223/// [`discover`] call.
224pub async fn discover_on_interface(
225    timeout: Duration,
226    interface: &str,
227) -> Result<Vec<DeviceInfo>, GigeError> {
228    discover_impl(timeout, Some(interface), true).await
229}
230
231/// Discover devices on all interfaces including loopback.
232///
233/// This is useful for testing with simulated cameras (e.g. `arv-fake-gv-camera`)
234/// bound to `127.0.0.1`.
235pub async fn discover_all(timeout: Duration) -> Result<Vec<DeviceInfo>, GigeError> {
236    discover_impl(timeout, None, true).await
237}
238
239/// Send a FORCEIP command to temporarily assign an IP address to a device.
240///
241/// FORCEIP is a broadcast command that targets a device by its MAC address.
242/// The assigned IP is temporary — it does not survive a power cycle. Use
243/// [`GigeDevice::write_persistent_ip`] + [`GigeDevice::enable_persistent_ip`]
244/// for permanent configuration.
245///
246/// FORCEIP payload layout (56 bytes, big-endian):
247/// ```text
248/// [0..2]   reserved
249/// [2..8]   target MAC address (6 bytes)
250/// [8..20]  reserved
251/// [20..24] static IP address
252/// [24..36] reserved
253/// [36..40] subnet mask
254/// [40..52] reserved
255/// [52..56] gateway
256/// ```
257pub async fn force_ip(
258    mac: [u8; 6],
259    ip: Ipv4Addr,
260    subnet: Ipv4Addr,
261    gateway: Ipv4Addr,
262    iface: Option<&Iface>,
263) -> Result<(), GigeError> {
264    // Build the 56-byte FORCEIP payload.
265    let payload = encode_forceip_payload(mac, ip, subnet, gateway);
266
267    let local_ip = match iface {
268        Some(iface) => iface
269            .ipv4()
270            .ok_or_else(|| GigeError::Protocol("interface lacks IPv4 address".into()))?,
271        None => Ipv4Addr::UNSPECIFIED,
272    };
273
274    let socket = UdpSocket::bind(SocketAddr::new(IpAddr::V4(local_ip), 0)).await?;
275    socket.set_broadcast(true)?;
276    let dest = SocketAddr::new(IpAddr::V4(Ipv4Addr::BROADCAST), consts::PORT);
277
278    let header = GvcpRequestHeader {
279        flags: CommandFlags::ACK_REQUIRED | CommandFlags::BROADCAST,
280        command: consts::FORCEIP_COMMAND,
281        length: payload.len() as u16,
282        request_id: 1,
283    };
284    let packet = header.encode(&payload);
285    info!(mac = ?mac, %ip, %subnet, %gateway, "sending FORCEIP command");
286    socket.send_to(&packet, dest).await?;
287
288    // Wait for FORCEIP_ACK.
289    let mut buf = vec![0u8; consts::DISCOVERY_BUFFER];
290    match time::timeout(consts::CONTROL_TIMEOUT, socket.recv_from(&mut buf)).await {
291        Ok(Ok((len, _src))) => {
292            if len < viva_gencp::HEADER_SIZE {
293                return Err(GigeError::Protocol("FORCEIP ack too short".into()));
294            }
295            let mut cursor = &buf[..];
296            let status = cursor.get_u16();
297            let command = cursor.get_u16();
298            if command != consts::FORCEIP_ACK {
299                return Err(GigeError::Protocol(format!(
300                    "unexpected FORCEIP ack opcode {command:#06x}"
301                )));
302            }
303            if status != 0 {
304                return Err(GigeError::Protocol(format!(
305                    "FORCEIP returned status {status:#06x}"
306                )));
307            }
308            info!(%ip, "FORCEIP accepted");
309            Ok(())
310        }
311        Ok(Err(e)) => Err(e.into()),
312        Err(_) => Err(GigeError::Timeout),
313    }
314}
315
316/// Encode the 56-byte FORCEIP payload.
317fn encode_forceip_payload(
318    mac: [u8; 6],
319    ip: Ipv4Addr,
320    subnet: Ipv4Addr,
321    gateway: Ipv4Addr,
322) -> Vec<u8> {
323    let mut buf = vec![0u8; 56];
324    // [0..2]   reserved
325    // [2..8]   MAC address
326    buf[2..8].copy_from_slice(&mac);
327    // [8..20]  reserved
328    // [20..24] IP address
329    buf[20..24].copy_from_slice(&ip.octets());
330    // [24..36] reserved
331    // [36..40] subnet mask
332    buf[36..40].copy_from_slice(&subnet.octets());
333    // [40..52] reserved
334    // [52..56] gateway
335    buf[52..56].copy_from_slice(&gateway.octets());
336    buf
337}
338
339async fn discover_impl(
340    timeout: Duration,
341    iface_filter: Option<&str>,
342    include_loopback: bool,
343) -> Result<Vec<DeviceInfo>, GigeError> {
344    let mut interfaces = Vec::new();
345    for iface in get_if_addrs()? {
346        let IfAddr::V4(v4) = iface.addr else {
347            continue;
348        };
349        if !include_loopback && v4.ip.is_loopback() {
350            continue;
351        }
352        if let Some(filter) = iface_filter
353            && iface.name != filter
354        {
355            continue;
356        }
357        interfaces.push((iface.name, v4));
358    }
359
360    if interfaces.is_empty() {
361        return Ok(Vec::new());
362    }
363
364    let mut join_set = JoinSet::new();
365    for (idx, (name, v4)) in interfaces.into_iter().enumerate() {
366        let request_id = 0x0100u16.wrapping_add(idx as u16);
367        let interface_name = name.clone();
368        join_set.spawn(async move {
369            let local_addr = SocketAddr::new(IpAddr::V4(v4.ip), 0);
370            let socket = UdpSocket::bind(local_addr).await?;
371            // On loopback, broadcast is not supported on some platforms (macOS).
372            // Send unicast discovery directly to the local address instead.
373            let destination = if v4.ip.is_loopback() {
374                SocketAddr::new(IpAddr::V4(v4.ip), consts::PORT)
375            } else {
376                socket.set_broadcast(true)?;
377                let broadcast = v4.broadcast.unwrap_or(Ipv4Addr::BROADCAST);
378                SocketAddr::new(IpAddr::V4(broadcast), consts::PORT)
379            };
380
381            let header = GvcpRequestHeader {
382                flags: CommandFlags::ACK_REQUIRED | CommandFlags::BROADCAST,
383                command: consts::DISCOVERY_COMMAND,
384                length: 0,
385                request_id,
386            };
387            let packet = header.encode(&[]);
388            info!(%interface_name, local = %v4.ip, dest = %destination, "sending GVCP discovery");
389            trace!(%interface_name, bytes = packet.len(), "GVCP discovery payload size");
390            socket.send_to(&packet, destination).await?;
391
392            let mut responses = Vec::new();
393            let mut buffer = vec![0u8; consts::DISCOVERY_BUFFER];
394            let timer = time::sleep(timeout);
395            tokio::pin!(timer);
396            loop {
397                tokio::select! {
398                    _ = &mut timer => break,
399                    recv = socket.recv_from(&mut buffer) => {
400                        let (len, src) = recv?;
401                        info!(%interface_name, %src, "received GVCP response");
402                        trace!(%interface_name, bytes = len, "GVCP response length");
403                        if let Some(info) = parse_discovery_ack(&buffer[..len], request_id)? {
404                            trace!(ip = %info.ip, mac = %info.mac_string(), "parsed discovery ack");
405                            responses.push(info);
406                        }
407                    }
408                }
409            }
410            Ok::<_, GigeError>(responses)
411        });
412    }
413
414    let mut seen = HashMap::new();
415    while let Some(res) = join_set.join_next().await {
416        let devices =
417            res.map_err(|e| GigeError::Protocol(format!("discovery task failed: {e}")))??;
418        for dev in devices {
419            seen.entry((dev.ip, dev.mac)).or_insert(dev);
420        }
421    }
422
423    let mut devices: Vec<_> = seen.into_values().collect();
424    devices.sort_by_key(|d| d.ip);
425    Ok(devices)
426}
427
428fn parse_discovery_ack(buf: &[u8], expected_request: u16) -> Result<Option<DeviceInfo>, GigeError> {
429    if buf.len() < viva_gencp::HEADER_SIZE {
430        return Err(GigeError::Protocol("GVCP ack too short".into()));
431    }
432    let mut header = buf;
433    let status = header.get_u16();
434    let command = header.get_u16();
435    let length = header.get_u16() as usize;
436    let request_id = header.get_u16();
437    if request_id != expected_request {
438        return Ok(None);
439    }
440    if command != consts::DISCOVERY_ACK {
441        return Err(GigeError::Protocol(format!(
442            "unexpected discovery opcode {command:#06x}"
443        )));
444    }
445    if status != 0 {
446        return Err(GigeError::Protocol(format!(
447            "discovery returned status {status:#06x}"
448        )));
449    }
450    if buf.len() < viva_gencp::HEADER_SIZE + length {
451        return Err(GigeError::Protocol("discovery payload truncated".into()));
452    }
453    let payload = &buf[viva_gencp::HEADER_SIZE..viva_gencp::HEADER_SIZE + length];
454    let info = parse_discovery_payload(payload)?;
455    Ok(Some(info))
456}
457
458/// Parse a GigE Vision Discovery ACK payload (248 bytes).
459///
460/// Field layout per GigE Vision specification (table 7-4):
461///
462/// | Offset | Size | Field                        |
463/// |--------|------|------------------------------|
464/// |      0 |    2 | Spec version major           |
465/// |      2 |    2 | Spec version minor           |
466/// |      4 |    4 | Device mode                  |
467/// |      8 |    4 | Reserved                     |
468/// |     12 |    2 | MAC address high             |
469/// |     14 |    4 | MAC address low              |
470/// |     18 |    4 | Supported IP config          |
471/// |     22 |    4 | Current IP config            |
472/// |     26 |   10 | Reserved                     |
473/// |     36 |    4 | Current IP address           |
474/// |     40 |   12 | Reserved                     |
475/// |     52 |    4 | Current subnet mask          |
476/// |     56 |   12 | Reserved                     |
477/// |     68 |    4 | Default gateway              |
478/// |     72 |   32 | Manufacturer name            |
479/// |    106 |   32 | Model name                   |
480/// |    138 |   32 | Device version               |
481/// |    170 |   48 | Manufacturer specific info   |
482/// |    218 |   16 | Serial number                |
483/// |    234 |   16 | User defined name            |
484fn parse_discovery_payload(payload: &[u8]) -> Result<DeviceInfo, GigeError> {
485    // Minimum size to reach past the current IP field.
486    if payload.len() < 40 {
487        return Err(GigeError::Protocol("discovery payload too small".into()));
488    }
489    let mut cursor = Cursor::new(payload);
490    let _spec_major = cursor.get_u16(); // 0
491    let _spec_minor = cursor.get_u16(); // 2
492    let _device_mode = cursor.get_u32(); // 4
493    let _reserved = cursor.get_u32(); // 8
494
495    // MAC: 2 bytes high + 4 bytes low = 6 bytes at offset 12.
496    let mut mac = [0u8; 6];
497    cursor.copy_to_slice(&mut mac); // 12..18
498
499    let _supported_ip_config = cursor.get_u32(); // 18
500    let _current_ip_config = cursor.get_u32(); // 22
501
502    // 10 bytes reserved before current IP.
503    cursor.advance(10); // 26..36
504    let ip = Ipv4Addr::from(cursor.get_u32()); // 36
505
506    // 12 bytes reserved before subnet.
507    cursor.advance(12); // 40..52
508    let _subnet = cursor.get_u32(); // 52
509
510    // 12 bytes reserved before gateway.
511    cursor.advance(12); // 56..68
512    let _gateway = cursor.get_u32(); // 68
513
514    // String fields.
515    let manufacturer = read_fixed_string(&mut cursor, 32)?; // 72
516    let model = read_fixed_string(&mut cursor, 32)?; // 104
517    // Remaining fields (version, info, serial, user name) are optional.
518
519    Ok(DeviceInfo {
520        ip,
521        mac,
522        manufacturer,
523        model,
524    })
525}
526
527fn read_fixed_string(cursor: &mut Cursor<&[u8]>, len: usize) -> Result<Option<String>, GigeError> {
528    if cursor.remaining() < len {
529        return Err(GigeError::Protocol("discovery string truncated".into()));
530    }
531    let mut buf = vec![0u8; len];
532    cursor.copy_to_slice(&mut buf);
533    Ok(parse_string(&buf))
534}
535
536fn parse_string(bytes: &[u8]) -> Option<String> {
537    let end = bytes.iter().position(|&b| b == 0).unwrap_or(bytes.len());
538    let slice = &bytes[..end];
539    let s = String::from_utf8_lossy(slice).trim().to_string();
540    if s.is_empty() { None } else { Some(s) }
541}
542
543/// GVCP device handle.
544pub struct GigeDevice {
545    socket: UdpSocket,
546    remote: SocketAddr,
547    request_id: u16,
548    rng: Rng,
549}
550
551/// Stream negotiation outcome describing the values written to the device.
552#[derive(Debug, Clone, Copy, PartialEq, Eq)]
553pub struct StreamParams {
554    /// Selected GVSP packet size (bytes).
555    pub packet_size: u32,
556    /// Packet delay expressed in GVSP clock ticks (80 ns units).
557    pub packet_delay: u32,
558    /// Link MTU used to derive the packet size.
559    pub mtu: u32,
560    /// Host IPv4 address configured on the device.
561    pub host: Ipv4Addr,
562    /// Host port configured on the device.
563    pub port: u16,
564}
565
566impl GigeDevice {
567    /// Connect to a device GVCP endpoint.
568    ///
569    /// The connection is ready for register read/write but does not claim
570    /// control privilege. Call [`Self::claim_control`] before configuring streaming
571    /// or starting acquisition.
572    pub async fn open(addr: SocketAddr) -> Result<Self, GigeError> {
573        let local_ip = match addr.ip() {
574            IpAddr::V4(_) => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
575            IpAddr::V6(_) => {
576                return Err(GigeError::Protocol("IPv6 GVCP is not supported".into()));
577            }
578        };
579        let socket = UdpSocket::bind(SocketAddr::new(local_ip, 0)).await?;
580        socket.connect(addr).await?;
581        Ok(Self {
582            socket,
583            remote: addr,
584            request_id: 1,
585            rng: Rng::new(),
586        })
587    }
588
589    /// Claim control channel privilege (CCP).
590    ///
591    /// Required by the GigE Vision specification before the device accepts
592    /// stream configuration or acquisition commands.
593    pub async fn claim_control(&mut self) -> Result<(), GigeError> {
594        self.write_register(
595            consts::CONTROL_CHANNEL_PRIVILEGE as u32,
596            consts::CCP_CONTROL,
597        )
598        .await?;
599        debug!(addr = %self.remote, "claimed control channel privilege");
600        Ok(())
601    }
602
603    /// Release control channel privilege.
604    pub async fn release_control(&mut self) -> Result<(), GigeError> {
605        self.write_register(consts::CONTROL_CHANNEL_PRIVILEGE as u32, 0)
606            .await
607    }
608
609    /// Return the remote GVCP socket address associated with this device.
610    pub fn remote_addr(&self) -> SocketAddr {
611        self.remote
612    }
613
614    fn next_request_id(&mut self) -> u16 {
615        let id = self.request_id;
616        self.request_id = self.request_id.wrapping_add(1);
617        if self.request_id == 0 {
618            self.request_id = 1;
619        }
620        id
621    }
622
623    async fn transact_with_retry(
624        &mut self,
625        opcode: OpCode,
626        payload: BytesMut,
627    ) -> Result<GenCpAck, GigeError> {
628        let mut attempt = 0usize;
629        let mut payload = payload;
630        loop {
631            attempt += 1;
632            let request_id = self.next_request_id();
633            let payload_bytes = payload.clone().freeze();
634            let header = GvcpRequestHeader {
635                flags: CommandFlags::ACK_REQUIRED,
636                command: opcode.command_code(),
637                length: payload_bytes.len() as u16,
638                request_id,
639            };
640            let encoded = header.encode(&payload_bytes);
641            trace!(request_id, opcode = ?opcode, bytes = encoded.len(), attempt, "sending GVCP command");
642            if let Err(err) = self.socket.send(&encoded).await {
643                if attempt >= consts::MAX_RETRIES {
644                    return Err(err.into());
645                }
646                warn!(request_id, ?opcode, attempt, "send failed, retrying");
647                self.backoff(attempt).await;
648                payload = BytesMut::from(&payload_bytes[..]);
649                continue;
650            }
651
652            let mut buf = vec![
653                0u8;
654                viva_gencp::HEADER_SIZE
655                    + consts::GENCP_MAX_BLOCK
656                    + consts::GENCP_WRITE_OVERHEAD
657            ];
658            match time::timeout(consts::CONTROL_TIMEOUT, self.socket.recv(&mut buf)).await {
659                Ok(Ok(len)) => {
660                    trace!(request_id, bytes = len, attempt, "received GenCP ack");
661                    let ack = decode_ack(&buf[..len])?;
662                    if ack.header.request_id != request_id {
663                        debug!(
664                            request_id,
665                            got = ack.header.request_id,
666                            attempt,
667                            "acknowledgement id mismatch"
668                        );
669                        if attempt >= consts::MAX_RETRIES {
670                            return Err(GigeError::Protocol("acknowledgement id mismatch".into()));
671                        }
672                        self.backoff(attempt).await;
673                        payload = BytesMut::from(&payload_bytes[..]);
674                        continue;
675                    }
676                    if ack.header.opcode != opcode {
677                        return Err(GigeError::Protocol(
678                            "unexpected opcode in acknowledgement".into(),
679                        ));
680                    }
681                    match ack.header.status {
682                        StatusCode::Success => return Ok(ack),
683                        StatusCode::DeviceBusy if attempt < consts::MAX_RETRIES => {
684                            warn!(request_id, attempt, "device busy, retrying");
685                            self.backoff(attempt).await;
686                            payload = BytesMut::from(&payload_bytes[..]);
687                            continue;
688                        }
689                        other => return Err(GigeError::Status(other)),
690                    }
691                }
692                Ok(Err(err)) => {
693                    if attempt >= consts::MAX_RETRIES {
694                        return Err(err.into());
695                    }
696                    warn!(request_id, ?opcode, attempt, "receive error, retrying");
697                    self.backoff(attempt).await;
698                    payload = BytesMut::from(&payload_bytes[..]);
699                }
700                Err(_) => {
701                    if attempt >= consts::MAX_RETRIES {
702                        return Err(GigeError::Timeout);
703                    }
704                    warn!(request_id, ?opcode, attempt, "command timeout, retrying");
705                    self.backoff(attempt).await;
706                    payload = BytesMut::from(&payload_bytes[..]);
707                }
708            }
709        }
710    }
711
712    async fn backoff(&mut self, attempt: usize) {
713        let multiplier = 1u32 << (attempt.saturating_sub(1)).min(3);
714        let base_ms = consts::RETRY_BASE_DELAY.as_millis() as u64;
715        let base = Duration::from_millis(base_ms.saturating_mul(multiplier as u64).max(base_ms));
716        let jitter_ms = self.rng.u64(..=consts::RETRY_JITTER.as_millis() as u64);
717        let jitter = Duration::from_millis(jitter_ms);
718        let delay = base + jitter;
719        debug!(attempt, delay = ?delay, "gvcp retry backoff");
720        time::sleep(delay).await;
721    }
722
723    /// Read a single 32-bit bootstrap or device register.
724    ///
725    /// Uses GVCP READREG format: 4-byte register address.
726    /// The acknowledgement carries the 4-byte register value.
727    pub async fn read_register(&mut self, addr: u32) -> Result<u32, GigeError> {
728        let mut payload = BytesMut::with_capacity(4);
729        payload.put_u32(addr);
730        let ack = self
731            .transact_with_retry(OpCode::ReadRegister, payload)
732            .await?;
733        if ack.payload.len() != 4 {
734            return Err(GigeError::Protocol(format!(
735                "expected 4-byte register ack but device returned {} bytes",
736                ack.payload.len()
737            )));
738        }
739        let mut cursor = &ack.payload[..];
740        Ok(cursor.get_u32())
741    }
742
743    /// Write a single 32-bit bootstrap or device register.
744    ///
745    /// Uses GVCP WRITEREG format: 4-byte register address + 4-byte value.
746    /// The acknowledgement carries a 4-byte data index placeholder.
747    pub async fn write_register(&mut self, addr: u32, value: u32) -> Result<(), GigeError> {
748        let mut payload = BytesMut::with_capacity(8);
749        payload.put_u32(addr);
750        payload.put_u32(value);
751        let ack = self
752            .transact_with_retry(OpCode::WriteRegister, payload)
753            .await?;
754        if ack.payload.len() != 4 {
755            return Err(GigeError::Protocol(format!(
756                "expected 4-byte register write ack but device returned {} bytes",
757                ack.payload.len()
758            )));
759        }
760        Ok(())
761    }
762
763    /// Read a block of memory from the remote device with chunking and retries.
764    ///
765    /// Uses GVCP READMEM format: 4-byte address + 2-byte reserved + 2-byte count.
766    /// The acknowledgement carries: 4-byte address echo + data bytes.
767    pub async fn read_mem(&mut self, addr: u64, len: usize) -> Result<Vec<u8>, GigeError> {
768        let mut remaining = len;
769        let mut offset = 0usize;
770        let mut data = Vec::with_capacity(len);
771        while remaining > 0 {
772            let chunk = remaining.min(consts::GENCP_MAX_BLOCK);
773            let mut payload = BytesMut::with_capacity(8);
774            payload.put_u32((addr + offset as u64) as u32);
775            payload.put_u16(0); // reserved
776            payload.put_u16(chunk as u16);
777            let ack = self.transact_with_retry(OpCode::ReadMem, payload).await?;
778            // GVCP READMEM_ACK: 4-byte address prefix + data.
779            let ack_data = if ack.payload.len() >= 4 + chunk {
780                &ack.payload[4..4 + chunk]
781            } else if ack.payload.len() == chunk {
782                // Some devices omit the address echo.
783                &ack.payload[..chunk]
784            } else {
785                return Err(GigeError::Protocol(format!(
786                    "expected {} bytes but device returned {}",
787                    chunk,
788                    ack.payload.len()
789                )));
790            };
791            data.extend_from_slice(ack_data);
792            remaining -= chunk;
793            offset += chunk;
794        }
795        Ok(data)
796    }
797
798    /// Write a block of memory to the remote device with chunking and retries.
799    ///
800    /// Uses GVCP WRITEMEM format: 4-byte address + data bytes.
801    /// The acknowledgement carries: 4-byte reserved (index).
802    pub async fn write_mem(&mut self, addr: u64, data: &[u8]) -> Result<(), GigeError> {
803        /// GVCP WRITEMEM overhead: 4-byte address prefix.
804        const GVCP_WRITE_OVERHEAD: usize = 4;
805
806        let mut offset = 0usize;
807        while offset < data.len() {
808            let chunk = (data.len() - offset).min(consts::GENCP_MAX_BLOCK - GVCP_WRITE_OVERHEAD);
809            if chunk == 0 {
810                return Err(GigeError::Protocol("write chunk size is zero".into()));
811            }
812            let mut payload = BytesMut::with_capacity(GVCP_WRITE_OVERHEAD + chunk);
813            payload.put_u32((addr + offset as u64) as u32);
814            payload.extend_from_slice(&data[offset..offset + chunk]);
815            let ack = self.transact_with_retry(OpCode::WriteMem, payload).await?;
816            // GVCP WRITEMEM_ACK: 4-byte reserved payload.
817            if ack.payload.len() > 4 {
818                return Err(GigeError::Protocol(
819                    "write acknowledgement carried unexpected payload".into(),
820                ));
821            }
822            offset += chunk;
823        }
824        Ok(())
825    }
826
827    /// Configure the message channel destination address/port.
828    pub async fn set_message_destination(
829        &mut self,
830        ip: Ipv4Addr,
831        port: u16,
832    ) -> Result<(), GigeError> {
833        info!(%ip, port, "configuring message channel destination");
834        self.write_mem(consts::MESSAGE_DESTINATION_ADDRESS, &ip.octets())
835            .await?;
836        self.write_mem(consts::MESSAGE_DESTINATION_PORT, &port.to_be_bytes())
837            .await?;
838        Ok(())
839    }
840
841    fn stream_reg(channel: u32, offset: u64) -> u64 {
842        consts::STREAM_CHANNEL_BASE + channel as u64 * consts::STREAM_CHANNEL_STRIDE + offset
843    }
844
845    /// Configure the GVSP host destination for the provided channel.
846    pub async fn set_stream_destination(
847        &mut self,
848        channel: u32,
849        ip: Ipv4Addr,
850        port: u16,
851    ) -> Result<(), GigeError> {
852        info!(channel, %ip, port, "configuring stream destination");
853        let addr = Self::stream_reg(channel, consts::STREAM_DESTINATION_ADDRESS);
854        self.write_mem(addr, &ip.octets()).await?;
855        let addr = Self::stream_reg(channel, consts::STREAM_DESTINATION_PORT);
856        self.write_mem(addr, &(port as u32).to_be_bytes()).await?;
857        Ok(())
858    }
859
860    /// Configure the packet size for the stream channel.
861    pub async fn set_stream_packet_size(
862        &mut self,
863        channel: u32,
864        packet_size: u32,
865    ) -> Result<(), GigeError> {
866        info!(channel, packet_size, "configuring stream packet size");
867        let addr = Self::stream_reg(channel, consts::STREAM_PACKET_SIZE);
868        self.write_mem(addr, &packet_size.to_be_bytes()).await
869    }
870
871    /// Configure the packet delay (`GevSCPD`).
872    pub async fn set_stream_packet_delay(
873        &mut self,
874        channel: u32,
875        packet_delay: u32,
876    ) -> Result<(), GigeError> {
877        debug!(channel, packet_delay, "configuring stream packet delay");
878        let addr = Self::stream_reg(channel, consts::STREAM_PACKET_DELAY);
879        self.write_mem(addr, &packet_delay.to_be_bytes()).await
880    }
881
882    /// Negotiate GVSP parameters with the device given the host interface.
883    pub async fn negotiate_stream(
884        &mut self,
885        channel: u32,
886        iface: &Iface,
887        port: u16,
888        target_mtu: Option<u32>,
889    ) -> Result<StreamParams, GigeError> {
890        let host_ip = iface
891            .ipv4()
892            .ok_or_else(|| GigeError::Protocol("interface lacks IPv4 address".into()))?;
893        let iface_mtu = nic::mtu(iface)?;
894        let mtu = target_mtu.map_or(iface_mtu, |limit| limit.min(iface_mtu));
895        let packet_size = nic::best_packet_size(mtu);
896        let packet_delay = if mtu <= 1500 {
897            // When jumbo frames are unavailable we space out packets by 2 µs to
898            // prevent excessive buffering pressure on receivers. GVSP expresses
899            // `GevSCPD` in units of 80 ns.
900            const DELAY_NS: u32 = 2_000; // 2 µs.
901            DELAY_NS / 80
902        } else {
903            0
904        };
905
906        self.set_stream_destination(channel, host_ip, port).await?;
907        self.set_stream_packet_size(channel, packet_size).await?;
908        self.set_stream_packet_delay(channel, packet_delay).await?;
909
910        Ok(StreamParams {
911            packet_size,
912            packet_delay,
913            mtu,
914            host: host_ip,
915            port,
916        })
917    }
918
919    /// Enable or disable delivery of the provided event identifier.
920    pub async fn enable_event_raw(&mut self, id: u16, on: bool) -> Result<(), GigeError> {
921        let index = (id / 32) as u64;
922        let bit = 1u32 << (id % 32);
923        let addr = consts::EVENT_NOTIFICATION_BASE + index * consts::EVENT_NOTIFICATION_STRIDE;
924        let current = self.read_mem(addr, 4).await?;
925        if current.len() != 4 {
926            return Err(GigeError::Protocol(
927                "event notification register length mismatch".into(),
928            ));
929        }
930        let mut bytes = [0u8; 4];
931        bytes.copy_from_slice(&current);
932        let mut value = u32::from_be_bytes(bytes);
933        if on {
934            value |= bit;
935        } else {
936            value &= !bit;
937        }
938        let new_bytes = value.to_be_bytes();
939        self.write_mem(addr, &new_bytes).await?;
940        debug!(event_id = id, enabled = on, "updated event mask");
941        Ok(())
942    }
943
944    /// Read the persistent IP configuration from the device.
945    ///
946    /// Returns `(ip, subnet, gateway)`.
947    pub async fn read_persistent_ip(
948        &mut self,
949    ) -> Result<(Ipv4Addr, Ipv4Addr, Ipv4Addr), GigeError> {
950        let ip = Ipv4Addr::from(
951            self.read_register(consts::PERSISTENT_IP_ADDRESS as u32)
952                .await?,
953        );
954        let subnet = Ipv4Addr::from(
955            self.read_register(consts::PERSISTENT_SUBNET_MASK as u32)
956                .await?,
957        );
958        let gateway = Ipv4Addr::from(
959            self.read_register(consts::PERSISTENT_DEFAULT_GATEWAY as u32)
960                .await?,
961        );
962        Ok((ip, subnet, gateway))
963    }
964
965    /// Write the persistent IP configuration to the device.
966    pub async fn write_persistent_ip(
967        &mut self,
968        ip: Ipv4Addr,
969        subnet: Ipv4Addr,
970        gateway: Ipv4Addr,
971    ) -> Result<(), GigeError> {
972        self.write_register(consts::PERSISTENT_IP_ADDRESS as u32, u32::from(ip))
973            .await?;
974        self.write_register(consts::PERSISTENT_SUBNET_MASK as u32, u32::from(subnet))
975            .await?;
976        self.write_register(
977            consts::PERSISTENT_DEFAULT_GATEWAY as u32,
978            u32::from(gateway),
979        )
980        .await?;
981        info!(%ip, %subnet, %gateway, "wrote persistent IP configuration");
982        Ok(())
983    }
984
985    /// Enable persistent IP mode in the device configuration flags.
986    ///
987    /// Sets bit 1 (persistent IP) in the `CurrentIPConfiguration` register.
988    pub async fn enable_persistent_ip(&mut self) -> Result<(), GigeError> {
989        let current = self.read_register(consts::CURRENT_IP_CONFIG as u32).await?;
990        let updated = current | 0x02; // bit 1 = persistent IP
991        self.write_register(consts::CURRENT_IP_CONFIG as u32, updated)
992            .await?;
993        info!(config = format!("0x{updated:08x}"), "enabled persistent IP");
994        Ok(())
995    }
996
997    /// Request resend of a packet range for the provided block identifier.
998    pub async fn request_resend(
999        &mut self,
1000        block_id: u16,
1001        first_packet: u16,
1002        last_packet: u16,
1003    ) -> Result<(), GigeError> {
1004        let mut payload = BytesMut::with_capacity(8);
1005        payload.put_u16(block_id);
1006        payload.put_u16(0); // Reserved as per spec.
1007        payload.put_u16(first_packet);
1008        payload.put_u16(last_packet);
1009
1010        let request_id = self.next_request_id();
1011        let header = GvcpRequestHeader {
1012            flags: CommandFlags::ACK_REQUIRED,
1013            command: consts::PACKET_RESEND_COMMAND,
1014            length: payload.len() as u16,
1015            request_id,
1016        };
1017        let packet = header.encode(&payload);
1018        trace!(
1019            block_id,
1020            first_packet, last_packet, request_id, "sending packet resend request"
1021        );
1022        self.socket.send(&packet).await?;
1023        let mut buf = [0u8; viva_gencp::HEADER_SIZE];
1024        match time::timeout(consts::CONTROL_TIMEOUT, self.socket.recv(&mut buf)).await {
1025            Ok(Ok(len)) => {
1026                if len != viva_gencp::HEADER_SIZE {
1027                    return Err(GigeError::Protocol("resend ack length mismatch".into()));
1028                }
1029                let mut cursor = &buf[..];
1030                let status = StatusCode::from_raw(cursor.get_u16());
1031                let command = cursor.get_u16();
1032                let length = cursor.get_u16();
1033                let ack_request_id = cursor.get_u16();
1034                if command != consts::PACKET_RESEND_ACK {
1035                    return Err(GigeError::Protocol("unexpected resend ack opcode".into()));
1036                }
1037                if length != 0 {
1038                    return Err(GigeError::Protocol("resend ack carried payload".into()));
1039                }
1040                if ack_request_id != request_id {
1041                    return Err(GigeError::Protocol("resend ack request id mismatch".into()));
1042                }
1043                if status != StatusCode::Success {
1044                    return Err(GigeError::Status(status));
1045                }
1046                Ok(())
1047            }
1048            Ok(Err(err)) => Err(err.into()),
1049            Err(_) => Err(GigeError::Timeout),
1050        }
1051    }
1052}
1053
1054#[cfg(test)]
1055mod tests {
1056    use super::*;
1057
1058    #[test]
1059    fn request_header_roundtrip() {
1060        let header = GvcpRequestHeader {
1061            flags: CommandFlags::ACK_REQUIRED,
1062            command: 0x1234,
1063            length: 4,
1064            request_id: 0xBEEF,
1065        };
1066        let payload = [1u8, 2, 3, 4];
1067        let encoded = header.encode(&payload);
1068        assert_eq!(encoded.len(), viva_gencp::HEADER_SIZE + payload.len());
1069        // GVCP wire format: byte 0 = 0x42 key, byte 1 = flags byte.
1070        assert_eq!(encoded[0], GVCP_CMD_KEY);
1071        assert_eq!(encoded[1], 0x01); // ACK_REQUIRED
1072        assert_eq!(&encoded[2..4], &header.command.to_be_bytes());
1073        assert_eq!(&encoded[4..6], &header.length.to_be_bytes());
1074        assert_eq!(&encoded[6..8], &header.request_id.to_be_bytes());
1075        assert_eq!(&encoded[8..], &payload);
1076    }
1077
1078    #[test]
1079    fn forceip_payload_encoding() {
1080        let mac = [0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE];
1081        let ip = Ipv4Addr::new(192, 168, 1, 100);
1082        let subnet = Ipv4Addr::new(255, 255, 255, 0);
1083        let gateway = Ipv4Addr::new(192, 168, 1, 1);
1084        let payload = encode_forceip_payload(mac, ip, subnet, gateway);
1085        assert_eq!(payload.len(), 56);
1086        // MAC at offset 2..8
1087        assert_eq!(&payload[2..8], &mac);
1088        // IP at offset 20..24
1089        assert_eq!(&payload[20..24], &ip.octets());
1090        // Subnet at offset 36..40
1091        assert_eq!(&payload[36..40], &subnet.octets());
1092        // Gateway at offset 52..56
1093        assert_eq!(&payload[52..56], &gateway.octets());
1094        // Reserved bytes should be zero
1095        assert_eq!(&payload[0..2], &[0, 0]);
1096        assert_eq!(&payload[8..20], &[0u8; 12]);
1097        assert_eq!(&payload[24..36], &[0u8; 12]);
1098        assert_eq!(&payload[40..52], &[0u8; 12]);
1099    }
1100
1101    #[test]
1102    fn ack_header_conversion() {
1103        let ack = AckHeader {
1104            status: StatusCode::DeviceBusy,
1105            opcode: OpCode::ReadMem,
1106            length: 12,
1107            request_id: 0x44,
1108        };
1109        let converted = GvcpAckHeader::from(ack);
1110        assert_eq!(converted.status, StatusCode::DeviceBusy);
1111        assert_eq!(converted.command, OpCode::ReadMem.ack_code());
1112        assert_eq!(converted.length, 12);
1113        assert_eq!(converted.request_id, 0x44);
1114    }
1115}