Receiving

Heaps

Unlike the Python bindings, the C++ bindings expose three heap types: live heaps (spead2::recv::live_heap) are used for heaps being constructed, and may be missing data; frozen heaps (spead2::recv::heap) always have all their data; and incomplete heaps (spead2::recv::incomplete_heap) are frozen heaps that are missing data. Frozen heaps can be move-constructed from live heaps, which will typically be done in the callback.

class spead2::recv::live_heap

A SPEAD heap that is in the process of being received.

Once it is fully received, it is converted to a heap for further processing.

Any SPEAD-64-* flavour can be used, but all packets in the heap must use the same flavour. It may be possible to relax this, but it hasn’t been examined, and may cause issues for decoding descriptors (whose format depends on the flavour).

A heap can be:

  • complete: a heap length item was found in a packet, and we have received all the payload corresponding to it. No more packets are expected.

  • contiguous: the payload we have received is a contiguous range from 0 up to some amount, and cover all items described in the item pointers.

  • incomplete: not contiguous A complete heap is also contiguous, but not necessarily the other way around. Only contiguous heaps can be frozen to heap, and only incomplete heaps can be frozen to incomplete_heap.

Public Functions

bool is_complete() const

True if the heap is complete.

bool is_contiguous() const

True if the heap is contiguous.

bool is_end_of_stream() const

True if an end-of-stream heap control item was found.

s_item_pointer_t get_cnt() const

Retrieve the heap ID.

bug_compat_mask get_bug_compat() const

Get protocol bug compatibility flags.

class spead2::recv::heap : public spead2::recv::heap_base

Received heap that has been finalised.

Public Functions

heap(live_heap &&h)

Freeze a heap, which must satisfy live_heap::is_contiguous.

The original heap is destroyed.

descriptor to_descriptor() const

Extract descriptor fields from the heap.

Any missing fields are default-initialized. This should be used on a heap constructed from the content of a descriptor item.

The original PySPEAD package (version 0.5.2) does not follow the specification here. The macros in common_defines.h can be used to control whether to interpret the specification or be bug-compatible.

The protocol allows descriptors to use immediate-mode items, but the decoding of these into variable-length strings is undefined. This implementation will discard such descriptor fields.

std::vector<descriptor> get_descriptors() const

Extract and decode descriptors from this heap.

s_item_pointer_t get_cnt() const

Get heap ID.

const flavour &get_flavour() const

Get protocol flavour used.

const std::vector<item> &get_items() const

Get the items from the heap.

This includes descriptors, but excludes any items with ID <= 4.

bool is_ctrl_item(ctrl_mode value) const

Convenience function to check whether any of the items is a STREAM_CTRL_ID item with value value.

bool is_start_of_stream() const

Convenience function to check whether any of the items is a CTRL_STREAM_START.

bool is_end_of_stream() const

Convenience function to check whether any of the items is a CTRL_STREAM_STOP.

class spead2::recv::incomplete_heap : public spead2::recv::heap_base

Received heap that has been finalised, but which is missing data.

The payload and any items that refer to the payload are discarded.

Public Functions

incomplete_heap(live_heap &&h, bool keep_payload, bool keep_payload_ranges)

Freeze a heap.

The original heap is destroyed.

Parameters
  • h: The heap to freeze.

  • keep_payload: If true, transfer the payload memory allocation from the live heap to this object. If false, discard it.

  • keep_payload_ranges: If true, store information that allows get_payload_ranges to work.

s_item_pointer_t get_heap_length() const

Heap payload length encoded in packets (-1 for unknown)

s_item_pointer_t get_received_length() const

Number of bytes of payload received.

const memory_allocator::pointer &get_payload() const

Get the payload pointer.

This will return an empty pointer unless keep_payload was set in the constructor.

std::vector<std::pair<s_item_pointer_t, s_item_pointer_t>> get_payload_ranges() const

Return a list of contiguous ranges of payload that were received.

This is intended for special cases where a custom memory allocator was used to channel the payload into a caller-managed area, so that the caller knows which parts of that area have been filled in.

If keep_payload_ranges was false in the constructor, returns an empty list.

s_item_pointer_t get_cnt() const

Get heap ID.

const flavour &get_flavour() const

