Receiving

Heaps

Unlike the Python bindings, the C++ bindings expose two heap types: live heaps (spead2::recv::live_heap) are used for heaps being constructed, and may be missing data; frozen heaps (spead2::recv::heap) always have all their data. Frozen heaps can be move-constructed from live heaps, which will typically be done in the callback.

class spead2::recv::live_heap

A SPEAD heap that is in the process of being received.

Once it is fully received, it is converted to a heap for further processing.

Any SPEAD-64-* flavour can be used, but all packets in the heap must use the same flavour. It may be possible to relax this, but it hasn’t been examined, and may cause issues for decoding descriptors (whose format depends on the flavour).

A heap can be:

  • complete: a heap length item was found in a packet, and we have received all the payload corresponding to it. No more packets are expected.
  • contiguous: the payload we have received is a contiguous range from 0 up to some amount, and cover all items described in the item pointers. A complete heap is also contiguous, but not necessarily the other way around. Only contiguous heaps can be frozen.

Public Functions

bool is_complete() const

True if the heap is complete.

bool is_contiguous() const

True if the heap is contiguous.

bool is_end_of_stream() const

True if an end-of-stream heap control item was found.

s_item_pointer_t get_cnt() const

Retrieve the heap ID.

bug_compat_mask get_bug_compat() const

Get protocol bug compatibility flags.

class spead2::recv::heap

Received heap that has been finalised.

Subclassed by spead2::recv::heap_wrapper

Public Functions

heap(live_heap &&h)

Freeze a heap, which must satisfy live_heap::is_contiguous.

The original heap is destroyed.

s_item_pointer_t get_cnt() const

Get heap ID.

const flavour &get_flavour() const

Get protocol flavour used.

const std::vector<item> &get_items() const

Get the items from the heap.

This includes descriptors, but excludes any items with ID <= 4.

descriptor to_descriptor() const

Extract descriptor fields from the heap.

Any missing fields are default-initialized. This should be used on a heap constructed from the content of a descriptor item.

The original PySPEAD package (version 0.5.2) does not follow the specification here. The macros in common_defines.h can be used to control whether to interpret the specification or be bug-compatible.

The protocol allows descriptors to use immediate-mode items, but the decoding of these into variable-length strings is undefined. This implementation will discard such descriptor fields.

std::vector<descriptor> get_descriptors() const

Extract and decode descriptors from this heap.

bool is_start_of_stream() const

Convenience function to check whether any of the items is a CTRL_STREAM_START.

struct spead2::recv::item

An item extracted from a heap.

Subclassed by spead2::recv::item_wrapper

Public Members

s_item_pointer_t id

Item ID.

std::uint8_t *ptr

Start of memory containing value.

std::size_t length

Length of memory.

item_pointer_t immediate_value

The immediate interpreted as an integer (undefined if not immediate)

bool is_immediate

Whether the item is immediate.

struct spead2::descriptor

An unpacked descriptor.

If numpy_header is non-empty, it overrides format and shape.

Public Members

s_item_pointer_t id = 0

SPEAD ID.

std::string name

Short name.

std::string description

Long description.

std::vector<std::pair<char, s_item_pointer_t>> format

Legacy format.

Each element is a specifier character (e.g. ‘u’ for unsigned) and a bit width.

std::vector<s_item_pointer_t> shape

Shape.

Elements are either non-negative, or -1 is used to indicate a variable-length size. At most one dimension may be variable-length.

std::string numpy_header

Description in the format used in .npy files.

Streams

At the lowest level, heaps are given to the application via a callback to a virtual function. While this callback is running, no new packets can be received from the network socket, so this function needs to complete quickly to avoid data loss when using UDP. To use this interface, subclass spead2::recv::stream and implement heap_ready() and optionally override stop_received().

class spead2::recv::stream

Stream that is fed by subclasses of reader.

Unless otherwise specified, methods in stream_base may only be called while holding the strand contained in this class. The public interface functions must be called from outside the strand (and outside the threads associated with the io_service), but are not thread-safe relative to each other.

This class is thread-safe. This is achieved mostly by having operations run as completion handlers on a strand. The exception is stop, which uses a once to ensure that only the first call actually runs.

Inherits from spead2::recv::stream_base

Subclassed by callback_stream, recv_stream, spead2::recv::ring_stream_base

Public Functions

template <typename T, typename... Args>
void emplace_reader(Args&&... args)

Add a new reader by passing its constructor arguments, excluding the initial stream argument.

void stop()

Stop the stream and block until all the readers have wound up.

After calling this there should be no more outstanding completion handlers in the thread pool.

In most cases subclasses should override stop_received rather than this function.

Protected Functions

void stop_received()

Shut down the stream.

This calls flush. Subclasses may override this to achieve additional effects, but must chain to the base implementation.

It is undefined what happens if add_packet is called after a stream is stopped.

void flush()

Flush the collection of live heaps, passing them to heap_ready.

A potentially more convenient interface is spead2::recv::ring_stream<Ringbuffer>, which places received heaps into a fixed-size thread-safe ring buffer. Another thread can then pull from this ring buffer in a loop. The template parameter selects the ringbuffer implementation. The default is a good light-weight choice, but if you need to use select()-like functions to wait for data, you can use spead2::ringbuffer<spead2::recv::live_heap, spead2::semaphore_fd, spead2::semaphore>.

