Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Streaming

Camera.stream() returns a context manager that starts acquisition on entry, stops it on exit, and yields Frame objects while it’s open:

with cam.stream() as frames:
    for frame in frames:
        arr = frame.to_numpy()
        ...

No asyncio required — the underlying tokio runtime is managed inside the extension, and iteration blocks the calling thread while other Python threads stay runnable (the GIL is released during the blocking read).

The Frame object

frame.width              # int
frame.height             # int
frame.pixel_format       # "Mono8", "Mono16", "BayerRG8", "RGB8Packed", ...
frame.pixel_format_code  # raw PFNC integer
frame.ts_dev             # device tick count, Optional[int]
frame.ts_host            # POSIX seconds, Optional[float] (only if time-synced)
frame.payload()          # raw bytes, one copy

to_numpy() — natural shape

arr = frame.to_numpy()
Pixel formatArray shapedtype
Mono8(H, W)uint8
Mono16(H, W)uint16
RGB8Packed(H, W, 3)uint8
BGR8Packed, BayerRG8, BayerGB8, BayerBG8, BayerGR8(H, W, 3)uint8 (auto-demosaiced / reordered)
anything else(N,) rawuint8

Demosaicing is a simple nearest-neighbour kernel inside the Rust to_rgb8() path — fine for preview, not a substitute for an ISP.

to_rgb8() — always RGB

rgb = frame.to_rgb8()     # always (H, W, 3) uint8

Useful when you want one code path regardless of the camera’s pixel format.

Raw bytes

frame.payload()           # bytes, copy of the whole GVSP payload

Use this when you need to feed bytes to another decoder or serialize to disk as-is.

Streaming options

The stream() call accepts GigE-specific knobs:

cam.stream(
    iface="en0",               # NIC override
    auto_packet_size=True,     # negotiate the largest packet that fits MTU
    multicast="239.255.42.99", # subscribe to a multicast group instead of unicast
    destination_port=34567,    # fix the streaming UDP port
)

None of these are required. iface= is auto-resolved by subnet match if you omit it; the rest fall back to the camera’s defaults.

For U3V cameras all options are silently ignored.

Timeouts and ending a stream

Iteration blocks until a frame arrives. To time out a single read:

with cam.stream() as frames:
    frame = frames.next_frame(timeout_ms=1000)
    if frame is None:
        print("stream ended cleanly")
    else:
        ...

next_frame() returns None when the stream closes cleanly, or raises TransportError on timeout / network failure. The for frame in frames path uses a 5-second default timeout that raises on expiry.

Exit the with block to stop acquisition and release the socket / USB endpoint. You can also call frames.close() explicitly if you stored the iterator outside a with statement.

Complete example: save 5 frames as PNGs

import viva_genicam as vg
from PIL import Image

cam = vg.connect_gige(vg.discover(timeout_ms=500)[0])

with cam.stream() as frames:
    for i, frame in enumerate(frames, 1):
        Image.fromarray(frame.to_numpy()).save(f"frame_{i:03d}.png")
        if i >= 5:
            break

Identical in spirit to the Rust grab_gige example, 12 lines of Python.

Next

API reference — every public symbol, in one page.