Get protocol flavour used.

const std::vector<item> &get_items() const

Get the items from the heap.

This includes descriptors, but excludes any items with ID <= 4.

bool is_ctrl_item(ctrl_mode value) const

Convenience function to check whether any of the items is a STREAM_CTRL_ID item with value value.

bool is_start_of_stream() const

Convenience function to check whether any of the items is a CTRL_STREAM_START.

bool is_end_of_stream() const

Convenience function to check whether any of the items is a CTRL_STREAM_STOP.

struct spead2::recv::item

An item extracted from a heap.

Subclassed by spead2::recv::item_wrapper

Public Members

s_item_pointer_t id

Item ID.

std::uint8_t *ptr

Start of memory containing value.

std::size_t length

Length of memory.

item_pointer_t immediate_value

The immediate interpreted as an integer (undefined if not immediate)

bool is_immediate

Whether the item is immediate.

struct spead2::descriptor

An unpacked descriptor.

If numpy_header is non-empty, it overrides format and shape.

Public Members

s_item_pointer_t id = 0

SPEAD ID.

std::string name

Short name.

std::string description

Long description.

std::vector<std::pair<char, s_item_pointer_t>> format

Legacy format.

Each element is a specifier character (e.g. ‘u’ for unsigned) and a bit width.

std::vector<s_item_pointer_t> shape

Shape.

Elements are either non-negative, or -1 is used to indicate a variable-length size. At most one dimension may be variable-length.

std::string numpy_header

Description in the format used in .npy files.

Streams

At the lowest level, heaps are given to the application via a callback to a virtual function. While this callback is running, no new packets can be received from the network socket, so this function needs to complete quickly to avoid data loss when using UDP. To use this interface, subclass spead2::recv::stream and implement heap_ready() and optionally override stop_received().

Note that some public functions are incorrectly listed as protected below due to limitations of the documentation tools.

class spead2::recv::stream_config

Parameters for a receive stream.

Public Functions

stream_config &set_max_heaps(std::size_t max_heaps)

Set maximum number of partial heaps that can be live at one time.

This affects how intermingled heaps can be (due to out-of-order packet delivery) before heaps get dropped.

std::size_t get_max_heaps() const

Get maximum number of partial heaps that can be live at one time.

stream_config &set_memory_allocator(std::shared_ptr<memory_allocator> allocator)

Set an allocator to use for allocating heap memory.

const std::shared_ptr<memory_allocator> &get_memory_allocator() const

Get allocator for allocating heap memory.

stream_config &set_memcpy(packet_memcpy_function memcpy)

Set an alternative memcpy function for copying heap payload.

stream_config &set_memcpy(memcpy_function memcpy)

Set an alternative memcpy function for copying heap payload.

stream_config &set_memcpy(memcpy_function_id id)

Set builtin memcpy function to use for copying heap payload.

packet_memcpy_function get_memcpy() const

Get memcpy function for copying heap payload.

stream_config &set_stop_on_stop_item(bool stop)

Set whether to stop the stream when a stop item is received.

bool get_stop_on_stop_item() const

Get whether to stop the stream when a stop item is received.

stream_config &set_allow_unsized_heaps(bool allow)

Set whether to allow heaps without HEAP_LENGTH.

bool get_allow_unsized_heaps() const

Get whether to allow heaps without HEAP_LENGTH.

stream_config &set_allow_out_of_order(bool allow)

Set whether to allow out-of-order packets within a heap.

bool get_allow_out_of_order() const

Get whether to allow out-of-order packets within a heap.

stream_config &set_bug_compat(bug_compat_mask bug_compat)

Set bug compatibility flags.

bug_compat_mask get_bug_compat() const

Get bug compatibility flags.

class spead2::recv::stream : protected spead2::recv::stream_base

Stream that is fed by subclasses of reader.

The public interface to this class is thread-safe.

Subclassed by spead2::recv::ring_stream_base

Public Functions

template<typename T, typename ...Args>
void emplace_reader(Args&&... args)

Add a new reader by passing its constructor arguments, excluding the initial stream argument.

void stop()

Stop the stream and block until all the readers have wound up.

After calling this there should be no more outstanding completion handlers in the thread pool.