template <typename Ringbuffer = ringbuffer<live_heap>>
class spead2::recv::ring_stream

Specialisation of stream that pushes its results into a ringbuffer.

The ringbuffer class may be replaced, but must provide the same interface as ringbuffer. If the ring buffer fills up, add_packet will block the reader.

On the consumer side, heaps are automatically frozen as they are extracted.

This class is thread-safe.

Inherits from spead2::recv::ring_stream_base

Readers

Reader classes are constructed inside a stream by calling spead2::recv::stream::emplace_reader().

class spead2::recv::udp_reader

Asynchronous stream reader that receives packets over UDP.

Inherits from spead2::recv::udp_reader_base

Public Functions

udp_reader(stream &owner, const boost::asio::ip::udp::endpoint &endpoint, std::size_t max_size = default_max_size, std::size_t buffer_size = default_buffer_size)

Constructor.

If endpoint is a multicast address, then this constructor will subscribe to the multicast group, and also set SO_REUSEADDR so that multiple sockets can be subscribed to the multicast group.

Parameters
  • owner: Owning stream
  • endpoint: Address on which to listen
  • max_size: Maximum packet size that will be accepted.
  • buffer_size: Requested socket buffer size. Note that the operating system might not allow a buffer size as big as the default.

udp_reader(stream &owner, const boost::asio::ip::udp::endpoint &endpoint, std::size_t max_size, std::size_t buffer_size, const boost::asio::ip::address &interface_address)

Constructor with explicit multicast interface address (IPv4 only).

The socket will have SO_REUSEADDR set, so that multiple sockets can all listen to the same multicast stream. If you want to let the system pick the interface for the multicast subscription, use boost::asio::ip::address_v4::any(), or use the default constructor.

Parameters
  • owner: Owning stream
  • endpoint: Multicast group and port
  • max_size: Maximum packet size that will be accepted.
  • buffer_size: Requested socket buffer size.
  • interface_address: Address of the interface which should join the group
Exceptions
  • std::invalid_argument: If endpoint is not an IPv4 multicast address
  • std::invalid_argument: If interface_address is not an IPv4 address

udp_reader(stream &owner, const boost::asio::ip::udp::endpoint &endpoint, std::size_t max_size, std::size_t buffer_size, unsigned int interface_index)

Constructor with explicit multicast interface index (IPv6 only).

The socket will have SO_REUSEADDR set, so that multiple sockets can all listen to the same multicast stream. If you want to let the system pick the interface for the multicast subscription, set interface_index to 0, or use the standard constructor.

See
if_nametoindex(3)
Parameters
  • owner: Owning stream
  • endpoint: Multicast group and port
  • max_size: Maximum packet size that will be accepted.
  • buffer_size: Requested socket buffer size.
  • interface_index: Address of the interface which should join the group

udp_reader(stream &owner, boost::asio::ip::udp::socket &&socket, const boost::asio::ip::udp::endpoint &endpoint, std::size_t max_size = default_max_size, std::size_t buffer_size = default_buffer_size)

Constructor using an existing socket.

This allows socket options (e.g., multicast subscriptions) to be fine-tuned by the caller. The socket should not be bound. Note that there is no special handling for multicast addresses here.

Parameters
  • owner: Owning stream
  • socket: Existing socket which will be taken over. It must use the same I/O service as owner.
  • endpoint: Address on which to listen
  • max_size: Maximum packet size that will be accepted.
  • buffer_size: Requested socket buffer size. Note that the operating system might not allow a buffer size as big as the default.

class spead2::recv::mem_reader

Reader class that feeds data from a memory buffer to a stream.

The caller must ensure that the underlying memory buffer is not destroyed before this class.

Note
For simple cases, use mem_to_stream instead. This class is only necessary if one wants to plug in to a stream.

Inherits from spead2::recv::reader

Subclassed by spead2::recv::buffer_reader

Memory allocators

In addition to the memory allocators described in Memory allocators, new allocators can be created by subclassing spead2::memory_allocator. For an allocator set on a stream, a pointer to a spead2::recv::packet_header is passed as a hint to the allocator, allowing memory to be placed according to information in the packet. Note that this can be any packet from the heap, so you must not rely on it being the initial packet.

class spead2::memory_allocator

Polymorphic class for managing memory allocations in a memory pool.

This can be overloaded to provide custom memory allocations.

Inherits from std::enable_shared_from_this< memory_allocator >

Subclassed by spead2::memory_pool, spead2::mmap_allocator, spead2::unittest::mock_allocator

Public Functions

memory_allocator::pointer allocate(std::size_t size, void *hint)

Allocate size bytes of memory.

The default implementation uses new and pre-faults the memory.

Return
Pointer to newly allocated memory
Parameters
  • size: Number of bytes to allocate
  • hint: Usage-dependent extra information
Exceptions
  • std::bad_alloc: if allocation failed

Private Functions

void free(std::uint8_t *ptr, void *user)

Free memory previously returned from allocate.

Parameters
  • ptr: Value returned by allocate
  • user: User-defined handle returned by allocate