Sending¶
Unlike for receiving, each stream object can only use a single transport. There is currently no support for collective operations where multiple producers cooperate to construct a heap between them. It is still possible to do multi-producer, single-consumer operation if the heap IDs are kept separate.
Because each stream has only one transport, there is a separate class for
each, rather than a generic Stream class. Because there is common
configuration between the stream classes, configuration is encapsulated in a
spead2.send.StreamConfig
.
-
class
spead2.send.
StreamConfig
(max_packet_size=1472, rate=0.0, burst_size=65536, max_heaps=4, burst_rate_ratio=1.05)¶ - Parameters
max_packet_size (int) – Heaps will be split into packets of at most this size.
rate (double) – Target transmission rate, in bytes per second, or 0 to send as fast as possible.
burst_size (int) – Bursts of up to this size will be sent as fast as possible. Setting this too large (larger than available buffer sizes) risks losing packets, while setting it too small may reduce throughput by causing more sleeps than necessary.
max_heaps (int) – For asynchronous transmits, the maximum number of heaps that can be in-flight.
burst_rate_ratio (float) – If packet sending falls below the target transmission rate, the rate will be increased until the average rate has caught up. This value specifies the “catch-up” rate, as a ratio to the target rate.
The constructor arguments are also instance attributes.
Streams send pre-baked heaps, which can be constructed by hand, but are more
normally created from an ItemGroup
by a
spead2.send.HeapGenerator
. To simplify cases where one item group
is paired with one heap generator, a convenience class
spead2.send.ItemGroup
is provided that inherits from both.
-
class
spead2.send.
HeapGenerator
(item_group, descriptor_frequency=None, flavour=<Mock name='mock.Flavour()' id='140476097547624'>)¶ Tracks which items and item values have previously been sent and generates delta heaps.
- Parameters
item_group (
spead2.ItemGroup
) – Item group to monitor.descriptor_frequency (int, optional) – If specified, descriptors will be re-sent once every descriptor_frequency heaps generated by this method.
flavour (
spead2.Flavour
) – The SPEAD protocol flavour used for heaps generated byget_heap()
andget_end()
.
-
add_to_heap
(heap, descriptors='stale', data='stale')¶ Update a heap to contains all the new items and item descriptors since the last call.
- Parameters
heap (
Heap
) – The heap to update.descriptors ({'stale', 'all', 'none'}) – Which descriptors to send. The default (‘stale’) sends only descriptors that have not been sent, or have not been sent recently enough according to the descriptor_frequency passed to the constructor. The other options are to send all the descriptors or none of them. Sending all descriptors is useful if a new receiver is added which will be out of date.
data ({'stale', 'all', 'none'}) – Which data items to send.
item_group (
ItemGroup
, optional) – If specified, uses the items from this item group instead of the one passed to the constructor (which could be None).
- Raises
ValueError – if descriptors or data is not one of the legal values
-
get_heap
(*args, **kwargs)¶ Return a new heap which contains all the new items and item descriptors since the last call. This is a convenience wrapper around
add_to_heap()
.
-
get_start
()¶ Return a heap that contains only a start-of-stream marker.
-
get_end
()¶ Return a heap that contains only an end-of-stream marker.
-
class
spead2.send.
Heap
(flavour=spead2.Flavour())¶ -
repeat_pointers
¶ Enable/disable repetition of item pointers in all packets.
Usually this is not needed, but it can enable some specialised use cases where immediates can be recovered from incomplete heaps or where the receiver examines the item pointers in each packet to decide how to handle it. The packet size must be large enough to fit all the item pointers for the heap (the implementation also reserves a little space, so do not rely on a tight fit working).
The default is disabled.
-
add_item
(item)¶ Add an
Item
to the heap. This references the memory in the item rather than copying it. It does not cause a descriptor to be sent; useadd_descriptor()
for that.
-
add_descriptor
(descriptor)¶ Add a
Descriptor
to the heap.
-
add_start
()¶ Convenience method to add a start-of-stream item.
-
add_end
()¶ Convenience method to add an end-of-stream item.
-
Blocking send¶
There are multiple stream classes, corresponding to different transports, and some of the classes have several variants of the constructor. They all implement the following interface, although this base class does not actually exist:
-
class
spead2.send.
AbstractStream
¶ -
send_heap
(heap, cnt=- 1)¶ Sends a
spead2.send.Heap
to the peer, and wait for completion. There is currently no indication of whether it successfully arrived, butIOError
is raised if it could not be sent.If not specified, a heap cnt is chosen automatically (the choice can be modified by calling
set_cnt_sequence()
). If a non-negative value is specified for cnt, it is used instead. It is the user’s responsibility to avoid collisions.
-
set_cnt_sequence
(next, step)¶ Modify the linear sequence used to generate heap cnts. The next heap will have cnt next, and each following cnt will be incremented by step. When using this, it is the user’s responsibility to ensure that the generated values remain unique. The initial state is next = 1, step = 1.
This is useful when multiple senders will send heaps to the same receiver, and need to keep their heap cnts separate.
If the computed cnt overflows the number of bits available, the bottom-most bits are taken.
-
UDP¶
Note that since UDP is an unreliable protocol, there is no guarantee that packets arrive.
-
class
spead2.send.
UdpStream
(thread_pool, hostname, port, config=spead2.send.StreamConfig(), buffer_size=DEFAULT_BUFFER_SIZE, interface_address='')¶ - Parameters
thread_pool (
spead2.ThreadPool
) – Thread pool handling the I/Ohostname (str) – Peer hostname
port (int) – Peer port
config (
spead2.send.StreamConfig
) – Stream configurationbuffer_size (int) – Socket buffer size. A warning is logged if this size cannot be set due to OS limits.
interface_address (str) – Source hostname/IP address (see tips about Routing).
-
class
spead2.send.
UdpStream
(thread_pool, multicast_group, port, config=spead2.send.StreamConfig(), buffer_size=DEFAULT_BUFFER_SIZE, ttl)¶ Stream using UDP, with multicast TTL. Note that the regular constructor will also work with multicast, but does not give any control over the TTL.
- Parameters
thread_pool (
spead2.ThreadPool
) – Thread pool handling the I/Omulticast_group (str) – Multicast group hostname/IP address
port (int) – Destination port
config (
spead2.send.StreamConfig
) – Stream configurationbuffer_size (int) – Socket buffer size. A warning is logged if this size cannot be set due to OS limits.
ttl (int) – Multicast TTL
-
class
spead2.send.
UdpStream
(thread_pool, multicast_group, port, config=spead2.send.StreamConfig(), buffer_size=524288, ttl, interface_address)¶ Stream using UDP, with multicast TTL and interface address (IPv4 only).
- Parameters
thread_pool (
spead2.ThreadPool
) – Thread pool handling the I/Omulticast_group (str) – Multicast group hostname/IP address
port (int) – Destination port
config (
spead2.send.StreamConfig
) – Stream configurationbuffer_size (int) – Socket buffer size. A warning is logged if this size cannot be set due to OS limits.
ttl (int) – Multicast TTL
interface_address (str) – Hostname/IP address of the interface on which to send the data
-
class
spead2.send.
UdpStream
(thread_pool, multicast_group, port, config=spead2.send.StreamConfig(), buffer_size=DEFAULT_BUFFER_SIZE, ttl, interface_index)¶ Stream using UDP, with multicast TTL and interface index (IPv6 only).
- Parameters
thread_pool (
spead2.ThreadPool
) – Thread pool handling the I/Omulticast_group (str) – Multicast group hostname/IP address
port (int) – Destination port
config (
spead2.send.StreamConfig
) – Stream configurationbuffer_size (int) – Socket buffer size. A warning is logged if this size cannot be set due to OS limits.
ttl (int) – Multicast TTL
interface_index (str) – Index of the interface on which to send the data
-
class
spead2.send.
UdpStream
(thread_pool, socket, hostname, port, config=spead2.send.StreamConfig())¶ Stream using UDP, with a pre-existing socket. The socket is duplicated by the stream, so the original can be closed immediately to free up a file descriptor. The caller is responsible for setting any socket options. The socket must not be connected.
- Parameters
thread_pool (
spead2.ThreadPool
) – Thread pool handling the I/Osocket (socket.socket) – UDP socket
hostname (str) – Peer hostname
port (int) – Peer port
config (
spead2.send.StreamConfig
) – Stream configuration
-
class
spead2.send.
UdpStream
(thread_pool, hostname, port, config=spead2.send.StreamConfig(), buffer_size=DEFAULT_BUFFER_SIZE, socket)¶ - Parameters
thread_pool (
spead2.ThreadPool
) – Thread pool handling the I/Ohostname (str) – Peer hostname
port (int) – Peer port
config (
spead2.send.StreamConfig
) – Stream configurationbuffer_size (int) – Socket buffer size. A warning is logged if this size cannot be set due to OS limits.
socket (socket.socket) – This socket is used rather than a new one. The socket must not be connected. The caller must not use this socket any further, although it is not necessary to keep it alive. This is mainly useful for fine-tuning socket options.
Deprecated since version 1.9: Use the overload that does not take buffer_size.
TCP¶
TCP/IP is a reliable protocol, so heap delivery is guaranteed. However, if
multiple threads all call send_heap()
at
the same time, they can exceed the configured max_heaps and heaps will be dropped.
Because spead2 was originally designed for UDP, the default packet size in
StreamConfig
is quite small. Performance can be improved by
increasing it (but be sure the receiver is configured to handle larger packets).
-
class
spead2.send.
TcpStream
(thread_pool, hostname, port, config=spead2.send.StreamConfig(), buffer_size=DEFAULT_BUFFER_SIZE, interface_address='')¶ - Parameters
thread_pool (
spead2.ThreadPool
) – Thread pool handling the I/Ohostname (str) – Peer hostname
port (int) – Peer port
config (
spead2.send.StreamConfig
) – Stream configurationbuffer_size (int) – Socket buffer size. A warning is logged if this size cannot be set due to OS limits.
interface_address (str) – Source hostname/IP address (see tips about Routing).
-
class
spead2.send.
TcpStream
(thread_pool, socket, config=spead2.send.StreamConfig())¶ Stream using an existing socket. The socket must already be connected to the peer, and the user is responsible for setting any desired socket options. The socket is duplicated, so it can be closed to save resources.
- Parameters
thread_pool (
spead2.ThreadPool
) – Thread pool handling the I/Osocket (socket.socket) – TCP socket
config (
spead2.send.StreamConfig
) – Stream configuration
Raw bytes¶
-
class
spead2.send.
BytesStream
(thread_pool, config=spead2.send.StreamConfig())¶ Stream that collects packets in memory and makes the concatenated stream available.
- Parameters
thread_pool (
spead2.ThreadPool
) – Thread pool handling the I/Oconfig (
spead2.send.StreamConfig
) – Stream configuration
-
getvalue
()¶ Return a copy of the memory buffer.
- Return type
bytes
In-process transport¶
Refer to the separate documentation.
Asynchronous send¶
As for asynchronous receives, asynchronous sends are managed by asyncio. A
stream can buffer up multiple heaps for asynchronous send, up to the limit
specified by max_heaps in the StreamConfig
. If this
limit is exceeded, heaps will be dropped, and the returned future has an
IOError
exception set. An IOError
could also indicate a
low-level error in sending the heap (for example, if the packet size exceeds
the MTU).
The classes exist in the spead2.send.asyncio
modules, and mostly
implement the same constructors as the synchronous classes. They implement the
following abstract interface (the class does not actually exist):
-
class
spead2.send.asyncio.
AbstractStream
¶ -
async_send_heap
(heap, cnt=- 1, loop=None)¶ Send a heap asynchronously. Note that this is not a coroutine: it returns a future. Adding the heap to the queue is done synchronously, to ensure proper ordering.
- Parameters
heap (
spead2.send.Heap
) – Heap to sendcnt (int) – Heap cnt to send (defaults to auto-incrementing)
loop (
asyncio.AbstractEventLoop
) – Event loop to use, overriding the constructor
-
flush
()¶ Block until all enqueued heaps have been sent (or dropped).
-
async_flush
()¶ Asynchronously wait for all enqueued heaps to be sent. Note that this only waits for heaps passed to
async_send_heap()
prior to this call, not ones added while waiting.
-
TCP¶
For TCP, construction is slightly different: except when providing a custom socket, one uses a coroutine to connect: