Chunking stream groups¶
For an overview, refer to Chunking stream groups. This page is a reference for the C++ API.
-
class spead2::recv::chunk_stream_group_config¶
Configuration for chunk_stream_group.
Public Types
Public Functions
-
chunk_stream_group_config &set_max_chunks(std::size_t max_chunks)¶
Set the maximum number of chunks that can be live at the same time.
A value of 1 means that heaps must be received in order: once a chunk is started, no heaps from a previous chunk will be accepted.
- Throws
std::invalid_argument – if max_chunks is 0.
-
inline std::size_t get_max_chunks() const¶
Return the maximum number of chunks that can be live at the same time.
-
chunk_stream_group_config &set_eviction_mode(eviction_mode eviction_mode_)¶
Set chunk eviction mode. See eviction_mode.
-
inline eviction_mode get_eviction_mode() const¶
Return the current eviction mode.
-
chunk_stream_group_config &set_allocate(chunk_allocate_function allocate)¶
Set the function used to allocate a chunk.
-
inline const chunk_allocate_function &get_allocate() const¶
Get the function used to allocate a chunk.
-
chunk_stream_group_config &set_ready(chunk_ready_function ready)¶
Set the function that is provided with completed chunks.
-
inline const chunk_ready_function &get_ready() const¶
Get the function that is provided with completed chunks.
Public Static Attributes
-
static constexpr std::size_t default_max_chunks = chunk_stream_config::default_max_chunks¶
Default value for set_max_chunks.
-
chunk_stream_group_config &set_max_chunks(std::size_t max_chunks)¶
-
class spead2::recv::chunk_stream_group¶
A holder for a collection of streams that share chunks.
The group owns the component streams, and takes care of stopping and destroying them when the group is stopped or destroyed.
It presents an interface similar to
std::vector
for observing the set of attached streams.The public interface must only be called from one thread at a time, and all streams must be added before any readers are attached to them.
Subclassed by spead2::recv::chunk_stream_ring_group< DataRingbuffer, FreeRingbuffer >
Vector-like access to the streams.
Iterator invalidation rules are the same as for
std::vector
i.e., modifying the set of streams invalidates iterators.-
inline std::size_t size() const¶
Number of streams.
-
inline bool empty() const¶
Whether there are any streams.
-
inline chunk_stream_group_member &operator[](std::size_t index)¶
Get the stream at a given index.
-
inline const chunk_stream_group_member &operator[](std::size_t index) const¶
Get the stream at a given index.
-
iterator begin() noexcept¶
Get an iterator to the first stream.
-
iterator end() noexcept¶
Get an iterator past the last stream.
-
const_iterator begin() const noexcept¶
Get an iterator to the first stream.
-
const_iterator end() const noexcept¶
Get a const iterator past the last stream.
-
const_iterator cbegin() const noexcept¶
Get an iterator to the first stream.
-
const_iterator cend() const noexcept¶
Get a const iterator past the last stream.
Public Functions
-
chunk_stream_group_member &emplace_back(io_service_ref io_service, const stream_config &config, const chunk_stream_config &chunk_config)¶
Add a new stream.
-
template<typename T, typename ...Args>
T &emplace_back(Args&&... args)¶ Add a new stream, possibly of a subclass.
-
virtual void stop()¶
Stop all streams and release all chunks.
-
inline std::size_t size() const¶
-
class spead2::recv::chunk_stream_group_member : private spead2::recv::detail::chunk_stream_state<detail::chunk_manager_group>, public spead2::recv::stream¶
Single single within a group managed by chunk_stream_group.
Public Functions
-
virtual void stop_received() override¶
Shut down the stream.
This calls flush_unlocked. Subclasses may override this to achieve additional effects, but must chain to the base implementation. It is guaranteed that it will only be called once.
It is undefined what happens if add_packet is called after a stream is stopped.
This is called with spead2::recv::stream_base::shared_state::queue_mutex locked. Users must not call this function themselves; instead, call stop.
-
virtual void stop() override¶
Stop the stream.
After this returns, the io_service may still have outstanding completion handlers, but they should be no-ops when they’re called.
In most cases subclasses should override stop_received rather than this function. However, if heap_ready can block indefinitely, this function should be overridden to unblock it before calling the base implementation.
Protected Functions
-
inline const stream_config &get_config() const¶
Get the stream’s configuration.
-
void flush()¶
Flush the collection of live heaps, passing them to heap_ready.
-
stream_stats get_stats() const¶
Return statistics about the stream.
See the Python documentation.
Private Functions
-
stream_config adjust_config(const stream_config &config)¶
Compute the config to pass down to spead2::recv::stream.
-
std::pair<std::uint8_t*, heap_metadata> allocate(std::size_t size, const packet_header &packet)¶
Allocate storage for a heap within a chunk, given a first packet for a heap.
- Returns
A raw pointer for heap storage and context used for actual copies.
-
void flush_chunks()¶
Send all in-flight chunks to the ready callback (not thread-safe)
-
inline const chunk_stream_config &get_chunk_config() const¶
Get the stream’s chunk configuration.
Private Static Functions
-
static const heap_metadata *get_heap_metadata(const memory_allocator::pointer &ptr)¶
Get the metadata associated with a heap payload pointer.
If the pointer was not allocated by a chunk stream, returns
nullptr
.
-
virtual void stop_received() override¶
Ringbuffer convenience API¶
-
template<typename DataRingbuffer = ringbuffer<std::unique_ptr<chunk>>, typename FreeRingbuffer = ringbuffer<std::unique_ptr<chunk>>>
class spead2::recv::chunk_stream_ring_group : public spead2::recv::detail::chunk_ring_pair<DataRingbuffer, FreeRingbuffer>, public spead2::recv::chunk_stream_group¶ Wrapper around chunk_stream_group that uses ringbuffers to manage chunks.
When a fresh chunk is needed, it is retrieved from a ringbuffer of free chunks (the “free ring”). When a chunk is flushed, it is pushed to a “data ring”. These may be shared between groups, but both will be stopped as soon as any of the members streams are stopped. The intended use case is parallel groups that are started and stopped together.
When the group is stopped, the ringbuffers are both stopped, and readied chunks are diverted into a graveyard. The graveyard is then emptied from the thread calling stop. This makes it safe to use chunks that can only safely be freed from the caller’s thread (e.g. a Python thread holding the GIL).
Vector-like access to the streams.
Iterator invalidation rules are the same as for
std::vector
i.e., modifying the set of streams invalidates iterators.-
inline std::size_t size() const¶
Number of streams.
-
inline bool empty() const¶
Whether there are any streams.
-
inline chunk_stream_group_member &operator[](std::size_t index)¶
Get the stream at a given index.
-
inline const chunk_stream_group_member &operator[](std::size_t index) const¶
Get the stream at a given index.
-
iterator begin() noexcept¶
Get an iterator to the first stream.
-
const_iterator begin() const noexcept¶
Get an iterator to the first stream.
-
iterator end() noexcept¶
Get an iterator past the last stream.
-
const_iterator end() const noexcept¶
Get a const iterator past the last stream.
-
const_iterator cbegin() const noexcept¶
Get an iterator to the first stream.
-
const_iterator cend() const noexcept¶
Get a const iterator past the last stream.
Public Functions
-
virtual void stop() override¶
Stop all streams and release all chunks.
-
chunk_allocate_function make_allocate()¶
Create an allocate function that obtains chunks from the free ring.
-
chunk_ready_function make_ready(const chunk_ready_function &orig_ready)¶
Create a ready function that pushes chunks to the data ring.
The orig_ready function is called first.
-
void add_free_chunk(std::unique_ptr<chunk> &&c)¶
Add a chunk to the free ringbuffer.
This takes care of zeroing out the spead2::recv::chunk::present array, and it will suppress the spead2::ringbuffer_stopped error if the free ringbuffer has been stopped (in which case the argument will not have been moved from).
If the free ring is full, it will throw spead2::ringbuffer_full rather than blocking. The free ringbuffer should be constructed with enough slots that this does not happen.
-
inline std::shared_ptr<DataRingbuffer> get_data_ringbuffer() const¶
Retrieve the data ringbuffer passed to the constructor.
-
inline std::shared_ptr<FreeRingbuffer> get_free_ringbuffer() const¶
Retrieve the free ringbuffer passed to the constructor.
-
chunk_stream_group_member &emplace_back(io_service_ref io_service, const stream_config &config, const chunk_stream_config &chunk_config)¶
Add a new stream.
-
inline std::size_t size() const¶