7. Optimising the power calculation

Now that we’ve made significant optimisations to the sender, let’s turn our attention back to the receiver. As with the sender, the first step is to measure the performance. That is surprisingly tricky to do when receiving a lossy protocol such as UDP, because the transmission rate is determined by the sender, and all you get to measure is whether the receiver was able to keep up (and if not, how much data was lost). What one really wants to answer is “at what rate can the receiver reliably keep up”, and that’s affected not just by the average throughput, but also random variation.

Nevertheless, let’s see if we can get our receiver to keep up with our sender. Firstly, let’s see what happens if we run it as is: run the receiver in one terminal, and the sender (with arguments -p 9000 127.0.0.1 8888) in another. You will probably see a number of messages like the following:

dropped incomplete heap 20 (367344/1048576 bytes of payload)
worker thread blocked by full ringbuffer on heap 81

The first one tells you that data has been lost. Recall that the sender is dividing each heap into packets, each containing about 9000 bytes out of the 1 MiB of sample data. For this particular heap, we received only 367344 bytes of payload, and so it was dropped. It is worth mentioning that the heap ID (20) is one assigned by spead2 in the sender and encoded into the packet; it does not necessarily match the loop index in the for loop in the sender code.

Figure made with TikZ

Data flow in the receive path.

The “full ringbuffer” message is more of a warning. The worker thread (in the thread pool) operates by receiving data from the network socket and then pushing complete heaps into a ringbuffer. If our code doesn’t take those heaps off the ringbuffer fast enough, then the ringbuffer fills up, and the worker thread has to wait for space before it can push the next heap. While it’s waiting, it’s not consuming packets from the socket, which eventually fills up and causes subsequent packets to be dropped. On its own, the “full ringbuffer” message isn’t necessarily an issue, but when we see it together with lost data, it tells us that it’s the processing in our own code that’s too slow, rather than spead2’s packet handling.

The most likely candidate is the calculation of the power. Let’s write a more optimal version of that. In the Python case, we use numba to compile code on the fly, and in both cases, we’re doing the accumulation in integers to avoid the cost of integer-to-float conversions for every element. We’ll also print out the number of heaps received, just to ensure we didn’t miss any. The warnings about incomplete heaps should tell us, but if we lose 100% of the packets in a heap we won’t get any warning about it from spead2.

import numba
...
@numba.njit
def mean_power(adc_samples):
    total = np.int64(0)
    for i in range(len(adc_samples)):
        sample = np.int64(adc_samples[i])
        total += sample * sample
    return np.float64(total) / len(adc_samples)

def main():
    ...
    n_heaps = 0
    # Run it once to trigger compilation for int8
    mean_power(np.ones(1, np.int8))  # Trigger JIT
    for heap in stream:
        ...
        power = mean_power(item_group["adc_samples"].value)
        n_heaps += 1
        print(f"Timestamp: {timestamp:<10} Power: {power:.2f}")
    print(f"Received {n_heaps} heaps")
#if defined(__GNUC__) && defined(__x86_64__)
// Compile this function with AVX2 for better performance. Remove this if your
// CPU does not support AVX2 (e.g., if you get an Illegal Instruction error).
[[gnu::target("avx2")]]
#endif
static double mean_power(const std::int8_t *adc_samples, std::size_t length)
{
    std::int64_t sum = 0;
    for (std::size_t i = 0; i < length; i++)
        sum += adc_samples[i] * adc_samples[i];
    return double(sum) / length;
}

int main()
{
    ...
    std::int64_t n_heaps = 0;
    for (const spead2::recv::heap &heap : stream)
    {
        ...
        if (timestamp >= 0 && adc_samples != nullptr)
        {
            double power = mean_power(adc_samples, length);
            n_heaps++;
            ...
        }
    }
    std::cout << "Received " << n_heaps << " heaps\n";
    return 0;
}

On my machine, the receiver now keeps up with the sender and receives all 1000 heaps, although it is somewhat tight so you might get different results.

Full code

#!/usr/bin/env python3

# Copyright 2023 National Research Foundation (SARAO)
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import numba
import numpy as np

import spead2.recv


@numba.njit
def mean_power(adc_samples):
    total = np.int64(0)
    for i in range(len(adc_samples)):
        sample = np.int64(adc_samples[i])
        total += sample * sample
    return np.float64(total) / len(adc_samples)


def main():
    thread_pool = spead2.ThreadPool()
    stream = spead2.recv.Stream(thread_pool)
    stream.add_udp_reader(8888)
    item_group = spead2.ItemGroup()
    n_heaps = 0
    # Run it once to trigger compilation for int8
    mean_power(np.ones(1, np.int8))
    for heap in stream:
        item_group.update(heap)
        timestamp = item_group["timestamp"].value
        power = mean_power(item_group["adc_samples"].value)
        n_heaps += 1
        print(f"Timestamp: {timestamp:<10} Power: {power:.2f}")
    print(f"Received {n_heaps} heaps")


if __name__ == "__main__":
    main()
/* Copyright 2023-2024 National Research Foundation (SARAO)
 *
 * This program is free software: you can redistribute it and/or modify it under
 * the terms of the GNU Lesser General Public License as published by the Free
 * Software Foundation, either version 3 of the License, or (at your option) any
 * later version.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
 * details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <cassert>
#include <cstdint>
#include <cstddef>
#include <iostream>
#include <iomanip>
#include <boost/asio.hpp>
#include <spead2/common_ringbuffer.h>
#include <spead2/common_thread_pool.h>
#include <spead2/recv_ring_stream.h>
#include <spead2/recv_udp.h>
#include <spead2/recv_heap.h>

#if defined(__GNUC__) && defined(__x86_64__)
// Compile this function with AVX2 for better performance. Remove this if your
// CPU does not support AVX2 (e.g., if you get an Illegal Instruction error).
[[gnu::target("avx2")]]
#endif
static double mean_power(const std::int8_t *adc_samples, std::size_t length)
{
    std::int64_t sum = 0;
    for (std::size_t i = 0; i < length; i++)
        sum += adc_samples[i] * adc_samples[i];
    return double(sum) / length;
}

int main()
{
    spead2::thread_pool thread_pool;
    spead2::recv::ring_stream stream(thread_pool);
    boost::asio::ip::udp::endpoint endpoint(boost::asio::ip::address_v4::any(), 8888);
    stream.emplace_reader<spead2::recv::udp_reader>(endpoint);
    std::int64_t n_heaps = 0;
    for (const spead2::recv::heap &heap : stream)
    {
        std::int64_t timestamp = -1;
        const std::int8_t *adc_samples = nullptr;
        std::size_t length = 0;
        for (const auto &item : heap.get_items())
        {
            if (item.id == 0x1600)
            {
                assert(item.is_immediate);
                timestamp = item.immediate_value;
            }
            else if (item.id == 0x3300)
            {
                adc_samples = reinterpret_cast<const std::int8_t *>(item.ptr);
                length = item.length;
            }
        }
        if (timestamp >= 0 && adc_samples != nullptr)
        {
            double power = mean_power(adc_samples, length);
            n_heaps++;
            std::cout
                << "Timestamp: " << std::setw(10) << std::left << timestamp
                << " Power: " << power << '\n';
        }
    }
    std::cout << "Received " << n_heaps << " heaps\n";
    return 0;
}