Receiving

Heaps

Unlike the Python bindings, the C++ bindings expose three heap types: live heaps (spead2::recv::live_heap) are used for heaps being constructed, and may be missing data; frozen heaps (spead2::recv::heap) always have all their data; and incomplete heaps (spead2::recv::incomplete_heap) are frozen heaps that are missing data. Frozen heaps can be move-constructed from live heaps, which will typically be done in the callback.

class spead2::recv::live_heap

A SPEAD heap that is in the process of being received.

Once it is fully received, it is converted to a heap for further processing.

Any SPEAD-64-* flavour can be used, but all packets in the heap must use the same flavour. It may be possible to relax this, but it hasn’t been examined, and may cause issues for decoding descriptors (whose format depends on the flavour).

A heap can be:

  • complete: a heap length item was found in a packet, and we have received all the payload corresponding to it. No more packets are expected.
  • contiguous: the payload we have received is a contiguous range from 0 up to some amount, and cover all items described in the item pointers.
  • incomplete: not contiguous A complete heap is also contiguous, but not necessarily the other way around. Only contiguous heaps can be frozen to heap, and only incomplete heaps can be frozen to incomplete_heap.

Public Functions

bool is_complete() const

True if the heap is complete.

bool is_contiguous() const

True if the heap is contiguous.

bool is_end_of_stream() const

True if an end-of-stream heap control item was found.

s_item_pointer_t get_cnt() const

Retrieve the heap ID.

bug_compat_mask get_bug_compat() const

Get protocol bug compatibility flags.

class spead2::recv::heap

Received heap that has been finalised.

Inherits from spead2::recv::heap_base

Public Functions

heap(live_heap &&h)

Freeze a heap, which must satisfy live_heap::is_contiguous.

The original heap is destroyed.

descriptor to_descriptor() const

Extract descriptor fields from the heap.

Any missing fields are default-initialized. This should be used on a heap constructed from the content of a descriptor item.

The original PySPEAD package (version 0.5.2) does not follow the specification here. The macros in common_defines.h can be used to control whether to interpret the specification or be bug-compatible.

The protocol allows descriptors to use immediate-mode items, but the decoding of these into variable-length strings is undefined. This implementation will discard such descriptor fields.

std::vector<descriptor> get_descriptors() const

Extract and decode descriptors from this heap.

s_item_pointer_t get_cnt() const

Get heap ID.

const flavour &get_flavour() const

Get protocol flavour used.

const std::vector<item> &get_items() const

Get the items from the heap.

This includes descriptors, but excludes any items with ID <= 4.

bool is_ctrl_item(ctrl_mode value) const

Convenience function to check whether any of the items is a STREAM_CTRL_ID item with value value.

bool is_start_of_stream() const

Convenience function to check whether any of the items is a CTRL_STREAM_START.

bool is_end_of_stream() const

Convenience function to check whether any of the items is a CTRL_STREAM_STOP.

class spead2::recv::incomplete_heap

Received heap that has been finalised, but which is missing data.

The payload and any items that refer to the payload are discarded.

Inherits from spead2::recv::heap_base

Public Functions

incomplete_heap(live_heap &&h, bool keep_payload, bool keep_payload_ranges)

Freeze a heap.

The original heap is destroyed.

Parameters
  • h: The heap to freeze.
  • keep_payload: If true, transfer the payload memory allocation from the live heap to this object. If false, discard it.
  • keep_payload_ranges: If true, store information that allows get_payload_ranges to work.

s_item_pointer_t get_heap_length() const

Heap payload length encoded in packets (-1 for unknown)

s_item_pointer_t get_received_length() const

Number of bytes of payload received.

const memory_allocator::pointer &get_payload() const

Get the payload pointer.

This will return an empty pointer unless keep_payload was set in the constructor.

std::vector<std::pair<s_item_pointer_t, s_item_pointer_t>> get_payload_ranges() const

Return a list of contiguous ranges of payload that were received.

This is intended for special cases where a custom memory allocator was used to channel the payload into a caller-managed area, so that the caller knows which parts of that area have been filled in.

If keep_payload_ranges was false in the constructor, returns an empty list.

s_item_pointer_t get_cnt() const

Get heap ID.

const flavour &get_flavour() const

Get protocol flavour used.

const std::vector<item> &get_items() const

Get the items from the heap.

This includes descriptors, but excludes any items with ID <= 4.

bool is_ctrl_item(ctrl_mode value) const

Convenience function to check whether any of the items is a STREAM_CTRL_ID item with value value.

bool is_start_of_stream() const

Convenience function to check whether any of the items is a CTRL_STREAM_START.

bool is_end_of_stream() const

Convenience function to check whether any of the items is a CTRL_STREAM_STOP.

struct spead2::recv::item

An item extracted from a heap.

Subclassed by spead2::recv::item_wrapper

Public Members

s_item_pointer_t id

Item ID.

std::uint8_t *ptr

Start of memory containing value.

std::size_t length

Length of memory.

item_pointer_t immediate_value

