11. Batching heaps
In the previous section we improved the performance for sending small heaps, but it still isn’t as efficient as larger heaps. In this section we’ll improve things further.
The Python version is particularly slow because there is Python code running for every heap, and Python is slow. Interactions between Python code and extensions is known to be particularly slow. Additionally, both the C++ and Python versions perform inter-thread communication for every heap, and that is similarly expensive. The solution is batching: instead of submitting one heap at a time and waiting for one heap at a time, we’ll process heaps in batches.
This will require some refactoring of the code. The State
class
will now represent all the state for a batch, rather than a single heap. It
will still only hold one future, but now holds a list of heaps, multiple
timestamps and multiple arrays of ADC samples. In Python we’ll handle the
latter by adding an extra dimension to the arrays we’re already storing; for
C++ we’ll create a new structure to hold per-heap data other than the heaps
themselves (we’ll see later why we do not store the heaps themselves in this
structure).
@dataclass
class State:
adc_samples: np.ndarray
timestamps: np.ndarray
heaps: list[spead2.send.Heap]
future: asyncio.Future[int] = field(default_factory=asyncio.Future)
...
struct heap_data
{
std::vector<std::int8_t> adc_samples;
std::uint64_t timestamp; // big endian
};
struct state
{
std::future<spead2::item_pointer_t> future;
std::vector<heap_data> data;
std::vector<spead2::send::heap> heaps;
...
};
Next, we need to decide how big to make the batches, and how many batches to keep in flight. We’ll stick with our original goal of keeping roughly 1 MiB in flight, and split it into two batches. My experiments indicate that there is no benefit to using more than two (but we need at least two, so that we can transmit one batch while preparing the other, just as in section 5).
batches = 2
batch_heaps = max(1, 512 * 1024 // args.heap_size)
config = spead2.send.StreamConfig(rate=0.0, max_heaps=batches * batch_heaps)
const std::size_t batches = 2;
const std::size_t batch_heaps = std::max(std::int64_t(2), 512 * 1024 / heap_size);
config.set_max_heaps(batches * batch_heaps);
Preparing the states
array is now slightly more involved, as it needs two
levels of indexing. We use a helper function to prepare the heap list for
either an element of states
or the special heap list for the first batch
(which includes the descriptors).
def make_heaps(adc_samples, timestamps, first):
heaps = []
for j in range(batch_heaps):
item_group["timestamp"].value = timestamps[j, ...]
item_group["adc_samples"].value = adc_samples[j, ...]
heap = item_group.get_heap(descriptors="none" if j or not first else "all", data="all")
heaps.append(spead2.send.HeapReference(heap))
return heaps
states = []
for _ in range(batches):
adc_samples = np.ones((batch_heaps, heap_size), np.int8)
timestamps = np.ones(batch_heaps, ">u8")
states.append(
State(
adc_samples=adc_samples,
timestamps=timestamps,
heaps=make_heaps(adc_samples, timestamps, False),
)
)
first_heaps = make_heaps(states[0].adc_samples, states[0].timestamps, True)
auto make_heaps = [&](const std::vector<heap_data> &data, bool first)
{
std::vector<spead2::send::heap> heaps(batch_heaps);
for (std::size_t j = 0; j < batch_heaps; j++)
{
auto &heap = heaps[j];
auto &adc_samples = data[j].adc_samples;
auto ×tamp = data[j].timestamp;
if (first && j == 0)
{
heap.add_descriptor(timestamp_desc);
heap.add_descriptor(adc_samples_desc);
}
heap.add_item(timestamp_desc.id, (char *) ×tamp + 3, 5, true);
heap.add_item(
adc_samples_desc.id,
adc_samples.data(),
adc_samples.size() * sizeof(adc_samples[0]),
true
);
}
return heaps;
};
std::vector<state> states(batches);
for (std::size_t i = 0; i < states.size(); i++)
{
auto &state = states[i];
state.data.resize(batch_heaps);
for (std::size_t j = 0; j < batch_heaps; j++)
{
state.data[j].adc_samples.resize(heap_size);
}
state.heaps = make_heaps(state.data, false);
}
auto first_heaps = make_heaps(states[0].data, true);
The Python code uses an obscure piece of numpy syntax:
timestamps[j, ...]
is a zero-dimensional array which
references the memory in timestamps
; in contrast,
timestamps[j]
is a scalar copy of an element.
Our main transmission loop is also more complex: it now needs to run once per
batch, while still updating the data for all the heaps in the batch. In
Python, we want to make sure that we don’t involve the Python interpreter on a
per-heap basis, so we use numpy functions to compute each array in one
step. We also need to handle the case where n_heaps
is not a multiple of
batch_heaps
, in which case we need to truncate everything to the
remainder. We use n
to denote the number of heaps to actually use from
this batch.
for i in range(0, n_heaps, batch_heaps):
state = states[(i // batch_heaps) % len(states)]
end = min(i + batch_heaps, n_heaps)
n = end - i
await state.future # Wait for any previous use of this state to complete
state.adc_samples[:n] = np.arange(i, end).astype(np.int8)[:, np.newaxis]
state.timestamps[:n] = np.arange(i * heap_size, end * heap_size, heap_size)
heaps = state.heaps if i else first_heaps
if n < batch_heaps:
heaps = heaps[:n]
state.future = stream.async_send_heaps(heaps, spead2.send.GroupMode.SERIAL)
for (int i = 0; i < n_heaps; i += batch_heaps)
{
auto &state = states[(i / batch_heaps) % states.size()];
// Wait for any previous use of this state to complete
state.future.wait();
auto &heaps = (i == 0) ? first_heaps : state.heaps;
std::int64_t end = std::min(n_heaps, i + int(batch_heaps));
std::size_t n = end - i;
for (std::size_t j = 0; j < n; j++)
{
std::int64_t heap_index = i + j;
auto &data = state.data[j];
auto &adc_samples = data.adc_samples;
data.timestamp = spead2::htobe(std::uint64_t(heap_index * heap_size));
std::fill(adc_samples.begin(), adc_samples.end(), heap_index);
}
state.future = stream.async_send_heaps(
heaps.begin(), heaps.begin() + n, boost::asio::use_future,
spead2::send::group_mode::SERIAL
);
}
Everything up to the last statement has just been refactoring, but the call to
async_send_heaps
is new. Instead of taking a single heap (as
async_send_heap
does), it takes a batch of heaps, and signals completion
only when the whole batch has been transmitted. One minor limitation this
imposes is that it can only signal failure of the batch, without being able to
indicate which (if any) of the heaps were successfully transmitted.
It also takes a parameter indicating in which order to transmit the heaps. We’re using the serial mode, in which each heap is transmitted completely before starting on the next heap, and which matches the previous behaviour. There is an alternative mode in which the packets comprising the heaps are interleaved, but that is aimed at a different use case which is not discussed here.
The results are looking much better, but the Python version still lags behind
on the smallest heap size.
There is one more trick up our sleeve: while it looks like there is only a
constant amount of interaction with the Python interpreter per batch,
async_send_heaps
actually needs to increment the reference count of every
heap in the list to ensure it stays alive. We can avoid this by wrapping the
list in a spead2-specific structure that holds those references for us.
@dataclass
class State:
...
heaps: spead2.send.HeapReferenceList
...
async def main():
...
def make_heaps(adc_samples, timestamps, first):
...
return spead2.send.HeapReferenceList(heaps)
This brings the Python version to parity with the C++ version on 8192-byte heaps.
You might be wondering why 8192-byte heaps perform so much better than
16384-byte heaps. It is due to the -p 9000
argument: the Linux kernel has a
mechanism (Generic Segmentation Offload) for efficiently handling sequences of packets
that all have the same metadata, and in particular are all the same size.
With 8192-byte heaps, each heap fits within a single packet, and the packets
are all the same size (except for the first). With 16384-byte
heaps, two packets are needed: they happen to be 9000 bytes and 7480 bytes
long (this adds up to more than 16384 because they include all the SPEAD
headers and item pointers). The lack of a long sequence of identically-sized
packets makes transmission less efficient. With much larger heap sizes this is
less of an issue because most of the packets for each heap will be the full
9000 bytes, with only the last packet containing a remainder.
With some careful calculations, it is sometimes possible to adjust the packet size so that all the packets in the heap are the same size, and thus void this problem. It is worth noting that the highest performance is obtained using the ibverbs support, which is not affected by this.
Full code
#!/usr/bin/env python3
# Copyright 2023-2024 National Research Foundation (SARAO)
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse
import asyncio
import time
from dataclasses import dataclass, field
import numpy as np
import spead2.send.asyncio
@dataclass
class State:
adc_samples: np.ndarray
timestamps: np.ndarray
heaps: spead2.send.HeapReferenceList
future: asyncio.Future[int] = field(default_factory=asyncio.Future)
def __post_init__(self):
# Make it safe to wait on the future immediately
self.future.set_result(0)
async def main():
parser = argparse.ArgumentParser()
parser.add_argument("-n", "--heaps", type=int, default=1000)
parser.add_argument("-p", "--packet-size", type=int)
parser.add_argument("-H", "--heap-size", type=int, default=1024 * 1024)
parser.add_argument("host", type=str)
parser.add_argument("port", type=int)
args = parser.parse_args()
thread_pool = spead2.ThreadPool(1, [0])
spead2.ThreadPool.set_affinity(1)
batches = 2
batch_heaps = max(1, 512 * 1024 // args.heap_size)
config = spead2.send.StreamConfig(rate=0.0, max_heaps=batches * batch_heaps)
if args.packet_size is not None:
config.max_packet_size = args.packet_size
stream = spead2.send.asyncio.UdpStream(thread_pool, [(args.host, args.port)], config)
heap_size = args.heap_size
item_group = spead2.send.ItemGroup()
item_group.add_item(
0x1600,
"timestamp",
"Index of the first sample",
shape=(),
format=[("u", spead2.Flavour().heap_address_bits)],
)
item_group.add_item(
0x3300,
"adc_samples",
"ADC converter output",
shape=(heap_size,),
dtype=np.int8,
)
n_heaps = args.heaps
def make_heaps(adc_samples, timestamps, first):
heaps = []
for j in range(batch_heaps):
item_group["timestamp"].value = timestamps[j, ...]
item_group["adc_samples"].value = adc_samples[j, ...]
heap = item_group.get_heap(descriptors="none" if j or not first else "all", data="all")
heaps.append(spead2.send.HeapReference(heap))
return spead2.send.HeapReferenceList(heaps)
states = []
for _ in range(batches):
adc_samples = np.ones((batch_heaps, heap_size), np.int8)
timestamps = np.ones(batch_heaps, ">u8")
states.append(
State(
adc_samples=adc_samples,
timestamps=timestamps,
heaps=make_heaps(adc_samples, timestamps, False),
)
)
first_heaps = make_heaps(states[0].adc_samples, states[0].timestamps, True)
start = time.perf_counter()
for i in range(0, n_heaps, batch_heaps):
state = states[(i // batch_heaps) % len(states)]
end = min(i + batch_heaps, n_heaps)
n = end - i
await state.future # Wait for any previous use of this state to complete
state.adc_samples[:n] = np.arange(i, end).astype(np.int8)[:, np.newaxis]
state.timestamps[:n] = np.arange(i * heap_size, end * heap_size, heap_size)
heaps = state.heaps if i else first_heaps
if n < batch_heaps:
heaps = heaps[:n]
state.future = stream.async_send_heaps(heaps, spead2.send.GroupMode.SERIAL)
for state in states:
await state.future
elapsed = time.perf_counter() - start
print(f"{heap_size * n_heaps / elapsed / 1e6:.2f} MB/s")
await stream.async_send_heap(item_group.get_end())
if __name__ == "__main__":
asyncio.run(main())
/* Copyright 2023-2025 National Research Foundation (SARAO)
*
* This program is free software: you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the Free
* Software Foundation, either version 3 of the License, or (at your option) any
* later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
* details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <cstdint>
#include <cstddef>
#include <string>
#include <vector>
#include <utility>
#include <chrono>
#include <iostream>
#include <algorithm>
#include <future>
#include <optional>
#include <unistd.h>
#include <boost/asio.hpp>
#include <spead2/common_defines.h>
#include <spead2/common_flavour.h>
#include <spead2/common_endian.h>
#include <spead2/common_thread_pool.h>
#include <spead2/send_heap.h>
#include <spead2/send_stream_config.h>
#include <spead2/send_udp.h>
struct heap_data
{
std::vector<std::int8_t> adc_samples;
std::uint64_t timestamp; // big endian
};
struct state
{
std::future<spead2::item_pointer_t> future;
std::vector<heap_data> data;
std::vector<spead2::send::heap> heaps;
state()
{
// Make it safe to wait on the future immediately
std::promise<spead2::item_pointer_t> promise;
promise.set_value(0);
future = promise.get_future();
}
};
static void usage(const char * name)
{
std::cerr << "Usage: " << name << " [-n heaps] [-p packet-size] [-H heap-size] host port\n";
}
int main(int argc, char * const argv[])
{
int opt;
int n_heaps = 1000;
std::optional<int> packet_size;
std::int64_t heap_size = 1024 * 1024;
while ((opt = getopt(argc, argv, "n:p:H:")) != -1)
{
switch (opt)
{
case 'n':
n_heaps = std::stoi(optarg);
break;
case 'p':
packet_size = std::stoi(optarg);
break;
case 'H':
heap_size = std::stoll(optarg);
break;
default:
usage(argv[0]);
return 2;
}
}
if (argc - optind != 2)
{
usage(argv[0]);
return 2;
}
spead2::thread_pool thread_pool(1, {0});
spead2::thread_pool::set_affinity(1);
spead2::send::stream_config config;
config.set_rate(0.0);
const std::size_t batches = 2;
const std::size_t batch_heaps = std::max(std::int64_t(2), 512 * 1024 / heap_size);
config.set_max_heaps(batches * batch_heaps);
if (packet_size)
config.set_max_packet_size(packet_size.value());
boost::asio::ip::udp::endpoint endpoint(
boost::asio::ip::make_address(argv[optind]),
std::stoi(argv[optind + 1])
);
spead2::send::udp_stream stream(thread_pool, {endpoint}, config);
spead2::descriptor timestamp_desc;
timestamp_desc.id = 0x1600;
timestamp_desc.name = "timestamp";
timestamp_desc.description = "Index of the first sample";
timestamp_desc.format.emplace_back('u', spead2::flavour().get_heap_address_bits());
spead2::descriptor adc_samples_desc;
adc_samples_desc.id = 0x3300;
adc_samples_desc.name = "adc_samples";
adc_samples_desc.description = "ADC converter output";
adc_samples_desc.numpy_header =
"{'shape': (" + std::to_string(heap_size) + ",), 'fortran_order': False, 'descr': 'i1'}";
auto make_heaps = [&](const std::vector<heap_data> &data, bool first)
{
std::vector<spead2::send::heap> heaps(batch_heaps);
for (std::size_t j = 0; j < batch_heaps; j++)
{
auto &heap = heaps[j];
auto &adc_samples = data[j].adc_samples;
auto ×tamp = data[j].timestamp;
if (first && j == 0)
{
heap.add_descriptor(timestamp_desc);
heap.add_descriptor(adc_samples_desc);
}
heap.add_item(timestamp_desc.id, (char *) ×tamp + 3, 5, true);
heap.add_item(
adc_samples_desc.id,
adc_samples.data(),
adc_samples.size() * sizeof(adc_samples[0]),
true
);
}
return heaps;
};
std::vector<state> states(batches);
for (std::size_t i = 0; i < states.size(); i++)
{
auto &state = states[i];
state.data.resize(batch_heaps);
for (std::size_t j = 0; j < batch_heaps; j++)
{
state.data[j].adc_samples.resize(heap_size);
}
state.heaps = make_heaps(state.data, false);
}
auto first_heaps = make_heaps(states[0].data, true);
auto start = std::chrono::high_resolution_clock::now();
for (int i = 0; i < n_heaps; i += batch_heaps)
{
auto &state = states[(i / batch_heaps) % states.size()];
// Wait for any previous use of this state to complete
state.future.wait();
auto &heaps = (i == 0) ? first_heaps : state.heaps;
std::int64_t end = std::min(n_heaps, i + int(batch_heaps));
std::size_t n = end - i;
for (std::size_t j = 0; j < n; j++)
{
std::int64_t heap_index = i + j;
auto &data = state.data[j];
auto &adc_samples = data.adc_samples;
data.timestamp = spead2::htobe(std::uint64_t(heap_index * heap_size));
std::fill(adc_samples.begin(), adc_samples.end(), heap_index);
}
state.future = stream.async_send_heaps(
heaps.begin(), heaps.begin() + n, boost::asio::use_future,
spead2::send::group_mode::SERIAL
);
}
for (const auto &state : states)
state.future.wait();
std::chrono::duration<double> elapsed = std::chrono::high_resolution_clock::now() - start;
std::cout << heap_size * n_heaps / elapsed.count() / 1e6 << " MB/s\n";
// Send an end-of-stream control item
spead2::send::heap heap;
heap.add_end();
stream.async_send_heap(heap, boost::asio::use_future).get();
return 0;
}