genicam/
stream.rs

1//! Streaming builder and configuration helpers bridging `tl-gige` with
2//! higher-level GenICam consumers.
3//!
4//! The builder performs control-plane negotiation (packet size, delay) and
5//! prepares a UDP socket configured for reception. Applications can retrieve the
6//! socket handle to drive their own async pipelines while relying on the shared
7//! [`StreamStats`] accumulator for monitoring.
8
9use std::net::{IpAddr, Ipv4Addr};
10
11use tokio::net::UdpSocket;
12use tracing::info;
13
14use crate::GenicamError;
15use tl_gige::gvcp::{GigeDevice, StreamParams};
16use tl_gige::gvsp::StreamConfig;
17use tl_gige::nic::{self, Iface, McOptions, DEFAULT_RCVBUF_BYTES};
18use tl_gige::stats::{StreamStats, StreamStatsAccumulator};
19
20pub use tl_gige::gvsp::StreamDest;
21
22/// Builder for configuring a GVSP stream.
23pub struct StreamBuilder<'a> {
24    device: &'a mut GigeDevice,
25    iface: Option<Iface>,
26    dest: Option<StreamDest>,
27    rcvbuf_bytes: Option<usize>,
28    auto_packet_size: bool,
29    target_mtu: Option<u32>,
30    packet_size: Option<u32>,
31    packet_delay: Option<u32>,
32    channel: u32,
33    dst_port: u16,
34}
35
36impl<'a> StreamBuilder<'a> {
37    /// Create a new builder bound to an opened [`GigeDevice`].
38    pub fn new(device: &'a mut GigeDevice) -> Self {
39        Self {
40            device,
41            iface: None,
42            dest: None,
43            rcvbuf_bytes: None,
44            auto_packet_size: true,
45            target_mtu: None,
46            packet_size: None,
47            packet_delay: None,
48            channel: 0,
49            dst_port: 0,
50        }
51    }
52
53    /// Select the interface used for receiving GVSP packets.
54    pub fn iface(mut self, iface: Iface) -> Self {
55        self.iface = Some(iface);
56        self
57    }
58
59    /// Configure the stream destination.
60    pub fn dest(mut self, dest: StreamDest) -> Self {
61        self.dest = Some(dest);
62        self
63    }
64
65    /// Enable or disable automatic packet-size negotiation.
66    pub fn auto_packet_size(mut self, enable: bool) -> Self {
67        self.auto_packet_size = enable;
68        self
69    }
70
71    /// Target MTU used when computing the optimal GVSP packet size.
72    pub fn target_mtu(mut self, mtu: u32) -> Self {
73        self.target_mtu = Some(mtu);
74        self
75    }
76
77    /// Override the GVSP packet size when automatic negotiation is disabled.
78    pub fn packet_size(mut self, size: u32) -> Self {
79        self.packet_size = Some(size);
80        self
81    }
82
83    /// Override the GVSP packet delay when automatic negotiation is disabled.
84    pub fn packet_delay(mut self, delay: u32) -> Self {
85        self.packet_delay = Some(delay);
86        self
87    }
88
89    /// Configure the UDP port used for streaming (defaults to 0 => device chosen).
90    pub fn destination_port(mut self, port: u16) -> Self {
91        self.dst_port = port;
92        if let Some(dest) = &mut self.dest {
93            *dest = match *dest {
94                StreamDest::Unicast { dst_ip, .. } => StreamDest::Unicast {
95                    dst_ip,
96                    dst_port: port,
97                },
98                StreamDest::Multicast {
99                    group,
100                    loopback,
101                    ttl,
102                    ..
103                } => StreamDest::Multicast {
104                    group,
105                    port,
106                    loopback,
107                    ttl,
108                },
109            };
110        }
111        self
112    }
113
114    /// Configure multicast reception when the device is set to multicast mode.
115    pub fn multicast(mut self, group: Option<Ipv4Addr>) -> Self {
116        if let Some(group) = group {
117            self.dest = Some(StreamDest::Multicast {
118                group,
119                port: self.dst_port,
120                loopback: false,
121                ttl: 1,
122            });
123        } else {
124            self.dest = None;
125        }
126        self
127    }
128
129    /// Custom receive buffer size for the UDP socket.
130    pub fn rcvbuf_bytes(mut self, size: usize) -> Self {
131        self.rcvbuf_bytes = Some(size);
132        self
133    }
134
135    /// Select the GigE Vision stream channel to configure (defaults to 0).
136    pub fn channel(mut self, channel: u32) -> Self {
137        self.channel = channel;
138        self
139    }
140
141    /// Finalise the builder and return a configured [`Stream`].
142    pub async fn build(self) -> Result<Stream, GenicamError> {
143        let iface = self
144            .iface
145            .ok_or_else(|| GenicamError::transport("stream requires a network interface"))?;
146        let host_ip = iface
147            .ipv4()
148            .ok_or_else(|| GenicamError::transport("interface lacks IPv4 address"))?;
149        let default_port = if self.dst_port == 0 {
150            0x5FFF
151        } else {
152            self.dst_port
153        };
154        let mut dest = self.dest.unwrap_or(StreamDest::Unicast {
155            dst_ip: host_ip,
156            dst_port: default_port,
157        });
158        match &mut dest {
159            StreamDest::Unicast { dst_port, .. } => {
160                if *dst_port == 0 {
161                    *dst_port = default_port;
162                }
163            }
164            StreamDest::Multicast { port, .. } => {
165                if *port == 0 {
166                    *port = default_port;
167                }
168            }
169        }
170
171        let iface_mtu = nic::mtu(&iface).map_err(|err| GenicamError::transport(err.to_string()))?;
172        let mtu = self
173            .target_mtu
174            .map_or(iface_mtu, |limit| limit.min(iface_mtu));
175        let packet_size = if self.auto_packet_size {
176            nic::best_packet_size(mtu)
177        } else {
178            self.packet_size
179                .unwrap_or_else(|| nic::best_packet_size(1500))
180        };
181        let packet_delay = if self.auto_packet_size {
182            if mtu <= 1500 {
183                const DELAY_NS: u32 = 2_000;
184                DELAY_NS / 80
185            } else {
186                0
187            }
188        } else {
189            self.packet_delay.unwrap_or(0)
190        };
191
192        match &dest {
193            StreamDest::Unicast { dst_ip, dst_port } => {
194                info!(%dst_ip, dst_port, channel = self.channel, "configuring unicast stream");
195                self.device
196                    .set_stream_destination(self.channel, *dst_ip, *dst_port)
197                    .await
198                    .map_err(|err| GenicamError::transport(err.to_string()))?;
199            }
200            StreamDest::Multicast { .. } => {
201                info!(
202                    channel = self.channel,
203                    port = dest.port(),
204                    addr = %dest.addr(),
205                    "configuring multicast stream parameters"
206                );
207            }
208        }
209
210        self.device
211            .set_stream_packet_size(self.channel, packet_size)
212            .await
213            .map_err(|err| GenicamError::transport(err.to_string()))?;
214        self.device
215            .set_stream_packet_delay(self.channel, packet_delay)
216            .await
217            .map_err(|err| GenicamError::transport(err.to_string()))?;
218
219        let socket = match &dest {
220            StreamDest::Unicast { dst_port, .. } => {
221                let bind_ip = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
222                nic::bind_udp(bind_ip, *dst_port, Some(iface.clone()), self.rcvbuf_bytes)
223                    .await
224                    .map_err(|err| GenicamError::transport(err.to_string()))?
225            }
226            StreamDest::Multicast {
227                group,
228                port,
229                loopback,
230                ttl,
231            } => {
232                let opts = McOptions {
233                    loopback: *loopback,
234                    ttl: *ttl,
235                    rcvbuf_bytes: self.rcvbuf_bytes.unwrap_or(DEFAULT_RCVBUF_BYTES),
236                    ..McOptions::default()
237                };
238                nic::bind_multicast(&iface, *group, *port, &opts)
239                    .await
240                    .map_err(|err| GenicamError::transport(err.to_string()))?
241            }
242        };
243
244        let source_filter = if dest.is_multicast() {
245            None
246        } else {
247            Some(dest.addr())
248        };
249        let resend_enabled = !dest.is_multicast();
250
251        let params = StreamParams {
252            packet_size,
253            packet_delay,
254            mtu,
255            host: dest.addr(),
256            port: dest.port(),
257        };
258
259        let config = StreamConfig {
260            dest,
261            iface: iface.clone(),
262            packet_size: Some(packet_size),
263            packet_delay: Some(packet_delay),
264            source_filter,
265            resend_enabled,
266        };
267
268        let stats = StreamStatsAccumulator::new();
269        Ok(Stream {
270            socket,
271            stats,
272            params,
273            config,
274        })
275    }
276}
277
278/// Handle returned by [`StreamBuilder`] providing access to the configured socket
279/// and statistics.
280pub struct Stream {
281    socket: UdpSocket,
282    stats: StreamStatsAccumulator,
283    params: StreamParams,
284    config: StreamConfig,
285}
286
287impl Stream {
288    /// Borrow the underlying UDP socket.
289    pub fn socket(&self) -> &UdpSocket {
290        &self.socket
291    }
292
293    /// Consume the stream and return the UDP socket together with the shared statistics handle.
294    pub fn into_parts(
295        self,
296    ) -> (
297        UdpSocket,
298        StreamStatsAccumulator,
299        StreamParams,
300        StreamConfig,
301    ) {
302        (self.socket, self.stats, self.params, self.config)
303    }
304
305    /// Access the negotiated stream parameters.
306    pub fn params(&self) -> StreamParams {
307        self.params
308    }
309
310    /// Obtain a clone of the statistics accumulator handle for updates.
311    pub fn stats_handle(&self) -> StreamStatsAccumulator {
312        self.stats.clone()
313    }
314
315    /// Snapshot the collected statistics.
316    pub fn stats(&self) -> StreamStats {
317        self.stats.snapshot()
318    }
319
320    /// Immutable view of the stream configuration.
321    pub fn config(&self) -> &StreamConfig {
322        &self.config
323    }
324}
325
326impl<'a> From<&'a mut GigeDevice> for StreamBuilder<'a> {
327    fn from(device: &'a mut GigeDevice) -> Self {
328        StreamBuilder::new(device)
329    }
330}