8. Reusing memory

To demonstrate more features of spead2, we’ll need to experiment with different heap sizes. Instead of editing the hard-coded value, let’s introduce another command-line option. Don’t forget to delete the original definition of heap_size.

async def main():
    ...
    parser.add_argument("-H", "--heap-size", type=int, default=1024 * 1024)
    ...
    heap_size = args.heap_size
static void usage(const char * name)
{
    std::cerr << "Usage: " << name << " [-n heaps] [-p packet-size] [-H heap-size] host port\n";
}

int main(int argc, char * const argv[])
{
    ...
    std::int64_t heap_size = 1024 * 1024;
    while ((opt = getopt(argc, argv, "n:p:H:")) != -1)
    {
        switch (opt)
        {
        ...
        case 'H':
            heap_size = std::stoll(optarg);
            break;
        ...
        }
    }

As with previous versions of the sender, the command-line parsing in C++ is not very robust to user mistakes.

For this rest of this section we’ll pass the options -n 100 -H 67108864 -p 9000 to use 64 MiB heaps (and reduce the number of heaps to speed things up). First let us see what impact it has on the sender in isolation (this assumes you have set up the dummy network interface as in Measuring sender performance).

tut_8_send_reuse_memory -n 100 -H 67108864 -p 9000 192.168.31.2 8888

The performance is worse: significantly so for C++ (I get around 1300 MB/s) and slightly for Python (5900 MB/s). This is somewhat surprising, because bigger heaps should mean that per-heap overheads are reduced, just like increasing the packet size reduced the per-packet overheads. There are (at least) two things going on here:

  1. Caching. My CPU has a 1.5 MiB L2 cache (per core) and a 12 MiB L3 cache. The heap no longer fits into either of them, and so cache misses are substantially increased. In Python, the original command (1 MiB heaps) missed on 0.42% of L3 cache loads, while this new command misses on 6.4% of L3 cache loads.

  2. Memory allocation. When the application allocates memory to hold the data for a heap, the underlying library can do it in one of two ways: it can either hand out some memory that it has previously requested from the operating system but which isn’t in use, or it can request new memory from the operating system. In the latter case, Linux will provide a virtual memory address, but it won’t actually allocate the physical memory. Instead, the first time each page is accessed, a page fault will occur, and the kernel will allocate a page of physical memory and zero it out. Page faults are expensive, so if this happens for every heap it becomes expensive.

    In Glibc (the standard C library on most Linux distributions) the memory allocator uses heuristics to try to avoid this. However, for allocations bigger than 32 MiB (at the time of writing) it will always request memory directly from the operating system, and return it directly to the operating system when it is freed. That is why we see such poor performance with our 64 MiB heaps.

    In numpy the situation is slightly different: it is also obtaining the memory from the operating system, but it uses a hint to request that the memory is backed by “huge pages” (2 MiB pages on x86_64, compared to the default of 4 kiB pages). Since it takes far fewer pages to provide the physical memory, there are fewer page faults, and performance suffers less as a result.

We can’t do anything about the caching problem [1], but we can rewrite our code to avoid doing memory allocation on every iteration. We’ll do that by re-using our state class, but instead of creating a new one each iteration, we’ll keep a pool of two and alternate between them (so-called “double-buffering”).

In general when we start to fill in the data for a heap we need to make sure that previous asynchronous use of that heap has completed (by waiting for a corresponding future), but the first time each heap gets used is special. To avoid having to deal with special cases, we can set things up with a future that is already complete.

@dataclass
class State:
    adc_samples: np.ndarray
    future: asyncio.Future[int] = field(default_factory=asyncio.Future)

    def __post_init__(self):
        # Make it safe to wait on the future immediately
        self.future.set_result(0)
#include <future>
...
struct state
{
    ...
    state()
    {
        // Make it safe to wait on the future immediately
        std::promise<spead2::item_pointer_t> promise;
        promise.set_value(0);
        future = promise.get_future();
    }
};

Now we can get rid of old_state and new_state, and instead use an array of states.

    states = [State(adc_samples=np.ones(heap_size, np.int8)) for _ in range(2)]
    start = time.perf_counter()
    for i in range(n_heaps):
        state = states[i % len(states)]
        await state.future  # Wait for any previous use of this state to complete
        state.adc_samples.fill(i)
        item_group["timestamp"].value = i * heap_size
        item_group["adc_samples"].value = state.adc_samples
        heap = item_group.get_heap()
        state.future = stream.async_send_heap(heap)
    for state in states:
        await state.future
#include <array>
...
    std::array<state, 2> states;
    for (auto &state : states)
        state.adc_samples.resize(heap_size);
    auto start = std::chrono::high_resolution_clock::now();
    for (int i = 0; i < n_heaps; i++)
    {
        auto &state = states[i % states.size()];
        // Wait for any previous use of this state to complete
        state.future.wait();
        auto &heap = state.heap;
        auto &adc_samples = state.adc_samples;

        heap = spead2::send::heap();  // reset to default state
        ...
        state.future = stream.async_send_heap(heap, boost::asio::use_future);
    }
    for (const auto &state : states)
        state.future.wait();

With this redesign, we now get around 5600 MB/s from C++ and 6000 MB/s from Python (the difference is most likely due to Python using huge pages).

Full code

#!/usr/bin/env python3

# Copyright 2023-2024 National Research Foundation (SARAO)
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import argparse
import asyncio
import time
from dataclasses import dataclass, field

import numpy as np

import spead2.send.asyncio


@dataclass
class State:
    adc_samples: np.ndarray
    future: asyncio.Future[int] = field(default_factory=asyncio.Future)

    def __post_init__(self):
        # Make it safe to wait on the future immediately
        self.future.set_result(0)


async def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("-n", "--heaps", type=int, default=1000)
    parser.add_argument("-p", "--packet-size", type=int)
    parser.add_argument("-H", "--heap-size", type=int, default=1024 * 1024)
    parser.add_argument("host", type=str)
    parser.add_argument("port", type=int)
    args = parser.parse_args()

    thread_pool = spead2.ThreadPool(1, [0])
    spead2.ThreadPool.set_affinity(1)
    config = spead2.send.StreamConfig(rate=0.0, max_heaps=2)
    if args.packet_size is not None:
        config.max_packet_size = args.packet_size
    stream = spead2.send.asyncio.UdpStream(thread_pool, [(args.host, args.port)], config)
    heap_size = args.heap_size
    item_group = spead2.send.ItemGroup()
    item_group.add_item(
        0x1600,
        "timestamp",
        "Index of the first sample",
        shape=(),
        format=[("u", spead2.Flavour().heap_address_bits)],
    )
    item_group.add_item(
        0x3300,
        "adc_samples",
        "ADC converter output",
        shape=(heap_size,),
        dtype=np.int8,
    )

    n_heaps = args.heaps
    states = [State(adc_samples=np.ones(heap_size, np.int8)) for _ in range(2)]
    start = time.perf_counter()
    for i in range(n_heaps):
        state = states[i % len(states)]
        await state.future  # Wait for any previous use of this state to complete
        state.adc_samples.fill(np.int_(i))
        item_group["timestamp"].value = i * heap_size
        item_group["adc_samples"].value = state.adc_samples
        heap = item_group.get_heap()
        state.future = stream.async_send_heap(heap)
    for state in states:
        await state.future
    elapsed = time.perf_counter() - start
    print(f"{heap_size * n_heaps / elapsed / 1e6:.2f} MB/s")

    await stream.async_send_heap(item_group.get_end())


if __name__ == "__main__":
    asyncio.run(main())
/* Copyright 2023-2025 National Research Foundation (SARAO)
 *
 * This program is free software: you can redistribute it and/or modify it under
 * the terms of the GNU Lesser General Public License as published by the Free
 * Software Foundation, either version 3 of the License, or (at your option) any
 * later version.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
 * details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <cstdint>
#include <string>
#include <vector>
#include <array>
#include <utility>
#include <chrono>
#include <iostream>
#include <algorithm>
#include <future>
#include <optional>
#include <unistd.h>
#include <boost/asio.hpp>
#include <spead2/common_defines.h>
#include <spead2/common_flavour.h>
#include <spead2/common_thread_pool.h>
#include <spead2/send_heap.h>
#include <spead2/send_stream_config.h>
#include <spead2/send_udp.h>

struct state
{
    std::future<spead2::item_pointer_t> future;
    std::vector<std::int8_t> adc_samples;
    spead2::send::heap heap;

    state()
    {
        // Make it safe to wait on the future immediately
        std::promise<spead2::item_pointer_t> promise;
        promise.set_value(0);
        future = promise.get_future();
    }
};

static void usage(const char * name)
{
    std::cerr << "Usage: " << name << " [-n heaps] [-p packet-size] [-H heap-size] host port\n";
}

int main(int argc, char * const argv[])
{
    int opt;
    int n_heaps = 1000;
    std::optional<int> packet_size;
    std::int64_t heap_size = 1024 * 1024;
    while ((opt = getopt(argc, argv, "n:p:H:")) != -1)
    {
        switch (opt)
        {
        case 'n':
            n_heaps = std::stoi(optarg);
            break;
        case 'p':
            packet_size = std::stoi(optarg);
            break;
        case 'H':
            heap_size = std::stoll(optarg);
            break;
        default:
            usage(argv[0]);
            return 2;
        }
    }
    if (argc - optind != 2)
    {
        usage(argv[0]);
        return 2;
    }

    spead2::thread_pool thread_pool(1, {0});
    spead2::thread_pool::set_affinity(1);
    spead2::send::stream_config config;
    config.set_rate(0.0);
    config.set_max_heaps(2);
    if (packet_size)
        config.set_max_packet_size(packet_size.value());
    boost::asio::ip::udp::endpoint endpoint(
        boost::asio::ip::make_address(argv[optind]),
        std::stoi(argv[optind + 1])
    );
    spead2::send::udp_stream stream(thread_pool, {endpoint}, config);

    spead2::descriptor timestamp_desc;
    timestamp_desc.id = 0x1600;
    timestamp_desc.name = "timestamp";
    timestamp_desc.description = "Index of the first sample";
    timestamp_desc.format.emplace_back('u', spead2::flavour().get_heap_address_bits());
    spead2::descriptor adc_samples_desc;
    adc_samples_desc.id = 0x3300;
    adc_samples_desc.name = "adc_samples";
    adc_samples_desc.description = "ADC converter output";
    adc_samples_desc.numpy_header =
        "{'shape': (" + std::to_string(heap_size) + ",), 'fortran_order': False, 'descr': 'i1'}";

    std::array<state, 2> states;
    for (auto &state : states)
        state.adc_samples.resize(heap_size);
    auto start = std::chrono::high_resolution_clock::now();
    for (int i = 0; i < n_heaps; i++)
    {
        auto &state = states[i % states.size()];
        // Wait for any previous use of this state to complete
        state.future.wait();
        auto &heap = state.heap;
        auto &adc_samples = state.adc_samples;

        heap = spead2::send::heap();  // reset to default state
        // Add descriptors to the first heap
        if (i == 0)
        {
            heap.add_descriptor(timestamp_desc);
            heap.add_descriptor(adc_samples_desc);
        }
        std::fill(adc_samples.begin(), adc_samples.end(), i);
        // Add the data and timestamp to the heap
        heap.add_item(timestamp_desc.id, i * heap_size);
        heap.add_item(
            adc_samples_desc.id,
            adc_samples.data(),
            adc_samples.size() * sizeof(adc_samples[0]),
            true
        );
        state.future = stream.async_send_heap(heap, boost::asio::use_future);
    }
    for (const auto &state : states)
        state.future.wait();
    std::chrono::duration<double> elapsed = std::chrono::high_resolution_clock::now() - start;
    std::cout << heap_size * n_heaps / elapsed.count() / 1e6 << " MB/s\n";

    // Send an end-of-stream control item
    spead2::send::heap heap;
    heap.add_end();
    stream.async_send_heap(heap, boost::asio::use_future).get();
    return 0;
}