The immediate interpreted as an integer (undefined if not immediate)

bool is_immediate

Whether the item is immediate.

struct spead2::descriptor

An unpacked descriptor.

If numpy_header is non-empty, it overrides format and shape.

Public Members

s_item_pointer_t id = 0

SPEAD ID.

std::string name

Short name.

std::string description

Long description.

std::vector<std::pair<char, s_item_pointer_t>> format

Legacy format.

Each element is a specifier character (e.g. ‘u’ for unsigned) and a bit width.

std::vector<s_item_pointer_t> shape

Shape.

Elements are either non-negative, or -1 is used to indicate a variable-length size. At most one dimension may be variable-length.

std::string numpy_header

Description in the format used in .npy files.

Streams

At the lowest level, heaps are given to the application via a callback to a virtual function. While this callback is running, no new packets can be received from the network socket, so this function needs to complete quickly to avoid data loss when using UDP. To use this interface, subclass spead2::recv::stream and implement heap_ready() and optionally override stop_received().

class spead2::recv::stream

Stream that is fed by subclasses of reader.

Unless otherwise specified, methods in stream_base may only be called while holding the strand contained in this class. The public interface functions must be called from outside the strand (and outside the threads associated with the io_service), but are not thread-safe relative to each other.

This class is thread-safe. This is achieved mostly by having operations run as completion handlers on a strand. The exception is stop, which uses a once to ensure that only the first call actually runs.

Inherits from spead2::recv::stream_base

Subclassed by callback_stream, recv_stream, spead2::recv::ring_stream_base

Public Functions

template <typename T, typename... Args>
void emplace_reader(Args&&... args)

Add a new reader by passing its constructor arguments, excluding the initial stream argument.

stream_stats get_stats() const

Return statistics about the stream.

See the Python documentation.

void stop()

Stop the stream and block until all the readers have wound up.

After calling this there should be no more outstanding completion handlers in the thread pool.

In most cases subclasses should override stop_received rather than this function.

Protected Functions

void stop_received()

Shut down the stream.

This calls flush. Subclasses may override this to achieve additional effects, but must chain to the base implementation.

It is undefined what happens if add_packet is called after a stream is stopped.

void flush()

Flush the collection of live heaps, passing them to heap_ready.

struct spead2::recv::stream_stats

Statistics about a stream.

Not all fields are relevant for all stream types.

Public Members

std::uint64_t heaps = 0

Total number of heaps passed to stream_base::heap_ready.

std::uint64_t incomplete_heaps_evicted = 0

Number of incomplete heaps that were evicted from the buffer to make room for new data.

std::uint64_t incomplete_heaps_flushed = 0

Number of incomplete heaps that were emitted by stream::flush.

These are typically heaps that were in-flight when the stream stopped.

std::uint64_t packets = 0

Number of packets received.

std::uint64_t worker_blocked = 0

Number of times a worker thread was blocked because the ringbuffer was full.

Only applicable to ring_stream.

std::size_t max_batch = 0

Maximum number of packets received as a unit.

This is only applicable to readers that support fetching a batch of packets from the source.

A potentially more convenient interface is spead2::recv::ring_stream<Ringbuffer>, which places received heaps into a fixed-size thread-safe ring buffer. Another thread can then pull from this ring buffer in a loop. The template parameter selects the ringbuffer implementation. The default is a good light-weight choice, but if you need to use select()-like functions to wait for data, you can use spead2::ringbuffer<spead2::recv::live_heap, spead2::semaphore_fd, spead2::semaphore>.

template <typename Ringbuffer = ringbuffer<live_heap>>
class spead2::recv::ring_stream

Specialisation of stream that pushes its results into a ringbuffer.

The ringbuffer class may be replaced, but must provide the same interface as ringbuffer. If the ring buffer fills up, add_packet will block the reader.

On the consumer side, heaps are automatically frozen as they are extracted.

This class is thread-safe.

Inherits from spead2::recv::ring_stream_base

Public Functions

ring_stream(io_service_ref io_service, bug_compat_mask bug_compat = 0, std::size_t max_heaps = default_max_heaps, std::size_t ring_heaps = default_ring_heaps, bool contiguous_only = true)

Constructor.

Parameters
  • io_service: I/O service (also used by the readers).
  • bug_compat: Bug compatibility flags for interpreting heaps
  • max_heaps: Number of partial heaps to keep around
  • ring_heaps: Capacity of the ringbuffer
  • contiguous_only: If true, only contiguous heaps are pushed to the ring buffer

heap pop()

Wait until a contiguous heap is available, freeze it, and return it; or until the stream is stopped.

Exceptions
  • ringbuffer_stopped: if stop has been called and there are no more contiguous heaps.

live_heap pop_live()

Wait until a heap is available and return it; or until the stream is stopped.

Exceptions
  • ringbuffer_stopped: if stop has been called and there are no more heaps.

heap try_pop()

Like pop, but if no contiguous heap is available, throws spead2::ringbuffer_empty.

