Chunking stream groups
For an overview, refer to Chunking stream groups. This page is a reference for the C++ API.
-
class chunk_stream_group_config
Configuration for chunk_stream_group.
Public Types
Public Functions
-
chunk_stream_group_config &set_max_chunks(std::size_t max_chunks)
Set the maximum number of chunks that can be live at the same time.
A value of 1 means that heaps must be received in order: once a chunk is started, no heaps from a previous chunk will be accepted.
- Throws:
std::invalid_argument – if max_chunks is 0.
-
inline std::size_t get_max_chunks() const
Return the maximum number of chunks that can be live at the same time.
-
chunk_stream_group_config &set_eviction_mode(eviction_mode eviction_mode_)
Set chunk eviction mode. See eviction_mode.
-
inline eviction_mode get_eviction_mode() const
Return the current eviction mode.
-
chunk_stream_group_config &set_allocate(chunk_allocate_function allocate)
Set the function used to allocate a chunk.
-
inline const chunk_allocate_function &get_allocate() const
Get the function used to allocate a chunk.
-
chunk_stream_group_config &set_ready(chunk_ready_function ready)
Set the function that is provided with completed chunks.
-
inline const chunk_ready_function &get_ready() const
Get the function that is provided with completed chunks.
Public Static Attributes
-
static constexpr std::size_t default_max_chunks = chunk_stream_config::default_max_chunks
Default value for set_max_chunks.
-
chunk_stream_group_config &set_max_chunks(std::size_t max_chunks)
-
class chunk_stream_group
A holder for a collection of streams that share chunks.
The group owns the component streams, and takes care of stopping and destroying them when the group is stopped or destroyed.
It presents an interface similar to
std::vector
for observing the set of attached streams.The public interface must only be called from one thread at a time, and all streams must be added before any readers are attached to them.
Subclassed by spead2::recv::chunk_stream_ring_group< DataRingbuffer, FreeRingbuffer >
Vector-like access to the streams.
Iterator invalidation rules are the same as for
std::vector
i.e., modifying the set of streams invalidates iterators.-
inline std::size_t size() const
Number of streams.
-
inline bool empty() const
Whether there are any streams.
-
inline chunk_stream_group_member &operator[](std::size_t index)
Get the stream at a given index.
-
inline const chunk_stream_group_member &operator[](std::size_t index) const
Get the stream at a given index.
-
const_iterator begin() const noexcept
Get an iterator to the first stream.
-
const_iterator end() const noexcept
Get a const iterator past the last stream.
-
const_iterator cbegin() const noexcept
Get an iterator to the first stream.
-
const_iterator cend() const noexcept
Get a const iterator past the last stream.
Public Functions
-
chunk_stream_group_member &emplace_back(io_service_ref io_service, const stream_config &config, const chunk_stream_config &chunk_config)
Add a new stream.
-
template<typename T, typename ...Args>
T &emplace_back(Args&&... args) Add a new stream, possibly of a subclass.
-
virtual void stop()
Stop all streams and release all chunks.
-
inline std::size_t size() const
-
class chunk_stream_group_member : private spead2::recv::detail::chunk_stream_state<detail::chunk_manager_group>, public spead2::recv::stream
Single single within a group managed by chunk_stream_group.
Public Functions
-
virtual void stop_received() override
Shut down the stream.
This calls flush_unlocked. Subclasses may override this to achieve additional effects, but must chain to the base implementation. It is guaranteed that it will only be called once.
It is undefined what happens if add_packet is called after a stream is stopped.
This is called with spead2::recv::stream_base::shared_state::queue_mutex locked. Users must not call this function themselves; instead, call stop.
-
virtual void stop() override
Stop the stream.
After this returns, the io_service may still have outstanding completion handlers, but they should be no-ops when they’re called.
In most cases subclasses should override stop_received rather than this function. However, if heap_ready can block indefinitely, this function should be overridden to unblock it before calling the base implementation.
-
template<typename T, typename ...Args>
inline void emplace_reader(Args&&... args) Add a new reader by passing its constructor arguments, excluding the initial io_service and owner arguments.
-
void start()
Start the stream.
This is only needed if the config specifies explicit start (see stream_config::set_explicit_start). In that case, no new readers can be added after starting the stream.
-
inline const stream_config &get_config() const
Get the stream’s configuration.
-
stream_stats get_stats() const
Return statistics about the stream.
See the Python documentation.
Protected Functions
-
inline const stream_config &get_config() const
Get the stream’s configuration.
-
stream_stats get_stats() const
Return statistics about the stream.
See the Python documentation.
-
void flush()
Flush the collection of live heaps, passing them to heap_ready.
Private Functions
-
stream_config adjust_config(const stream_config &config)
Compute the config to pass down to spead2::recv::stream.
-
std::pair<std::uint8_t*, heap_metadata> allocate(std::size_t size, const packet_header &packet)
Allocate storage for a heap within a chunk, given a first packet for a heap.
- Returns:
A raw pointer for heap storage and context used for actual copies.
-
void flush_chunks()
Send all in-flight chunks to the ready callback (not thread-safe)
-
inline const chunk_stream_config &get_chunk_config() const
Get the stream’s chunk configuration.
Private Static Functions
-
static const heap_metadata *get_heap_metadata(const memory_allocator::pointer &ptr)
Get the metadata associated with a heap payload pointer.
If the pointer was not allocated by a chunk stream, returns
nullptr
.
-
virtual void stop_received() override
Ringbuffer convenience API
-
template<typename DataRingbuffer = ringbuffer<std::unique_ptr<chunk>>, typename FreeRingbuffer = ringbuffer<std::unique_ptr<chunk>>>
class chunk_stream_ring_group : public detail::chunk_ring_pair<ringbuffer<std::unique_ptr<chunk>>, ringbuffer<std::unique_ptr<chunk>>>, public spead2::recv::chunk_stream_group Wrapper around chunk_stream_group that uses ringbuffers to manage chunks.
When a fresh chunk is needed, it is retrieved from a ringbuffer of free chunks (the “free ring”). When a chunk is flushed, it is pushed to a “data ring”. These may be shared between groups, but both will be stopped as soon as any of the members streams are stopped. The intended use case is parallel groups that are started and stopped together.
When the group is stopped, the ringbuffers are both stopped, and readied chunks are diverted into a graveyard. The graveyard is then emptied from the thread calling stop. This makes it safe to use chunks that can only safely be freed from the caller’s thread (e.g. a Python thread holding the GIL).
Vector-like access to the streams.
Iterator invalidation rules are the same as for
std::vector
i.e., modifying the set of streams invalidates iterators.-
inline std::size_t size() const
Number of streams.
-
inline bool empty() const
Whether there are any streams.
-
inline chunk_stream_group_member &operator[](std::size_t index)
Get the stream at a given index.
-
inline const chunk_stream_group_member &operator[](std::size_t index) const
Get the stream at a given index.
-
const_iterator begin() const noexcept
Get an iterator to the first stream.
-
const_iterator end() const noexcept
Get a const iterator past the last stream.
-
const_iterator cbegin() const noexcept
Get an iterator to the first stream.
-
const_iterator cend() const noexcept
Get a const iterator past the last stream.
Public Functions
-
virtual void stop() override
Stop all streams and release all chunks.
-
chunk_stream_group_member &emplace_back(io_service_ref io_service, const stream_config &config, const chunk_stream_config &chunk_config)
Add a new stream.
-
inline std::size_t size() const