Sending

Unlike for receiving, each stream object can only use a single transport. There is currently no support for collective operations where multiple producers cooperate to construct a heap between them. It is still possible to do multi-producer, single-consumer operation if the heap IDs are kept separate.

Because each stream has only one transport, there is a separate class for each, rather than a generic Stream class. Because there is common configuration between the stream classes, configuration is encapsulated in a spead2.send.StreamConfig.

class spead2.send.StreamConfig(*, max_packet_size=1472, rate=0.0, burst_size=65536, max_heaps=4, burst_rate_ratio=1.05)
Parameters:
  • max_packet_size (int) – Heaps will be split into packets of at most this size.

  • rate (double) – Target transmission rate, in bytes per second, or 0 to send as fast as possible.

  • burst_size (int) – Bursts of up to this size will be sent as fast as possible. Setting this too large (larger than available buffer sizes) risks losing packets, while setting it too small may reduce throughput by causing more sleeps than necessary.

  • max_heaps (int) – For asynchronous transmits, the maximum number of heaps that can be in-flight.

  • burst_rate_ratio (float) – If packet sending falls below the target transmission rate, the rate will be increased until the average rate has caught up. This value specifies the “catch-up” rate, as a ratio to the target rate.

  • rate_method (RateMethod) – Select method for applying the rate limit. If true, then hardware-based rate limiting may be used if available. In this case it is implementation-defined whether burst_rate_ratio and burst_size have any effect. This will often produce results that are at least as good as the software limiter, but in some cases (particularly higher data rates) the overall rate becomes less accurate and so it is disabled by default.

The constructor arguments are also instance attributes.

class spead2.send.RateMethod

An enumeration to select a method for rate limiting.

SW

Use a generic software rate limiter.

HW

Use a hardware rate limiter, if available. This is currently only supported when using ibverbs, and only if the hardware supports it. If hardware rate limiting is not available, falls back to software.

AUTO

This is the default, and lets the implementation decide whether to use hardware rate limiting. At present it will never use the hardware rate limiter (because the available hardware limiters don’t do a good job under all conditions), but in future it is likely to use the hardware limiter more often in circumstances where it has been tested to perform well.

Streams send pre-baked heaps, which can be constructed by hand, but are more normally created from an ItemGroup by a spead2.send.HeapGenerator. To simplify cases where one item group is paired with one heap generator, a convenience class spead2.send.ItemGroup is provided that inherits from both.

class spead2.send.HeapGenerator(item_group, descriptor_frequency=None, flavour=<spead2._spead2.Flavour object>)

Tracks which items and item values have previously been sent and generates delta heaps.

Parameters:
  • item_group (spead2.ItemGroup) – Item group to monitor.

  • descriptor_frequency (int, optional) – If specified, descriptors will be re-sent once every descriptor_frequency heaps generated by this method.

  • flavour (spead2.Flavour) – The SPEAD protocol flavour used for heaps generated by get_heap() and get_end().

add_to_heap(heap, descriptors='stale', data='stale')

Update a heap to contains all the new items and item descriptors since the last call.

Parameters:
  • heap (Heap) – The heap to update.

  • descriptors ({'stale', 'all', 'none'}) – Which descriptors to send. The default (‘stale’) sends only descriptors that have not been sent, or have not been sent recently enough according to the descriptor_frequency passed to the constructor. The other options are to send all the descriptors or none of them. Sending all descriptors is useful if a new receiver is added which will be out of date.

  • data ({'stale', 'all', 'none'}) – Which data items to send.

  • item_group (ItemGroup, optional) – If specified, uses the items from this item group instead of the one passed to the constructor (which could be None).

Raises:

ValueError – if descriptors or data is not one of the legal values

get_heap(*args, **kwargs)

Return a new heap which contains all the new items and item descriptors since the last call. This is a convenience wrapper around add_to_heap().

get_start()

Return a heap that contains only a start-of-stream marker.

get_end()

Return a heap that contains only an end-of-stream marker.

class spead2.send.Heap(flavour=spead2.Flavour())
repeat_pointers

Enable/disable repetition of item pointers in all packets.

Usually this is not needed, but it can enable some specialised use cases where immediates can be recovered from incomplete heaps or where the receiver examines the item pointers in each packet to decide how to handle it. The packet size must be large enough to fit all the item pointers for the heap (the implementation also reserves a little space, so do not rely on a tight fit working).

The default is disabled.

add_item(item)

Add an Item to the heap. This references the memory in the item rather than copying it. It does not cause a descriptor to be sent; use add_descriptor() for that.

add_descriptor(descriptor)

Add a Descriptor to the heap.

add_start()

Convenience method to add a start-of-stream item.

add_end()

Convenience method to add an end-of-stream item.

Substreams

For some transport types it is possible to create a stream with multiple “substreams”. Each substream typically has a separate destination address, but all the heaps within the stream are sent in order, and the stream configuration (including the rate limits) applies to the stream as a whole. Using substreams rather than independent streams gives better control over the overall transmission rate, and uses fewer system resources.

When sending a heap, an optional parameter called substream_index selects the substream that will be used.

Blocking send