In most cases subclasses should override stop_received rather than this function. However, if heap_ready can block indefinitely, this function should be overridden to unblock it before calling the base implementation.

Protected Functions

const stream_config &get_config() const

Get the stream’s configuration.

void flush()

Flush the collection of live heaps, passing them to heap_ready.

stream_stats get_stats() const

Return statistics about the stream.

See the Python documentation.

struct spead2::recv::stream_stats

Statistics about a stream.

Not all fields are relevant for all stream types.

Public Members

std::uint64_t heaps = 0

Total number of heaps passed to stream_base::heap_ready.

std::uint64_t incomplete_heaps_evicted = 0

Number of incomplete heaps that were evicted from the buffer to make room for new data.

std::uint64_t incomplete_heaps_flushed = 0

Number of incomplete heaps that were emitted by stream::flush.

These are typically heaps that were in-flight when the stream stopped.

std::uint64_t packets = 0

Number of packets received.

std::uint64_t batches = 0

Number of batches of packets.

std::uint64_t worker_blocked = 0

Number of times a worker thread was blocked because the ringbuffer was full.

Only applicable to ring_stream.

std::size_t max_batch = 0

Maximum number of packets received as a unit.

This is only applicable to readers that support fetching a batch of packets from the source.

std::uint64_t single_packet_heaps = 0

Number of heaps that were entirely contained in one packet.

std::uint64_t search_dist = 0

Total number of hash table probes.

A potentially more convenient interface is spead2::recv::ring_stream<Ringbuffer>, which places received heaps into a fixed-size thread-safe ring buffer. Another thread can then pull from this ring buffer in a loop. The template parameter selects the ringbuffer implementation. The default is a good light-weight choice, but if you need to use select()-like functions to wait for data, you can use spead2::ringbuffer<spead2::recv::live_heap, spead2::semaphore_fd, spead2::semaphore>.

class spead2::recv::ring_stream_config

Parameters for configuring ring_stream.

Subclassed by spead2::recv::ring_stream_config_wrapper

Public Functions

ring_stream_config &set_heaps(std::size_t heaps)

Set capacity of the ring buffer.

std::size_t get_heaps() const

Get capacity of the ring buffer.

ring_stream_config &set_contiguous_only(bool contiguous_only)

Set whether only contiguous heaps are pushed to the ring buffer.

bool get_contiguous_only() const

Get whether only contiguous heaps are pushed to the ring buffer.

template<typename Ringbuffer = ringbuffer<live_heap>>
class spead2::recv::ring_stream : public spead2::recv::ring_stream_base

Specialisation of stream that pushes its results into a ringbuffer.

The ringbuffer class may be replaced, but must provide the same interface as ringbuffer. If the ring buffer fills up, add_packet will block the reader.

On the consumer side, heaps are automatically frozen as they are extracted.

This class is thread-safe.

Public Functions

ring_stream(io_service_ref io_service, const stream_config &config = stream_config(), const ring_stream_config &ring_config = ring_stream_config())

Constructor.

Parameters
  • io_service: I/O service (also used by the readers).

  • config: Stream configuration

  • ring_config: Ringbuffer configuration

heap pop()

Wait until a contiguous heap is available, freeze it, and return it; or until the stream is stopped.

Exceptions
  • ringbuffer_stopped: if stop has been called and there are no more contiguous heaps.

live_heap pop_live()

Wait until a heap is available and return it; or until the stream is stopped.

Exceptions
  • ringbuffer_stopped: if stop has been called and there are no more heaps.

heap try_pop()

Like pop, but if no contiguous heap is available, throws spead2::ringbuffer_empty.

Exceptions
  • ringbuffer_empty: if there is no contiguous heap available, but the stream has not been stopped

  • ringbuffer_stopped: if stop has been called and there are no more contiguous heaps.

live_heap try_pop_live()

Like pop_live, but if no heap is available, throws spead2::ringbuffer_empty.

Exceptions
  • ringbuffer_empty: if there is no heap available, but the stream has not been stopped

  • ringbuffer_stopped: if stop has been called and there are no more heaps.

const ring_stream_config &get_ring_config() const

Get the ringbuffer configuration passed to the constructor.

Readers

