Chunking receiver
For an overview, refer to Chunking receiver. This page is a reference for the Python API.
Writing a place callback
A callback is needed to determine which chunk each heap belongs to and where it fits into that chunk. The callback is made from a C++ thread, so it cannot be written in pure Python, or even use the Python C API. It needs to be compiled code. The callback code should also not attempt to acquire the Global Interpreter Lock (GIL), as it may lead to a deadlock.
Once you’ve written the function (see below), it needs to be passed to spead2.
The easiest way to do this is with scipy.LowLevelCallable
.
However, it’s not strictly necessary to use scipy. The callback must be
represented with a tuple whose first element is a PyCapsule
.
The other elements are not used, but a reference to the tuple is held so this
can be used to keep things alive). The capsule’s pointer must be a function
pointer to the code, the name must be the function signature, and a
user-defined pointer may be set as the capsule’s context.
For the place callback, the signature must be one of the following (exactly, with no whitespace changes):
"void (void *, size_t)"
"void (void *, size_t, void *)"
In the latter case, the capsule’s context is provided as the final argument.
The first two arguments are a pointer to
spead2::recv::chunk_place_data
and the size of that structure.
There are lots of ways to write compiled code and access the functions from Python: ctypes, cffi, cython, pybind11 are some of the options. One for which spead2 provided some extra support is numba:
- spead2.recv.numba.chunk_place_data
Numba record type representing the C structure used in the chunk placement callback.
Numba doesn’t (as of 0.54) support pointers in records, so the pointer fields are represented as integers. Use
spead2.numba.intp_to_voidptr()
to convert them to void pointers thennumba.carray()
to convert the void pointer to an array of the appropriate size and dtype.
- spead2.numba.intp_to_voidptr(typingctx, src)
Convert an integer (of type intp or uintp) to a void pointer.
This is useful because numba doesn’t (as of 0.54) support putting pointers into Records. They have to be smuggled in as intp, then converted to pointers with this function.
Reference
- class spead2.recv.Chunk(**kwargs)
The attributes can also be used as keywords arguments to the constructor. This class is designed to allow subclassing, and subclass properties will round-trip through the stream.
- data
Data storage for a chunk. This can be set to any object that supports the Python buffer protocol, as long as it is contiguous and writable. Examples include (contiguous) numpy arrays,
bytearray
andmemoryview
. It can also be set toNone
to clear it.- present
Data storage for flags indicating presence of heaps within the chunk. This can be set to any object that supports the Python buffer protocol, as long as it is contiguous and writable. It can also be set to
None
to clear it.- extra
Data storage for extra data to be written by the place callback. This can be set to any object that supports the Python buffer protocol, as long as it is contiguous and writable. It can also be set to
None
to clear it.- chunk_id
The chunk ID determined by the placement function.
- stream_id
Stream ID of the stream from which the chunk originated.
- class spead2.recv.ChunkStreamConfig(**kwargs)
Parameters for a
ChunkStream
. The configuration options can either be passed to the constructor (as keyword arguments) or set as properties after construction.- Parameters:
items (list[int]) – The items whose immediate values should be passed to the place function. Accessing this property returns a copy, so it cannot be updated with
append
or other mutating operations. Assign a complete list.max_chunks (int) – The maximum number of chunks that can be live at the same time. A value of 1 means that heaps must be received in order: once a chunk is started, no heaps from a previous chunk will be accepted.
place (tuple) – See Writing a place callback.
max_heap_extra (int) – The maximum amount of data a placement function may write to
spead2::recv::chunk_place_data::extra
.
- Raises:
ValueError – if max_chunks is zero.
Enable the packet presence feature. The payload offset of each packet is divided by payload_size and added to the heap index before indexing
spead2.recv.Chunk.present
.- disable_packet_presence()
Disable the packet presence feature enabled by
enable_packet_presence()
.- packet_presence_payload_size
The payload_size if packet presence is enabled, or 0 if not.
- DEFAULT_MAX_CHUNKS
Default value for
max_chunks
.
- class spead2.recv.ChunkRingbuffer(maxsize)
Ringbuffer holding
Chunk
s. The interface is modelled onQueue
, although the exceptions are different. It also implements the iterator protocol.Once a chunk has been added to a ringbuffer it should not be accessed again until it is retrieved from a ringbuffer (either the same one, or more typically, a different one after it has been filled in by
ChunkRingStream
).- maxsize
Maximum capacity of the ringbuffer.
- data_fd
A file descriptor that is readable when there is data available. This will not normally be used directly, but is used in the implementation of
spead2.recv.asyncio.ChunkRingbuffer
.- space_fd
A file descriptor that is readable when there is free space available. This will not normally be used directly, but is used in the implementation of
spead2.recv.asyncio.ChunkRingbuffer
.- qsize()
The current number of items in the ringbuffer.
- empty()
True if the ringbuffer is empty, otherwise false.
- full()
True if the ringbuffer is full, otherwise false.
- get()
Retrieve an item from the ringbuffer, blocking if necessary.
- Raises:
spead2.Stopped – if the ringbuffer was stopped before an item became available.
- get_nowait()
Retrieve an item from the ringbuffer, raising an exception if none is available.
- Raises:
spead2.Stopped – if the ringbuffer is stopped and empty.
spead2.Empty – if the ringbuffer is empty.
- put(chunk)
Put an item into the ringbuffer, blocking until there is space if necessary.
- Raises:
spead2.Stopped – if the ringbuffer was stopped before space became available.
- put_nowait(chunk)
Put an item into the ringbuffer, raising an exception if there is no space available.
- Raises:
spead2.Stopped – if the ringbuffer is stopped.
spead2.Full – if the ringbuffer is full.
- stop()
Shut down the ringbuffer. Producers will no longer be able to add new items. Consumers will be able to retrieve existing items, after which they will receive
spead2.Stopped
, and iterators will terminate.Returns true if this call stopped the ringbuffer, otherwise false.
- add_producer()
Register a new producer. Producers only need to call this if they want to call
remove_producer()
.- remove_producer()
Indicate that a producer registered with
add_producer()
is finished with the ringbuffer. If this was the last producer, the ringbuffer is stopped. Returns true if this call stopped the ringbuffer, otherwise false.
- class spead2.recv.asyncio.ChunkRingbuffer(maxsize)
Asynchronous chunk ringbuffer.
It provides asynchronous versions of the blocking functions, and the asychronous iterator protocol.
- async get()
Get a chunk from the ringbuffer asynchronously.
- Raises:
spead2.Stopped – if the ringbuffer is stopped before a chunk becomes available
- async put(chunk)
Put an item into the ringbuffer asynchronously.
- Raises:
spead2.Stopped – if the ringbuffer is stopped
- class spead2.recv.ChunkRingStream(thread_pool, config, chunk_config, data_ringbuffer, free_ringbuffer)
Stream that works on chunks. While it is not a direct subclass, it implements most of the same functions as
spead2.recv.Stream
, in particular for adding transports.- Parameters:
thread_pool (
spead2.ThreadPool
) – Thread pool handling the I/Oconfig (
spead2.recv.StreamConfig
) – Stream configurationchunk_config (
spead2.recv.ChunkStreamConfig
) – Chunking configurationdata_ringbuffer (
spead2.recv.ChunkRingbuffer
) – Ringbuffer onto which the stream will place completed chunks.free_ringbuffer (
spead2.recv.ChunkRingbuffer
) – Ringbuffer from which the stream will obtain new chunks.
- data_ringbuffer
The data ringbuffer given to the constructor.
- free_ringbuffer
The free ringbuffer given to the constructor.
- add_free_chunk(chunk)
Add a chunk to the free ringbuffer. This takes care of zeroing out the
Chunk.present
array, and it will suppress thespead2.Stopped
exception if the free ringbuffer has been stopped.If the free ring is full, it will raise
spead2.Full
rather than blocking. The free ringbuffer should be constructed with enough slots that this does not happen.