Chunking stream groups

For an overview, refer to Chunking stream groups. This page is a reference for the C++ API.

class chunk_stream_group_config

Configuration for chunk_stream_group.

Public Types

enum class eviction_mode

Eviction mode when it is necessary to advance the group window.

See the overview for more details.

Values:

enumerator LOSSY

force streams to release incomplete chunks

enumerator LOSSLESS

a chunk will only be marked ready when all streams have marked it ready

Public Functions

chunk_stream_group_config &set_max_chunks(std::size_t max_chunks)

Set the maximum number of chunks that can be live at the same time.

A value of 1 means that heaps must be received in order: once a chunk is started, no heaps from a previous chunk will be accepted.

Throws:

std::invalid_argument – if max_chunks is 0.

inline std::size_t get_max_chunks() const

Return the maximum number of chunks that can be live at the same time.

chunk_stream_group_config &set_eviction_mode(eviction_mode eviction_mode_)

Set chunk eviction mode. See eviction_mode.

inline eviction_mode get_eviction_mode() const

Return the current eviction mode.

chunk_stream_group_config &set_allocate(chunk_allocate_function allocate)

Set the function used to allocate a chunk.

inline const chunk_allocate_function &get_allocate() const

Get the function used to allocate a chunk.

chunk_stream_group_config &set_ready(chunk_ready_function ready)

Set the function that is provided with completed chunks.

inline const chunk_ready_function &get_ready() const

Get the function that is provided with completed chunks.

Public Static Attributes

static constexpr std::size_t default_max_chunks = chunk_stream_config::default_max_chunks

Default value for set_max_chunks.

class chunk_stream_group

A holder for a collection of streams that share chunks.

The group owns the component streams, and takes care of stopping and destroying them when the group is stopped or destroyed.

It presents an interface similar to std::vector for observing the set of attached streams.

The public interface must only be called from one thread at a time, and all streams must be added before any readers are attached to them.

Subclassed by spead2::recv::chunk_stream_ring_group< DataRingbuffer, FreeRingbuffer >

Vector-like access to the streams.

Iterator invalidation rules are the same as for std::vector i.e., modifying the set of streams invalidates iterators.

inline std::size_t size() const

Number of streams.

inline bool empty() const

Whether there are any streams.

inline chunk_stream_group_member &operator[](std::size_t index)

Get the stream at a given index.

inline const chunk_stream_group_member &operator[](std::size_t index) const

Get the stream at a given index.

iterator begin() noexcept

Get an iterator to the first stream.

iterator end() noexcept

Get an iterator past the last stream.

const_iterator begin() const noexcept

Get an iterator to the first stream.

const_iterator end() const noexcept

Get a const iterator past the last stream.

const_iterator cbegin() const noexcept

Get an iterator to the first stream.

const_iterator cend() const noexcept

Get a const iterator past the last stream.

Public Functions

chunk_stream_group_member &emplace_back(io_service_ref io_service, const stream_config &config, const chunk_stream_config &chunk_config)

Add a new stream.

template<typename T, typename ...Args>
T &emplace_back(Args&&... args)

Add a new stream, possibly of a subclass.

virtual void stop()

Stop all streams and release all chunks.

class chunk_stream_group_member : private spead2::recv::detail::chunk_stream_state<detail::chunk_manager_group>, public spead2::recv::stream

Single single within a group managed by chunk_stream_group.

Public Functions

virtual void stop_received() override

Shut down the stream.

This calls flush_unlocked. Subclasses may override this to achieve additional effects, but must chain to the base implementation. It is guaranteed that it will only be called once.

It is undefined what happens if add_packet is called after a stream is stopped.

This is called with spead2::recv::stream_base::shared_state::queue_mutex locked. Users must not call this function themselves; instead, call stop.

virtual void stop() override

Stop the stream.

After this returns, the io_service may still have outstanding completion handlers, but they should be no-ops when they’re called.

In most cases subclasses should override stop_received rather than this function. However, if heap_ready can block indefinitely, this function should be overridden to unblock it before calling the base implementation.

template<typename T, typename ...Args>
inline void emplace_reader(Args&&... args)

Add a new reader by passing its constructor arguments, excluding the initial io_service and owner arguments.

void start()

Start the stream.

This is only needed if the config specifies explicit start (see stream_config::set_explicit_start). In that case, no new readers can be added after starting the stream.

inline const stream_config &get_config() const

Get the stream’s configuration.

stream_stats get_stats() const

Return statistics about the stream.

See the Python documentation.

Protected Functions

inline const stream_config &get_config() const

Get the stream’s configuration.

stream_stats get_stats() const

Return statistics about the stream.

See the Python documentation.

void flush()

Flush the collection of live heaps, passing them to heap_ready.

Private Functions

stream_config adjust_config(const stream_config &config)

Compute the config to pass down to spead2::recv::stream.

std::pair<std::uint8_t*, heap_metadata> allocate(std::size_t size, const packet_header &packet)

Allocate storage for a heap within a chunk, given a first packet for a heap.

Returns:

A raw pointer for heap storage and context used for actual copies.

void flush_chunks()

Send all in-flight chunks to the ready callback (not thread-safe)

inline const chunk_stream_config &get_chunk_config() const

Get the stream’s chunk configuration.

Private Static Functions

static const heap_metadata *get_heap_metadata(const memory_allocator::pointer &ptr)

Get the metadata associated with a heap payload pointer.

If the pointer was not allocated by a chunk stream, returns nullptr.

Ringbuffer convenience API

template<typename DataRingbuffer = ringbuffer<std::unique_ptr<chunk>>, typename FreeRingbuffer = ringbuffer<std::unique_ptr<chunk>>>
class chunk_stream_ring_group : public detail::chunk_ring_pair<ringbuffer<std::unique_ptr<chunk>>, ringbuffer<std::unique_ptr<chunk>>>, public spead2::recv::chunk_stream_group

Wrapper around chunk_stream_group that uses ringbuffers to manage chunks.

When a fresh chunk is needed, it is retrieved from a ringbuffer of free chunks (the “free ring”). When a chunk is flushed, it is pushed to a “data ring”. These may be shared between groups, but both will be stopped as soon as any of the members streams are stopped. The intended use case is parallel groups that are started and stopped together.

When the group is stopped, the ringbuffers are both stopped, and readied chunks are diverted into a graveyard. The graveyard is then emptied from the thread calling stop. This makes it safe to use chunks that can only safely be freed from the caller’s thread (e.g. a Python thread holding the GIL).

Vector-like access to the streams.

Iterator invalidation rules are the same as for std::vector i.e., modifying the set of streams invalidates iterators.

inline std::size_t size() const

Number of streams.

inline bool empty() const

Whether there are any streams.

inline chunk_stream_group_member &operator[](std::size_t index)

Get the stream at a given index.

inline const chunk_stream_group_member &operator[](std::size_t index) const

Get the stream at a given index.

iterator begin() noexcept

Get an iterator to the first stream.

const_iterator begin() const noexcept

Get an iterator to the first stream.

iterator end() noexcept

Get an iterator past the last stream.

const_iterator end() const noexcept

Get a const iterator past the last stream.

const_iterator cbegin() const noexcept

Get an iterator to the first stream.

const_iterator cend() const noexcept

Get a const iterator past the last stream.

Public Functions

virtual void stop() override

Stop all streams and release all chunks.

chunk_stream_group_member &emplace_back(io_service_ref io_service, const stream_config &config, const chunk_stream_config &chunk_config)

Add a new stream.

template<typename T, typename ...Args>
T &emplace_back(Args&&... args)

Add a new stream, possibly of a subclass.