5. Pipelining the sender
At present, our sender program only does one thing at a time, alternating between generating some data (in our case, by just filling it with a constant) and transmitting that data. Modern processors have multiple cores, so we can speed things up by performing these operations in parallel. But apart from just increasing performance for the sake of it, this is important for real UDP applications because it allows the transmission to be smoothed out with a constant transmission speed, rather than alternating idle periods with rapid bursts. Rapid bursts can cause congestion with other traffic on the network (leading to packet loss) or overwhelm receivers that don’t have the performance to absorb them.
Serial computation (yellow) and transmission (green). Arrows show dependencies.
Parallel computation (yellow) and transmission (green).
To generate and transmit data in parallel, we’re going to need to restructure
our code a bit. In Python we’ll use the asyncio
module to manage
asynchronous sending, which means we’ll need an asynchronous main()
function. If you’re using Python but you’ve never used asyncio
before,
this would be a good time to find a tutorial on it, such as this one.
Modify the code as follows:
import asyncio
...
async def main():
...
if __name__ == "__main__":
asyncio.run(main())
We also need to use the asynchronous classes and methods of the spead2 API:
import spead2.send.asyncio
...
stream = spead2.send.asyncio.UdpStream(thread_pool, [(args.host, args.port)], config)
...
await stream.async_send_heap(heap)
...
await stream.async_send_heap(item_group.get_end())
That brings us to parity with the current C++ version, which already uses
async_send_heap
. However, we haven’t actually created any concurrency
yet, because immediately after starting the transmission, we wait for it to
complete (with await
in Python or .get()
in C++) before doing
anything else.
It’s important to realise that async_send_heap
does not necessarily
copy the heap data before transmitting it. Thus, between calling
async_send_heap
and waiting for it to complete, you must be careful not to
modify the data. If we are to prepare the next heap while the current heap is
being transmitted, we must do the preparation in different memory, and we
also need to ensure that the memory isn’t freed while it is being used. We’ll
use a State
class to hold all the data that we need to associate
with a particular heap and keep alive until later. In Python this is simpler
because the garbage collector keeps things alive for us.
from dataclasses import dataclass, field
...
@dataclass
class State:
future: asyncio.Future[int] = field(default_factory=asyncio.Future)
struct state
{
std::future<spead2::item_pointer_t> future;
std::vector<std::int8_t> adc_samples;
spead2::send::heap heap;
};
A “future” is an abstraction for a result that will only become available at some point in the future, and on which one may wait; in this case the result of transmitting a heap. If transmission fails, the result is an exception; otherwise, it is the number of bytes actually transmitted (including overheads from the SPEAD protocol, but excluding overheads from lower-level protocols such as IP and UDP).
We’re going to submit heap \(n+1\) to async_send_heap
while heap
\(n\) is potentially still “in-flight”. A stream has a bounded capacity
for in-flight heaps, which we can configure with the config object. The
default is actually more than 2, so this isn’t necessary for our
example, but we’ll be explicit in order to demonstrate the syntax.
config = spead2.send.StreamConfig(rate=0.0, max_heaps=2)
config.set_max_heaps(2);
Now we rework the main loop to use the state class, and to delay retrieving
the result of the future for heap \(n\) until we’ve passed heap
\(n+1\) to async_send_heap
. Our diagram above isn’t quite accurate,
because we don’t start computing heap \(n+2\) until we’ve retrieved the
result of heap \(n\). The actual situation is this (note the new arrow
from heap 1 to heap 3).
Parallel computation (yellow) and transmission (green) with at most two heaps in flight.
old_state = None
start = time.perf_counter()
for i in range(n_heaps):
new_state = State()
...
new_state.future = stream.async_send_heap(heap)
if old_state is not None:
await old_state.future
old_state = new_state
await old_state.future
#include <memory>
...
std::unique_ptr<state> old_state;
auto start = std::chrono::high_resolution_clock::now();
for (int i = 0; i < n_heaps; i++)
{
auto new_state = std::make_unique<state>();
auto &heap = new_state->heap; // delete previous declaration of 'heap'
auto &adc_samples = new_state->adc_samples;
adc_samples.resize(heap_size, i);
...
new_state->future = stream.async_send_heap(heap, boost::asio::use_future);
if (old_state)
old_state->future.get();
old_state = std::move(new_state);
}
old_state->future.get();
Note how at the end of the loop we still need to wait for the final heap.
This improves performance to around 4000 MB/s for both Python and C++.
Apart from overlapping the data generation with the transmission, there is another hidden benefit to this approach: pipelining. Even if the data generation were free, the original code would have sub-optimal performance because we wait until transmission is complete before submitting the next batch of work. This means that the networking thread will go to sleep after finishing heap \(n\) and need to be woken up again when heap \(n+1\) is submitted, and no data is being transmitted while the thread is being woken up. With the new code, provided the processing is fast enough to submit heap \(n+1\) before heap \(n\) is complete, the worker thread can move directly from one to the next without needing to pause. In our example this makes no noticeable difference, but it can be significant if the heaps are small, and it can even be beneficial to have more than two heaps in flight at a time.
Full code
#!/usr/bin/env python3
# Copyright 2023-2024 National Research Foundation (SARAO)
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse
import asyncio
import time
from dataclasses import dataclass, field
import numpy as np
import spead2.send.asyncio
@dataclass
class State:
future: asyncio.Future[int] = field(default_factory=asyncio.Future)
async def main():
parser = argparse.ArgumentParser()
parser.add_argument("-n", "--heaps", type=int, default=1000)
parser.add_argument("host", type=str)
parser.add_argument("port", type=int)
args = parser.parse_args()
thread_pool = spead2.ThreadPool(1, [0])
spead2.ThreadPool.set_affinity(1)
config = spead2.send.StreamConfig(rate=0.0, max_heaps=2)
stream = spead2.send.asyncio.UdpStream(thread_pool, [(args.host, args.port)], config)
heap_size = 1024 * 1024
item_group = spead2.send.ItemGroup()
item_group.add_item(
0x1600,
"timestamp",
"Index of the first sample",
shape=(),
format=[("u", spead2.Flavour().heap_address_bits)],
)
item_group.add_item(
0x3300,
"adc_samples",
"ADC converter output",
shape=(heap_size,),
dtype=np.int8,
)
n_heaps = args.heaps
old_state = None
start = time.perf_counter()
for i in range(n_heaps):
new_state = State()
item_group["timestamp"].value = i * heap_size
item_group["adc_samples"].value = np.full(heap_size, np.int_(i), np.int8)
heap = item_group.get_heap()
new_state.future = stream.async_send_heap(heap)
if old_state is not None:
await old_state.future
old_state = new_state
await old_state.future
elapsed = time.perf_counter() - start
print(f"{heap_size * n_heaps / elapsed / 1e6:.2f} MB/s")
await stream.async_send_heap(item_group.get_end())
if __name__ == "__main__":
asyncio.run(main())
/* Copyright 2023-2025 National Research Foundation (SARAO)
*
* This program is free software: you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the Free
* Software Foundation, either version 3 of the License, or (at your option) any
* later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
* details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <cstdint>
#include <string>
#include <vector>
#include <utility>
#include <chrono>
#include <iostream>
#include <algorithm>
#include <memory>
#include <unistd.h>
#include <boost/asio.hpp>
#include <spead2/common_defines.h>
#include <spead2/common_flavour.h>
#include <spead2/common_thread_pool.h>
#include <spead2/send_heap.h>
#include <spead2/send_stream_config.h>
#include <spead2/send_udp.h>
struct state
{
std::future<spead2::item_pointer_t> future;
std::vector<std::int8_t> adc_samples;
spead2::send::heap heap;
};
static void usage(const char * name)
{
std::cerr << "Usage: " << name << " [-n heaps] host port\n";
}
int main(int argc, char * const argv[])
{
int opt;
int n_heaps = 1000;
while ((opt = getopt(argc, argv, "n:")) != -1)
{
switch (opt)
{
case 'n':
n_heaps = std::stoi(optarg);
break;
default:
usage(argv[0]);
return 2;
}
}
if (argc - optind != 2)
{
usage(argv[0]);
return 2;
}
spead2::thread_pool thread_pool(1, {0});
spead2::thread_pool::set_affinity(1);
spead2::send::stream_config config;
config.set_rate(0.0);
config.set_max_heaps(2);
boost::asio::ip::udp::endpoint endpoint(
boost::asio::ip::make_address(argv[optind]),
std::stoi(argv[optind + 1])
);
spead2::send::udp_stream stream(thread_pool, {endpoint}, config);
const std::int64_t heap_size = 1024 * 1024;
spead2::descriptor timestamp_desc;
timestamp_desc.id = 0x1600;
timestamp_desc.name = "timestamp";
timestamp_desc.description = "Index of the first sample";
timestamp_desc.format.emplace_back('u', spead2::flavour().get_heap_address_bits());
spead2::descriptor adc_samples_desc;
adc_samples_desc.id = 0x3300;
adc_samples_desc.name = "adc_samples";
adc_samples_desc.description = "ADC converter output";
adc_samples_desc.numpy_header =
"{'shape': (" + std::to_string(heap_size) + ",), 'fortran_order': False, 'descr': 'i1'}";
std::unique_ptr<state> old_state;
auto start = std::chrono::high_resolution_clock::now();
for (int i = 0; i < n_heaps; i++)
{
auto new_state = std::make_unique<state>();
auto &heap = new_state->heap;
auto &adc_samples = new_state->adc_samples;
adc_samples.resize(heap_size);
// Add descriptors to the first heap
if (i == 0)
{
heap.add_descriptor(timestamp_desc);
heap.add_descriptor(adc_samples_desc);
}
std::fill(adc_samples.begin(), adc_samples.end(), i);
// Add the data and timestamp to the heap
heap.add_item(timestamp_desc.id, i * heap_size);
heap.add_item(
adc_samples_desc.id,
adc_samples.data(),
adc_samples.size() * sizeof(adc_samples[0]),
true
);
new_state->future = stream.async_send_heap(heap, boost::asio::use_future);
if (old_state)
old_state->future.get();
old_state = std::move(new_state);
}
old_state->future.get();
std::chrono::duration<double> elapsed = std::chrono::high_resolution_clock::now() - start;
std::cout << heap_size * n_heaps / elapsed.count() / 1e6 << " MB/s\n";
// Send an end-of-stream control item
spead2::send::heap heap;
heap.add_end();
stream.async_send_heap(heap, boost::asio::use_future).get();
return 0;
}