1use std::collections::HashMap;
4use std::io::Cursor;
5use std::net::{IpAddr, Ipv4Addr, SocketAddr};
6use std::time::Duration;
7
8use bytes::{Buf, BufMut, Bytes, BytesMut};
9use fastrand::Rng;
10use if_addrs::{IfAddr, get_if_addrs};
11use thiserror::Error;
12use tokio::net::UdpSocket;
13use tokio::task::JoinSet;
14use tokio::time;
15use tracing::{debug, info, trace, warn};
16use viva_gencp::{AckHeader, CommandFlags, GenCpAck, OpCode, StatusCode, decode_ack};
17
18use crate::nic::{self, Iface};
19
20pub mod consts {
22 use std::time::Duration;
23
24 pub const PORT: u16 = 3956;
26 pub const DISCOVERY_COMMAND: u16 = 0x0002;
28 pub const DISCOVERY_ACK: u16 = 0x0003;
30 pub const FORCEIP_COMMAND: u16 = 0x0004;
32 pub const FORCEIP_ACK: u16 = 0x0005;
34 pub const PACKET_RESEND_COMMAND: u16 = 0x0040;
36 pub const PACKET_RESEND_ACK: u16 = 0x0041;
38
39 pub const CURRENT_IP_CONFIG: u64 = 0x0014;
43
44 pub const PERSISTENT_IP_ADDRESS: u64 = 0x064C;
46 pub const PERSISTENT_SUBNET_MASK: u64 = 0x065C;
48 pub const PERSISTENT_DEFAULT_GATEWAY: u64 = 0x066C;
50
51 pub const CONTROL_CHANNEL_PRIVILEGE: u64 = 0x0a00;
56 pub const CCP_CONTROL: u32 = 1 << 1;
58 pub const CCP_EXCLUSIVE: u32 = 1 << 0;
60
61 pub const MESSAGE_DESTINATION_ADDRESS: u64 = 0x0900_0200;
63 pub const MESSAGE_DESTINATION_PORT: u64 = 0x0900_0204;
65 pub const EVENT_NOTIFICATION_BASE: u64 = 0x0900_0300;
67 pub const EVENT_NOTIFICATION_STRIDE: u64 = 4;
69
70 pub const GENCP_MAX_BLOCK: usize = 512;
72 pub const GENCP_WRITE_OVERHEAD: usize = 8;
74
75 pub const CONTROL_TIMEOUT: Duration = Duration::from_millis(500);
77 pub const MAX_RETRIES: usize = 4;
79 pub const RETRY_BASE_DELAY: Duration = Duration::from_millis(20);
81 pub const RETRY_JITTER: Duration = Duration::from_millis(10);
83
84 pub const DISCOVERY_BUFFER: usize = 2048;
86
87 pub const STREAM_CHANNEL_BASE: u64 = 0x0d00;
94 pub const STREAM_CHANNEL_STRIDE: u64 = 0x40;
96 pub const STREAM_DESTINATION_PORT: u64 = 0x00;
98 pub const STREAM_PACKET_SIZE: u64 = 0x04;
100 pub const STREAM_PACKET_DELAY: u64 = 0x08;
102 pub const STREAM_DESTINATION_ADDRESS: u64 = 0x18;
104}
105
106pub use consts::PORT as GVCP_PORT;
108
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111pub struct GvcpRequestHeader {
112 pub flags: CommandFlags,
114 pub command: u16,
116 pub length: u16,
118 pub request_id: u16,
120}
121
122const GVCP_CMD_KEY: u8 = 0x42;
124
125impl GvcpRequestHeader {
126 pub fn encode(self, payload: &[u8]) -> Bytes {
131 let mut buf = BytesMut::with_capacity(viva_gencp::HEADER_SIZE + payload.len());
132 buf.put_u8(GVCP_CMD_KEY);
134 buf.put_u8(self.gvcp_flags_byte());
135 buf.put_u16(self.command);
136 buf.put_u16(self.length);
137 buf.put_u16(self.request_id);
138 buf.extend_from_slice(payload);
139 buf.freeze()
140 }
141
142 fn gvcp_flags_byte(&self) -> u8 {
144 let mut byte = 0u8;
145 if self.flags.contains(CommandFlags::ACK_REQUIRED) {
146 byte |= 0x01;
147 }
148 if self.flags.contains(CommandFlags::BROADCAST) {
149 byte |= 0x10;
150 }
151 byte
152 }
153}
154
155#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub struct GvcpAckHeader {
158 pub status: StatusCode,
160 pub command: u16,
162 pub length: u16,
164 pub request_id: u16,
166}
167
168impl From<AckHeader> for GvcpAckHeader {
169 fn from(value: AckHeader) -> Self {
170 Self {
171 status: value.status,
172 command: value.opcode.ack_code(),
173 length: value.length,
174 request_id: value.request_id,
175 }
176 }
177}
178
179#[derive(Debug, Error)]
181pub enum GigeError {
182 #[error("io: {0}")]
183 Io(#[from] std::io::Error),
184 #[error("protocol: {0}")]
185 Protocol(String),
186 #[error("timeout waiting for acknowledgement")]
187 Timeout,
188 #[error("GenCP: {0}")]
189 GenCp(#[from] viva_gencp::GenCpError),
190 #[error("device reported status {0:?}")]
191 Status(StatusCode),
192}
193
194#[derive(Debug, Clone, PartialEq, Eq)]
196pub struct DeviceInfo {
197 pub ip: Ipv4Addr,
198 pub mac: [u8; 6],
199 pub model: Option<String>,
200 pub manufacturer: Option<String>,
201}
202
203impl DeviceInfo {
204 fn mac_string(&self) -> String {
205 self.mac
206 .iter()
207 .map(|byte| format!("{byte:02X}"))
208 .collect::<Vec<_>>()
209 .join(":")
210 }
211}
212
213pub async fn discover(timeout: Duration) -> Result<Vec<DeviceInfo>, GigeError> {
215 discover_impl(timeout, None, false).await
216}
217
218pub async fn discover_on_interface(
225 timeout: Duration,
226 interface: &str,
227) -> Result<Vec<DeviceInfo>, GigeError> {
228 discover_impl(timeout, Some(interface), true).await
229}
230
231pub async fn discover_all(timeout: Duration) -> Result<Vec<DeviceInfo>, GigeError> {
236 discover_impl(timeout, None, true).await
237}
238
239pub async fn force_ip(
258 mac: [u8; 6],
259 ip: Ipv4Addr,
260 subnet: Ipv4Addr,
261 gateway: Ipv4Addr,
262 iface: Option<&Iface>,
263) -> Result<(), GigeError> {
264 let payload = encode_forceip_payload(mac, ip, subnet, gateway);
266
267 let local_ip = match iface {
268 Some(iface) => iface
269 .ipv4()
270 .ok_or_else(|| GigeError::Protocol("interface lacks IPv4 address".into()))?,
271 None => Ipv4Addr::UNSPECIFIED,
272 };
273
274 let socket = UdpSocket::bind(SocketAddr::new(IpAddr::V4(local_ip), 0)).await?;
275 socket.set_broadcast(true)?;
276 let dest = SocketAddr::new(IpAddr::V4(Ipv4Addr::BROADCAST), consts::PORT);
277
278 let header = GvcpRequestHeader {
279 flags: CommandFlags::ACK_REQUIRED | CommandFlags::BROADCAST,
280 command: consts::FORCEIP_COMMAND,
281 length: payload.len() as u16,
282 request_id: 1,
283 };
284 let packet = header.encode(&payload);
285 info!(mac = ?mac, %ip, %subnet, %gateway, "sending FORCEIP command");
286 socket.send_to(&packet, dest).await?;
287
288 let mut buf = vec![0u8; consts::DISCOVERY_BUFFER];
290 match time::timeout(consts::CONTROL_TIMEOUT, socket.recv_from(&mut buf)).await {
291 Ok(Ok((len, _src))) => {
292 if len < viva_gencp::HEADER_SIZE {
293 return Err(GigeError::Protocol("FORCEIP ack too short".into()));
294 }
295 let mut cursor = &buf[..];
296 let status = cursor.get_u16();
297 let command = cursor.get_u16();
298 if command != consts::FORCEIP_ACK {
299 return Err(GigeError::Protocol(format!(
300 "unexpected FORCEIP ack opcode {command:#06x}"
301 )));
302 }
303 if status != 0 {
304 return Err(GigeError::Protocol(format!(
305 "FORCEIP returned status {status:#06x}"
306 )));
307 }
308 info!(%ip, "FORCEIP accepted");
309 Ok(())
310 }
311 Ok(Err(e)) => Err(e.into()),
312 Err(_) => Err(GigeError::Timeout),
313 }
314}
315
316fn encode_forceip_payload(
318 mac: [u8; 6],
319 ip: Ipv4Addr,
320 subnet: Ipv4Addr,
321 gateway: Ipv4Addr,
322) -> Vec<u8> {
323 let mut buf = vec![0u8; 56];
324 buf[2..8].copy_from_slice(&mac);
327 buf[20..24].copy_from_slice(&ip.octets());
330 buf[36..40].copy_from_slice(&subnet.octets());
333 buf[52..56].copy_from_slice(&gateway.octets());
336 buf
337}
338
339async fn discover_impl(
340 timeout: Duration,
341 iface_filter: Option<&str>,
342 include_loopback: bool,
343) -> Result<Vec<DeviceInfo>, GigeError> {
344 let mut interfaces = Vec::new();
345 for iface in get_if_addrs()? {
346 let IfAddr::V4(v4) = iface.addr else {
347 continue;
348 };
349 if !include_loopback && v4.ip.is_loopback() {
350 continue;
351 }
352 if let Some(filter) = iface_filter
353 && iface.name != filter
354 {
355 continue;
356 }
357 interfaces.push((iface.name, v4));
358 }
359
360 if interfaces.is_empty() {
361 return Ok(Vec::new());
362 }
363
364 let mut join_set = JoinSet::new();
365 for (idx, (name, v4)) in interfaces.into_iter().enumerate() {
366 let request_id = 0x0100u16.wrapping_add(idx as u16);
367 let interface_name = name.clone();
368 join_set.spawn(async move {
369 let local_addr = SocketAddr::new(IpAddr::V4(v4.ip), 0);
370 let socket = UdpSocket::bind(local_addr).await?;
371 let destination = if v4.ip.is_loopback() {
374 SocketAddr::new(IpAddr::V4(v4.ip), consts::PORT)
375 } else {
376 socket.set_broadcast(true)?;
377 let broadcast = v4.broadcast.unwrap_or(Ipv4Addr::BROADCAST);
378 SocketAddr::new(IpAddr::V4(broadcast), consts::PORT)
379 };
380
381 let header = GvcpRequestHeader {
382 flags: CommandFlags::ACK_REQUIRED | CommandFlags::BROADCAST,
383 command: consts::DISCOVERY_COMMAND,
384 length: 0,
385 request_id,
386 };
387 let packet = header.encode(&[]);
388 info!(%interface_name, local = %v4.ip, dest = %destination, "sending GVCP discovery");
389 trace!(%interface_name, bytes = packet.len(), "GVCP discovery payload size");
390 socket.send_to(&packet, destination).await?;
391
392 let mut responses = Vec::new();
393 let mut buffer = vec![0u8; consts::DISCOVERY_BUFFER];
394 let timer = time::sleep(timeout);
395 tokio::pin!(timer);
396 loop {
397 tokio::select! {
398 _ = &mut timer => break,
399 recv = socket.recv_from(&mut buffer) => {
400 let (len, src) = recv?;
401 info!(%interface_name, %src, "received GVCP response");
402 trace!(%interface_name, bytes = len, "GVCP response length");
403 if let Some(info) = parse_discovery_ack(&buffer[..len], request_id)? {
404 trace!(ip = %info.ip, mac = %info.mac_string(), "parsed discovery ack");
405 responses.push(info);
406 }
407 }
408 }
409 }
410 Ok::<_, GigeError>(responses)
411 });
412 }
413
414 let mut seen = HashMap::new();
415 while let Some(res) = join_set.join_next().await {
416 let devices =
417 res.map_err(|e| GigeError::Protocol(format!("discovery task failed: {e}")))??;
418 for dev in devices {
419 seen.entry((dev.ip, dev.mac)).or_insert(dev);
420 }
421 }
422
423 let mut devices: Vec<_> = seen.into_values().collect();
424 devices.sort_by_key(|d| d.ip);
425 Ok(devices)
426}
427
428fn parse_discovery_ack(buf: &[u8], expected_request: u16) -> Result<Option<DeviceInfo>, GigeError> {
429 if buf.len() < viva_gencp::HEADER_SIZE {
430 return Err(GigeError::Protocol("GVCP ack too short".into()));
431 }
432 let mut header = buf;
433 let status = header.get_u16();
434 let command = header.get_u16();
435 let length = header.get_u16() as usize;
436 let request_id = header.get_u16();
437 if request_id != expected_request {
438 return Ok(None);
439 }
440 if command != consts::DISCOVERY_ACK {
441 return Err(GigeError::Protocol(format!(
442 "unexpected discovery opcode {command:#06x}"
443 )));
444 }
445 if status != 0 {
446 return Err(GigeError::Protocol(format!(
447 "discovery returned status {status:#06x}"
448 )));
449 }
450 if buf.len() < viva_gencp::HEADER_SIZE + length {
451 return Err(GigeError::Protocol("discovery payload truncated".into()));
452 }
453 let payload = &buf[viva_gencp::HEADER_SIZE..viva_gencp::HEADER_SIZE + length];
454 let info = parse_discovery_payload(payload)?;
455 Ok(Some(info))
456}
457
458fn parse_discovery_payload(payload: &[u8]) -> Result<DeviceInfo, GigeError> {
485 if payload.len() < 40 {
487 return Err(GigeError::Protocol("discovery payload too small".into()));
488 }
489 let mut cursor = Cursor::new(payload);
490 let _spec_major = cursor.get_u16(); let _spec_minor = cursor.get_u16(); let _device_mode = cursor.get_u32(); let _reserved = cursor.get_u32(); let mut mac = [0u8; 6];
497 cursor.copy_to_slice(&mut mac); let _supported_ip_config = cursor.get_u32(); let _current_ip_config = cursor.get_u32(); cursor.advance(10); let ip = Ipv4Addr::from(cursor.get_u32()); cursor.advance(12); let _subnet = cursor.get_u32(); cursor.advance(12); let _gateway = cursor.get_u32(); let manufacturer = read_fixed_string(&mut cursor, 32)?; let model = read_fixed_string(&mut cursor, 32)?; Ok(DeviceInfo {
520 ip,
521 mac,
522 manufacturer,
523 model,
524 })
525}
526
527fn read_fixed_string(cursor: &mut Cursor<&[u8]>, len: usize) -> Result<Option<String>, GigeError> {
528 if cursor.remaining() < len {
529 return Err(GigeError::Protocol("discovery string truncated".into()));
530 }
531 let mut buf = vec![0u8; len];
532 cursor.copy_to_slice(&mut buf);
533 Ok(parse_string(&buf))
534}
535
536fn parse_string(bytes: &[u8]) -> Option<String> {
537 let end = bytes.iter().position(|&b| b == 0).unwrap_or(bytes.len());
538 let slice = &bytes[..end];
539 let s = String::from_utf8_lossy(slice).trim().to_string();
540 if s.is_empty() { None } else { Some(s) }
541}
542
543pub struct GigeDevice {
545 socket: UdpSocket,
546 remote: SocketAddr,
547 request_id: u16,
548 rng: Rng,
549}
550
551#[derive(Debug, Clone, Copy, PartialEq, Eq)]
553pub struct StreamParams {
554 pub packet_size: u32,
556 pub packet_delay: u32,
558 pub mtu: u32,
560 pub host: Ipv4Addr,
562 pub port: u16,
564}
565
566impl GigeDevice {
567 pub async fn open(addr: SocketAddr) -> Result<Self, GigeError> {
573 let local_ip = match addr.ip() {
574 IpAddr::V4(_) => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
575 IpAddr::V6(_) => {
576 return Err(GigeError::Protocol("IPv6 GVCP is not supported".into()));
577 }
578 };
579 let socket = UdpSocket::bind(SocketAddr::new(local_ip, 0)).await?;
580 socket.connect(addr).await?;
581 Ok(Self {
582 socket,
583 remote: addr,
584 request_id: 1,
585 rng: Rng::new(),
586 })
587 }
588
589 pub async fn claim_control(&mut self) -> Result<(), GigeError> {
594 self.write_register(
595 consts::CONTROL_CHANNEL_PRIVILEGE as u32,
596 consts::CCP_CONTROL,
597 )
598 .await?;
599 debug!(addr = %self.remote, "claimed control channel privilege");
600 Ok(())
601 }
602
603 pub async fn release_control(&mut self) -> Result<(), GigeError> {
605 self.write_register(consts::CONTROL_CHANNEL_PRIVILEGE as u32, 0)
606 .await
607 }
608
609 pub fn remote_addr(&self) -> SocketAddr {
611 self.remote
612 }
613
614 fn next_request_id(&mut self) -> u16 {
615 let id = self.request_id;
616 self.request_id = self.request_id.wrapping_add(1);
617 if self.request_id == 0 {
618 self.request_id = 1;
619 }
620 id
621 }
622
623 async fn transact_with_retry(
624 &mut self,
625 opcode: OpCode,
626 payload: BytesMut,
627 ) -> Result<GenCpAck, GigeError> {
628 let mut attempt = 0usize;
629 let mut payload = payload;
630 loop {
631 attempt += 1;
632 let request_id = self.next_request_id();
633 let payload_bytes = payload.clone().freeze();
634 let header = GvcpRequestHeader {
635 flags: CommandFlags::ACK_REQUIRED,
636 command: opcode.command_code(),
637 length: payload_bytes.len() as u16,
638 request_id,
639 };
640 let encoded = header.encode(&payload_bytes);
641 trace!(request_id, opcode = ?opcode, bytes = encoded.len(), attempt, "sending GVCP command");
642 if let Err(err) = self.socket.send(&encoded).await {
643 if attempt >= consts::MAX_RETRIES {
644 return Err(err.into());
645 }
646 warn!(request_id, ?opcode, attempt, "send failed, retrying");
647 self.backoff(attempt).await;
648 payload = BytesMut::from(&payload_bytes[..]);
649 continue;
650 }
651
652 let mut buf = vec![
653 0u8;
654 viva_gencp::HEADER_SIZE
655 + consts::GENCP_MAX_BLOCK
656 + consts::GENCP_WRITE_OVERHEAD
657 ];
658 match time::timeout(consts::CONTROL_TIMEOUT, self.socket.recv(&mut buf)).await {
659 Ok(Ok(len)) => {
660 trace!(request_id, bytes = len, attempt, "received GenCP ack");
661 let ack = decode_ack(&buf[..len])?;
662 if ack.header.request_id != request_id {
663 debug!(
664 request_id,
665 got = ack.header.request_id,
666 attempt,
667 "acknowledgement id mismatch"
668 );
669 if attempt >= consts::MAX_RETRIES {
670 return Err(GigeError::Protocol("acknowledgement id mismatch".into()));
671 }
672 self.backoff(attempt).await;
673 payload = BytesMut::from(&payload_bytes[..]);
674 continue;
675 }
676 if ack.header.opcode != opcode {
677 return Err(GigeError::Protocol(
678 "unexpected opcode in acknowledgement".into(),
679 ));
680 }
681 match ack.header.status {
682 StatusCode::Success => return Ok(ack),
683 StatusCode::DeviceBusy if attempt < consts::MAX_RETRIES => {
684 warn!(request_id, attempt, "device busy, retrying");
685 self.backoff(attempt).await;
686 payload = BytesMut::from(&payload_bytes[..]);
687 continue;
688 }
689 other => return Err(GigeError::Status(other)),
690 }
691 }
692 Ok(Err(err)) => {
693 if attempt >= consts::MAX_RETRIES {
694 return Err(err.into());
695 }
696 warn!(request_id, ?opcode, attempt, "receive error, retrying");
697 self.backoff(attempt).await;
698 payload = BytesMut::from(&payload_bytes[..]);
699 }
700 Err(_) => {
701 if attempt >= consts::MAX_RETRIES {
702 return Err(GigeError::Timeout);
703 }
704 warn!(request_id, ?opcode, attempt, "command timeout, retrying");
705 self.backoff(attempt).await;
706 payload = BytesMut::from(&payload_bytes[..]);
707 }
708 }
709 }
710 }
711
712 async fn backoff(&mut self, attempt: usize) {
713 let multiplier = 1u32 << (attempt.saturating_sub(1)).min(3);
714 let base_ms = consts::RETRY_BASE_DELAY.as_millis() as u64;
715 let base = Duration::from_millis(base_ms.saturating_mul(multiplier as u64).max(base_ms));
716 let jitter_ms = self.rng.u64(..=consts::RETRY_JITTER.as_millis() as u64);
717 let jitter = Duration::from_millis(jitter_ms);
718 let delay = base + jitter;
719 debug!(attempt, delay = ?delay, "gvcp retry backoff");
720 time::sleep(delay).await;
721 }
722
723 pub async fn read_register(&mut self, addr: u32) -> Result<u32, GigeError> {
728 let mut payload = BytesMut::with_capacity(4);
729 payload.put_u32(addr);
730 let ack = self
731 .transact_with_retry(OpCode::ReadRegister, payload)
732 .await?;
733 if ack.payload.len() != 4 {
734 return Err(GigeError::Protocol(format!(
735 "expected 4-byte register ack but device returned {} bytes",
736 ack.payload.len()
737 )));
738 }
739 let mut cursor = &ack.payload[..];
740 Ok(cursor.get_u32())
741 }
742
743 pub async fn write_register(&mut self, addr: u32, value: u32) -> Result<(), GigeError> {
748 let mut payload = BytesMut::with_capacity(8);
749 payload.put_u32(addr);
750 payload.put_u32(value);
751 let ack = self
752 .transact_with_retry(OpCode::WriteRegister, payload)
753 .await?;
754 if ack.payload.len() != 4 {
755 return Err(GigeError::Protocol(format!(
756 "expected 4-byte register write ack but device returned {} bytes",
757 ack.payload.len()
758 )));
759 }
760 Ok(())
761 }
762
763 pub async fn read_mem(&mut self, addr: u64, len: usize) -> Result<Vec<u8>, GigeError> {
768 let mut remaining = len;
769 let mut offset = 0usize;
770 let mut data = Vec::with_capacity(len);
771 while remaining > 0 {
772 let chunk = remaining.min(consts::GENCP_MAX_BLOCK);
773 let mut payload = BytesMut::with_capacity(8);
774 payload.put_u32((addr + offset as u64) as u32);
775 payload.put_u16(0); payload.put_u16(chunk as u16);
777 let ack = self.transact_with_retry(OpCode::ReadMem, payload).await?;
778 let ack_data = if ack.payload.len() >= 4 + chunk {
780 &ack.payload[4..4 + chunk]
781 } else if ack.payload.len() == chunk {
782 &ack.payload[..chunk]
784 } else {
785 return Err(GigeError::Protocol(format!(
786 "expected {} bytes but device returned {}",
787 chunk,
788 ack.payload.len()
789 )));
790 };
791 data.extend_from_slice(ack_data);
792 remaining -= chunk;
793 offset += chunk;
794 }
795 Ok(data)
796 }
797
798 pub async fn write_mem(&mut self, addr: u64, data: &[u8]) -> Result<(), GigeError> {
803 const GVCP_WRITE_OVERHEAD: usize = 4;
805
806 let mut offset = 0usize;
807 while offset < data.len() {
808 let chunk = (data.len() - offset).min(consts::GENCP_MAX_BLOCK - GVCP_WRITE_OVERHEAD);
809 if chunk == 0 {
810 return Err(GigeError::Protocol("write chunk size is zero".into()));
811 }
812 let mut payload = BytesMut::with_capacity(GVCP_WRITE_OVERHEAD + chunk);
813 payload.put_u32((addr + offset as u64) as u32);
814 payload.extend_from_slice(&data[offset..offset + chunk]);
815 let ack = self.transact_with_retry(OpCode::WriteMem, payload).await?;
816 if ack.payload.len() > 4 {
818 return Err(GigeError::Protocol(
819 "write acknowledgement carried unexpected payload".into(),
820 ));
821 }
822 offset += chunk;
823 }
824 Ok(())
825 }
826
827 pub async fn set_message_destination(
829 &mut self,
830 ip: Ipv4Addr,
831 port: u16,
832 ) -> Result<(), GigeError> {
833 info!(%ip, port, "configuring message channel destination");
834 self.write_mem(consts::MESSAGE_DESTINATION_ADDRESS, &ip.octets())
835 .await?;
836 self.write_mem(consts::MESSAGE_DESTINATION_PORT, &port.to_be_bytes())
837 .await?;
838 Ok(())
839 }
840
841 fn stream_reg(channel: u32, offset: u64) -> u64 {
842 consts::STREAM_CHANNEL_BASE + channel as u64 * consts::STREAM_CHANNEL_STRIDE + offset
843 }
844
845 pub async fn set_stream_destination(
847 &mut self,
848 channel: u32,
849 ip: Ipv4Addr,
850 port: u16,
851 ) -> Result<(), GigeError> {
852 info!(channel, %ip, port, "configuring stream destination");
853 let addr = Self::stream_reg(channel, consts::STREAM_DESTINATION_ADDRESS);
854 self.write_mem(addr, &ip.octets()).await?;
855 let addr = Self::stream_reg(channel, consts::STREAM_DESTINATION_PORT);
856 self.write_mem(addr, &(port as u32).to_be_bytes()).await?;
857 Ok(())
858 }
859
860 pub async fn set_stream_packet_size(
862 &mut self,
863 channel: u32,
864 packet_size: u32,
865 ) -> Result<(), GigeError> {
866 info!(channel, packet_size, "configuring stream packet size");
867 let addr = Self::stream_reg(channel, consts::STREAM_PACKET_SIZE);
868 self.write_mem(addr, &packet_size.to_be_bytes()).await
869 }
870
871 pub async fn set_stream_packet_delay(
873 &mut self,
874 channel: u32,
875 packet_delay: u32,
876 ) -> Result<(), GigeError> {
877 debug!(channel, packet_delay, "configuring stream packet delay");
878 let addr = Self::stream_reg(channel, consts::STREAM_PACKET_DELAY);
879 self.write_mem(addr, &packet_delay.to_be_bytes()).await
880 }
881
882 pub async fn negotiate_stream(
884 &mut self,
885 channel: u32,
886 iface: &Iface,
887 port: u16,
888 target_mtu: Option<u32>,
889 ) -> Result<StreamParams, GigeError> {
890 let host_ip = iface
891 .ipv4()
892 .ok_or_else(|| GigeError::Protocol("interface lacks IPv4 address".into()))?;
893 let iface_mtu = nic::mtu(iface)?;
894 let mtu = target_mtu.map_or(iface_mtu, |limit| limit.min(iface_mtu));
895 let packet_size = nic::best_packet_size(mtu);
896 let packet_delay = if mtu <= 1500 {
897 const DELAY_NS: u32 = 2_000; DELAY_NS / 80
902 } else {
903 0
904 };
905
906 self.set_stream_destination(channel, host_ip, port).await?;
907 self.set_stream_packet_size(channel, packet_size).await?;
908 self.set_stream_packet_delay(channel, packet_delay).await?;
909
910 Ok(StreamParams {
911 packet_size,
912 packet_delay,
913 mtu,
914 host: host_ip,
915 port,
916 })
917 }
918
919 pub async fn enable_event_raw(&mut self, id: u16, on: bool) -> Result<(), GigeError> {
921 let index = (id / 32) as u64;
922 let bit = 1u32 << (id % 32);
923 let addr = consts::EVENT_NOTIFICATION_BASE + index * consts::EVENT_NOTIFICATION_STRIDE;
924 let current = self.read_mem(addr, 4).await?;
925 if current.len() != 4 {
926 return Err(GigeError::Protocol(
927 "event notification register length mismatch".into(),
928 ));
929 }
930 let mut bytes = [0u8; 4];
931 bytes.copy_from_slice(¤t);
932 let mut value = u32::from_be_bytes(bytes);
933 if on {
934 value |= bit;
935 } else {
936 value &= !bit;
937 }
938 let new_bytes = value.to_be_bytes();
939 self.write_mem(addr, &new_bytes).await?;
940 debug!(event_id = id, enabled = on, "updated event mask");
941 Ok(())
942 }
943
944 pub async fn read_persistent_ip(
948 &mut self,
949 ) -> Result<(Ipv4Addr, Ipv4Addr, Ipv4Addr), GigeError> {
950 let ip = Ipv4Addr::from(
951 self.read_register(consts::PERSISTENT_IP_ADDRESS as u32)
952 .await?,
953 );
954 let subnet = Ipv4Addr::from(
955 self.read_register(consts::PERSISTENT_SUBNET_MASK as u32)
956 .await?,
957 );
958 let gateway = Ipv4Addr::from(
959 self.read_register(consts::PERSISTENT_DEFAULT_GATEWAY as u32)
960 .await?,
961 );
962 Ok((ip, subnet, gateway))
963 }
964
965 pub async fn write_persistent_ip(
967 &mut self,
968 ip: Ipv4Addr,
969 subnet: Ipv4Addr,
970 gateway: Ipv4Addr,
971 ) -> Result<(), GigeError> {
972 self.write_register(consts::PERSISTENT_IP_ADDRESS as u32, u32::from(ip))
973 .await?;
974 self.write_register(consts::PERSISTENT_SUBNET_MASK as u32, u32::from(subnet))
975 .await?;
976 self.write_register(
977 consts::PERSISTENT_DEFAULT_GATEWAY as u32,
978 u32::from(gateway),
979 )
980 .await?;
981 info!(%ip, %subnet, %gateway, "wrote persistent IP configuration");
982 Ok(())
983 }
984
985 pub async fn enable_persistent_ip(&mut self) -> Result<(), GigeError> {
989 let current = self.read_register(consts::CURRENT_IP_CONFIG as u32).await?;
990 let updated = current | 0x02; self.write_register(consts::CURRENT_IP_CONFIG as u32, updated)
992 .await?;
993 info!(config = format!("0x{updated:08x}"), "enabled persistent IP");
994 Ok(())
995 }
996
997 pub async fn request_resend(
999 &mut self,
1000 block_id: u16,
1001 first_packet: u16,
1002 last_packet: u16,
1003 ) -> Result<(), GigeError> {
1004 let mut payload = BytesMut::with_capacity(8);
1005 payload.put_u16(block_id);
1006 payload.put_u16(0); payload.put_u16(first_packet);
1008 payload.put_u16(last_packet);
1009
1010 let request_id = self.next_request_id();
1011 let header = GvcpRequestHeader {
1012 flags: CommandFlags::ACK_REQUIRED,
1013 command: consts::PACKET_RESEND_COMMAND,
1014 length: payload.len() as u16,
1015 request_id,
1016 };
1017 let packet = header.encode(&payload);
1018 trace!(
1019 block_id,
1020 first_packet, last_packet, request_id, "sending packet resend request"
1021 );
1022 self.socket.send(&packet).await?;
1023 let mut buf = [0u8; viva_gencp::HEADER_SIZE];
1024 match time::timeout(consts::CONTROL_TIMEOUT, self.socket.recv(&mut buf)).await {
1025 Ok(Ok(len)) => {
1026 if len != viva_gencp::HEADER_SIZE {
1027 return Err(GigeError::Protocol("resend ack length mismatch".into()));
1028 }
1029 let mut cursor = &buf[..];
1030 let status = StatusCode::from_raw(cursor.get_u16());
1031 let command = cursor.get_u16();
1032 let length = cursor.get_u16();
1033 let ack_request_id = cursor.get_u16();
1034 if command != consts::PACKET_RESEND_ACK {
1035 return Err(GigeError::Protocol("unexpected resend ack opcode".into()));
1036 }
1037 if length != 0 {
1038 return Err(GigeError::Protocol("resend ack carried payload".into()));
1039 }
1040 if ack_request_id != request_id {
1041 return Err(GigeError::Protocol("resend ack request id mismatch".into()));
1042 }
1043 if status != StatusCode::Success {
1044 return Err(GigeError::Status(status));
1045 }
1046 Ok(())
1047 }
1048 Ok(Err(err)) => Err(err.into()),
1049 Err(_) => Err(GigeError::Timeout),
1050 }
1051 }
1052}
1053
1054#[cfg(test)]
1055mod tests {
1056 use super::*;
1057
1058 #[test]
1059 fn request_header_roundtrip() {
1060 let header = GvcpRequestHeader {
1061 flags: CommandFlags::ACK_REQUIRED,
1062 command: 0x1234,
1063 length: 4,
1064 request_id: 0xBEEF,
1065 };
1066 let payload = [1u8, 2, 3, 4];
1067 let encoded = header.encode(&payload);
1068 assert_eq!(encoded.len(), viva_gencp::HEADER_SIZE + payload.len());
1069 assert_eq!(encoded[0], GVCP_CMD_KEY);
1071 assert_eq!(encoded[1], 0x01); assert_eq!(&encoded[2..4], &header.command.to_be_bytes());
1073 assert_eq!(&encoded[4..6], &header.length.to_be_bytes());
1074 assert_eq!(&encoded[6..8], &header.request_id.to_be_bytes());
1075 assert_eq!(&encoded[8..], &payload);
1076 }
1077
1078 #[test]
1079 fn forceip_payload_encoding() {
1080 let mac = [0xDE, 0xAD, 0xBE, 0xEF, 0xCA, 0xFE];
1081 let ip = Ipv4Addr::new(192, 168, 1, 100);
1082 let subnet = Ipv4Addr::new(255, 255, 255, 0);
1083 let gateway = Ipv4Addr::new(192, 168, 1, 1);
1084 let payload = encode_forceip_payload(mac, ip, subnet, gateway);
1085 assert_eq!(payload.len(), 56);
1086 assert_eq!(&payload[2..8], &mac);
1088 assert_eq!(&payload[20..24], &ip.octets());
1090 assert_eq!(&payload[36..40], &subnet.octets());
1092 assert_eq!(&payload[52..56], &gateway.octets());
1094 assert_eq!(&payload[0..2], &[0, 0]);
1096 assert_eq!(&payload[8..20], &[0u8; 12]);
1097 assert_eq!(&payload[24..36], &[0u8; 12]);
1098 assert_eq!(&payload[40..52], &[0u8; 12]);
1099 }
1100
1101 #[test]
1102 fn ack_header_conversion() {
1103 let ack = AckHeader {
1104 status: StatusCode::DeviceBusy,
1105 opcode: OpCode::ReadMem,
1106 length: 12,
1107 request_id: 0x44,
1108 };
1109 let converted = GvcpAckHeader::from(ack);
1110 assert_eq!(converted.status, StatusCode::DeviceBusy);
1111 assert_eq!(converted.command, OpCode::ReadMem.ack_code());
1112 assert_eq!(converted.length, 12);
1113 assert_eq!(converted.request_id, 0x44);
1114 }
1115}