Receiving¶
The classes associated with receiving are in the spead2.recv
package. A stream represents a logical stream, in that packets with
the same heap ID are assumed to belong to the same heap. A stream can have
multiple physical transports.
Streams yield heaps, which are the basic units of data transfer and contain
both item descriptors and item values. While it is possible to directly
inspect heaps, this is not recommended or supported. Instead, heaps are
normally passed to spead2.ItemGroup.update()
.
- class spead2.recv.Heap¶
- cnt¶
Heap identifier (read-only)
- flavour¶
SPEAD flavour used to encode the heap (see SPEAD flavours)
- is_start_of_stream()¶
Returns true if the packet contains a stream start control item.
- is_end_of_stream()¶
Returns true if the packet contains a stream stop control item.
Note
Malformed packets (such as an unsupported SPEAD version, or
inconsistent heap lengths) are dropped, with a log message. However,
errors in interpreting a fully assembled heap (such as invalid/unsupported
formats, data of the wrong size and so on) are reported as
ValueError
exceptions. Robust code should thus be prepared to
catch exceptions from heap processing.
Configuration¶
Once a stream is constructed, the configuration cannot be changed. The configuration is
captured in two classes, StreamConfig
and
RingStreamConfig
. The split is a reflection of the C++
API and not particularly relevant in Python. The configuration options can
either be passed to the constructors (as keyword arguments) or set as
properties after construction.
- class spead2.recv.StreamConfig(**kwargs)¶
- Parameters
max_heaps (int) – The number of partial heaps that can be live at one time. This affects how intermingled heaps can be (due to out-of-order packet delivery) before heaps get dropped.
bug_compat (int) – Bug compatibility flags (see SPEAD flavours)
memcpy (int) – Set the method used to copy data from the network to the heap. The default is
MEMCPY_STD
. This can be changed toMEMCPY_NONTEMPORAL
, which writes to the destination with a non-temporal cache hint (if SSE2 is enabled at compile time). This can improve performance with large heaps if the data is not going to be used immediately, by reducing cache pollution. Be careful when benchmarking: receiving heaps will generally appear faster, but it can slow down subsequent processing of the heap because it will not be cached.memory_allocator (
spead2.MemoryAllocator
) – Set the memory allocator for a stream. See Memory allocators for details.stop_on_stop_item (bool) – By default, a heap containing a stream control stop item will terminate the stream (and that heap is discarded). In some cases it is useful to keep the stream object alive and ready to receive a following stream. Setting this attribute to
False
will disable this special treatment. Such heaps can then be detected withis_end_of_stream()
.allow_unsized_heaps (bool) – By default, spead2 caters for heaps without a HEAP_LEN item, and will dynamically extend the memory allocation as data arrives. However, this can be expensive, and ideally senders should include this item. Setting this attribute to
False
will cause packets without this item to be rejected.allow_out_of_order (bool) – Whether to allow packets within a heap to be received out-of-order. See Packet ordering for details.
stream_id (int) – An arbitrary integer to associate with the stream. This is used to identify chunks generated by
spead2.recv.ChunkRingStream
.
- Raises
ValueError – if max_heaps is zero.
- class spead2.recv.RingStreamConfig(**kwargs)¶
- Parameters
heaps (int) – The capacity of the ring buffer between the network threads and the consumer. Increasing this may reduce lock contention at the cost of more memory usage.
contiguous_only (bool) – If set to
False
, incomplete heaps will be included in the stream as instances ofIncompleteHeap
. By default they are discarded. See Incomplete Heaps for details.incomplete_keep_payload_ranges (bool) – If set to
True
, it is possible to retrieve information about which parts of the payload arrived in incomplete heaps, usingIncompleteHeap.payload_ranges()
.
- Raises
ValueError – if ring_heaps is zero.
Blocking receive¶
To do blocking receive, create a spead2.recv.Stream
, and add
transports to it with add_buffer_reader()
,
add_udp_reader()
,
add_tcp_reader()
or
add_udp_pcap_file_reader()
. Then either iterate over
it, or repeatedly call get()
.
- class spead2.recv.Stream(thread_pool, stream_config=StreamConfig(), ring_config=RingStreamConfig())¶
- Parameters
thread_pool (
spead2.ThreadPool
) – Thread pool handling the I/Oconfig (
spead2.recv.StreamConfig
) – Stream configurationring_config (
spead2.recv.RingStreamConfig
) – Ringbuffer configuration
- config¶
Stream configuration passed to the constructor (read-only)
- ring_config¶
Ringbuffer configuration passed to the constructor (read-only)
- add_buffer_reader(buffer)¶
Feed data from an object implementing the buffer protocol.
- add_udp_reader(port, max_size=DEFAULT_UDP_MAX_SIZE, buffer_size=DEFAULT_UDP_BUFFER_SIZE, bind_hostname='', socket=None)¶
Feed data from a UDP port.
- Parameters
port (int) – UDP port number
max_size (int) – Largest packet size that will be accepted.
buffer_size (int) – Kernel socket buffer size. If this is 0, the OS default is used. If a buffer this large cannot be allocated, a warning will be logged, but there will not be an error.
bind_hostname (str) – If specified, the socket will be bound to the first IP address found by resolving the given hostname. If this is a multicast group, then it will also subscribe to this multicast group.
- add_udp_reader(multicast_group, port, max_size=DEFAULT_UDP_MAX_SIZE, buffer_size=DEFAULT_UDP_BUFFER_SIZE, interface_address)
Feed data from a UDP port (IPv4 only). This is intended for use with multicast, but it will also accept a unicast address as long as it is the same as the interface address.
- Parameters
multicast_group (str) – Hostname/IP address of the multicast group to subscribe to
port (int) – UDP port number
max_size (int) – Largest packet size that will be accepted.
buffer_size (int) – Kernel socket buffer size. If this is 0, the OS default is used. If a buffer this large cannot be allocated, a warning will be logged, but there will not be an error.
interface_address (str) – Hostname/IP address of the interface which will be subscribed, or the empty string to let the OS decide.
- add_udp_reader(multicast_group, port, max_size=DEFAULT_UDP_MAX_SIZE, buffer_size=DEFAULT_UDP_BUFFER_SIZE, interface_index)
Feed data from a UDP port with multicast (IPv6 only).
- Parameters
multicast_group (str) – Hostname/IP address of the multicast group to subscribe to
port (int) – UDP port number
max_size (int) – Largest packet size that will be accepted.
buffer_size (int) – Kernel socket buffer size. If this is 0, the OS default is used. If a buffer this large cannot be allocated, a warning will be logged, but there will not be an error.
interface_index (str) – Index of the interface which will be subscribed, or 0 to let the OS decide.
- add_tcp_reader(port, max_size=DEFAULT_TCP_MAX_SIZE, buffer_size=DEFAULT_TCP_BUFFER_SIZE, bind_hostname='')¶
Receive data over TCP/IP. This will listen for a single incoming connection, after which no new connections will be accepted. When the connection is closed, the stream is stopped.
- Parameters
port (int) – TCP port number
max_size (int) – Largest packet size that will be accepted.
buffer_size (int) – Kernel socket buffer size. If this is 0, the OS default is used. If a buffer this large cannot be allocated, a warning will be logged, but there will not be an error.
bind_hostname (str) – If specified, the socket will be bound to the first IP address found by resolving the given hostname.
- add_tcp_reader(acceptor, max_size=DEFAULT_TCP_MAX_SIZE)
Receive data over TCP/IP. This is similar to the previous overload, but takes a user-provided socket, which must already be listening for connections. It duplicates the acceptor socket, so the original can be closed immediately.
- Parameters
acceptor (socket.socket) – Listening socket
max_size (int) – Largest packet size that will be accepted.
- add_udp_pcap_file_reader(filename)¶
Feed data from a pcap file (for example, captured with tcpdump or mcdump). This is only available if libpcap development files were found at compile time.
- add_inproc_reader(queue)¶
Feed data from an in-process queue. Refer to In-process transport for details.
- get()¶
Returns the next heap, blocking if necessary. If the stream has been stopped, either by calling
stop()
or by receiving a stream control packet, it raisesspead2.Stopped
. However, heap that were already queued when the stream was stopped are returned first.A stream can also be iterated over to yield all heaps.
- stop()¶
Shut down the stream and close all associated sockets. It is not possible to restart a stream once it has been stopped; instead, create a new stream.
- fd¶
The read end of a pipe to which a byte is written when a heap is received. Do not read from this pipe. It is used for integration with asynchronous I/O frameworks (see below).
- stats¶
Statistics about the stream.
- ringbuffer¶
The internal ringbuffer of the stream (see Statistics).
Asynchronous receive¶
Asynchronous I/O is supported through Python’s asyncio
module. It can
be combined with other asynchronous I/O frameworks like twisted and Tornado.
- class spead2.recv.asyncio.Stream(*args, **kwargs)¶
See
spead2.recv.Stream
(the base class) for other constructor arguments.- get()¶
Coroutine that yields the next heap, or raises
spead2.Stopped
once the stream has been stopped and there is no more data. It is safe to have multiple in-flight calls, which will be satisfied in the order they were made.
The stream is also asynchronously iterable, i.e., can be used in an async
for
loop to iterate over the heaps.
Packet ordering¶
SPEAD is typically carried over UDP, and by its nature, UDP allows packets to be reordered. Packets may also arrive interleaved if they are produced by multiple senders. We consider two sorts of packet ordering issues:
Re-ordering within a heap. By default, spead2 assumes that all the packets that form a heap will arrive in order, and discards any packet that does not have the expected payload offset. In most networks this is a safe assumption provided that all the packets originate from the same sender (IP address and port number) and have the same destination.
If this assumption is not appropriate, it can be changed with the
allow_out_of_order
attribute ofspead2.recv.StreamConfig
. This has minimal impact when packets do in fact arrive in order, but reassembling arbitrarily ordered packets can be expensive. Allowing for out-of-order arrival also makes handling lost packets more expensive (because one must cater for them arriving later), which can lead to a feedback loop as this more expensive processing can lead to further packet loss.Interleaving of packets from different heaps. This is always supported, but to a bounded degree so that lost packets don’t lead to heaps being kept around indefinitely in the hope that the packet may arrive. The
max_heaps
attribute ofspead2.recv.StreamConfig
determines the amount of overlap allowed: once a packet in heap \(n\) is observed, it is assumed that heap \(n - \text{max_heaps}\) is complete. When there are many producers it will likely to be necessary to increase this value. Larger values increase the memory usage for partial heaps, and have a small performance impact.
Memory allocators¶
To allow for performance tuning, it is possible to use an alternative memory
allocator for heap payloads. A few allocator classes are provided; new classes
must currently be written in C++. The default (which is also the base class
for all allocators) is spead2.MemoryAllocator
, which has no
constructor arguments or methods. An alternative is
spead2.MmapAllocator
.
- class spead2.MmapAllocator(flags=0)¶
An allocator using mmap(2). This may be slightly faster for large allocations, and allows setting custom mmap flags. This is mainly intended for use with the C++ API, but is exposed to Python as well.
- Parameters
flags (int) – Extra flags to pass to mmap(2). Finding the numeric values for OS-specific flags is left as a problem for the user.
The most important custom allocator is spead2.MemoryPool
. It allocates
from a pool, rather than directly from the system. This can lead to
significant performance improvements when the allocations are large enough
that the C library allocator does not recycle the memory itself, but instead
requests memory from the kernel.
A memory pool has a range of sizes that it will handle from its pool, by allocating the upper bound size. Thus, setting too wide a range will waste memory, while setting too narrow a range will prevent the memory pool from being used at all. A memory pool is best suited for cases where the heaps are all roughly the same size.
A memory pool can optionally use a background task (scheduled onto a thread pool) to replenish the pool when it gets low. This is useful when heaps are being captured and stored indefinitely rather than processed and released.
- class spead2.MemoryPool(thread_pool, lower, upper, max_free, initial, low_water, allocator=None)¶
Constructor. One can omit thread_pool and low_water to skip the background refilling.
- Parameters
thread_pool (ThreadPool) – thread pool used for refilling the memory pool
lower (int) – Minimum allocation size to handle with the pool
upper (int) – Size of allocations to make
max_free (int) – Maximum number of allocations held in the pool
initial (int) – Number of allocations to put in the free pool initially.
low_water (int) – When fewer than this many buffers remain, the background task will be started and allocate new memory until initial buffers are available.
allocator (MemoryAllocator) – Underlying memory allocator
- warn_on_empty¶
Whether to issue a warning if the memory pool becomes empty and needs to allocate new memory on request. It defaults to true.
Incomplete Heaps¶
By default, an incomplete heap (one for which some but not all of the packets
were received) is simply dropped and a warning is printed. Advanced users
might need finer control, such as recording metrics about the number of these
heaps. To do so, set contiguous_only to False
in the
RingStreamConfig
. The stream will then yield
instances of IncompleteHeap
.
- class spead2.recv.IncompleteHeap¶
- cnt¶
Heap identifier (read-only)
- flavour¶
SPEAD flavour used to encode the heap (see SPEAD flavours)
- heap_length¶
The expected number of bytes of payload (-1 if unknown)
- received_length¶
The number of bytes of payload that were actually received
- payload_ranges¶
A list of pairs of heap offsets. Each pair is a range of bytes that was received. This is only non-empty if incomplete_keep_payload_ranges was set in the
RingStreamConfig
; otherwise the information is dropped to save memory.When using this, you should also set allow_out_of_order to
True
in theStreamConfig
, as otherwise any data after the first lost packet is discarded.
- is_start_of_stream()¶
Returns true if the packet contains a stream start control item.
- is_end_of_stream()¶
Returns true if the packet contains a stream stop control item.
Statistics¶
The stats
property of a stream contains
statistics about the stream. Note that while the fields below are expected to
be stable except where otherwise noted, their exact interpretation in edge
cases is subject to change as the implementation evolves. It is intended for
instrumentation, rather than for driving application logic.
Each time the property is accessed, an internally consistent view of the
statistics is returned. However, it is not synchronised with other aspects of
the stream. For example, it’s theoretically possible to retrieve 5 heaps from
the stream iterator, then find that StreamStats.heaps
is (briefly)
4.
Some readers process packets in batches, and the statistics are only updated after a whole batch is added. This can be particularly noticeable if the ringbuffer fills up and blocks the reader, as this prevents the batch from completing and so heaps that have already been received by Python code might not be reflected in the statistics.
- class spead2.recv.StreamStats¶
- heaps¶
Total number of heaps put into the stream. This includes incomplete heaps, and complete heaps that were received but did not make it into the ringbuffer before
stop()
was called. It excludes the heap that contained the stop item.- incomplete_heaps_evicted¶
Number of incomplete heaps that were evicted from the buffer to make room for new data.
- incomplete_heaps_flushed¶
Number of incomplete heaps that were still in the buffer when the stream stopped.
- packets¶
Total number of packets received, including the one containing the stop item.
- batches¶
Number of batches of packets. Some readers are able to take multiple packets from the network in one go, and each time this forms a batch.
- worker_blocked¶
Number of times a worker thread was blocked because the ringbuffer was full. If this is non-zero, it indicates that the stream is not being read fast enough, or that the ring_heaps constructor parameter needs to be increased to buffer sudden bursts.
- max_batch¶
Maximum number of packets received as a unit. This is only applicable to readers that support fetching a batch of packets from the source.
- single_packet_heaps¶
Number of heaps that were entirely contained in a single packet. These take a slightly faster path as it is not necessary to reassemble them.
- search_dist¶
Number of hash table entries searched to find the heaps associated with packets. This is intended for debugging/profiling spead2 and may be removed without notice.
Additional statistics are available on the ringbuffer underlying the stream
(ringbuffer
property), with similar caveats about
synchronisation.