1use std::collections::HashSet;
4use std::io;
5use std::io::ErrorKind;
6use std::net::{IpAddr, Ipv4Addr, SocketAddr};
7use std::time::{Duration, Instant};
8
9use bytes::{BufMut, BytesMut};
10use tokio::net::UdpSocket;
11use tokio::time;
12use tracing::{debug, info, trace, warn};
13
14use crate::gvcp::{GvcpAckHeader, GvcpRequestHeader, GVCP_PORT};
15
16mod consts {
18 pub const ACTION_COMMAND: u16 = 0x0080;
20 pub const ACTION_ACK: u16 = 0x0081;
22 pub const ACTION_PAYLOAD: usize = 24;
24}
25
26#[derive(Debug, Clone, PartialEq, Eq)]
28pub struct ActionParams {
29 pub device_key: u32,
31 pub group_key: u32,
33 pub group_mask: u32,
35 pub scheduled_time: Option<u64>,
37 pub channel: u16,
39}
40
41#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
43pub struct AckSummary {
44 pub sent: usize,
46 pub acks: usize,
48}
49
50fn encode_payload(params: &ActionParams) -> BytesMut {
51 let mut buf = BytesMut::with_capacity(consts::ACTION_PAYLOAD);
52 buf.put_u32(params.device_key);
53 buf.put_u32(params.group_key);
54 buf.put_u32(params.group_mask);
55 let ticks = params.scheduled_time.unwrap_or(0);
56 buf.put_u32((ticks >> 32) as u32);
57 buf.put_u32(ticks as u32);
58 buf.put_u16(params.channel);
59 buf.put_u16(0); buf
61}
62
63fn parse_ack(buf: &[u8]) -> io::Result<GvcpAckHeader> {
64 if buf.len() < 8 {
65 return Err(io::Error::new(
66 ErrorKind::InvalidData,
67 "acknowledgement shorter than GVCP header",
68 ));
69 }
70 let status = u16::from_be_bytes([buf[0], buf[1]]);
71 let opcode = u16::from_be_bytes([buf[2], buf[3]]);
72 let length = u16::from_be_bytes([buf[4], buf[5]]);
73 let request_id = u16::from_be_bytes([buf[6], buf[7]]);
74 Ok(GvcpAckHeader {
75 status: genicp::StatusCode::from_raw(status),
76 command: opcode,
77 length,
78 request_id,
79 })
80}
81
82fn is_broadcast(addr: &SocketAddr) -> bool {
83 matches!(addr.ip(), IpAddr::V4(ip) if ip == Ipv4Addr::BROADCAST)
84}
85
86pub async fn send_action(
88 broadcast: SocketAddr,
89 params: &ActionParams,
90 timeout_ms: u64,
91) -> io::Result<AckSummary> {
92 let destination = SocketAddr::new(broadcast.ip(), GVCP_PORT);
93 let local_ip = match destination.ip() {
94 IpAddr::V4(_) => IpAddr::V4(Ipv4Addr::UNSPECIFIED),
95 IpAddr::V6(_) => {
96 return Err(io::Error::new(
97 ErrorKind::InvalidInput,
98 "IPv6 destinations are not supported for actions",
99 ));
100 }
101 };
102 let socket = UdpSocket::bind(SocketAddr::new(local_ip, 0)).await?;
103 if is_broadcast(&destination) {
104 socket.set_broadcast(true)?;
105 }
106
107 let mut summary = AckSummary::default();
108 let payload = encode_payload(params);
109 let request_id = fastrand::u16(0x8000..=0xFFFE);
110 let mut flags = genicp::CommandFlags::ACK_REQUIRED;
111 if is_broadcast(&destination) {
112 flags |= genicp::CommandFlags::BROADCAST;
113 }
114 let header = GvcpRequestHeader {
115 flags,
116 command: consts::ACTION_COMMAND,
117 length: payload.len() as u16,
118 request_id,
119 };
120 let packet = header.encode(&payload);
121 trace!(bytes = packet.len(), %destination, request_id, "sending action command");
122 socket.send_to(&packet, destination).await?;
123 summary.sent = 1;
124
125 let timeout = Duration::from_millis(timeout_ms);
126 if timeout.is_zero() {
127 info!(acks = 0, "action command sent (no wait)");
128 return Ok(summary);
129 }
130
131 let start = Instant::now();
132 let mut buf = vec![0u8; 512];
133 let mut seen = HashSet::new();
134 while let Some(remaining) = timeout.checked_sub(start.elapsed()) {
135 if remaining.is_zero() {
136 break;
137 }
138 match time::timeout(remaining, socket.recv_from(&mut buf)).await {
139 Ok(Ok((len, src))) => {
140 trace!(bytes = len, %src, "received acknowledgement");
141 let header = parse_ack(&buf[..len])?;
142 if header.command != consts::ACTION_ACK {
143 debug!(
144 opcode = header.command,
145 "ignoring unrelated acknowledgement"
146 );
147 continue;
148 }
149 if header.request_id != request_id {
150 debug!(
151 expected = request_id,
152 got = header.request_id,
153 "acknowledgement id mismatch"
154 );
155 continue;
156 }
157 if header.status != genicp::StatusCode::Success {
158 warn!(status = ?header.status, %src, "device reported action failure");
159 continue;
160 }
161 if seen.insert(src.ip()) {
162 summary.acks += 1;
163 }
164 }
165 Ok(Err(err)) => {
166 warn!(?err, "error receiving acknowledgement");
167 break;
168 }
169 Err(_) => break,
170 }
171 }
172
173 info!(acks = summary.acks, "action command completed");
174 Ok(summary)
175}
176
177#[cfg(test)]
178mod tests {
179 use super::*;
180
181 #[test]
182 fn payload_layout() {
183 let params = ActionParams {
184 device_key: 0x1122_3344,
185 group_key: 0x5566_7788,
186 group_mask: 0xFFFF_0000,
187 scheduled_time: Some(0x0102_0304_0506_0708),
188 channel: 0x090A,
189 };
190 let payload = encode_payload(¶ms);
191 assert_eq!(payload.len(), consts::ACTION_PAYLOAD);
192 assert_eq!(&payload[..4], &0x1122_3344u32.to_be_bytes());
193 assert_eq!(&payload[4..8], &0x5566_7788u32.to_be_bytes());
194 assert_eq!(&payload[8..12], &0xFFFF_0000u32.to_be_bytes());
195 assert_eq!(&payload[12..16], &0x0102_0304u32.to_be_bytes());
196 assert_eq!(&payload[16..20], &0x0506_0708u32.to_be_bytes());
197 assert_eq!(&payload[20..22], &0x090A_u16.to_be_bytes());
198 }
199
200 #[test]
201 fn ack_parser() {
202 let mut buf = BytesMut::with_capacity(8);
203 buf.put_u16(genicp::StatusCode::Success.to_raw());
204 buf.put_u16(consts::ACTION_ACK);
205 buf.put_u16(0);
206 buf.put_u16(0xBEEF);
207 let ack = parse_ack(&buf).expect("ack");
208 assert_eq!(ack.command, consts::ACTION_ACK);
209 assert_eq!(ack.request_id, 0xBEEF);
210 }
211}