Chunking receiver

For an overview, refer to Chunking receiver. This page is a reference for the C++ API.

struct chunk_place_data

Data passed to chunk_place_function.

This structure is designed to be a plain C structure that can easily be handled by language bindings. As far as possible, new fields will be added to the end but existing fields will be retained, to preserve ABI compatibility.

Do not modify any of the pointers in the structure.

Public Members

const std::uint8_t *packet

Pointer to the original packet data.

std::size_t packet_size

Number of bytes referenced by packet.

s_item_pointer_t *items

Values of requested item pointers.

std::int64_t chunk_id

Chunk ID (output). Set to -1 (or leave unmodified) to discard the heap.

std::size_t heap_index

Number of this heap within the chunk (output)

std::size_t heap_offset

Byte offset of this heap within the chunk payload (output)

std::uint64_t *batch_stats

Pointer to staging area for statistics.

std::uint8_t *extra

Pointer to data to be copied to chunk::extra.

std::size_t extra_offset

Offset within chunk::extra to write.

std::size_t extra_size

Number of bytes to copy to chunk::extra.

typedef std::function<void(chunk_place_data *data, std::size_t data_size)> spead2::recv::chunk_place_function

Callback to determine where a heap is placed in the chunk stream.

See also

chunk_place_data

Param data:

Pointer to the input and output arguments.

Param data_size:

sizeof(chunk_place_data) at the time spead2 was compiled

typedef std::function<std::unique_ptr<chunk>(std::int64_t chunk_id, std::uint64_t *batch_stats)> chunk_allocate_function

Callback to obtain storage for a new chunk.

typedef std::function<void(std::unique_ptr<chunk>&&, std::uint64_t *batch_stats)> spead2::recv::chunk_ready_function

Callback to receive a completed chunk.

It takes ownership of the chunk.

class chunk

Storage for a chunk with metadata.

Subclassed by spead2::recv::chunk_wrapper

Public Members

std::int64_t chunk_id = -1

Chunk ID.

std::uintptr_t stream_id = 0

Stream ID of the stream from which the chunk originated.

memory_allocator::pointer present

Flag array indicating which heaps have been received (one byte per heap)

std::size_t present_size = 0

Number of elements in present.

memory_allocator::pointer data

Chunk payload.

memory_allocator::pointer extra

Optional storage area for per-heap metadata.

class chunk_stream_config

Parameters for a chunk_stream.

Public Functions

chunk_stream_config &set_items(const std::vector<item_pointer_t> &item_ids)

Specify the items whose immediate values should be passed to the place function (see chunk_place_function).

inline const std::vector<item_pointer_t> &get_items() const

Get the items set with set_items.

chunk_stream_config &set_max_chunks(std::size_t max_chunks)

Set the maximum number of chunks that can be live at the same time.

A value of 1 means that heaps must be received in order: once a chunk is started, no heaps from a previous chunk will be accepted.

Throws:

std::invalid_argument – if max_chunks is 0.

inline std::size_t get_max_chunks() const

Return the maximum number of chunks that can be live at the same time.

chunk_stream_config &set_place(chunk_place_function place)

Set the function used to determine the chunk of each heap and its placement within the chunk.

inline const chunk_place_function &get_place() const

Get the function used to determine the chunk of each heap and its placement within the chunk.

chunk_stream_config &set_allocate(chunk_allocate_function allocate)

Set the function used to allocate a chunk.

inline const chunk_allocate_function &get_allocate() const

Get the function used to allocate a chunk.

chunk_stream_config &set_ready(chunk_ready_function ready)

Set the function that is provided with completed chunks.

inline const chunk_ready_function &get_ready() const

Get the function that is provided with completed chunks.

chunk_stream_config &enable_packet_presence(std::size_t payload_size)

Enable the packet presence feature.

The payload offset of each packet is divided by payload_size and added to the heap index before indexing spead2::recv::chunk::present.

Throws:

std::invalid_argument – if payload_size is zero.

chunk_stream_config &disable_packet_presence()

Disable the packet presence feature enabled by enable_packet_presence.

inline std::size_t get_packet_presence_payload_size() const

Retrieve the payload_size if packet presence is enabled, or 0 if not.

chunk_stream_config &set_max_heap_extra(std::size_t max_heap_extra)

Set maximum amount of data a placement function may write to chunk_place_data::extra.

inline std::size_t get_max_heap_extra() const

Get maximum amount of data a placement function may write to chunk_place_data::extra.

Public Static Attributes

static constexpr std::size_t default_max_chunks = 2

Default value for set_max_chunks.

class chunk_stream : private spead2::recv::detail::chunk_stream_state<detail::chunk_manager_simple>, public spead2::recv::stream

Stream that writes incoming heaps into chunks.

Subclassed by spead2::recv::chunk_ring_stream< DataRingbuffer, FreeRingbuffer >

Public Functions

chunk_stream(io_service_ref io_service, const stream_config &config, const chunk_stream_config &chunk_config)

Constructor.

This class passes a modified config to the base class constructor. This is reflected in the return of get_config. In particular:

  • The allow unsized heaps setting is forced to false.

  • The memcpy function may be overridden, although the provided function is still used when a copy happens. Use get_heap_metadata to get a pointer to heap_metadata, from which the chunk can be retrieved.

  • The memory allocator is overridden, and the provided value is ignored.

  • Additional statistics are registered:

    • too_old_heaps: number of heaps for which the placement function returned a non-negative chunk ID that was behind the window.

    • rejected_heaps: number of heaps for which the placement function returned a negative chunk ID.

