Receiving¶
Heaps¶
Unlike the Python bindings, the C++ bindings expose three heap types: live heaps
(spead2::recv::live_heap
) are used for heaps being constructed,
and may be missing data; frozen heaps (spead2::recv::heap
)
always have all their data; and
incomplete heaps (spead2::recv::incomplete_heap
) are frozen
heaps that are missing data. Frozen heaps can be move-constructed from live
heaps, which will typically be done in the callback.
-
class
live_heap
¶ A SPEAD heap that is in the process of being received.
Once it is fully received, it is converted to a heap for further processing.
Any SPEAD-64-* flavour can be used, but all packets in the heap must use the same flavour. It may be possible to relax this, but it hasn’t been examined, and may cause issues for decoding descriptors (whose format depends on the flavour).
A heap can be:
- complete: a heap length item was found in a packet, and we have received all the payload corresponding to it. No more packets are expected.
- contiguous: the payload we have received is a contiguous range from 0 up to some amount, and cover all items described in the item pointers.
- incomplete: not contiguous A complete heap is also contiguous, but not necessarily the other way around. Only contiguous heaps can be frozen to heap, and only incomplete heaps can be frozen to incomplete_heap.
Public Functions
-
bool
is_complete
() const¶ True if the heap is complete.
-
bool
is_contiguous
() const¶ True if the heap is contiguous.
-
bool
is_end_of_stream
() const¶ True if an end-of-stream heap control item was found.
-
s_item_pointer_t
get_cnt
() const¶ Retrieve the heap ID.
-
bug_compat_mask
get_bug_compat
() const¶ Get protocol bug compatibility flags.
-
class
heap
: public spead2::recv::heap_base¶ Received heap that has been finalised.
Public Functions
-
heap
(live_heap &&h)¶ Freeze a heap, which must satisfy live_heap::is_contiguous.
The original heap is destroyed.
-
descriptor
to_descriptor
() const¶ Extract descriptor fields from the heap.
Any missing fields are default-initialized. This should be used on a heap constructed from the content of a descriptor item.
The original PySPEAD package (version 0.5.2) does not follow the specification here. The macros in common_defines.h can be used to control whether to interpret the specification or be bug-compatible.
The protocol allows descriptors to use immediate-mode items, but the decoding of these into variable-length strings is undefined. This implementation will discard such descriptor fields.
-
std::vector<descriptor>
get_descriptors
() const¶ Extract and decode descriptors from this heap.
-
s_item_pointer_t
get_cnt
() const¶ Get heap ID.
-
const flavour &
get_flavour
() const¶ Get protocol flavour used.
-
const std::vector<item> &
get_items
() const¶ Get the items from the heap.
This includes descriptors, but excludes any items with ID <= 4.
-
bool
is_ctrl_item
(ctrl_mode value) const¶ Convenience function to check whether any of the items is a
STREAM_CTRL_ID
item with value value.
-
bool
is_start_of_stream
() const¶ Convenience function to check whether any of the items is a
CTRL_STREAM_START
.
-
bool
is_end_of_stream
() const¶ Convenience function to check whether any of the items is a
CTRL_STREAM_STOP
.
-
-
class
incomplete_heap
: public spead2::recv::heap_base¶ Received heap that has been finalised, but which is missing data.
The payload and any items that refer to the payload are discarded.
Public Functions
-
incomplete_heap
(live_heap &&h, bool keep_payload, bool keep_payload_ranges)¶ Freeze a heap.
The original heap is destroyed.
- Parameters
h
: The heap to freeze.keep_payload
: If true, transfer the payload memory allocation from the live heap to this object. If false, discard it.keep_payload_ranges
: If true, store information that allows get_payload_ranges to work.
-
s_item_pointer_t
get_heap_length
() const¶ Heap payload length encoded in packets (-1 for unknown)
-
s_item_pointer_t
get_received_length
() const¶ Number of bytes of payload received.
-
const memory_allocator::pointer &
get_payload
() const¶ Get the payload pointer.
This will return an empty pointer unless keep_payload was set in the constructor.
-
std::vector<std::pair<s_item_pointer_t, s_item_pointer_t>>
get_payload_ranges
() const¶ Return a list of contiguous ranges of payload that were received.
This is intended for special cases where a custom memory allocator was used to channel the payload into a caller-managed area, so that the caller knows which parts of that area have been filled in.
If keep_payload_ranges was
false
in the constructor, returns an empty list.
-
s_item_pointer_t
get_cnt
() const Get heap ID.
-
const flavour &
get_flavour
() const Get protocol flavour used.
-
const std::vector<item> &
get_items
() const Get the items from the heap.
This includes descriptors, but excludes any items with ID <= 4.
-
bool
is_ctrl_item
(ctrl_mode value) const Convenience function to check whether any of the items is a
STREAM_CTRL_ID
item with value value.
-
bool
is_start_of_stream
() const Convenience function to check whether any of the items is a
CTRL_STREAM_START
.
-
bool
is_end_of_stream
() const Convenience function to check whether any of the items is a
CTRL_STREAM_STOP
.
-
-
struct
item
¶ An item extracted from a heap.
Subclassed by spead2::recv::item_wrapper
-
struct
descriptor
¶ An unpacked descriptor.
If numpy_header is non-empty, it overrides format and shape.
Public Members
-
s_item_pointer_t
id
= 0¶ SPEAD ID.
-
std::string
name
¶ Short name.
-
std::string
description
¶ Long description.
-
std::vector<std::pair<char, s_item_pointer_t>>
format
¶ Legacy format.
Each element is a specifier character (e.g. ‘u’ for unsigned) and a bit width.
-
std::vector<s_item_pointer_t>
shape
¶ Shape.
Elements are either non-negative, or -1 is used to indicate a variable-length size. At most one dimension may be variable-length.
-
std::string
numpy_header
¶ Description in the format used in .npy files.
-
s_item_pointer_t
Streams¶
At the lowest level, heaps are given to the application via a callback to a
virtual function. While this callback is running, no new packets can be
received from the network socket, so this function needs to complete quickly
to avoid data loss when using UDP. To use this interface, subclass
spead2::recv::stream
and implement heap_ready()
and
optionally override stop_received()
.
Note that some public functions are incorrectly listed as protected below due to limitations of the documentation tools.
-
class
stream
: protected spead2::recv::stream_base¶ Stream that is fed by subclasses of reader.
The public interface to this class is thread-safe.
Subclassed by spead2::recv::ring_stream_base
Public Functions
-
template<typename
T
, typename ...Args
>
voidemplace_reader
(Args&&... args)¶ Add a new reader by passing its constructor arguments, excluding the initial stream argument.
-
void
stop
()¶ Stop the stream and block until all the readers have wound up.
After calling this there should be no more outstanding completion handlers in the thread pool.
In most cases subclasses should override stop_received rather than this function. However, if heap_ready can block indefinitely, this function should be overridden to unblock it before calling the base implementation.
Protected Functions
Set a pool to use for allocating heap memory.
Set an allocator to use for allocating heap memory.
-
void
set_memcpy
(packet_memcpy_function memcpy)¶ Set an alternative memcpy function for copying heap payload.
-
void
set_memcpy
(memcpy_function memcpy)¶ Set an alternative memcpy function for copying heap payload.
-
void
set_memcpy
(memcpy_function_id id)¶ Set builtin memcpy function to use for copying payload.
-
void
set_stop_on_stop_item
(bool stop)¶ Set whether to stop the stream when a stop item is received.
-
bool
get_stop_on_stop_item
() const¶ Get whether to stop the stream when a stop item is received.
-
void
set_allow_unsized_heaps
(bool allow)¶ Set whether to allow heaps without HEAP_LENGTH.
-
bool
get_allow_unsized_heaps
() const¶ Get whether to allow heaps without HEAP_LENGTH.
-
void
flush
()¶ Flush the collection of live heaps, passing them to heap_ready.
-
stream_stats
get_stats
() const¶ Return statistics about the stream.
See the Python documentation.
-
template<typename
-
struct
stream_stats
¶ Statistics about a stream.
Not all fields are relevant for all stream types.
Public Members
-
std::uint64_t
heaps
= 0¶ Total number of heaps passed to stream_base::heap_ready.
-
std::uint64_t
incomplete_heaps_evicted
= 0¶ Number of incomplete heaps that were evicted from the buffer to make room for new data.
-
std::uint64_t
incomplete_heaps_flushed
= 0¶ Number of incomplete heaps that were emitted by stream::flush.
These are typically heaps that were in-flight when the stream stopped.
-
std::uint64_t
packets
= 0¶ Number of packets received.
-
std::uint64_t
batches
= 0¶ Number of batches of packets.
-
std::uint64_t
worker_blocked
= 0¶ Number of times a worker thread was blocked because the ringbuffer was full.
Only applicable to ring_stream.
-
std::size_t
max_batch
= 0¶ Maximum number of packets received as a unit.
This is only applicable to readers that support fetching a batch of packets from the source.
-
std::uint64_t
single_packet_heaps
= 0¶ Number of heaps that were entirely contained in one packet.
-
std::uint64_t
search_dist
= 0¶ Total number of hash table probes.
-
std::uint64_t
A potentially more convenient interface is
spead2::recv::ring_stream<Ringbuffer>
, which places received
heaps into a fixed-size thread-safe ring buffer. Another thread can then pull
from this ring buffer in a loop. The template parameter selects the ringbuffer
implementation. The default is a good light-weight choice, but if you need to
use select()
-like functions to wait for data, you can use
spead2::ringbuffer<spead2::recv::live_heap, spead2::semaphore_fd, spead2::semaphore>
.
-
template<typename
Ringbuffer
= ringbuffer<live_heap>>
classring_stream
: public spead2::recv::ring_stream_base¶ Specialisation of stream that pushes its results into a ringbuffer.
The ringbuffer class may be replaced, but must provide the same interface as ringbuffer. If the ring buffer fills up, add_packet will block the reader.
On the consumer side, heaps are automatically frozen as they are extracted.
This class is thread-safe.
Public Functions
-
ring_stream
(io_service_ref io_service, bug_compat_mask bug_compat = 0, std::size_t max_heaps = default_max_heaps, std::size_t ring_heaps = default_ring_heaps, bool contiguous_only = true)¶ Constructor.
- Parameters
io_service
: I/O service (also used by the readers).bug_compat
: Bug compatibility flags for interpreting heapsmax_heaps
: Number of partial heaps to keep aroundring_heaps
: Capacity of the ringbuffercontiguous_only
: If true, only contiguous heaps are pushed to the ring buffer
-
heap
pop
()¶ Wait until a contiguous heap is available, freeze it, and return it; or until the stream is stopped.
- Exceptions
ringbuffer_stopped
: if stop has been called and there are no more contiguous heaps.
-
live_heap
pop_live
()¶ Wait until a heap is available and return it; or until the stream is stopped.
- Exceptions
ringbuffer_stopped
: if stop has been called and there are no more heaps.
-
Readers¶
Reader classes are constructed inside a stream by calling
spead2::recv::stream::emplace_reader()
.
-
class
udp_reader
: public spead2::recv::udp_reader_base¶ Asynchronous stream reader that receives packets over UDP.
Public Functions
-
udp_reader
(stream &owner, const boost::asio::ip::udp::endpoint &endpoint, std::size_t max_size = default_max_size, std::size_t buffer_size = default_buffer_size)¶ Constructor.
If endpoint is a multicast address, then this constructor will subscribe to the multicast group, and also set
SO_REUSEADDR
so that multiple sockets can be subscribed to the multicast group.- Parameters
owner
: Owning streamendpoint
: Address on which to listenmax_size
: Maximum packet size that will be accepted.buffer_size
: Requested socket buffer size. Note that the operating system might not allow a buffer size as big as the default.
-
udp_reader
(stream &owner, const boost::asio::ip::udp::endpoint &endpoint, std::size_t max_size, std::size_t buffer_size, const boost::asio::ip::address &interface_address)¶ Constructor with explicit multicast interface address (IPv4 only).
The socket will have
SO_REUSEADDR
set, so that multiple sockets can all listen to the same multicast stream. If you want to let the system pick the interface for the multicast subscription, useboost::asio::ip::address_v4::any()
, or use the default constructor.- Parameters
owner
: Owning streamendpoint
: Multicast group and portmax_size
: Maximum packet size that will be accepted.buffer_size
: Requested socket buffer size.interface_address
: Address of the interface which should join the group
- Exceptions
std::invalid_argument
: If endpoint is not an IPv4 multicast addressstd::invalid_argument
: If interface_address is not an IPv4 address
-
udp_reader
(stream &owner, const boost::asio::ip::udp::endpoint &endpoint, std::size_t max_size, std::size_t buffer_size, unsigned int interface_index)¶ Constructor with explicit multicast interface index (IPv6 only).
The socket will have
SO_REUSEADDR
set, so that multiple sockets can all listen to the same multicast stream. If you want to let the system pick the interface for the multicast subscription, set interface_index to 0, or use the standard constructor.- See
- if_nametoindex(3)
- Parameters
owner
: Owning streamendpoint
: Multicast group and portmax_size
: Maximum packet size that will be accepted.buffer_size
: Requested socket buffer size.interface_index
: Address of the interface which should join the group
-
udp_reader
(stream &owner, boost::asio::ip::udp::socket &&socket, const boost::asio::ip::udp::endpoint &endpoint, std::size_t max_size = default_max_size, std::size_t buffer_size = default_buffer_size)¶ Constructor using an existing socket.
This allows socket options (e.g., multicast subscriptions) to be fine-tuned by the caller. The socket should not be bound. Note that there is no special handling for multicast addresses here.
- Parameters
owner
: Owning streamsocket
: Existing socket which will be taken over. It must use the same I/O service as owner.endpoint
: Address on which to listenmax_size
: Maximum packet size that will be accepted.buffer_size
: Requested socket buffer size. Note that the operating system might not allow a buffer size as big as the default.
-
udp_reader
(stream &owner, boost::asio::ip::udp::socket &&socket, std::size_t max_size = default_max_size)¶ Constructor using an existing socket.
This allows socket options (e.g., multicast subscriptions) to be fine-tuned by the caller. The socket must already be bound to the desired endpoint. There is no special handling of multicast subscriptions or socket buffer sizes here.
- Parameters
owner
: Owning streamsocket
: Existing socket which will be taken over. It must use the same I/O service as owner.max_size
: Maximum packet size that will be accepted.
-
-
class
tcp_reader
: public spead2::recv::reader¶ Asynchronous stream reader that receives packets over TCP.
Public Functions
-
tcp_reader
(stream &owner, const boost::asio::ip::tcp::endpoint &endpoint, std::size_t max_size = default_max_size, std::size_t buffer_size = default_buffer_size)¶ Constructor.
- Parameters
owner
: Owning streamendpoint
: Address on which to listenmax_size
: Maximum packet size that will be accepted.buffer_size
: Requested socket buffer size. Note that the operating system might not allow a buffer size as big as the default.
-
tcp_reader
(stream &owner, boost::asio::ip::tcp::acceptor &&acceptor, std::size_t max_size = default_max_size)¶ Constructor using an existing acceptor object.
This allows acceptor objects to be created and fine-tuned by users before handing them over. The acceptor object must be already bound.
- Parameters
owner
: Owning streamacceptor
: Acceptor object, must be boundmax_size
: Maximum packet size that will be accepted.
Private Functions
-
tcp_reader
(stream &owner, boost::asio::ip::tcp::acceptor &&acceptor, std::size_t max_size, std::size_t buffer_size)¶ Base constructor, used by the other constructors.
- Parameters
owner
: Owning streamacceptor
: Acceptor object, must be boundmax_size
: Maximum packet size that will be accepted.buffer_size
: Requested socket buffer size. Note that the operating system might not allow a buffer size as big as the default.
-
-
class
inproc_reader
: public spead2::recv::reader Stream reader that receives packets from an inproc_queue.
Public Functions
Constructor.
-
class
mem_reader
: public spead2::recv::reader¶ Reader class that feeds data from a memory buffer to a stream.
The caller must ensure that the underlying memory buffer is not destroyed before this class.
- Note
- For simple cases, use mem_to_stream instead. This class is only necessary if one wants to plug in to a stream.
Subclassed by spead2::recv::buffer_reader
-
class
udp_pcap_file_reader
: public spead2::recv::udp_reader_base¶ Reader class that feeds data from a pcap file to a stream.
Memory allocators¶
In addition to the memory allocators described in Memory allocators,
new allocators can be created by subclassing spead2::memory_allocator
.
For an allocator set on a stream, a pointer to a
spead2::recv::packet_header
is passed as a hint to the allocator,
allowing memory to be placed according to information in the packet. Note that
for unreliable transport this could be any packet from the heap, and you should
not rely on it being the initial packet.
-
class
memory_allocator
: public std::enable_shared_from_this<memory_allocator>¶ Polymorphic class for managing memory allocations in a memory pool.
This can be overloaded to provide custom memory allocations.
Subclassed by spead2::memory_pool, spead2::mmap_allocator, spead2::unittest::mock_allocator
Public Functions
-
memory_allocator::pointer
allocate
(std::size_t size, void *hint)¶ Allocate size bytes of memory.
The default implementation uses
new
and pre-faults the memory.The pointer type includes a custom deleter that takes a
void *
argument, which will be passed to free. This can be used to pass extra information that is needed to free the memory. It is guaranteed that the base class will set this tonullptr
.- Return
- Pointer to newly allocated memory
- Parameters
size
: Number of bytes to allocatehint
: Usage-dependent extra information
- Exceptions
std::bad_alloc
: if allocation failed
-
memory_allocator::pointer
Custom memory scatter¶
In specialised high-bandwidth cases, the overhead of assembling heaps in temporary storage before scattering the data into other arrangements can be very high. It is possible (since 1.11) to take complete control over the transfer of the payload of the SPEAD packets. Before embarking on such an approach, be sure you have a good understanding of the SPEAD protocol, particularly packets, heaps, item pointers and payload.
In the simplest case, each heap needs to be written to some special or pre-allocated storage, but in a contiguous fashion. In this case it is sufficient to provide a custom allocator (see above), which will return a pointer to the target storage.
In more complex cases, the contents of each heap, or even each packet, needs
to be scattered to discontiguous storage areas. In this case, one can
additionally override the memory copy function with
set_memcpy()
and providing a
packet_memcpy_function
.
-
typedef std::function<void(const spead2::memory_allocator::pointer &allocation, const packet_header &packet)>
spead2::recv
::
packet_memcpy_function
¶
It takes a pointer to the start of the heap’s allocation (as returned by the allocator) and the packet metadata. The default implementation is equivalent to the following:
void copy(const spead2::memory_allocator::pointer &allocation, const packet_header &packet)
{
memcpy(allocation.get() + packet.payload_offset, packet.payload, packet.payload_length);
}
Note that when providing your own memory copy and allocator, you don’t necessarily need the allocator to actually return a pointer to payload memory. It could, for example, populate a structure that guides the copy, and return a pointer to that; or it could return a null pointer. There are some caveats though:
- If the sender doesn’t provide the heap length item, then spead2 may need to
make multiple allocations of increasing size as the heap grows, and each
time it will copy (with standard memcpy, rather than your custom one) the
old content to the new. Assuming you aren’t expecting such packets, you can
reject them using
set_allow_unsized_heaps()
. spead2::recv::heap_base::get_items()
constructs pointers to the items on the assumption of the default memcpy function, so if your replacement doesn’t copy things to the same place, you obviously won’t be able to use those pointers. Note thatget_descriptors()
will also not be usable.