Reader classes are constructed inside a stream by calling spead2::recv::stream::emplace_reader().

class spead2::recv::udp_reader : public spead2::recv::udp_reader_base

Asynchronous stream reader that receives packets over UDP.

Public Functions

udp_reader(stream &owner, const boost::asio::ip::udp::endpoint &endpoint, std::size_t max_size = default_max_size, std::size_t buffer_size = default_buffer_size)

Constructor.

If endpoint is a multicast address, then this constructor will subscribe to the multicast group, and also set SO_REUSEADDR so that multiple sockets can be subscribed to the multicast group.

Parameters
  • owner: Owning stream

  • endpoint: Address on which to listen

  • max_size: Maximum packet size that will be accepted.

  • buffer_size: Requested socket buffer size. Note that the operating system might not allow a buffer size as big as the default.

udp_reader(stream &owner, const boost::asio::ip::udp::endpoint &endpoint, std::size_t max_size, std::size_t buffer_size, const boost::asio::ip::address &interface_address)

Constructor with explicit interface address (IPv4 only).

This overload is designed for use with multicast, but can also be used with a unicast endpoint as long as the address matches the interface address.

When a multicast group is used, the socket will have SO_REUSEADDR set, so that multiple sockets can all listen to the same multicast stream. If you want to let the system pick the interface for the multicast subscription, use boost::asio::ip::address_v4::any(), or use the default constructor.

Parameters
  • owner: Owning stream

  • endpoint: Address and port

  • max_size: Maximum packet size that will be accepted.

  • buffer_size: Requested socket buffer size.

  • interface_address: Address of the interface which should join the group

Exceptions
  • std::invalid_argument: If endpoint is not an IPv4 multicast address and does not match interface_address.

  • std::invalid_argument: If interface_address is not an IPv4 address

udp_reader(stream &owner, const boost::asio::ip::udp::endpoint &endpoint, std::size_t max_size, std::size_t buffer_size, unsigned int interface_index)

Constructor with explicit multicast interface index (IPv6 only).

The socket will have SO_REUSEADDR set, so that multiple sockets can all listen to the same multicast stream. If you want to let the system pick the interface for the multicast subscription, set interface_index to 0, or use the standard constructor.

See

if_nametoindex(3)

Parameters
  • owner: Owning stream

  • endpoint: Multicast group and port

  • max_size: Maximum packet size that will be accepted.

  • buffer_size: Requested socket buffer size.

  • interface_index: Address of the interface which should join the group

udp_reader(stream &owner, boost::asio::ip::udp::socket &&socket, std::size_t max_size = default_max_size)

Constructor using an existing socket.

This allows socket options (e.g., multicast subscriptions) to be fine-tuned by the caller. The socket must already be bound to the desired endpoint. There is no special handling of multicast subscriptions or socket buffer sizes here.

Parameters
  • owner: Owning stream

  • socket: Existing socket which will be taken over. It must use the same I/O service as owner.

  • max_size: Maximum packet size that will be accepted.

class spead2::recv::tcp_reader : public spead2::recv::reader

Asynchronous stream reader that receives packets over TCP.

Public Functions

tcp_reader(stream &owner, const boost::asio::ip::tcp::endpoint &endpoint, std::size_t max_size = default_max_size, std::size_t buffer_size = default_buffer_size)

Constructor.

Parameters
  • owner: Owning stream

  • endpoint: Address on which to listen

  • max_size: Maximum packet size that will be accepted.

  • buffer_size: Requested socket buffer size. Note that the operating system might not allow a buffer size as big as the default.

tcp_reader(stream &owner, boost::asio::ip::tcp::acceptor &&acceptor, std::size_t max_size = default_max_size)

Constructor using an existing acceptor object.

This allows acceptor objects to be created and fine-tuned by users before handing them over. The acceptor object must be already bound.

Parameters
  • owner: Owning stream

  • acceptor: Acceptor object, must be bound

  • max_size: Maximum packet size that will be accepted.

Private Functions

tcp_reader(stream &owner, boost::asio::ip::tcp::acceptor &&acceptor, std::size_t max_size, std::size_t buffer_size)

Base constructor, used by the other constructors.

