tl_gige/
action.rs

1//! GVCP action command helpers.
2
3use std::collections::HashSet;
4use std::io;
5use std::io::ErrorKind;
6use std::net::{IpAddr, Ipv4Addr, SocketAddr};
7use std::time::{Duration, Instant};
8
9use bytes::{BufMut, BytesMut};
10use tokio::net::UdpSocket;
11use tokio::time;
12use tracing::{debug, info, trace, warn};
13
14use crate::gvcp::{GvcpAckHeader, GvcpRequestHeader, GVCP_PORT};
15
16/// Constants describing the layout of action command packets.
17mod consts {
18    /// GVCP opcode for an action command request.
19    pub const ACTION_COMMAND: u16 = 0x0080;
20    /// GVCP opcode for an action acknowledgement.
21    pub const ACTION_ACK: u16 = 0x0081;
22    /// Size of the action command payload in bytes.
23    pub const ACTION_PAYLOAD: usize = 24;
24}
25
26/// Parameters used to construct an action command.
27#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct ActionParams {
29    /// Vendor-specific device key used to authorise the action.
30    pub device_key: u32,
31    /// Group key identifying which devices should react to the action.
32    pub group_key: u32,
33    /// Group mask applied to the device key by receivers.
34    pub group_mask: u32,
35    /// Optional scheduled time expressed in device clock ticks.
36    pub scheduled_time: Option<u64>,
37    /// Stream channel identifier associated with the action.
38    pub channel: u16,
39}
40
41/// Summary of the broadcast performed by [`send_action`].
42#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
43pub struct AckSummary {
44    /// Number of GVCP datagrams transmitted.
45    pub sent: usize,
46    /// Number of distinct acknowledgement sources observed.
47    pub acks: usize,
48}
49
50fn encode_payload(params: &ActionParams) -> BytesMut {
51    let mut buf = BytesMut::with_capacity(consts::ACTION_PAYLOAD);
52    buf.put_u32(params.device_key);
53    buf.put_u32(params.group_key);
54    buf.put_u32(params.group_mask);
55    let ticks = params.scheduled_time.unwrap_or(0);
56    buf.put_u32((ticks >> 32) as u32);
57    buf.put_u32(ticks as u32);
58    buf.put_u16(params.channel);
59    buf.put_u16(0); // reserved
60    buf
61}
62
63fn parse_ack(buf: &[u8]) -> io::Result<GvcpAckHeader> {
64    if buf.len() < 8 {
65        return Err(io::Error::new(
66            ErrorKind::InvalidData,
67            "acknowledgement shorter than GVCP header",
68        ));
69    }
70    let status = u16::from_be_bytes([buf[0], buf[1]]);
71    let opcode = u16::from_be_bytes([buf[2], buf[3]]);
72    let length = u16::from_be_bytes([buf[4], buf[5]]);
73    let request_id = u16::from_be_bytes([buf[6], buf[7]]);
74    Ok(GvcpAckHeader {
75        status: genicp::StatusCode::from_raw(status),
76        command: opcode,
77        length,
78        request_id,
79    })
80}
81
82fn is_broadcast(addr: &SocketAddr) -> bool {
83    matches!(addr.ip(), IpAddr::V4(ip) if ip == Ipv4Addr::BROADCAST)
84}
85
86/// Send a GVCP action command and collect acknowledgements.
87pub async fn send_action(
88    broadcast: SocketAddr,
89    params: &ActionParams,
90    timeout_ms: u64,
91) -> io::Result<AckSummary> {
92    let destination = SocketAddr::new(broadcast.ip(), GVCP_PORT);
93    let local_ip = match destination.ip() {
94        IpAddr::V4(_) => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
95        IpAddr::V6(_) => {
96            return Err(io::Error::new(
97                ErrorKind::InvalidInput,
98                "IPv6 destinations are not supported for actions",
99            ));
100        }
101    };
102    let socket = UdpSocket::bind(SocketAddr::new(local_ip, 0)).await?;
103    if is_broadcast(&destination) {
104        socket.set_broadcast(true)?;
105    }
106
107    let mut summary = AckSummary::default();
108    let payload = encode_payload(params);
109    let request_id = fastrand::u16(0x8000..=0xFFFE);
110    let mut flags = genicp::CommandFlags::ACK_REQUIRED;
111    if is_broadcast(&destination) {
112        flags |= genicp::CommandFlags::BROADCAST;
113    }
114    let header = GvcpRequestHeader {
115        flags,
116        command: consts::ACTION_COMMAND,
117        length: payload.len() as u16,
118        request_id,
119    };
120    let packet = header.encode(&payload);
121    trace!(bytes = packet.len(), %destination, request_id, "sending action command");
122    socket.send_to(&packet, destination).await?;
123    summary.sent = 1;
124
125    let timeout = Duration::from_millis(timeout_ms);
126    if timeout.is_zero() {
127        info!(acks = 0, "action command sent (no wait)");
128        return Ok(summary);
129    }
130
131    let start = Instant::now();
132    let mut buf = vec![0u8; 512];
133    let mut seen = HashSet::new();
134    while let Some(remaining) = timeout.checked_sub(start.elapsed()) {
135        if remaining.is_zero() {
136            break;
137        }
138        match time::timeout(remaining, socket.recv_from(&mut buf)).await {
139            Ok(Ok((len, src))) => {
140                trace!(bytes = len, %src, "received acknowledgement");
141                let header = parse_ack(&buf[..len])?;
142                if header.command != consts::ACTION_ACK {
143                    debug!(
144                        opcode = header.command,
145                        "ignoring unrelated acknowledgement"
146                    );
147                    continue;
148                }
149                if header.request_id != request_id {
150                    debug!(
151                        expected = request_id,
152                        got = header.request_id,
153                        "acknowledgement id mismatch"
154                    );
155                    continue;
156                }
157                if header.status != genicp::StatusCode::Success {
158                    warn!(status = ?header.status, %src, "device reported action failure");
159                    continue;
160                }
161                if seen.insert(src.ip()) {
162                    summary.acks += 1;
163                }
164            }
165            Ok(Err(err)) => {
166                warn!(?err, "error receiving acknowledgement");
167                break;
168            }
169            Err(_) => break,
170        }
171    }
172
173    info!(acks = summary.acks, "action command completed");
174    Ok(summary)
175}
176
177#[cfg(test)]
178mod tests {
179    use super::*;
180
181    #[test]
182    fn payload_layout() {
183        let params = ActionParams {
184            device_key: 0x1122_3344,
185            group_key: 0x5566_7788,
186            group_mask: 0xFFFF_0000,
187            scheduled_time: Some(0x0102_0304_0506_0708),
188            channel: 0x090A,
189        };
190        let payload = encode_payload(&params);
191        assert_eq!(payload.len(), consts::ACTION_PAYLOAD);
192        assert_eq!(&payload[..4], &0x1122_3344u32.to_be_bytes());
193        assert_eq!(&payload[4..8], &0x5566_7788u32.to_be_bytes());
194        assert_eq!(&payload[8..12], &0xFFFF_0000u32.to_be_bytes());
195        assert_eq!(&payload[12..16], &0x0102_0304u32.to_be_bytes());
196        assert_eq!(&payload[16..20], &0x0506_0708u32.to_be_bytes());
197        assert_eq!(&payload[20..22], &0x090A_u16.to_be_bytes());
198    }
199
200    #[test]
201    fn ack_parser() {
202        let mut buf = BytesMut::with_capacity(8);
203        buf.put_u16(genicp::StatusCode::Success.to_raw());
204        buf.put_u16(consts::ACTION_ACK);
205        buf.put_u16(0);
206        buf.put_u16(0xBEEF);
207        let ack = parse_ack(&buf).expect("ack");
208        assert_eq!(ack.command, consts::ACTION_ACK);
209        assert_eq!(ack.request_id, 0xBEEF);
210    }
211}