Chunking receiver

For an overview, refer to Chunking receiver. This page is a reference for the Python API.

Writing a place callback

A callback is needed to determine which chunk each heap belongs to and where it fits into that chunk. The callback is made from a C++ thread, so it cannot be written in pure Python, or even use the Python C API. It needs to be compiled code. The callback code should also not attempt to acquire the Global Interpreter Lock (GIL), as it may lead to a deadlock.

Once you’ve written the function (see below), it needs to be passed to spead2. The easiest way to do this is with scipy.LowLevelCallable. However, it’s not strictly necessary to use scipy. The callback must be represented with a tuple whose first element is a PyCapsule. The other elements are not used, but a reference to the tuple is held so this can be used to keep things alive). The capsule’s pointer must be a function pointer to the code, the name must be the function signature, and a user-defined pointer may be set as the capsule’s context.

For the place callback, the signature must be one of the following (exactly, with no whitespace changes):

  • "void (void *, size_t)"

  • "void (void *, size_t, void *)"

In the latter case, the capsule’s context is provided as the final argument. The first two arguments are a pointer to spead2::recv::chunk_place_data and the size of that structure.

There are lots of ways to write compiled code and access the functions from Python: ctypes, cffi, cython, pybind11 are some of the options. One for which spead2 provided some extra support is numba:

spead2.recv.numba.chunk_place_data

Numba record type representing the C structure used in the chunk placement callback.

Numba doesn’t (as of 0.54) support pointers in records, so the pointer fields are represented as integers. Use spead2.numba.intp_to_voidptr() to convert them to void pointers then numba.carray() to convert the void pointer to an array of the appropriate size and dtype.

spead2.numba.intp_to_voidptr(typingctx, src)

Convert an integer (of type intp or uintp) to a void pointer.

This is useful because numba doesn’t (as of 0.54) support putting pointers into Records. They have to be smuggled in as intp, then converted to pointers with this function.

Reference

class spead2.recv.Chunk(**kwargs)

The attributes can also be used as keywords arguments to the constructor. This class is designed to allow subclassing, and subclass properties will round-trip through the stream.

data

Data storage for a chunk. This can be set to any object that supports the Python buffer protocol, as long as it is contiguous and writable. Examples include (contiguous) numpy arrays, bytearray and memoryview. It can also be set to None to clear it.

present

Data storage for flags indicating presence of heaps within the chunk. This can be set to any object that supports the Python buffer protocol, as long as it is contiguous and writable. It can also be set to None to clear it.

extra

Data storage for extra data to be written by the place callback. This can be set to any object that supports the Python buffer protocol, as long as it is contiguous and writable. It can also be set to None to clear it.

chunk_id

The chunk ID determined by the placement function.

stream_id

Stream ID of the stream from which the chunk originated.

class spead2.recv.ChunkStreamConfig(**kwargs)

Parameters for a ChunkStream. The configuration options can either be passed to the constructor (as keyword arguments) or set as properties after construction.

Parameters:
  • items (List[int]) – The items whose immediate values should be passed to the place function. Accessing this property returns a copy, so it cannot be updated with append or other mutating operations. Assign a complete list.

  • max_chunks (int) – The maximum number of chunks that can be live at the same time. A value of 1 means that heaps must be received in order: once a chunk is started, no heaps from a previous chunk will be accepted.

  • place (tuple) – See Writing a place callback.

  • max_heap_extra (int) – The maximum amount of data a placement function may write to spead2::recv::chunk_place_data::extra.

Raises:

ValueError – if max_chunks is zero.

enable_packet_presence(payload_size: int)

Enable the packet presence feature. The payload offset of each packet is divided by payload_size and added to the heap index before indexing spead2.recv.Chunk.present.

disable_packet_presence()

Disable the packet presence feature enabled by enable_packet_presence().

packet_presence_payload_size

The payload_size if packet presence is enabled, or 0 if not.

DEFAULT_MAX_CHUNKS

Default value for max_chunks.

class spead2.recv.ChunkRingbuffer(maxsize)

Ringbuffer holding Chunks. The interface is modelled on Queue, although the exceptions are different. It also implements the iterator protocol.

Once a chunk has been added to a ringbuffer it should not be accessed again until it is retrieved from a ringbuffer (either the same one, or more typically, a different one after it has been filled in by ChunkRingStream).

maxsize

Maximum capacity of the ringbuffer.

data_fd

A file descriptor that is readable when there is data available. This will not normally be used directly, but is used in the implementation of spead2.recv.asyncio.ChunkRingbuffer.

space_fd

A file descriptor that is readable when there is free space available. This will not normally be used directly, but is used in the implementation of spead2.recv.asyncio.ChunkRingbuffer.

qsize()

The current number of items in the ringbuffer.

empty()

True if the ringbuffer is empty, otherwise false.

full()

True if the ringbuffer is full, otherwise false.

get()

Retrieve an item from the ringbuffer, blocking if necessary.

Raises:

spead2.Stopped – if the ringbuffer was stopped before an item became available.

get_nowait()

Retrieve an item from the ringbuffer, raising an exception if none is available.

Raises:
  • spead2.Stopped – if the ringbuffer is stopped and empty.

  • spead2.Empty – if the ringbuffer is empty.

put(chunk)

Put an item into the ringbuffer, blocking until there is space if necessary.

Raises:

spead2.Stopped – if the ringbuffer was stopped before space became available.

put_nowait(chunk)

Put an item into the ringbuffer, raising an exception if there is no space available.

Raises:
  • spead2.Stopped – if the ringbuffer is stopped.

  • spead2.Full – if the ringbuffer is full.

stop()

Shut down the ringbuffer. Producers will no longer be able to add new items. Consumers will be able to retrieve existing items, after which they will receive spead2.Stopped, and iterators will terminate.

Returns true if this call stopped the ringbuffer, otherwise false.

add_producer()

Register a new producer. Producers only need to call this if they want to call remove_producer().

remove_producer()

Indicate that a producer registered with add_producer() is finished with the ringbuffer. If this was the last producer, the ringbuffer is stopped. Returns true if this call stopped the ringbuffer, otherwise false.

class spead2.recv.asyncio.ChunkRingbuffer(maxsize)

Asynchronous chunk ringbuffer.

It provides asynchronous versions of the blocking functions, and the asychronous iterator protocol.

async get()

Get a chunk from the ringbuffer asynchronously.

Raises:

spead2.Stopped – if the ringbuffer is stopped before a chunk becomes available

async put(chunk)

Put an item into the ringbuffer asynchronously.

Raises:

spead2.Stopped – if the ringbuffer is stopped

class spead2.recv.ChunkRingStream(thread_pool, config, chunk_config, data_ringbuffer, free_ringbuffer)

Stream that works on chunks. While it is not a direct subclass, it implements most of the same functions as spead2.recv.Stream, in particular for adding transports.

Parameters:
data_ringbuffer

The data ringbuffer given to the constructor.

free_ringbuffer

The free ringbuffer given to the constructor.

add_free_chunk(chunk)

Add a chunk to the free ringbuffer. This takes care of zeroing out the Chunk.present array, and it will suppress the spead2.Stopped exception if the free ringbuffer has been stopped.

If the free ring is full, it will raise spead2.Full rather than blocking. The free ringbuffer should be constructed with enough slots that this does not happen.