10. Reusing heaps
In the previous sections, we saw how to improve performance for very large heaps. Now we will turn our attention to transmitting small heaps. This is a different challenge, where we need to reduce the per-heap overheads as much as possible.
To start with, let’s measure the transmit performance for various heap sizes. The total amount of data sent is kept constant at 4 GiB by varying the number of heaps to send.
It’s pretty clear that Python suffers much worse with small heap sizes (not surprising, given that Python is known to be slow) but that the C++ code is also suboptimal below about 256 kiB per heap. While there is always going to be some per-heap overhead, we can certainly do better. We’ll focus on testing the smallest heap size, using this command:
tut_10_send_reuse_heaps -n 524288 -H 8192 -p 9000 192.168.31.2 8888
We’ll start with some low-hanging fruit. The pipeline that we set up in tutorial 5 is only two heaps deep. When the heaps are so small, this makes the pipeline very shallow when measured in bytes. Let’s increase it to be about 1 MiB deep, while ensuring we always keep at least two heaps.
config = spead2.send.StreamConfig(rate=0.0, max_heaps=max(2, 1024 * 1024 // args.heap_size))
...
states = [State(adc_samples=np.ones(heap_size, np.int8)) for _ in range(config.max_heaps)]
config.set_max_heaps(std::max(std::int64_t(2), 1024 * 1024 / heap_size));
...
std::vector<state> states(config.get_max_heaps());
This makes substantial improvements: 3300 MB/s for C++ and 210 MB/s for Python.
Each loop iteration repeats all the work of setting up a heap object, even though they all have essentially the same structure. What if this could be done once up front? This is indeed possible, but there are some complications:
Each heap needs to be untouched while it is being transmitted, so we can’t just have a single heap object and mutate it as we wish. We can, however, create one per state (the C++ code already does this).
The first heap is special, because it has the descriptors. We can just create an extra object for this first heap.
We are already updating the ADC samples in place, but we also need a way to update the timestamp in an existing heap. Instead of passing the timestamp by value, we’ll need to store it in memory that we can update. This is somewhat complicated because the timestamp is a 40-bit big-endian value.
Let’s start by extending the State
class to hold the heap (for
Python) and the timestamp.
@dataclass
class State:
adc_samples: np.ndarray
timestamp: np.ndarray
heap: spead2.send.Heap
future: asyncio.Future[int] = field(default_factory=asyncio.Future)
...
struct state
{
std::future<spead2::item_pointer_t> future;
std::vector<std::int8_t> adc_samples;
std::uint64_t timestamp; // big endian
spead2::send::heap heap;
...
};
Next, we move the code for creating each heap out of the main loop and into
the loop for initialising states
.
states = []
for i in range(config.max_heaps):
adc_samples = np.ones(heap_size, np.int8)
timestamp = np.array(0, ">u8")
item_group["timestamp"].value = timestamp
item_group["adc_samples"].value = adc_samples
heap = item_group.get_heap(descriptors="none", data="all")
states.append(State(adc_samples=adc_samples, timestamp=timestamp, heap=heap))
#include <cstddef>
...
std::vector<state> states(config.get_max_heaps());
for (std::size_t i = 0; i < states.size(); i++)
{
auto &state = states[i];
auto &heap = state.heap;
auto &adc_samples = state.adc_samples;
auto ×tamp = state.timestamp;
adc_samples.resize(heap_size);
heap.add_item(timestamp_desc.id, (char *) ×tamp + 3, 5, true);
heap.add_item(
adc_samples_desc.id,
adc_samples.data(),
adc_samples.size() * sizeof(adc_samples[0]),
true
);
}
In the Python version, we’re using a new feature of
ItemGroup.get_heap()
. Normally it does its own tracking
regarding which items and descriptors to include in the heap, but that
assumes the simple case where each time you transmit a heap you obtain it
from get_heap()
. Since we’re recycling heaps, we need
more direct control. We’re requesting that each heap has no descriptors but all
of the actual items.
There is also some magic for adding the timestamp. In Python, we create a
zero-dimensional array [1] with type >u8
, which means 64-bit big-endian
unsigned integer. The Python bindings handle that as a special case for
immediate items [2], ensuring that the heap references the original
value rather than a copy. The C++ interface doesn’t have this magic, so we
need to compute a pointer and length for the 40-bit portion we want out of the
64-bit value.
We need some similar code to set up the initial heap (immediately after the code above):
item_group["timestamp"].value = states[0].timestamp
item_group["adc_samples"].value = states[0].adc_samples
first_heap = item_group.get_heap(descriptors="all", data="all")
spead2::send::heap first_heap;
first_heap.add_descriptor(timestamp_desc);
first_heap.add_descriptor(adc_samples_desc);
first_heap.add_item(timestamp_desc.id, (char *) &states[0].timestamp + 3, 5, true);
first_heap.add_item(
adc_samples_desc.id,
states[0].adc_samples.data(),
states[0].adc_samples.size() * sizeof(states[0].adc_samples[0]),
true
);
Finally, we need to update the main loop to use these heaps, and to update the timestamp in place.
for i in range(n_heaps):
state = states[i % len(states)]
await state.future # Wait for any previous use of this state to complete
state.adc_samples.fill(np.int_(i))
state.timestamp[()] = i * heap_size
heap = first_heap if i == 0 else state.heap
state.future = stream.async_send_heap(heap)
#include <spead2/common_endian.h>
...
for (int i = 0; i < n_heaps; i++)
{
auto &state = states[i % states.size()];
// Wait for any previous use of this state to complete
state.future.wait();
auto &heap = (i == 0) ? first_heap : state.heap;
auto &adc_samples = state.adc_samples;
state.timestamp = spead2::htobe(std::uint64_t(i * heap_size));
std::fill(adc_samples.begin(), adc_samples.end(), i);
state.future = stream.async_send_heap(heap, boost::asio::use_future);
}
And now let’s see the results again:
This is a major improvement for small heaps, but there is still room for more optimisation, which we will address in the next section.
Full code
#!/usr/bin/env python3
# Copyright 2023-2024 National Research Foundation (SARAO)
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse
import asyncio
import time
from dataclasses import dataclass, field
import numpy as np
import spead2.send.asyncio
@dataclass
class State:
adc_samples: np.ndarray
timestamp: np.ndarray
heap: spead2.send.Heap
future: asyncio.Future[int] = field(default_factory=asyncio.Future)
def __post_init__(self):
# Make it safe to wait on the future immediately
self.future.set_result(0)
async def main():
parser = argparse.ArgumentParser()
parser.add_argument("-n", "--heaps", type=int, default=1000)
parser.add_argument("-p", "--packet-size", type=int)
parser.add_argument("-H", "--heap-size", type=int, default=1024 * 1024)
parser.add_argument("host", type=str)
parser.add_argument("port", type=int)
args = parser.parse_args()
thread_pool = spead2.ThreadPool(1, [0])
spead2.ThreadPool.set_affinity(1)
config = spead2.send.StreamConfig(rate=0.0, max_heaps=max(2, 1024 * 1024 // args.heap_size))
if args.packet_size is not None:
config.max_packet_size = args.packet_size
stream = spead2.send.asyncio.UdpStream(thread_pool, [(args.host, args.port)], config)
heap_size = args.heap_size
item_group = spead2.send.ItemGroup()
item_group.add_item(
0x1600,
"timestamp",
"Index of the first sample",
shape=(),
format=[("u", spead2.Flavour().heap_address_bits)],
)
item_group.add_item(
0x3300,
"adc_samples",
"ADC converter output",
shape=(heap_size,),
dtype=np.int8,
)
n_heaps = args.heaps
states = []
for i in range(config.max_heaps):
adc_samples = np.ones(heap_size, np.int8)
timestamp = np.array(0, ">u8")
item_group["timestamp"].value = timestamp
item_group["adc_samples"].value = adc_samples
heap = item_group.get_heap(descriptors="none", data="all")
states.append(State(adc_samples=adc_samples, timestamp=timestamp, heap=heap))
item_group["timestamp"].value = states[0].timestamp
item_group["adc_samples"].value = states[0].adc_samples
first_heap = item_group.get_heap(descriptors="all", data="all")
start = time.perf_counter()
for i in range(n_heaps):
state = states[i % len(states)]
await state.future # Wait for any previous use of this state to complete
state.adc_samples.fill(np.int_(i))
state.timestamp[()] = i * heap_size
heap = first_heap if i == 0 else state.heap
state.future = stream.async_send_heap(heap)
for state in states:
await state.future
elapsed = time.perf_counter() - start
print(f"{heap_size * n_heaps / elapsed / 1e6:.2f} MB/s")
await stream.async_send_heap(item_group.get_end())
if __name__ == "__main__":
asyncio.run(main())
/* Copyright 2023-2025 National Research Foundation (SARAO)
*
* This program is free software: you can redistribute it and/or modify it under
* the terms of the GNU Lesser General Public License as published by the Free
* Software Foundation, either version 3 of the License, or (at your option) any
* later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
* details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <cstdint>
#include <cstddef>
#include <string>
#include <vector>
#include <utility>
#include <chrono>
#include <iostream>
#include <algorithm>
#include <future>
#include <optional>
#include <unistd.h>
#include <boost/asio.hpp>
#include <spead2/common_defines.h>
#include <spead2/common_flavour.h>
#include <spead2/common_endian.h>
#include <spead2/common_thread_pool.h>
#include <spead2/send_heap.h>
#include <spead2/send_stream_config.h>
#include <spead2/send_udp.h>
struct state
{
std::future<spead2::item_pointer_t> future;
std::vector<std::int8_t> adc_samples;
std::uint64_t timestamp; // big endian
spead2::send::heap heap;
state()
{
// Make it safe to wait on the future immediately
std::promise<spead2::item_pointer_t> promise;
promise.set_value(0);
future = promise.get_future();
}
};
static void usage(const char * name)
{
std::cerr << "Usage: " << name << " [-n heaps] [-p packet-size] [-H heap-size] host port\n";
}
int main(int argc, char * const argv[])
{
int opt;
int n_heaps = 1000;
std::optional<int> packet_size;
std::int64_t heap_size = 1024 * 1024;
while ((opt = getopt(argc, argv, "n:p:H:")) != -1)
{
switch (opt)
{
case 'n':
n_heaps = std::stoi(optarg);
break;
case 'p':
packet_size = std::stoi(optarg);
break;
case 'H':
heap_size = std::stoll(optarg);
break;
default:
usage(argv[0]);
return 2;
}
}
if (argc - optind != 2)
{
usage(argv[0]);
return 2;
}
spead2::thread_pool thread_pool(1, {0});
spead2::thread_pool::set_affinity(1);
spead2::send::stream_config config;
config.set_rate(0.0);
config.set_max_heaps(std::max(std::int64_t(2), 1024 * 1024 / heap_size));
if (packet_size)
config.set_max_packet_size(packet_size.value());
boost::asio::ip::udp::endpoint endpoint(
boost::asio::ip::make_address(argv[optind]),
std::stoi(argv[optind + 1])
);
spead2::send::udp_stream stream(thread_pool, {endpoint}, config);
spead2::descriptor timestamp_desc;
timestamp_desc.id = 0x1600;
timestamp_desc.name = "timestamp";
timestamp_desc.description = "Index of the first sample";
timestamp_desc.format.emplace_back('u', spead2::flavour().get_heap_address_bits());
spead2::descriptor adc_samples_desc;
adc_samples_desc.id = 0x3300;
adc_samples_desc.name = "adc_samples";
adc_samples_desc.description = "ADC converter output";
adc_samples_desc.numpy_header =
"{'shape': (" + std::to_string(heap_size) + ",), 'fortran_order': False, 'descr': 'i1'}";
std::vector<state> states(config.get_max_heaps());
for (std::size_t i = 0; i < states.size(); i++)
{
auto &state = states[i];
auto &heap = state.heap;
auto &adc_samples = state.adc_samples;
auto ×tamp = state.timestamp;
adc_samples.resize(heap_size);
heap.add_item(timestamp_desc.id, (char *) ×tamp + 3, 5, true);
heap.add_item(
adc_samples_desc.id,
adc_samples.data(),
adc_samples.size() * sizeof(adc_samples[0]),
true
);
}
spead2::send::heap first_heap;
first_heap.add_descriptor(timestamp_desc);
first_heap.add_descriptor(adc_samples_desc);
first_heap.add_item(timestamp_desc.id, (char *) &states[0].timestamp + 3, 5, true);
first_heap.add_item(
adc_samples_desc.id,
states[0].adc_samples.data(),
states[0].adc_samples.size() * sizeof(states[0].adc_samples[0]),
true
);
auto start = std::chrono::high_resolution_clock::now();
for (int i = 0; i < n_heaps; i++)
{
auto &state = states[i % states.size()];
// Wait for any previous use of this state to complete
state.future.wait();
auto &heap = (i == 0) ? first_heap : state.heap;
auto &adc_samples = state.adc_samples;
state.timestamp = spead2::htobe(std::uint64_t(i * heap_size));
std::fill(adc_samples.begin(), adc_samples.end(), i);
state.future = stream.async_send_heap(heap, boost::asio::use_future);
}
for (const auto &state : states)
state.future.wait();
std::chrono::duration<double> elapsed = std::chrono::high_resolution_clock::now() - start;
std::cout << heap_size * n_heaps / elapsed.count() / 1e6 << " MB/s\n";
// Send an end-of-stream control item
spead2::send::heap heap;
heap.add_end();
stream.async_send_heap(heap, boost::asio::use_future).get();
return 0;
}