There are multiple stream classes, corresponding to different transports, and some of the classes have several variants of the constructor. They all implement the following interface (the class exists as a type annotation, but does not currently exist at runtime).

class spead2.send.SyncStream
send_heap(heap, cnt=-1, substream_index=0, rate=-1.0)

Send a spead2.send.Heap to the peer, and wait for completion. There is currently no indication of whether it successfully arrived, but IOError is raised if it could not be sent.

If not specified, a heap cnt is chosen automatically (the choice can be modified by calling set_cnt_sequence()). If a non-negative value is specified for cnt, it is used instead. It is the user’s responsibility to avoid collisions.

See Substreams for a description of substream_index.

Normally a rate is set for the whole stream, but it can be overridden here by providing a non-negative value. See spead2.send.StreamConfig for the interpretation.

send_heaps(heaps, mode)

Send a group of heaps. See Batching for more information.

This function will either enqueue all of the heaps, or none of them. In particular, there must be space in the queue for all of them.

It is an error for heaps to be empty. Currently this raises OSError, but it may be replaced by ValueError in future.

Parameters:
set_cnt_sequence(next, step)

Modify the linear sequence used to generate heap cnts. The next heap will have cnt next, and each following cnt will be incremented by step. When using this, it is the user’s responsibility to ensure that the generated values remain unique. The initial state is next = 1, step = 1.

This is useful when multiple senders will send heaps to the same receiver, and need to keep their heap cnts separate.

If the computed cnt overflows the number of bits available, the bottom-most bits are taken.

num_substreams

Number of substreams in this stream (read-only).

UDP

Note that since UDP is an unreliable protocol, there is no guarantee that packets arrive.

For each constructor overload, the endpoints parameter can also be replaced by two parameters that contain the hostname/IP address and port for a single substream (for backwards compatibility).

class spead2.send.UdpStream(thread_pool, endpoints, config=spead2.send.StreamConfig(), buffer_size=DEFAULT_BUFFER_SIZE, interface_address='')
Parameters:
  • thread_pool (spead2.ThreadPool) – Thread pool handling the I/O

  • endpoints (List[Tuple[str, int]]) – Peer endpoints (one per substream)

  • config (spead2.send.StreamConfig) – Stream configuration

  • buffer_size (int) – Socket buffer size. A warning is logged if this size cannot be set due to OS limits.

  • interface_address (str) – Source hostname/IP address (see tips about Routing).

class spead2.send.UdpStream(thread_pool, endpoints, config=spead2.send.StreamConfig(), buffer_size=DEFAULT_BUFFER_SIZE, ttl)

Stream using UDP, with multicast TTL. Note that the regular constructor will also work with multicast, but does not give any control over the TTL.

Parameters:
  • thread_pool (spead2.ThreadPool) – Thread pool handling the I/O

  • endpoints (List[Tuple[str, int]]) – Peer endpoints (one per substream)

  • config (spead2.send.StreamConfig) – Stream configuration

  • buffer_size (int) – Socket buffer size. A warning is logged if this size cannot be set due to OS limits.

  • ttl (int) – Multicast TTL

class spead2.send.UdpStream(thread_pool, endpoints, config=spead2.send.StreamConfig(), buffer_size=524288, ttl, interface_address)

Stream using UDP, with multicast TTL and interface address (IPv4 only).

Parameters:
  • thread_pool (spead2.ThreadPool) – Thread pool handling the I/O

  • endpoints (List[Tuple[str, int]]) – Peer endpoints (one per substream)

  • config (spead2.send.StreamConfig) – Stream configuration

  • buffer_size (int) – Socket buffer size. A warning is logged if this size cannot be set due to OS limits.

  • ttl (int) – Multicast TTL

  • interface_address (str) – Hostname/IP address of the interface on which to send the data

class spead2.send.UdpStream(thread_pool, endpoints, config=spead2.send.StreamConfig(), buffer_size=DEFAULT_BUFFER_SIZE, ttl, interface_index)

Stream using UDP, with multicast TTL and interface index (IPv6 only).

Parameters:
  • thread_pool (spead2.ThreadPool) – Thread pool handling the I/O

  • endpoints (List[Tuple[str, int]]) – Peer endpoints (one per substream)

  • config (spead2.send.StreamConfig) – Stream configuration

  • buffer_size (int) – Socket buffer size. A warning is logged if this size cannot be set due to OS limits.

  • ttl (int) – Multicast TTL

  • interface_index (str) – Index of the interface on which to send the data

class spead2.send.UdpStream(thread_pool, socket, endpoints, config=spead2.send.StreamConfig())

Stream using UDP, with a pre-existing socket. The socket is duplicated by the stream, so the original can be closed immediately to free up a file descriptor. The caller is responsible for setting any socket options. The socket must not be connected.

Parameters:
class spead2.send.UdpStream(thread_pool, endpoints, config=spead2.send.StreamConfig(), buffer_size=DEFAULT_BUFFER_SIZE, socket)
Parameters:
  • thread_pool (spead2.ThreadPool) – Thread pool handling the I/O

  • endpoints (List[Tuple[str, int]]) – Peer endpoints (one per substream)

  • config (spead2.send.StreamConfig) – Stream configuration

  • buffer_size (int) – Socket buffer size. A warning is logged if this size cannot be set due to OS limits.

