4. Measuring sender performance

Now that we have some functioning code, we’re start using more features of spead2 to improve performance. But first, we ought to have some idea of what the performance is. We’ll make some changes to observe the time before and after we send the heaps, and also send more heaps (to make the timing more reliable). We’ll remove the target rate, so that we’re just sending as fast as we can. Setting the rate to 0 has the special meaning of removing any rate limiting (it is also the default, so we could just not set it at all).

Unlike the previous sections though, we’ll be modifying the code as we go, rather than just writing it from top to bottom. If you’re unclear on how the shown snippets fit into the existing code, you can refer to the bottom of the page for the full listing.

import time
...
    config = spead2.send.StreamConfig(rate=0.0)
    ...
    n_heaps = 100
    start = time.perf_counter()
    for i in range(n_heaps):
        ...
    elapsed = time.perf_counter() - start
    print(f"{heap_size * n_heaps / elapsed / 1e6:.2f} MB/s")
#include <chrono>
#include <iostream>
#include <algorithm>  // Not needed yet, but we'll use it later
...
    config.set_rate(0.0);
    ...
    const int n_heaps = 100;
    auto start = std::chrono::high_resolution_clock::now();
    for (int i = 0; i < n_heaps; i++)
    {
        ...
    }
    std::chrono::duration<double> elapsed = std::chrono::high_resolution_clock::now() - start;
    std::cout << heap_size * n_heaps / elapsed.count() / 1e6 << " MB/s\n";

We’ll also make a change that will make performance slightly more predictable: pinning each thread to a specific CPU core. This avoids the costs incurred (particularly related to L1 caches) when a process is migrated from one CPU core to another.

thread_pool = spead2.ThreadPool(1, [0])
spead2.ThreadPool.set_affinity(1)
spead2::thread_pool thread_pool(1, {0});
spead2::thread_pool::set_affinity(1);

The first line creates the thread pool with one thread, which is assigned to core 0. The second line sets the affinity of the main thread (the function lives in the thread pool namespace, but affects the current thread rather than the thread pool). In other words, the thread pool has one thread bound to core 0 and the main Python thread is bound to core 1.

You can expect performance to be pretty low; I get around 65 MB/s from Python and 140 MB/s from C++ [1]. In fact, spead2 makes very little difference to the performance here: it’s mostly taken up by generating the random numbers. We don’t actually care about the numbers being statistically random, so let’s remove the random number generation and replace it with the following:

        item_group["adc_samples"].value = np.full(heap_size, np.int_(i), np.int8)
        std::fill(adc_samples.begin(), adc_samples.end(), i);

We’re filling each heap with the loop index (truncated to an 8-bit integer) — not particularly meaningful for simulation, but it has the bonus that we can easily see on the receiver side if we’ve accidentally transmitted data for the wrong heap.

This dramatically improves performance: around 1000 MB/s for Python and 1200 MB/s for C++ — assuming you’re running the receiver. Somewhat surprisingly, performance is much higher when not running the receiver: 1800 MB/s and 2200 MB/s respectively. By using the loopback interface, some of the costs of receiving data are affecting the sender. Even when not running the receiver, we’re going to experience some overheads from using the loopback interface.

In a future tutorial we’ll return to the receiver to improve its performance, but for now let’s move away from the loopback interface so that we can measure the sender’s performance in isolation. That means we’ll need to transmit packets somewhere other than to 127.0.0.1. Rather than hard-coding an address (which may have a pre-assigned meaning on your network), let’s make the destination a command-line option. While we’re at it, we’ll make the number of heaps a command-line option too, and increase the default.

import argparse
...
async def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("-n", "--heaps", type=int, default=1000)
    parser.add_argument("host", type=str)
    parser.add_argument("port", type=int)
    args = parser.parse_args()
    ...
    stream = spead2.send.UdpStream(thread_pool, [(args.host, args.port)], config)
    ...
    n_heaps = args.heaps
#include <unistd.h>
...
static void usage(const char * name)
{
    std::cerr << "Usage: " << name << " [-n heaps] host port\n";
}

int main(int argc, char * const argv[])
{
    int opt;
    int n_heaps = 1000;  // remove the original definition of n_heaps
    while ((opt = getopt(argc, argv, "n:")) != -1)
    {
        switch (opt)
        {
        case 'n':
            n_heaps = std::stoi(optarg);
            break;
        default:
            usage(argv[0]);
            return 2;
        }
    }
    if (argc - optind != 2)
    {
        usage(argv[0]);
        return 2;
    }
    ...
    boost::asio::ip::udp::endpoint endpoint(
        boost::asio::ip::make_address(argv[optind]),
        std::stoi(argv[optind + 1])
    );
    ...
}

The C++ version uses very quick-n-dirty command-line parsing; in a production application you would need to do more error handling, and may want to use a more modern library for it.

If you have a high-speed network interface, you can try sending to a non-existent address on that network [2]. But there is a portable solution on Linux: a dummy interface. This is a network device that simply drops all the data sent to it, instead of transmitting it anywhere. As such, it represents an upper bound for what you’re likely to achieve with kernel drivers for real network interfaces.

You’ll need a subnet to assign to it which isn’t otherwise in use. For the examples I’ll use 192.168.31.0/24. You can configure a dummy interface like this (as root):

ip link add dummy1 type dummy
ip addr add 192.168.31.1/24 dev dummy1
ip link set dummy1 up