Parameters
  • owner: Owning stream

  • acceptor: Acceptor object, must be bound

  • max_size: Maximum packet size that will be accepted.

  • buffer_size: Requested socket buffer size. Note that the operating system might not allow a buffer size as big as the default.

class mem_reader : public spead2::recv::reader

Reader class that feeds data from a memory buffer to a stream.

The caller must ensure that the underlying memory buffer is not destroyed before this class.

Note

For simple cases, use mem_to_stream instead. This class is only necessary if one wants to plug in to a stream.

Subclassed by spead2::recv::buffer_reader

class spead2::recv::udp_pcap_file_reader : public spead2::recv::udp_reader_base

Reader class that feeds data from a pcap file to a stream.

Public Functions

udp_pcap_file_reader(stream &owner, const std::string &filename)

Constructor.

Parameters
  • owner: Owning stream

  • filename: Filename of the capture file

Exceptions
  • std::runtime_error: if filename could not read

Memory allocators

In addition to the memory allocators described in Memory allocators, new allocators can be created by subclassing spead2::memory_allocator. For an allocator set on a stream, a pointer to a spead2::recv::packet_header is passed as a hint to the allocator, allowing memory to be placed according to information in the packet. Note that for unreliable transports this could be any packet from the heap, and you should not rely on it being the initial packet.

class spead2::memory_allocator : public std::enable_shared_from_this<memory_allocator>

Polymorphic class for managing memory allocations in a memory pool.

This can be overloaded to provide custom memory allocations.

Subclassed by spead2::memory_pool, spead2::mmap_allocator, spead2::unittest::mock_allocator

Public Functions

pointer allocate(std::size_t size, void *hint)

Allocate size bytes of memory.

The default implementation uses new and pre-faults the memory.

The pointer type includes a custom deleter that takes a void * argument, which will be passed to free. This can be used to pass extra information that is needed to free the memory. It is guaranteed that the base class will set this to nullptr.

Return

Pointer to newly allocated memory

Parameters
  • size: Number of bytes to allocate

  • hint: Usage-dependent extra information

Exceptions
  • std::bad_alloc: if allocation failed

Private Functions

void free(std::uint8_t *ptr, void *user)

Free memory previously returned from allocate.

Parameters
  • ptr: Value returned by allocate

  • user: User-defined handle stored in the deleter by allocate

Custom memory scatter

In specialised high-bandwidth cases, the overhead of assembling heaps in temporary storage before scattering the data into other arrangements can be very high. It is possible (since 1.11) to take complete control over the transfer of the payload of the SPEAD packets. Before embarking on such an approach, be sure you have a good understanding of the SPEAD protocol, particularly packets, heaps, item pointers and payload.

In the simplest case, each heap needs to be written to some special or pre-allocated storage, but in a contiguous fashion. In this case it is sufficient to provide a custom allocator (see above), which will return a pointer to the target storage.

In more complex cases, the contents of each heap, or even each packet, needs to be scattered to discontiguous storage areas. In this case, one can additionally override the memory copy function with set_memcpy() and providing a packet_memcpy_function.

typedef std::function<void(const spead2::memory_allocator::pointer &allocation, const packet_header &packet)> recv::spead2::packet_memcpy_function

It takes a pointer to the start of the heap’s allocation (as returned by the allocator) and the packet metadata. The default implementation is equivalent to the following:

void copy(const spead2::memory_allocator::pointer &allocation, const packet_header &packet)
{
    memcpy(allocation.get() + packet.payload_offset, packet.payload, packet.payload_length);
}

Note that when providing your own memory copy and allocator, you don’t necessarily need the allocator to actually return a pointer to payload memory. It could, for example, populate a structure that guides the copy, and return a pointer to that; or it could return a null pointer. There are some caveats though:

  1. If the sender doesn’t provide the heap length item, then spead2 may need to make multiple allocations of increasing size as the heap grows, and each time it will copy (with standard memcpy, rather than your custom one) the old content to the new. Assuming you aren’t expecting such packets, you can reject them using set_allow_unsized_heaps().

  2. spead2::recv::heap_base::get_items() constructs pointers to the items on the assumption of the default memcpy function, so if your replacement doesn’t copy things to the same place, you obviously won’t be able to use those pointers. Note that get_descriptors() will also not be usable.