Receiving¶
The classes associated with receiving are in the spead2.recv
package. A stream represents a logical stream, in that packets with
the same heap ID are assumed to belong to the same heap. A stream can have
multiple physical transports.
Streams yield heaps, which are the basic units of data transfer and contain
both item descriptors and item values. While it is possible to directly
inspect heaps, this is not recommended or supported. Instead, heaps are
normally passed to spead2.ItemGroup.update()
.
-
class
spead2.recv.
Heap
¶ -
cnt
¶ Heap identifier (read-only)
-
flavour
¶ SPEAD flavour used to encode the heap (see SPEAD flavours)
-
is_start_of_stream
()¶ Returns true if the packet contains a stream start control item.
-
Note
Malformed packets (such as an unsupported SPEAD version, or
inconsistent heap lengths) are dropped, with a log message. However,
errors in interpreting a fully assembled heap (such as invalid/unsupported
formats, data of the wrong size and so on) are reported as
ValueError
exceptions. Robust code should thus be prepared to
catch exceptions from heap processing.
Blocking receive¶
To do blocking receive, create a spead2.recv.Stream
, and add
transports to it with add_buffer_reader()
and
add_udp_reader()
. Then either iterate over it,
or repeatedly call get()
.
-
class
spead2.recv.
Stream
(thread_pool, bug_compat=0, max_heaps=4, ring_heaps=4)¶ Parameters: - thread_pool (
spead2.ThreadPool
) – Thread pool handling the I/O - bug_compat (int) – Bug compatibility flags (see SPEAD flavours)
- max_heaps (int) – The number of partial heaps that can be live at one time. This affects how intermingled heaps can be (due to out-of-order packet delivery) before heaps get dropped.
- ring_heaps (int) – The capacity of the ring buffer between the network threads and the consumer. Increasing this may reduce lock contention at the cost of more memory usage.
-
set_memory_allocator
(allocator)¶ Set or change the memory allocator for a stream. See Memory allocators for details.
Parameters: pool ( spead2.MemoryAllocator
) – New memory allocator
-
set_memcpy
(id)¶ Set the method used to copy data from the network to the heap. The default is
MEMCPY_STD
. This can be changed toMEMCPY_NONTEMPORAL
, which writes to the destination with a non-temporal cache hint (if SSE2 is enabled at compile time). This can improve performance with large heaps if the data is not going to be used immediately, by reducing cache pollution. Be careful when benchmarking: receiving heaps will generally appear faster, but it can slow down subsequent processing of the heap because it will not be cached.Parameters: id ({ MEMCPY_STD
,MEMCPY_NONTEMPORAL
}) – Identifier for the copy function
-
add_buffer_reader
(buffer)¶ Feed data from an object implementing the buffer protocol.
-
add_udp_reader
(port, max_size=DEFAULT_UDP_MAX_SIZE, buffer_size=DEFAULT_UDP_BUFFER_SIZE, bind_hostname='', socket=None)¶ Feed data from a UDP port.
Parameters: - port (int) – UDP port number
- max_size (int) – Largest packet size that will be accepted.
- buffer_size (int) – Kernel socket buffer size. If this is 0, the OS default is used. If a buffer this large cannot be allocated, a warning will be logged, but there will not be an error.
- bind_hostname (str) – If specified, the socket will be bound to the first IP address found by resolving the given hostname. If this is a multicast group, then it will also subscribe to this multicast group.
- socket (socket.socket) – If specified, this socket is used rather than a new one. The socket must be open but unbound. The caller must not use this socket any further, although it is not necessary to keep it alive. This is mainly useful for fine-tuning socket options such as multicast subscriptions.
-
add_udp_reader
(multicast_group, port, max_size=DEFAULT_UDP_MAX_SIZE, buffer_size=DEFAULT_UDP_BUFFER_SIZE, interface_address) Feed data from a UDP port with multicast (IPv4 only).
Parameters: - multicast_group (str) – Hostname/IP address of the multicast group to subscribe to
- port (int) – UDP port number
- max_size (int) – Largest packet size that will be accepted.
- buffer_size (int) – Kernel socket buffer size. If this is 0, the OS default is used. If a buffer this large cannot be allocated, a warning will be logged, but there will not be an error.
- interface_address (str) – Hostname/IP address of the interface which will be subscribed, or the empty string to let the OS decide.
-
add_udp_reader
(multicast_group, port, max_size=DEFAULT_UDP_MAX_SIZE, buffer_size=DEFAULT_UDP_BUFFER_SIZE, interface_index) Feed data from a UDP port with multicast (IPv6 only).
Parameters: - multicast_group (str) – Hostname/IP address of the multicast group to subscribe to
- port (int) – UDP port number
- max_size (int) – Largest packet size that will be accepted.
- buffer_size (int) – Kernel socket buffer size. If this is 0, the OS default is used. If a buffer this large cannot be allocated, a warning will be logged, but there will not be an error.
- interface_index (str) – Index of the interface which will be subscribed, or 0 to let the OS decide.
-
get
()¶ Returns the next heap, blocking if necessary. If the stream has been stopped, either by calling
stop()
or by receiving a stream control packet, it raisesspead2.Stopped
. However, heap that were already queued when the stream was stopped are returned first.A stream can also be iterated over to yield all heaps.
-
stop
()¶ Shut down the stream and close all associated sockets. It is not possible to restart a stream once it has been stopped; instead, create a new stream.
- thread_pool (
Asynchronous receive¶
Asynchronous I/O is supported through trollius, which is a Python 2 backport
of the Python 3 asyncio
module. It can be combined with other
asynchronous I/O frameworks like twisted.
-
class
spead2.recv.trollius.
Stream
(*args, **kwargs, loop=None)¶ See
spead2.recv.Stream
(the base class) for other constructor arguments.Parameters: loop – Default Trollius event loop for async operations. If not specified, uses the default Trollius event loop. Do not call get_nowait from the base class. -
get
(loop=None)¶ Coroutine that yields the next heap, or raises
spead2.Stopped
once the stream has been stopped and there is no more data. It is safe to have multiple in-flight calls, which will be satisfied in the order they were made.Parameters: loop – Trollius event loop to use, overriding constructor.
-
Memory allocators¶
To allow for performance tuning, it is possible to use an alternative memory
allocator for heap payloads. A few allocator classes are provided; new classes
must currently be written in C++. The default (which is also the base class
for all allocators) is spead2.MemoryAllocator
, which has no
constructor arguments or methods. An alternative is
spead2.MmapAllocator
.
-
class
spead2.
MmapAllocator
(flags=0)¶ An allocator using mmap(2). This may be slightly faster for large allocations, and allows setting custom mmap flags. This is mainly intended for use with the C++ API, but is exposed to Python as well.
Parameters: flags (int) – Extra flags to pass to mmap(2). Finding the numeric values for OS-specific flags is left as a problem for the user.
The most important custom allocator is spead2.MemoryPool
. It allocates
from a pool, rather than directly from the system. This can lead to
significant performance improvements when the allocations are large enough
that the C library allocator does not recycle the memory itself, but instead
requests memory from the kernel.
A memory pool has a range of sizes that it will handle from its pool, by allocating the upper bound size. Thus, setting too wide a range will waste memory, while setting too narrow a range will prevent the memory pool from being used at all. A memory pool is best suited for cases where the heaps are all roughly the same size.
A memory pool can optionally use a background task (scheduled onto a thread pool) to replenish the pool when it gets low. This is useful when heaps are being captured and stored indefinitely rather than processed and released.
-
class
spead2.
MemoryPool
(thread_pool, lower, upper, max_free, initial, low_water, allocator=None)¶ Constructor. One can omit thread_pool and low_water to skip the background refilling.
Parameters: - thread_pool (ThreadPool) – thread pool used for refilling the memory pool
- lower (int) – Minimum allocation size to handle with the pool
- upper (int) – Size of allocations to make
- max_free (int) – Maximum number of allocations held in the pool
- initial (int) – Number of allocations to put in the free pool initially.
- low_water (int) – When fewer than this many buffers remain, the background task will be started and allocate new memory until initial buffers are available.
- allocator (MemoryAllocator) – Underlying memory allocator