Chunking receiver
For an overview, refer to Chunking receiver. This page is a reference for the C++ API.
-
struct chunk_place_data
Data passed to chunk_place_function.
This structure is designed to be a plain C structure that can easily be handled by language bindings. As far as possible, new fields will be added to the end but existing fields will be retained, to preserve ABI compatibility.
Do not modify any of the pointers in the structure.
Public Members
-
const std::uint8_t *packet
Pointer to the original packet data.
-
s_item_pointer_t *items
Values of requested item pointers.
-
std::int64_t chunk_id
Chunk ID (output). Set to -1 (or leave unmodified) to discard the heap.
-
std::size_t heap_index
Number of this heap within the chunk (output)
-
std::size_t heap_offset
Byte offset of this heap within the chunk payload (output)
-
std::uint64_t *batch_stats
Pointer to staging area for statistics.
-
std::uint8_t *extra
Pointer to data to be copied to chunk::extra.
-
std::size_t extra_offset
Offset within chunk::extra to write.
-
std::size_t extra_size
Number of bytes to copy to chunk::extra.
-
const std::uint8_t *packet
-
typedef std::function<void(chunk_place_data *data, std::size_t data_size)> spead2::recv::chunk_place_function
Callback to determine where a heap is placed in the chunk stream.
See also
- Param data:
Pointer to the input and output arguments.
- Param data_size:
sizeof(chunk_place_data)
at the time spead2 was compiled
-
typedef std::function<std::unique_ptr<chunk>(std::int64_t chunk_id, std::uint64_t *batch_stats)> chunk_allocate_function
Callback to obtain storage for a new chunk.
-
typedef std::function<void(std::unique_ptr<chunk>&&, std::uint64_t *batch_stats)> spead2::recv::chunk_ready_function
Callback to receive a completed chunk.
It takes ownership of the chunk.
-
class chunk
Storage for a chunk with metadata.
Subclassed by spead2::recv::chunk_wrapper
Public Members
-
std::int64_t chunk_id = -1
Chunk ID.
-
std::uintptr_t stream_id = 0
Stream ID of the stream from which the chunk originated.
-
memory_allocator::pointer present
Flag array indicating which heaps have been received (one byte per heap)
-
memory_allocator::pointer data
Chunk payload.
-
memory_allocator::pointer extra
Optional storage area for per-heap metadata.
-
std::int64_t chunk_id = -1
-
class chunk_stream_config
Parameters for a chunk_stream.
Public Functions
-
chunk_stream_config &set_items(const std::vector<item_pointer_t> &item_ids)
Specify the items whose immediate values should be passed to the place function (see chunk_place_function).
-
chunk_stream_config &set_max_chunks(std::size_t max_chunks)
Set the maximum number of chunks that can be live at the same time.
A value of 1 means that heaps must be received in order: once a chunk is started, no heaps from a previous chunk will be accepted.
- Throws:
std::invalid_argument – if max_chunks is 0.
-
inline std::size_t get_max_chunks() const
Return the maximum number of chunks that can be live at the same time.
-
chunk_stream_config &set_place(chunk_place_function place)
Set the function used to determine the chunk of each heap and its placement within the chunk.
-
inline const chunk_place_function &get_place() const
Get the function used to determine the chunk of each heap and its placement within the chunk.
-
chunk_stream_config &set_allocate(chunk_allocate_function allocate)
Set the function used to allocate a chunk.
-
inline const chunk_allocate_function &get_allocate() const
Get the function used to allocate a chunk.
-
chunk_stream_config &set_ready(chunk_ready_function ready)
Set the function that is provided with completed chunks.
-
inline const chunk_ready_function &get_ready() const
Get the function that is provided with completed chunks.
-
chunk_stream_config &enable_packet_presence(std::size_t payload_size)
Enable the packet presence feature.
The payload offset of each packet is divided by payload_size and added to the heap index before indexing spead2::recv::chunk::present.
- Throws:
std::invalid_argument – if payload_size is zero.
-
chunk_stream_config &disable_packet_presence()
Disable the packet presence feature enabled by enable_packet_presence.
-
inline std::size_t get_packet_presence_payload_size() const
Retrieve the
payload_size
if packet presence is enabled, or 0 if not.
-
chunk_stream_config &set_max_heap_extra(std::size_t max_heap_extra)
Set maximum amount of data a placement function may write to chunk_place_data::extra.
-
inline std::size_t get_max_heap_extra() const
Get maximum amount of data a placement function may write to chunk_place_data::extra.
Public Static Attributes
-
static constexpr std::size_t default_max_chunks = 2
Default value for set_max_chunks.
-
chunk_stream_config &set_items(const std::vector<item_pointer_t> &item_ids)
-
class chunk_stream : private spead2::recv::detail::chunk_stream_state<detail::chunk_manager_simple>, public spead2::recv::stream
Stream that writes incoming heaps into chunks.
Subclassed by spead2::recv::chunk_ring_stream< DataRingbuffer, FreeRingbuffer >
Public Functions
-
chunk_stream(io_service_ref io_service, const stream_config &config, const chunk_stream_config &chunk_config)
Constructor.
This class passes a modified config to the base class constructor. This is reflected in the return of get_config. In particular:
The allow unsized heaps setting is forced to false.
The memcpy function may be overridden, although the provided function is still used when a copy happens. Use get_heap_metadata to get a pointer to heap_metadata, from which the chunk can be retrieved.
The memory allocator is overridden, and the provided value is ignored.
Additional statistics are registered:
too_old_heaps
: number of heaps for which the placement function returned a non-negative chunk ID that was behind the window.rejected_heaps
: number of heaps for which the placement function returned a negative chunk ID.
- Parameters:
io_service – I/O service (also used by the readers).
config – Basic stream configuration
chunk_config – Configuration for chunking
- Throws:
invalid_argument – if any of the function pointers in chunk_config have not been set.
Private Functions
-
inline const chunk_stream_config &get_chunk_config() const
Get the stream’s chunk configuration.
Private Static Functions
-
static const heap_metadata *get_heap_metadata(const memory_allocator::pointer &ptr)
Get the metadata associated with a heap payload pointer.
If the pointer was not allocated by a chunk stream, returns
nullptr
.
-
chunk_stream(io_service_ref io_service, const stream_config &config, const chunk_stream_config &chunk_config)
Ringbuffer convenience API
-
template<typename DataRingbuffer = ringbuffer<std::unique_ptr<chunk>>, typename FreeRingbuffer = ringbuffer<std::unique_ptr<chunk>>>
class chunk_ring_stream : public detail::chunk_ring_pair<ringbuffer<std::unique_ptr<chunk>>, ringbuffer<std::unique_ptr<chunk>>>, public spead2::recv::chunk_stream Wrapper around chunk_stream that uses ringbuffers to manage chunks.
When a fresh chunk is needed, it is retrieved from a ringbuffer of free chunks (the “free ring”). When a chunk is flushed, it is pushed to a “data ring”. These may be shared between streams, but both will be stopped as soon as any of the streams using them are stopped. The intended use case is parallel streams that are started and stopped together.
When stop is called, any in-flight chunks (that are not in either of the ringbuffers) will be freed from the thread that called stop.
Public Functions
Constructor.
Refer to chunk_stream::chunk_stream for details.
The allocate callback is ignored and should be unset. If a ready callback is defined, it will be called before the chunk is pushed onto the ringbuffer. It must not move from the provided unique_ptr, but it can be used to perform further processing on the chunk before it is pushed.
Calling get_chunk_config will reflect the internally-defined callbacks.
-
template<typename T, typename DataSemaphore = semaphore, typename SpaceSemaphore = semaphore>
class ringbuffer : public spead2::ringbuffer_base<T> Ring buffer with blocking and non-blocking push and pop.
It supports non-copyable objects using move semantics. The producer may signal that it has finished producing data by calling stop, which will gracefully shut down consumers as well as other producers. This class is fully thread-safe for multiple producers and consumers.
With multiple producers it is sometimes desirable to only stop the ringbuffer once all the producers are finished. To support this, a producer may register itself with add_producer, and indicate completion with remove_producer. If this causes the number of producers to fall to zero, the stream is stopped.
Public Functions
-
void try_push(T &&value)
Append an item to the queue, if there is space.
It uses move semantics, so on success, the original value is undefined.
- Parameters:
value – Value to move
- Throws:
ringbuffer_full – if there is no space
ringbuffer_stopped – if stop has already been called
-
template<typename ...Args>
void try_emplace(Args&&... args) Construct a new item in the queue, if there is space.
- Parameters:
args – Arguments to the constructor
- Throws:
ringbuffer_full – if there is no space
ringbuffer_stopped – if stop has already been called
-
template<typename ...SemArgs>
void push(T &&value, SemArgs&&... sem_args) Append an item to the queue, blocking if necessary.
It uses move semantics, so on success, the original value is undefined.
- Parameters:
value – Value to move
sem_args – Arbitrary arguments to pass to the space semaphore
- Throws:
ringbuffer_stopped – if stop is called first
-
template<typename ...Args>
void emplace(Args&&... args) Construct a new item in the queue, blocking if necessary.
- Parameters:
args – Arguments to the constructor
- Throws:
ringbuffer_stopped – if stop is called first
-
T try_pop()
Retrieve an item from the queue, if there is one.
- Throws:
ringbuffer_stopped – if the queue is empty and stop was called
ringbuffer_empty – if the queue is empty but still active
-
template<typename ...SemArgs>
T pop(SemArgs&&... sem_args) Retrieve an item from the queue, blocking until there is one or until the queue is stopped.
- Parameters:
sem_args – Arbitrary arguments to pass to the data semaphore
- Throws:
ringbuffer_stopped – if the queue is empty and stop was called
-
bool stop()
Indicate that no more items will be produced.
This does not immediately stop consumers if there are still items in the queue; instead, consumers will continue to retrieve remaining items, and will only be signalled once the queue has drained.
- Returns:
whether the ringbuffer was stopped
-
bool remove_producer()
Indicate that a producer registered with add_producer is finished with the ringbuffer.
If this was the last producer, the ringbuffer is stopped.
- Returns:
whether the ringbuffer was stopped
-
inline const DataSemaphore &get_data_sem() const
Get access to the data semaphore.
-
inline const SpaceSemaphore &get_space_sem() const
Get access to the free-space semaphore.
-
detail::ringbuffer_iterator<ringbuffer> begin()
Begin iteration over the items in the ringbuffer.
This does not return a full-blown iterator; it is only intended to be used for a range-based for loop. For example:
for (auto &&item : ringbuffer) { ... }
-
std::size_t capacity() const
Maximum number of items that can be held at once.
-
std::size_t size() const
Return the number of items currently in the ringbuffer.
This should only be used for metrics, not for control flow, as the result could be out of date by the time it is returned.
-
void add_producer()
Register a new producer.
Producers only need to call this if they want to call ringbuffer::remove_producer.
-
void try_push(T &&value)
-
class ringbuffer_empty : public std::runtime_error
Thrown when attempting to do a non-blocking pop from an empty ringbuffer.
-
class ringbuffer_full : public std::runtime_error
Thrown when attempting to do a non-blocking push to a full ringbuffer.
-
class ringbuffer_stopped : public std::runtime_error
Thrown when attempting to do a pop from an empty ringbuffer that has been stopped, or a push to a ringbuffer that has been stopped.