In-process transport
While SPEAD is generally deployed over UDP, it is less than ideal for writing tests:
One has to deal with allocating port numbers and avoiding conflicts.
The sender and receiver need to be running at the same time.
If the receiver doesn’t keep up, it can drop packets.
To simplify unit testing, spead2 also offers an “in-process” transport. One creates a queue, then connects a sender and a receiver to it. The queue has unbounded capacity, so one can safely send all the data first, then create the receiver later. This unbounded capacity also means that it should not be used in production for high-volume streams, because it can exhaust all your memory if the sender works faster than the receiver.
A queue can also be connected to multiple senders. It should generally not be connected to multiple receivers, because they will each get some undefined subset of the packets, which won’t reassemble into the proper heaps. However, if you set the packet size large enough that every heap is contained in one packet then it will work.
Warning
Even though the transport is reliable, a stream has a maximum number of
outstanding heaps. Attempting to send more heaps in parallel than the
stream is configured to handle can lead to heaps being dropped. This is
not a problem when using a single thread with
spead2.send.InprocStream.send_heap()
(because it blocks until the
heap has been fully added to the queue), but needs to be considered when
sending heaps in parallel with spead2.send.asyncio.InprocStream
or when using multiple threads.
Sending
- class spead2.InprocQueue
- add_packet(packet)
Add a packet directly to the queue.
- stop()
Indicate end-of-stream to receivers. It is an error to add any more packets after this.
- class spead2.send.InprocStream(thread_pool, queues, config)
- Parameters:
thread_pool (
spead2.ThreadPool
) – Thread pool handling the I/Oqueues (List[
spead2.InprocQueue
]) – Queues holding the generated packets (one per substream).config (
spead2.send.StreamConfig
) – Stream configuration
- queues
Get the queues passed to the constructor.
- class spead2.send.asyncio.InprocStream(thread_pool, queues, config)
SPEAD over reliable in-process transport.
Note
Data may still be lost if the maximum number of in-flight heaps (set in the stream config) is exceeded. Either set this value to more heaps than will ever be sent (which will use unbounded memory) or be sure to block on the futures returned before exceeding the capacity.
- Parameters:
thread_pool (
spead2.ThreadPool
) – Thread pool handling the I/Oqueues (List[
spead2.InprocQueue
]) – Queue holding the data in flightconfig (
spead2.send.StreamConfig
) – Stream configuration
An asynchronous version of
spead2.send.InprocStream
. Refer to Asynchronous send for general details about asynchronous transport.
Receiving
To connect a receiver to the the queue, use
spead2.recv.Stream.add_inproc_reader()
.