Sending
Heaps
-
class heap
Heap that is constructed for transmission.
Subclassed by spead2::send::heap_wrapper
Public Types
Public Functions
-
explicit heap(const flavour &flavour_ = flavour())
Constructor.
- Parameters:
flavour_ – SPEAD flavour that will be used to encode the heap
-
inline const flavour &get_flavour() const
Return flavour.
-
template<typename ...Args>
inline item_handle add_item(s_item_pointer_t id, Args&&... args) Construct a new item.
- Returns:
A handle that can be passed to get_item to update the item
-
inline item &get_item(item_handle handle)
Get a reference to a previously added item.
The retrieved item reference may be modified to update the heap in place. Behaviour is undefined if handle is not a handle previously returned by add_item.
- Parameters:
handle – Item handle previously returned from add_item
-
inline const item &get_item(item_handle handle) const
Get a reference to a previously added item.
Behaviour is undefined if handle is not a handle previously returned by add_item.
- Parameters:
handle – Item handle previously returned from add_item
-
inline void add_pointer(std::unique_ptr<std::uint8_t[]> &&pointer)
Take over ownership of pointer and arrange for it to be freed when the heap is freed.
-
void add_descriptor(const descriptor &descriptor)
Encode a descriptor to an item and add it to the heap.
-
inline void add_start()
Add a start-of-stream control item.
-
inline void add_end()
Add an end-of-stream control item.
-
inline void set_repeat_pointers(bool repeat)
Enable/disable repetition of item pointers in all packets.
Usually this is not needed, but it can enable some specialised use cases where immediates can be recovered from incomplete heaps or where the receiver examines the item pointers in each packet to decide how to handle it. The packet size must be large enough to fit all the item pointers for the heap (the implementation also reserves a little space, so do not rely on a tight fit working).
The default is disabled.
-
inline bool get_repeat_pointers() const
Return the flag set by set_repeat_pointers.
-
explicit heap(const flavour &flavour_ = flavour())
-
struct heap_reference
Associate a heap with metadata needed to transmit it.
It holds a reference to the original heap.
Public Functions
-
struct item
An item to be inserted into a heap.
An item does not own its memory.
Public Functions
-
item() = default
Default constructor.
This item has undefined values and is not usable.
-
inline item(s_item_pointer_t id, const void *ptr, std::size_t length, bool allow_immediate)
Create an item referencing existing memory.
-
inline item(s_item_pointer_t id, s_item_pointer_t immediate)
Create an item with a value to be encoded as an immediate.
-
inline item(s_item_pointer_t id, const std::string &value, bool allow_immediate)
Construct an item referencing the data in a string.
-
inline item(s_item_pointer_t id, const std::vector<std::uint8_t> &value, bool allow_immediate)
Construct an item referencing the data in a vector.
Public Members
-
s_item_pointer_t id
Item ID.
-
bool is_inline
If true, the item’s value is stored in-place and must be encoded as an immediate.
Non-inline values can still be encoded as immediates if they have the right length.
-
bool allow_immediate
If true, the item’s value may be encoded as an immediate.
This must be false if the item is variable-sized, because in that case the actual size can only be determined from address differences.
If is_inline is true, then this must be true as well.
-
const std::uint8_t *ptr
Pointer to the value.
-
std::size_t length
Length of the value.
-
item() = default
Configuration
See spead2.send.StreamConfig
for an explanation of the
configuration options. In the C++ API, one must first construct a default
configuration and then use setters to set individual properties. The setters
all return the configuration itself so that one can construct a configuration
with a single expression such as
spead2::send::stream_config().set_max_packet_size(9172).set_rate(1e9)
-
class stream_config
Configuration for send streams.
Public Functions
-
stream_config &set_max_packet_size(std::size_t max_packet_size)
Set maximum packet size to use (only counts the UDP payload, not L1-4 headers).
-
inline std::size_t get_max_packet_size() const
Get maximum packet size to use.
-
stream_config &set_rate(double rate)
Set maximum transmit rate to use, in bytes per second.
-
inline double get_rate() const
Get maximum transmit rate to use, in bytes per second.
-
stream_config &set_burst_size(std::size_t burst_size)
Set maximum size of a burst, in bytes.
-
inline std::size_t get_burst_size() const
Get maximum size of a burst, in bytes.
-
stream_config &set_max_heaps(std::size_t max_heaps)
Set maximum number of in-flight heaps.
-
inline std::size_t get_max_heaps() const
Get maximum number of in-flight heaps.
-
stream_config &set_burst_rate_ratio(double burst_rate_ratio)
Set maximum increase in transmit rate for catching up.
-
inline double get_burst_rate_ratio() const
Get maximum increase in transmit rate for catching up.
-
stream_config &set_rate_method(rate_method method)
Set rate-limiting method.
-
inline rate_method get_rate_method() const
Get rate-limiting method.
-
double get_burst_rate() const
Get product of rate and burst_rate_ratio.
-
stream_config &set_max_packet_size(std::size_t max_packet_size)
Streams
All stream types are derived from spead2::send::stream
.
-
enum class spead2::send::group_mode
Determines how to order packets when using spead2::send::stream::async_send_heaps.
Values:
-
enumerator ROUND_ROBIN
Interleave the packets of the heaps.
One packet is sent from each heap in turn (skipping those that have run out of packets).
-
enumerator SERIAL
Send the heaps serially, as if they were added one at a time.
-
enumerator ROUND_ROBIN
-
typedef std::function<void(const boost::system::error_code &ec, item_pointer_t bytes_transferred)> spead2::send::stream::completion_handler
Callback type for asynchronous notification of heap completion.
-
class stream
Stream for sending heaps, potentially to multiple destinations.
Subclassed by spead2::send::inproc_stream, spead2::send::streambuf_stream, spead2::send::tcp_stream, spead2::send::udp_ibv_stream, spead2::send::udp_stream
Public Functions
-
boost::asio::io_service &get_io_service() const
Retrieve the io_service used for processing the stream.
-
void set_cnt_sequence(item_pointer_t next, item_pointer_t step)
Modify the linear sequence used to generate heap cnts.
The next heap will have cnt next, and each following cnt will be incremented by step. When using this, it is the user’s responsibility to ensure that the generated values remain unique. The initial state is next = 1, cnt = 1.
This is useful when multiple senders will send heaps to the same receiver, and need to keep their heap cnts separate.
-
bool async_send_heap(const heap &h, completion_handler handler, s_item_pointer_t cnt = -1, std::size_t substream_index = 0, double rate = -1.0)
Send h asynchronously, with handler called on completion.
The caller must ensure that h remains valid (as well as any memory it points to) until handler is called.
If this function returns
true
, then the heap has been added to the queue. The completion handlers for such heaps are guaranteed to be called in order.If this function returns
false
, the heap was rejected without being added to the queue. The handler is called as soon as possible (from a thread running the io_service). If the heap was rejected due to lack of space, the error code isboost::asio::error::would_block
.By default the heap cnt is chosen automatically (see set_cnt_sequence). An explicit value can instead be chosen by passing a non-negative value for cnt. When doing this, it is entirely the responsibility of the user to avoid collisions, both with other explicit values and with the automatic counter. This feature is useful when multiple senders contribute to a single stream and must keep their heap cnts disjoint, which the automatic assignment would not do.
The transmission rate may be overridden using the optional rate parameter. If it is negative, the stream’s rate applies, if it is zero there is no rate limiting, and if it is positive it specifies the rate in bytes per second.
Some streams may contain multiple substreams, each with a different destination. In this case, substream_index selects the substream to use.
- Return values:
false – If the heap was immediately discarded
true – If the heap was enqueued
-
template<typename CompletionToken>
inline auto async_send_heap(const heap &h, CompletionToken &&token, s_item_pointer_t cnt = -1, std::enable_if_t<!std::is_convertible_v<CompletionToken, completion_handler>, std::size_t> substream_index = 0, double rate = -1.0) Send h asynchronously, with an arbitrary completion token.
This overload is not used if the completion token is convertible to completion_handler.
Refer to the other overload for details. The boolean return of the other overload is absent. You will need to retrieve the asynchronous result and check for
boost::asio::error::would_block
to determine if the heaps were rejected due to lack of buffer space.
-
template<typename Iterator>
inline bool async_send_heaps(Iterator first, Iterator last, completion_handler handler, group_mode mode) Send a group of heaps asynchronously, with handler called on completion.
The caller must ensure that the heap objects (as well as any memory they point to) remain valid until handler is called.
If this function returns
true
, then the heaps have been added to the queue. The completion handlers for such heaps are guaranteed to be called in order. Note that there is no individual per-heap feedback; the callback is called once to give the result of the entire group.If this function returns
false
, the heaps were rejected without being added to the queue. The handler is called as soon as possible (from a thread running the io_service). If the heaps were rejected due to lack of space, the error code isboost::asio::error::would_block
. It is an error to send an empty list of heaps.Note that either all the heaps will be queued, or none will; in particular, there needs to be enough space in the queue for them all.
The heaps are specified by a range of input iterators. Typically they will be of type heap_reference, but other types can be used by overloading
get_heap
,get_heap_cnt
andget_heap_substream_index
for the value type of the iterator. Refer to async_send_heap for an explanation of the cnt and substream_index parameters.The heap_reference objects can be safely deleted once this function returns; it is sufficient for the heap objects (and the data they reference) to persist.
- Return values:
false – If the heaps were immediately discarded
true – If the heaps were enqueued
-
template<typename Iterator, typename CompletionToken>
inline auto async_send_heaps(Iterator first, Iterator last, CompletionToken &&token, std::enable_if_t<!std::is_convertible_v<CompletionToken, completion_handler>, group_mode> mode) Send a group of heaps asynchronously, with an arbitrary completion token (e.g.,
boost::asio::use_future
).This overload is not used if the completion token is convertible to completion_handler.
Refer to the other overload for details. There are a few differences:
The boolean return of the other overload is absent. You will need to retrieve the asynchronous result and check for
boost::asio::error::would_block
to determine if the heaps were rejected due to lack of buffer space.Depending on the completion token, the iterators might not be used immediately. Using
boost::asio::use_future
causes them to be used immediately, butboost::asio::deferred
orboost::asio::use_awaitable
does not (they are only used when awaiting the result). If they are not used immediately, the caller must keep them valid (as well as the data they reference) until they are used.
-
std::size_t get_num_substreams() const
Get the number of substreams in this stream.
-
void flush()
Block until all enqueued heaps have been sent.
This function is thread-safe; only the heaps that were enqueued prior to calling the function are waited for. The handlers will have been called prior to this function returning.
-
boost::asio::io_service &get_io_service() const
-
class udp_stream : public spead2::send::stream
Public Functions
-
udp_stream(io_service_ref io_service, const std::vector<boost::asio::ip::udp::endpoint> &endpoints, const stream_config &config = stream_config(), std::size_t buffer_size = default_buffer_size, const boost::asio::ip::address &interface_address = boost::asio::ip::address())
Constructor.
This constructor can handle unicast or multicast destinations, but is primarily intended for unicast as it does not provide all the options that the multicast-specific constructors do.
- Parameters:
io_service – I/O service for sending data
endpoints – Destination address and port for each substream
config – Stream configuration
buffer_size – Socket buffer size (0 for OS default)
interface_address – Source address
(see tips on Routing)
-
udp_stream(io_service_ref io_service, boost::asio::ip::udp::socket &&socket, const std::vector<boost::asio::ip::udp::endpoint> &endpoints, const stream_config &config = stream_config())
Constructor using an existing socket and an explicit io_service or thread pool.
The socket must be open but not connected, and the io_service must match the socket’s.
-
udp_stream(io_service_ref io_service, const std::vector<boost::asio::ip::udp::endpoint> &endpoints, const stream_config &config, std::size_t buffer_size, int ttl)
Constructor with multicast hop count.
- Parameters:
io_service – I/O service for sending data
endpoints – Multicast group and port for each substream
config – Stream configuration
buffer_size – Socket buffer size (0 for OS default)
ttl – Maximum number of hops
- Throws:
std::invalid_argument – if any element of endpoints is not a multicast address
std::invalid_argument – if the elements of endpoints do not all have the same protocol
std::invalid_argument – if endpoints is empty
-
udp_stream(io_service_ref io_service, const std::vector<boost::asio::ip::udp::endpoint> &endpoints, const stream_config &config, std::size_t buffer_size, int ttl, const boost::asio::ip::address &interface_address)
Constructor with multicast hop count and outgoing interface address (IPv4 only).
- Parameters:
io_service – I/O service for sending data
endpoints – Multicast group and port for each substream
config – Stream configuration
buffer_size – Socket buffer size (0 for OS default)
ttl – Maximum number of hops
interface_address – Address of the outgoing interface
- Throws:
std::invalid_argument – if any element of endpoint is not an IPv4 multicast address
std::invalid_argument – if endpoints is empty
std::invalid_argument – if interface_address is not an IPv4 address
-
udp_stream(io_service_ref io_service, const std::vector<boost::asio::ip::udp::endpoint> &endpoints, const stream_config &config, std::size_t buffer_size, int ttl, unsigned int interface_index)
Constructor with multicast hop count and outgoing interface index (IPv6 only).
See also
if_nametoindex(3)
- Parameters:
io_service – I/O service for sending data
endpoints – Multicast group and port for each substream
config – Stream configuration
buffer_size – Socket buffer size (0 for OS default)
ttl – Maximum number of hops
interface_index – Index of the outgoing interface
- Throws:
std::invalid_argument – if any element of endpoints is not an IPv6 multicast address
std::invalid_argument – if endpoints is empty
Private Functions
-
udp_stream(io_service_ref io_service, boost::asio::ip::udp::socket &&socket, const std::vector<boost::asio::ip::udp::endpoint> &endpoints, const stream_config &config, std::size_t buffer_size)
Constructor used to implement most other constructors.
-
udp_stream(io_service_ref io_service, const std::vector<boost::asio::ip::udp::endpoint> &endpoints, const stream_config &config = stream_config(), std::size_t buffer_size = default_buffer_size, const boost::asio::ip::address &interface_address = boost::asio::ip::address())
-
class tcp_stream : public spead2::send::stream
Public Functions
-
tcp_stream(io_service_ref io_service, std::function<void(const boost::system::error_code&)> &&connect_handler, const std::vector<boost::asio::ip::tcp::endpoint> &endpoints, const stream_config &config = stream_config(), std::size_t buffer_size = default_buffer_size, const boost::asio::ip::address &interface_address = boost::asio::ip::address())
Constructor.
A callback is provided to indicate when the connection is established.
Warning
The callback may be called before the constructor returns. The implementation of the callback needs to be prepared to handle this case.
- Parameters:
io_service – I/O service for sending data
connect_handler – Callback when connection is established. It is called with a
boost::system::error_code
to indicate whether connection was successful.endpoints – Destination host and port (must contain exactly one element)
config – Stream configuration
buffer_size – Socket buffer size (0 for OS default)
interface_address – Source address
(see tips on Routing)
-
tcp_stream(io_service_ref io_service, boost::asio::ip::tcp::socket &&socket, const stream_config &config = stream_config())
Constructor using an existing socket.
The socket must be connected.
-
tcp_stream(io_service_ref io_service, std::function<void(const boost::system::error_code&)> &&connect_handler, const std::vector<boost::asio::ip::tcp::endpoint> &endpoints, const stream_config &config = stream_config(), std::size_t buffer_size = default_buffer_size, const boost::asio::ip::address &interface_address = boost::asio::ip::address())
-
class streambuf_stream : public spead2::send::stream
Puts packets into a streambuf (which could come from an
ostream
).This should not be used for a blocking stream such as a wrapper around TCP, because doing so will block the asio handler thread.
Subclassed by spead2::send::stream_wrapper< streambuf_stream >
Public Functions
-
streambuf_stream(io_service_ref io_service, std::streambuf &streambuf, const stream_config &config = stream_config())
Constructor.
-
streambuf_stream(io_service_ref io_service, std::streambuf &streambuf, const stream_config &config = stream_config())