Exceptions
  • ringbuffer_empty: if there is no contiguous heap available, but the stream has not been stopped
  • ringbuffer_stopped: if stop has been called and there are no more contiguous heaps.

live_heap try_pop_live()

Like pop_live, but if no heap is available, throws spead2::ringbuffer_empty.

Exceptions
  • ringbuffer_empty: if there is no heap available, but the stream has not been stopped
  • ringbuffer_stopped: if stop has been called and there are no more heaps.

Readers

Reader classes are constructed inside a stream by calling spead2::recv::stream::emplace_reader().

class spead2::recv::udp_reader

Asynchronous stream reader that receives packets over UDP.

Inherits from spead2::recv::udp_reader_base

Public Functions

udp_reader(stream &owner, const boost::asio::ip::udp::endpoint &endpoint, std::size_t max_size = default_max_size, std::size_t buffer_size = default_buffer_size)

Constructor.

If endpoint is a multicast address, then this constructor will subscribe to the multicast group, and also set SO_REUSEADDR so that multiple sockets can be subscribed to the multicast group.

Parameters
  • owner: Owning stream
  • endpoint: Address on which to listen
  • max_size: Maximum packet size that will be accepted.
  • buffer_size: Requested socket buffer size. Note that the operating system might not allow a buffer size as big as the default.

udp_reader(stream &owner, const boost::asio::ip::udp::endpoint &endpoint, std::size_t max_size, std::size_t buffer_size, const boost::asio::ip::address &interface_address)

Constructor with explicit multicast interface address (IPv4 only).

The socket will have SO_REUSEADDR set, so that multiple sockets can all listen to the same multicast stream. If you want to let the system pick the interface for the multicast subscription, use boost::asio::ip::address_v4::any(), or use the default constructor.

Parameters
  • owner: Owning stream
  • endpoint: Multicast group and port
  • max_size: Maximum packet size that will be accepted.
  • buffer_size: Requested socket buffer size.
  • interface_address: Address of the interface which should join the group
Exceptions
  • std::invalid_argument: If endpoint is not an IPv4 multicast address
  • std::invalid_argument: If interface_address is not an IPv4 address

udp_reader(stream &owner, const boost::asio::ip::udp::endpoint &endpoint, std::size_t max_size, std::size_t buffer_size, unsigned int interface_index)

Constructor with explicit multicast interface index (IPv6 only).

The socket will have SO_REUSEADDR set, so that multiple sockets can all listen to the same multicast stream. If you want to let the system pick the interface for the multicast subscription, set interface_index to 0, or use the standard constructor.

See
if_nametoindex(3)
Parameters
  • owner: Owning stream
  • endpoint: Multicast group and port
  • max_size: Maximum packet size that will be accepted.
  • buffer_size: Requested socket buffer size.
  • interface_index: Address of the interface which should join the group

udp_reader(stream &owner, boost::asio::ip::udp::socket &&socket, const boost::asio::ip::udp::endpoint &endpoint, std::size_t max_size = default_max_size, std::size_t buffer_size = default_buffer_size)

Constructor using an existing socket.

This allows socket options (e.g., multicast subscriptions) to be fine-tuned by the caller. The socket should not be bound. Note that there is no special handling for multicast addresses here.

Parameters
  • owner: Owning stream
  • socket: Existing socket which will be taken over. It must use the same I/O service as owner.
  • endpoint: Address on which to listen
  • max_size: Maximum packet size that will be accepted.
  • buffer_size: Requested socket buffer size. Note that the operating system might not allow a buffer size as big as the default.

class spead2::recv::mem_reader

Reader class that feeds data from a memory buffer to a stream.

The caller must ensure that the underlying memory buffer is not destroyed before this class.

Note
For simple cases, use mem_to_stream instead. This class is only necessary if one wants to plug in to a stream.

Inherits from spead2::recv::reader

Subclassed by spead2::recv::buffer_reader

Memory allocators

In addition to the memory allocators described in Memory allocators, new allocators can be created by subclassing spead2::memory_allocator. For an allocator set on a stream, a pointer to a spead2::recv::packet_header is passed as a hint to the allocator, allowing memory to be placed according to information in the packet. Note that this can be any packet from the heap, so you must not rely on it being the initial packet.

class spead2::memory_allocator

Polymorphic class for managing memory allocations in a memory pool.

This can be overloaded to provide custom memory allocations.

Inherits from std::enable_shared_from_this< memory_allocator >

Subclassed by spead2::memory_pool, spead2::mmap_allocator, spead2::unittest::mock_allocator

Public Functions

memory_allocator::pointer allocate(std::size_t size, void *hint)

Allocate size bytes of memory.

The default implementation uses new and pre-faults the memory.

Return
Pointer to newly allocated memory
Parameters
  • size: Number of bytes to allocate
  • hint: Usage-dependent extra information
Exceptions
  • std::bad_alloc: if allocation failed

Private Functions

void free(std::uint8_t *ptr, void *user)

Free memory previously returned from allocate.

Parameters
  • ptr: Value returned by allocate
  • user: User-defined handle returned by allocate