1use std::net::{IpAddr, Ipv4Addr};
10
11use tokio::net::UdpSocket;
12use tracing::info;
13
14use crate::GenicamError;
15use tl_gige::gvcp::{GigeDevice, StreamParams};
16use tl_gige::gvsp::StreamConfig;
17use tl_gige::nic::{self, Iface, McOptions, DEFAULT_RCVBUF_BYTES};
18use tl_gige::stats::{StreamStats, StreamStatsAccumulator};
19
20pub use tl_gige::gvsp::StreamDest;
21
22pub struct StreamBuilder<'a> {
24 device: &'a mut GigeDevice,
25 iface: Option<Iface>,
26 dest: Option<StreamDest>,
27 rcvbuf_bytes: Option<usize>,
28 auto_packet_size: bool,
29 target_mtu: Option<u32>,
30 packet_size: Option<u32>,
31 packet_delay: Option<u32>,
32 channel: u32,
33 dst_port: u16,
34}
35
36impl<'a> StreamBuilder<'a> {
37 pub fn new(device: &'a mut GigeDevice) -> Self {
39 Self {
40 device,
41 iface: None,
42 dest: None,
43 rcvbuf_bytes: None,
44 auto_packet_size: true,
45 target_mtu: None,
46 packet_size: None,
47 packet_delay: None,
48 channel: 0,
49 dst_port: 0,
50 }
51 }
52
53 pub fn iface(mut self, iface: Iface) -> Self {
55 self.iface = Some(iface);
56 self
57 }
58
59 pub fn dest(mut self, dest: StreamDest) -> Self {
61 self.dest = Some(dest);
62 self
63 }
64
65 pub fn auto_packet_size(mut self, enable: bool) -> Self {
67 self.auto_packet_size = enable;
68 self
69 }
70
71 pub fn target_mtu(mut self, mtu: u32) -> Self {
73 self.target_mtu = Some(mtu);
74 self
75 }
76
77 pub fn packet_size(mut self, size: u32) -> Self {
79 self.packet_size = Some(size);
80 self
81 }
82
83 pub fn packet_delay(mut self, delay: u32) -> Self {
85 self.packet_delay = Some(delay);
86 self
87 }
88
89 pub fn destination_port(mut self, port: u16) -> Self {
91 self.dst_port = port;
92 if let Some(dest) = &mut self.dest {
93 *dest = match *dest {
94 StreamDest::Unicast { dst_ip, .. } => StreamDest::Unicast {
95 dst_ip,
96 dst_port: port,
97 },
98 StreamDest::Multicast {
99 group,
100 loopback,
101 ttl,
102 ..
103 } => StreamDest::Multicast {
104 group,
105 port,
106 loopback,
107 ttl,
108 },
109 };
110 }
111 self
112 }
113
114 pub fn multicast(mut self, group: Option<Ipv4Addr>) -> Self {
116 if let Some(group) = group {
117 self.dest = Some(StreamDest::Multicast {
118 group,
119 port: self.dst_port,
120 loopback: false,
121 ttl: 1,
122 });
123 } else {
124 self.dest = None;
125 }
126 self
127 }
128
129 pub fn rcvbuf_bytes(mut self, size: usize) -> Self {
131 self.rcvbuf_bytes = Some(size);
132 self
133 }
134
135 pub fn channel(mut self, channel: u32) -> Self {
137 self.channel = channel;
138 self
139 }
140
141 pub async fn build(self) -> Result<Stream, GenicamError> {
143 let iface = self
144 .iface
145 .ok_or_else(|| GenicamError::transport("stream requires a network interface"))?;
146 let host_ip = iface
147 .ipv4()
148 .ok_or_else(|| GenicamError::transport("interface lacks IPv4 address"))?;
149 let default_port = if self.dst_port == 0 {
150 0x5FFF
151 } else {
152 self.dst_port
153 };
154 let mut dest = self.dest.unwrap_or(StreamDest::Unicast {
155 dst_ip: host_ip,
156 dst_port: default_port,
157 });
158 match &mut dest {
159 StreamDest::Unicast { dst_port, .. } => {
160 if *dst_port == 0 {
161 *dst_port = default_port;
162 }
163 }
164 StreamDest::Multicast { port, .. } => {
165 if *port == 0 {
166 *port = default_port;
167 }
168 }
169 }
170
171 let iface_mtu = nic::mtu(&iface).map_err(|err| GenicamError::transport(err.to_string()))?;
172 let mtu = self
173 .target_mtu
174 .map_or(iface_mtu, |limit| limit.min(iface_mtu));
175 let packet_size = if self.auto_packet_size {
176 nic::best_packet_size(mtu)
177 } else {
178 self.packet_size
179 .unwrap_or_else(|| nic::best_packet_size(1500))
180 };
181 let packet_delay = if self.auto_packet_size {
182 if mtu <= 1500 {
183 const DELAY_NS: u32 = 2_000;
184 DELAY_NS / 80
185 } else {
186 0
187 }
188 } else {
189 self.packet_delay.unwrap_or(0)
190 };
191
192 match &dest {
193 StreamDest::Unicast { dst_ip, dst_port } => {
194 info!(%dst_ip, dst_port, channel = self.channel, "configuring unicast stream");
195 self.device
196 .set_stream_destination(self.channel, *dst_ip, *dst_port)
197 .await
198 .map_err(|err| GenicamError::transport(err.to_string()))?;
199 }
200 StreamDest::Multicast { .. } => {
201 info!(
202 channel = self.channel,
203 port = dest.port(),
204 addr = %dest.addr(),
205 "configuring multicast stream parameters"
206 );
207 }
208 }
209
210 self.device
211 .set_stream_packet_size(self.channel, packet_size)
212 .await
213 .map_err(|err| GenicamError::transport(err.to_string()))?;
214 self.device
215 .set_stream_packet_delay(self.channel, packet_delay)
216 .await
217 .map_err(|err| GenicamError::transport(err.to_string()))?;
218
219 let socket = match &dest {
220 StreamDest::Unicast { dst_port, .. } => {
221 let bind_ip = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
222 nic::bind_udp(bind_ip, *dst_port, Some(iface.clone()), self.rcvbuf_bytes)
223 .await
224 .map_err(|err| GenicamError::transport(err.to_string()))?
225 }
226 StreamDest::Multicast {
227 group,
228 port,
229 loopback,
230 ttl,
231 } => {
232 let opts = McOptions {
233 loopback: *loopback,
234 ttl: *ttl,
235 rcvbuf_bytes: self.rcvbuf_bytes.unwrap_or(DEFAULT_RCVBUF_BYTES),
236 ..McOptions::default()
237 };
238 nic::bind_multicast(&iface, *group, *port, &opts)
239 .await
240 .map_err(|err| GenicamError::transport(err.to_string()))?
241 }
242 };
243
244 let source_filter = if dest.is_multicast() {
245 None
246 } else {
247 Some(dest.addr())
248 };
249 let resend_enabled = !dest.is_multicast();
250
251 let params = StreamParams {
252 packet_size,
253 packet_delay,
254 mtu,
255 host: dest.addr(),
256 port: dest.port(),
257 };
258
259 let config = StreamConfig {
260 dest,
261 iface: iface.clone(),
262 packet_size: Some(packet_size),
263 packet_delay: Some(packet_delay),
264 source_filter,
265 resend_enabled,
266 };
267
268 let stats = StreamStatsAccumulator::new();
269 Ok(Stream {
270 socket,
271 stats,
272 params,
273 config,
274 })
275 }
276}
277
278pub struct Stream {
281 socket: UdpSocket,
282 stats: StreamStatsAccumulator,
283 params: StreamParams,
284 config: StreamConfig,
285}
286
287impl Stream {
288 pub fn socket(&self) -> &UdpSocket {
290 &self.socket
291 }
292
293 pub fn into_parts(
295 self,
296 ) -> (
297 UdpSocket,
298 StreamStatsAccumulator,
299 StreamParams,
300 StreamConfig,
301 ) {
302 (self.socket, self.stats, self.params, self.config)
303 }
304
305 pub fn params(&self) -> StreamParams {
307 self.params
308 }
309
310 pub fn stats_handle(&self) -> StreamStatsAccumulator {
312 self.stats.clone()
313 }
314
315 pub fn stats(&self) -> StreamStats {
317 self.stats.snapshot()
318 }
319
320 pub fn config(&self) -> &StreamConfig {
322 &self.config
323 }
324}
325
326impl<'a> From<&'a mut GigeDevice> for StreamBuilder<'a> {
327 fn from(device: &'a mut GigeDevice) -> Self {
328 StreamBuilder::new(device)
329 }
330}