TCP

TCP/IP is a reliable protocol, so heap delivery is guaranteed. However, if multiple threads all call send_heap() at the same time, they can exceed the configured max_heaps and heaps will be dropped.

Because spead2 was originally designed for UDP, the default packet size in StreamConfig is quite small. Performance can be improved by increasing it (but be sure the receiver is configured to handle larger packets).

TCP/IP is also a connection-oriented protocol, and does not support substreams. The endpoints must therefore contain exactly one endpoint (it takes a list for consistency with UdpStream).

class spead2.send.TcpStream(thread_pool, endpoints, config=spead2.send.StreamConfig(), buffer_size=DEFAULT_BUFFER_SIZE, interface_address='')
Parameters:
  • thread_pool (spead2.ThreadPool) – Thread pool handling the I/O

  • endpoints (List[Tuple[str, int]]) – Peer endpoint (must contain exactly one element).

  • config (spead2.send.StreamConfig) – Stream configuration

  • buffer_size (int) – Socket buffer size. A warning is logged if this size cannot be set due to OS limits.

  • interface_address (str) – Source hostname/IP address (see tips about Routing).

class spead2.send.TcpStream(thread_pool, socket, config=spead2.send.StreamConfig())

Stream using an existing socket. The socket must already be connected to the peer, and the user is responsible for setting any desired socket options. The socket is duplicated, so it can be closed to save resources.

Parameters:

Raw bytes

class spead2.send.BytesStream(thread_pool, config=spead2.send.StreamConfig())

Stream that collects packets in memory and makes the concatenated stream available.

Parameters:
getvalue()

Return a copy of the memory buffer.

Return type:

bytes

In-process transport

Refer to the separate documentation.

Asynchronous send

As for asynchronous receives, asynchronous sends are managed by asyncio. A stream can buffer up multiple heaps for asynchronous send, up to the limit specified by max_heaps in the StreamConfig. If this limit is exceeded, heaps will be dropped, and the returned future has an IOError exception set. An IOError could also indicate a low-level error in sending the heap (for example, if the packet size exceeds the MTU).

The classes exist in the spead2.send.asyncio modules, and mostly implement the same constructors as the synchronous classes. They implement the following interface (the class exists as a type annotation, but does not currently exist at runtime):

class spead2.send.asyncio.AsyncStream
async_send_heap(heap, cnt=-1, substream_index=0, rate=-1.0)

Send a heap asynchronously. Note that this is not a coroutine: it returns a future. Adding the heap to the queue is done synchronously, to ensure proper ordering.

Parameters:
  • heap (spead2.send.Heap) – Heap to send

  • cnt (int) – Heap cnt to send (defaults to auto-incrementing)

  • rate (float) – Rate at which to transmit (defaults to the stream’s rate)

async_send_heaps(heaps, mode)

Send a group of heaps asynchronously. See Batching for more information. Note that this is not a coroutine: it returns a future. Adding the heaps to the queue is done synchronously, to ensure proper ordering.

The parameters have the same meaning as for send_heaps().

Parameters:
flush()

Block until all enqueued heaps have been sent (or dropped).

async_flush()

Asynchronously wait for all enqueued heaps to be sent. Note that this only waits for heaps passed to async_send_heap() prior to this call, not ones added while waiting.

TCP

For TCP, construction is slightly different: except when providing a custom socket, one uses a coroutine to connect:

async classmethod TcpStream.connect(*args, **kwargs)

Open a connection.

The arguments are the same as for the constructor of spead2.send.TcpStream.

Batching

Instead of sending one heap at a time, it is possible to pass a whole list of heaps to be sent at once. There are a few reasons one might want to do this:

  1. It is generally more efficient, particularly if the heaps are small.

  2. The packets of the heaps can be sent in an interleaved order. This is useful when combined with Substreams, as each substream can have a steady flow of packets rather than sending a full heap to one substream, then a full heap to the next etc.

class spead2.send.HeapReference(heap, *, cnt=-1, substream_index=0, rate=-1.0)

A thin wrapper around a Heap, heap cnt and substream index, for passing to send_heaps(). The parameters have the same meaning as the corresponding arguments to send_heap().

class spead2.send.GroupMode

Enumeration selecting the packet ordering for a group of heaps sent with send_heaps().

ROUND_ROBIN

Interleave the packets of the heaps. One packet is sent from each heap in turn (skipping those that have run out of packets).

SERIAL

Send the heaps one after another.

Passing a large list has some overhead as the list has to be converted from Python to C++. If exactly the same list will be used multiple times, this cost can be amortised by converting the list to a HeapReferenceList up front and then using repeatedly.

class spead2.send.HeapReferenceList(heaps)

An opaque copy of a list of HeapReference. It can be passed to send_heaps() in place of a list. It can also be indexed with a slice to create a new HeapReferenceList with a subset of the original heaps.

Parameters:

heaps (List[spead2.send.HeapReference]) – The heap references to store