If you want to clean up the dummy interface later, use

ip link del dummy1

Now if you run tut_4_send_perf 192.168.31.2 8888 you should get even better performance (note that the destination address is not the same as the address assigned to the interface). I get 2300 MB/s with Python and 3000 MB/s with C++.

Full code

#!/usr/bin/env python3

# Copyright 2023-2024 National Research Foundation (SARAO)
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import argparse
import time

import numpy as np

import spead2.send


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("-n", "--heaps", type=int, default=1000)
    parser.add_argument("host", type=str)
    parser.add_argument("port", type=int)
    args = parser.parse_args()

    thread_pool = spead2.ThreadPool(1, [0])
    spead2.ThreadPool.set_affinity(1)
    config = spead2.send.StreamConfig(rate=0.0)
    stream = spead2.send.UdpStream(thread_pool, [(args.host, args.port)], config)
    heap_size = 1024 * 1024
    item_group = spead2.send.ItemGroup()
    item_group.add_item(
        0x1600,
        "timestamp",
        "Index of the first sample",
        shape=(),
        format=[("u", spead2.Flavour().heap_address_bits)],
    )
    item_group.add_item(
        0x3300,
        "adc_samples",
        "ADC converter output",
        shape=(heap_size,),
        dtype=np.int8,
    )

    n_heaps = args.heaps
    start = time.perf_counter()
    for i in range(n_heaps):
        item_group["timestamp"].value = i * heap_size
        item_group["adc_samples"].value = np.full(heap_size, np.int_(i), np.int8)
        heap = item_group.get_heap()
        stream.send_heap(heap)
    elapsed = time.perf_counter() - start
    print(f"{heap_size * n_heaps / elapsed / 1e6:.2f} MB/s")

    stream.send_heap(item_group.get_end())


if __name__ == "__main__":
    main()
/* Copyright 2023-2025 National Research Foundation (SARAO)
 *
 * This program is free software: you can redistribute it and/or modify it under
 * the terms of the GNU Lesser General Public License as published by the Free
 * Software Foundation, either version 3 of the License, or (at your option) any
 * later version.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
 * details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <cstdint>
#include <string>
#include <vector>
#include <utility>
#include <chrono>
#include <iostream>
#include <algorithm>
#include <unistd.h>
#include <boost/asio.hpp>
#include <spead2/common_defines.h>
#include <spead2/common_flavour.h>
#include <spead2/common_thread_pool.h>
#include <spead2/send_heap.h>
#include <spead2/send_stream_config.h>
#include <spead2/send_udp.h>

static void usage(const char * name)
{
    std::cerr << "Usage: " << name << " [-n heaps] host port\n";
}

int main(int argc, char * const argv[])
{
    int opt;
    int n_heaps = 1000;
    while ((opt = getopt(argc, argv, "n:")) != -1)
    {
        switch (opt)
        {
        case 'n':
            n_heaps = std::stoi(optarg);
            break;
        default:
            usage(argv[0]);
            return 2;
        }
    }
    if (argc - optind != 2)
    {
        usage(argv[0]);
        return 2;
    }

    spead2::thread_pool thread_pool(1, {0});
    spead2::thread_pool::set_affinity(1);
    spead2::send::stream_config config;
    config.set_rate(0.0);
    boost::asio::ip::udp::endpoint endpoint(
        boost::asio::ip::make_address(argv[optind]),
        std::stoi(argv[optind + 1])
    );
    spead2::send::udp_stream stream(thread_pool, {endpoint}, config);

    const std::int64_t heap_size = 1024 * 1024;
    spead2::descriptor timestamp_desc;
    timestamp_desc.id = 0x1600;
    timestamp_desc.name = "timestamp";
    timestamp_desc.description = "Index of the first sample";
    timestamp_desc.format.emplace_back('u', spead2::flavour().get_heap_address_bits());
    spead2::descriptor adc_samples_desc;
    adc_samples_desc.id = 0x3300;
    adc_samples_desc.name = "adc_samples";
    adc_samples_desc.description = "ADC converter output";
    adc_samples_desc.numpy_header =
        "{'shape': (" + std::to_string(heap_size) + ",), 'fortran_order': False, 'descr': 'i1'}";

    std::vector<std::int8_t> adc_samples(heap_size);

    auto start = std::chrono::high_resolution_clock::now();
    for (int i = 0; i < n_heaps; i++)
    {
        spead2::send::heap heap;
        // Add descriptors to the first heap
        if (i == 0)
        {
            heap.add_descriptor(timestamp_desc);
            heap.add_descriptor(adc_samples_desc);
        }
        std::fill(adc_samples.begin(), adc_samples.end(), i);
        // Add the data and timestamp to the heap
        heap.add_item(timestamp_desc.id, i * heap_size);
        heap.add_item(
            adc_samples_desc.id,
            adc_samples.data(),
            adc_samples.size() * sizeof(adc_samples[0]),
            true
        );
        stream.async_send_heap(heap, boost::asio::use_future).get();
    }
    std::chrono::duration<double> elapsed = std::chrono::high_resolution_clock::now() - start;
    std::cout << heap_size * n_heaps / elapsed.count() / 1e6 << " MB/s\n";

    // Send an end-of-stream control item
    spead2::send::heap heap;
    heap.add_end();
    stream.async_send_heap(heap, boost::asio::use_future).get();
    return 0;
}