Chunking receiver¶
Warning
This feature is experimental. Future releases of spead2 may change it in backwards-incompatible ways, and it could even be removed entirely.
For an overview, refer to Chunking receiver. This page is a reference for the C++ API.
-
struct spead2::recv::chunk_place_data¶
Data passed to chunk_place_function.
This structure is designed to be a plain C structure that can easily be handled by language bindings. As far as possible, new fields will be added to the end but existing fields will be retained, to preserve ABI compatibility.
Public Members
-
const std::uint8_t *packet¶
Pointer to the original packet data.
-
const s_item_pointer_t *items¶
Values of requested item pointers.
-
std::int64_t chunk_id¶
Chunk ID (output). Set to -1 (or leave unmodified) to discard the heap.
-
std::size_t heap_index¶
Number of this heap within the chunk (output)
-
std::size_t heap_offset¶
Byte offset of this heap within the chunk payload (output)
-
std::uint64_t *batch_stats¶
Pointer to staging area for statistics.
-
const std::uint8_t *packet¶
-
typedef std::function<void(chunk_place_data *data, std::size_t data_size)> spead2::recv::chunk_place_function¶
Callback to determine where a heap is placed in the chunk stream.
- Parameters
data – Pointer to the input and output arguments.
data_size –
sizeof(chunk_place_data)
at the time spead2 was compiled
-
typedef std::function<std::unique_ptr<chunk>(std::int64_t chunk_id, std::uint64_t *batch_stats)> chunk_allocate_function¶
Callback to obtain storage for a new chunk.
-
typedef std::function<void(std::unique_ptr<chunk>&&, std::uint64_t *batch_stats)> spead2::recv::chunk_ready_function¶
Callback to receive a completed chunk.
It takes ownership of the chunk.
-
class spead2::recv::chunk¶
Storage for a chunk with metadata.
Subclassed by spead2::recv::chunk_wrapper
Public Members
-
std::int64_t chunk_id = -1¶
Chunk ID.
-
std::uintptr_t stream_id = 0¶
Stream ID of the stream from which the chunk originated.
-
memory_allocator::pointer present¶
Flag array indicating which heaps have been received (one byte per heap)
-
memory_allocator::pointer data¶
Chunk payload.
-
std::int64_t chunk_id = -1¶
-
class spead2::recv::chunk_stream_config¶
Parameters for a chunk_stream.
Public Functions
-
chunk_stream_config &set_items(const std::vector<item_pointer_t> &item_ids)¶
Specify the items whose immediate values should be passed to the place function (see chunk_place_function).
-
chunk_stream_config &set_max_chunks(std::size_t max_chunks)¶
Set the maximum number of chunks that can be live at the same time.
A value of 1 means that heaps must be received in order: once a chunk is started, no heaps from a previous chunk will be accepted.
- Throws
std::invalid_argument – if max_chunks is 0.
-
inline std::size_t get_max_chunks() const¶
Return the maximum number of chunks that can be live at the same time.
-
chunk_stream_config &set_place(chunk_place_function place)¶
Set the function used to determine the chunk of each heap and its placement within the chunk.
-
inline const chunk_place_function &get_place() const¶
Get the function used to determine the chunk of each heap and its placement within the chunk.
-
chunk_stream_config &set_allocate(chunk_allocate_function allocate)¶
Set the function used to allocate a chunk.
-
inline const chunk_allocate_function &get_allocate() const¶
Get the function used to allocate a chunk.
-
chunk_stream_config &set_ready(chunk_ready_function ready)¶
Set the function that is provided with completed chunks.
-
inline const chunk_ready_function &get_ready() const¶
Get the function that is provided with completed chunks.
-
chunk_stream_config &enable_packet_presence(std::size_t payload_size)¶
Enable the packet presence feature.
The payload offset of each packet is divided by payload_size and added to the heap index before indexing spead2::recv::chunk::present.
- Throws
std::invalid_argument – if payload_size is zero.
-
chunk_stream_config &disable_packet_presence()¶
Disable the packet presence feature enabled by enable_packet_presence.
-
inline std::size_t get_packet_presence_payload_size() const¶
Retrieve the
payload_size
if packet presence is enabled, or 0 if not.
Public Static Attributes
-
static constexpr std::size_t default_max_chunks = 2¶
Default value for set_max_chunks.
-
chunk_stream_config &set_items(const std::vector<item_pointer_t> &item_ids)¶
-
class spead2::recv::chunk_stream : private spead2::recv::detail::chunk_stream_state, public spead2::recv::stream¶
Stream that writes incoming heaps into chunks.
Subclassed by spead2::recv::chunk_ring_stream< chunk_ringbuffer, chunk_ringbuffer >, spead2::recv::chunk_ring_stream< DataRingbuffer, FreeRingbuffer >
Public Functions
-
chunk_stream(io_service_ref io_service, const stream_config &config, const chunk_stream_config &chunk_config)¶
Constructor.
This class passes a modified config to the base class constructor. This is reflected in the return of get_config. In particular:
The allow unsized heaps setting is forced to false.
The memcpy function may be overridden, although the provided function is still used when a copy happens. Use get_heap_metadata to get a pointer to heap_metadata, from which the chunk can be retrieved.
The memory allocator is overridden, and the provided value is ignored.
Additional statistics are registered:
too_old_heaps
: number of heaps for which the placement function returned a non-negative chunk ID that was behind the window.rejected_heaps
: number of heaps for which the placement function returned a negative chunk ID.
- Parameters
io_service – I/O service (also used by the readers).
config – Basic stream configuration
chunk_config – Configuration for chunking
- Throws
invalid_value – if any of the function pointers in chunk_config have not been set.
Private Functions
-
inline const chunk_stream_config &get_chunk_config() const¶
Get the stream’s chunk configuration.
Private Static Functions
-
static const heap_metadata *get_heap_metadata(const memory_allocator::pointer &ptr)¶
Get the heap_metadata associated with a heap payload pointer.
If the pointer was not allocated by a chunk stream, returns
nullptr
.
-
chunk_stream(io_service_ref io_service, const stream_config &config, const chunk_stream_config &chunk_config)¶
Ringbuffer convenience API¶
-
template<typename DataRingbuffer = ringbuffer<std::unique_ptr<chunk>>, typename FreeRingbuffer = ringbuffer<std::unique_ptr<chunk>>>
class spead2::recv::chunk_ring_stream : public spead2::recv::chunk_stream¶ Wrapper around chunk_stream that uses ringbuffers to manage chunks.
When a fresh chunk is needed, it is retrieved from a ringbuffer of free chunks (the “free ring”). When a chunk is flushed, it is pushed to a “data ring”. These may be shared between streams, but both will be stopped as soon as any of the streams using them are stopped. The intended use case is parallel streams that are started and stopped together.
When stop is called, any in-flight chunks (that are not in either of the ringbuffers) will be freed from the thread that called stop.
It’s important to note that the free ring is also stopped if the stream is stopped by a stream control item. The user must thus be prepared to deal gracefully with a ringbuffer_stopped exception when pushing to the free ring.
Public Functions
Constructor.
Refer to chunk_stream::chunk_stream for details.
The allocate callback is ignored and should be unset. If a ready callback is defined, it will be called before the chunk is pushed onto the ringbuffer. It must not move from the provided unique_ptr, but it can be used to perform further processing on the chunk before it is pushed.
Calling get_chunk_config will reflect the internally-defined callbacks.
-
void add_free_chunk(std::unique_ptr<chunk> &&c)¶
Add a chunk to the free ringbuffer.
This takes care of zeroing out the spead2::recv::chunk::present array, and it will suppress the spead2::ringbuffer_stopped error if the free ringbuffer has been stopped (in which case the argument will not have been moved from).
If the free ring is full, it will throw spead2::ringbuffer_full rather than blocking. The free ringbuffer should be constructed with enough slots that this does not happen.
-
inline std::shared_ptr<DataRingbuffer> get_data_ringbuffer() const¶
Retrieve the data ringbuffer passed to the constructor.
-
inline std::shared_ptr<FreeRingbuffer> get_free_ringbuffer() const¶
Retrieve the free ringbuffer passed to the constructor.