In-process transport

While SPEAD is generally deployed over UDP, it is less than ideal for writing tests:

  • One has to deal with allocating port numbers and avoiding conflicts.

  • The sender and receiver need to be running at the same time.

  • If the receiver doesn’t keep up, it can drop packets.

To simplify unit testing, spead2 also offers an “in-process” transport. One creates a queue, then connects a sender and a receiver to it. The queue has unbounded capacity, so one can safely send all the data first, then create the receiver later. This unbounded capacity also means that it should not be used in production for high-volume streams, because it can exhaust all your memory if the sender works faster than the receiver.

A queue can also be connected to multiple senders. It should generally not be connected to multiple receivers, because they will each get some undefined subset of the packets, which won’t reassemble into the proper heaps. However, if you set the packet size large enough that every heap is contained in one packet then it will work.

Warning

Even though the transport is reliable, a stream has a maximum number of outstanding heaps. Attempting to send more heaps in parallel than the stream is configured to handle can lead to heaps being dropped. This is not a problem when using a single thread with spead2.send.InprocStream.send_heap() (because it blocks until the heap has been fully added to the queue), but needs to be considered when sending heaps in parallel with spead2.send.asyncio.InprocStream or when using multiple threads.

Sending

class spead2.InprocQueue
add_packet(packet)

Add a packet directly to the queue.

stop()

Indicate end-of-stream to receivers. It is an error to add any more packets after this.

class spead2.send.InprocStream(thread_pool, queues, config)
Parameters:
queues

Get the queues passed to the constructor.

class spead2.send.asyncio.InprocStream(thread_pool, queues, config)

SPEAD over reliable in-process transport.

Note

Data may still be lost if the maximum number of in-flight heaps (set in the stream config) is exceeded. Either set this value to more heaps than will ever be sent (which will use unbounded memory) or be sure to block on the futures returned before exceeding the capacity.

Parameters:

An asynchronous version of spead2.send.InprocStream. Refer to Asynchronous send for general details about asynchronous transport.

Receiving

To connect a receiver to the the queue, use spead2.recv.Stream.add_inproc_reader().