Chunking receiver

Warning

This feature is experimental. Future releases of spead2 may change it in backwards-incompatible ways, and it could even be removed entirely.

For an overview, refer to Chunking receiver. This page is a reference for the C++ API.

struct spead2::recv::chunk_place_data

Data passed to chunk_place_function.

This structure is designed to be a plain C structure that can easily be handled by language bindings. As far as possible, new fields will be added to the end but existing fields will be retained, to preserve ABI compatibility.

Public Members

const std::uint8_t *packet

Pointer to the original packet data.

std::size_t packet_size

Number of bytes referenced by packet.

const s_item_pointer_t *items

Values of requested item pointers.

std::int64_t chunk_id

Chunk ID (output). Set to -1 (or leave unmodified) to discard the heap.

std::size_t heap_index

Number of this heap within the chunk (output)

std::size_t heap_offset

Byte offset of this heap within the chunk payload (output)

typedef std::function<void(chunk_place_data *data, std::size_t chunk_place_data)> spead2::recv::chunk_place_function

Callback to determine where a heap is placed in the chunk stream.

See

chunk_place_data

Parameters
  • data – Pointer to the input and output arguments.

  • data_sizesizeof(chunk_place_data) at the time spead2 was compiled

typedef std::function<std::unique_ptr<chunk>(std::int64_t chunk_id)> chunk_allocate_function

Callback to obtain storage for a new chunk.

typedef std::function<void(std::unique_ptr<chunk>&&)> spead2::recv::chunk_ready_function

Callback to receive a completed chunk.

It takes ownership of the chunk.

class spead2::recv::chunk

Storage for a chunk with metadata.

Subclassed by spead2::recv::chunk_wrapper

Public Members

std::int64_t chunk_id = -1

Chunk ID.

std::uintptr_t stream_id = 0

Stream ID of the stream from which the chunk originated.

memory_allocator::pointer present

Flag array indicating which heaps have been received (one byte per heap)

std::size_t present_size = 0

Number of elements in present.

memory_allocator::pointer data

Chunk payload.

class spead2::recv::chunk_stream_config

Parameters for a chunk_stream.

Public Functions

chunk_stream_config &set_items(const std::vector<item_pointer_t> &item_ids)

Specify the items whose immediate values should be passed to the place function (see chunk_place_function).

inline const std::vector<item_pointer_t> &get_items() const

Get the items set with set_items.

chunk_stream_config &set_max_chunks(std::size_t max_chunks)

Set the maximum number of chunks that can be live at the same time.

A value of 1 means that heaps must be received in order: once a chunk is started, no heaps from a previous chunk will be accepted.

Throws

std::invalid_argument – if max_chunks is 0.

inline std::size_t get_max_chunks() const

Return the maximum number of chunks that can be live at the same time.

chunk_stream_config &set_place(chunk_place_function place)

Set the function used to determine the chunk of each heap and its placement within the chunk.

inline const chunk_place_function &get_place() const

Get the function used to determine the chunk of each heap and its placement within the chunk.

chunk_stream_config &set_allocate(chunk_allocate_function allocate)

Set the function used to allocate a chunk.

inline const chunk_allocate_function &get_allocate() const

Get the function used to allocate a chunk.

chunk_stream_config &set_ready(chunk_ready_function ready)

Set the function that is provided with completed chunks.

inline const chunk_ready_function &get_ready() const

Get the function that is provided with completed chunks.

chunk_stream_config &enable_packet_presence(std::size_t payload_size)

Enable the packet presence feature.

The payload offset of each packet is divided by payload_size and added to the heap index before indexing spead2::recv::chunk::present.

Throws

std::invalid_argument – if payload_size is zero.

chunk_stream_config &disable_packet_presence()

Disable the packet presence feature enabled by enable_packet_presence.

inline std::size_t get_packet_presence_payload_size() const

Retrieve the payload_size if packet presence is enabled, or 0 if not.

Public Static Attributes

static constexpr std::size_t default_max_chunks = 2

Default value for set_max_chunks.

class spead2::recv::chunk_stream : private spead2::recv::detail::chunk_stream_state, public spead2::recv::stream

Stream that writes incoming heaps into chunks.

Subclassed by spead2::recv::chunk_ring_stream< chunk_ringbuffer, chunk_ringbuffer >, spead2::recv::chunk_ring_stream< DataRingbuffer, FreeRingbuffer >

Public Functions

chunk_stream(io_service_ref io_service, const stream_config &config, const chunk_stream_config &chunk_config)

Constructor.

This class passes a modified config to the base class constructor. This is reflected in the return of get_config. In particular:

  • The allow unsized heaps setting is forced to false.

  • The memcpy function may be overridden, although the provided function is still used when a copy happens. Use ptr.get_deleter().get_user() to get a pointer to heap_metadata, from which the chunk can be retrieved.

  • The memory allocator is overridden, and the provided value is ignored.

Parameters
  • io_service – I/O service (also used by the readers).

  • config – Basic stream configuration

  • chunk_config – Configuration for chunking

Throws

invalid_value – if any of the function pointers in chunk_config have not been set.

Private Functions

inline const chunk_stream_config &get_chunk_config() const

Get the stream’s chunk configuration.

Ringbuffer convenience API

template<typename DataRingbuffer = ringbuffer<std::unique_ptr<chunk>>, typename FreeRingbuffer = ringbuffer<std::unique_ptr<chunk>>>
class spead2::recv::chunk_ring_stream : public spead2::recv::chunk_stream

Wrapper around chunk_stream that uses ringbuffers to manage chunks.

When a fresh chunk is needed, it is retrieved from a ringbuffer of free chunks (the “free ring”). When a chunk is flushed, it is pushed to a “data ring”. These may be shared between streams, but both will be stopped as soon as any of the streams using them are stopped. The intended use case is parallel streams that are started and stopped together.

When stop is called, any in-flight chunks (that are not in either of the ringbuffers) will be freed from the thread that called stop.

It’s important to note that the free ring is also stopped if the stream is stopped by a stream control item. The user must thus be prepared to deal gracefully with a ringbuffer_stopped exception when pushing to the free ring.

Public Functions

chunk_ring_stream(io_service_ref io_service, const stream_config &config, const chunk_stream_config &chunk_config, std::shared_ptr<DataRingbuffer> data_ring, std::shared_ptr<FreeRingbuffer> free_ring)

Constructor.

Refer to chunk_stream::chunk_stream for details.

The allocate and ready callbacks are ignored and should be unset. Calling get_chunk_config will reflect the internally-defined callbacks.

void add_free_chunk(std::unique_ptr<chunk> &&c)

Add a chunk to the free ringbuffer.

This takes care of zeroing out the spead2::recv::chunk::present array, and it will suppress the spead2::ringbuffer_stopped error if the free ringbuffer has been stopped (in which case the argument will not have been moved from).

If the free ring is full, it will throw spead2::ringbuffer_full rather than blocking. The free ringbuffer should be constructed with enough slots that this does not happen.

inline std::shared_ptr<DataRingbuffer> get_data_ringbuffer() const

Retrieve the data ringbuffer passed to the constructor.

inline std::shared_ptr<FreeRingbuffer> get_free_ringbuffer() const

Retrieve the free ringbuffer passed to the constructor.