Parameters:
  • io_service – I/O service (also used by the readers).

  • config – Basic stream configuration

  • chunk_config – Configuration for chunking

Throws:

invalid_argument – if any of the function pointers in chunk_config have not been set.

Private Functions

inline const chunk_stream_config &get_chunk_config() const

Get the stream’s chunk configuration.

Private Static Functions

static const heap_metadata *get_heap_metadata(const memory_allocator::pointer &ptr)

Get the metadata associated with a heap payload pointer.

If the pointer was not allocated by a chunk stream, returns nullptr.

Ringbuffer convenience API

template<typename DataRingbuffer = ringbuffer<std::unique_ptr<chunk>>, typename FreeRingbuffer = ringbuffer<std::unique_ptr<chunk>>>
class chunk_ring_stream : public detail::chunk_ring_pair<ringbuffer<std::unique_ptr<chunk>>, ringbuffer<std::unique_ptr<chunk>>>, public spead2::recv::chunk_stream

Wrapper around chunk_stream that uses ringbuffers to manage chunks.

When a fresh chunk is needed, it is retrieved from a ringbuffer of free chunks (the “free ring”). When a chunk is flushed, it is pushed to a “data ring”. These may be shared between streams, but both will be stopped as soon as any of the streams using them are stopped. The intended use case is parallel streams that are started and stopped together.

When stop is called, any in-flight chunks (that are not in either of the ringbuffers) will be freed from the thread that called stop.

Public Functions

chunk_ring_stream(io_service_ref io_service, const stream_config &config, const chunk_stream_config &chunk_config, std::shared_ptr<DataRingbuffer> data_ring, std::shared_ptr<FreeRingbuffer> free_ring)

Constructor.

Refer to chunk_stream::chunk_stream for details.

The allocate callback is ignored and should be unset. If a ready callback is defined, it will be called before the chunk is pushed onto the ringbuffer. It must not move from the provided unique_ptr, but it can be used to perform further processing on the chunk before it is pushed.

Calling get_chunk_config will reflect the internally-defined callbacks.

template<typename T, typename DataSemaphore = semaphore, typename SpaceSemaphore = semaphore>
class ringbuffer : public spead2::ringbuffer_base<T>

Ring buffer with blocking and non-blocking push and pop.

It supports non-copyable objects using move semantics. The producer may signal that it has finished producing data by calling stop, which will gracefully shut down consumers as well as other producers. This class is fully thread-safe for multiple producers and consumers.

With multiple producers it is sometimes desirable to only stop the ringbuffer once all the producers are finished. To support this, a producer may register itself with add_producer, and indicate completion with remove_producer. If this causes the number of producers to fall to zero, the stream is stopped.

Public Functions

void try_push(T &&value)

Append an item to the queue, if there is space.

It uses move semantics, so on success, the original value is undefined.

Parameters:

value – Value to move

Throws:
template<typename ...Args>
void try_emplace(Args&&... args)

Construct a new item in the queue, if there is space.

Parameters:

args – Arguments to the constructor

Throws:
template<typename ...SemArgs>
void push(T &&value, SemArgs&&... sem_args)

Append an item to the queue, blocking if necessary.

It uses move semantics, so on success, the original value is undefined.

Parameters:
  • value – Value to move

  • sem_args – Arbitrary arguments to pass to the space semaphore

Throws:

ringbuffer_stopped – if stop is called first

template<typename ...Args>
void emplace(Args&&... args)

Construct a new item in the queue, blocking if necessary.

Parameters:

args – Arguments to the constructor

Throws:

ringbuffer_stopped – if stop is called first

T try_pop()

Retrieve an item from the queue, if there is one.

Throws:
template<typename ...SemArgs>
T pop(SemArgs&&... sem_args)

Retrieve an item from the queue, blocking until there is one or until the queue is stopped.

Parameters:

sem_args – Arbitrary arguments to pass to the data semaphore

Throws:

ringbuffer_stopped – if the queue is empty and stop was called

bool stop()

Indicate that no more items will be produced.

This does not immediately stop consumers if there are still items in the queue; instead, consumers will continue to retrieve remaining items, and will only be signalled once the queue has drained.

Returns:

whether the ringbuffer was stopped

bool remove_producer()

Indicate that a producer registered with add_producer is finished with the ringbuffer.

If this was the last producer, the ringbuffer is stopped.

Returns:

whether the ringbuffer was stopped

inline const DataSemaphore &get_data_sem() const

Get access to the data semaphore.

inline const SpaceSemaphore &get_space_sem() const

Get access to the free-space semaphore.

detail::ringbuffer_iterator<ringbuffer> begin()

Begin iteration over the items in the ringbuffer.

This does not return a full-blown iterator; it is only intended to be used for a range-based for loop. For example: for (auto &&item : ringbuffer) { ... }

detail::ringbuffer_sentinel end()

End iterator (see begin).

std::size_t capacity() const

Maximum number of items that can be held at once.

std::size_t size() const

Return the number of items currently in the ringbuffer.

This should only be used for metrics, not for control flow, as the result could be out of date by the time it is returned.

void add_producer()

Register a new producer.

Producers only need to call this if they want to call ringbuffer::remove_producer.

class ringbuffer_empty : public std::runtime_error

Thrown when attempting to do a non-blocking pop from an empty ringbuffer.

class ringbuffer_full : public std::runtime_error

Thrown when attempting to do a non-blocking push to a full ringbuffer.

class ringbuffer_stopped : public std::runtime_error

Thrown when attempting to do a pop from an empty ringbuffer that has been stopped, or a push to a ringbuffer that has been stopped.