3. Receiver, version 1

Now that we have a sender, let’s write a receiver. For the sake of an example, let’s have it report the average power (mean squared value) of the samples in each heap. As before, the full code can be found at the bottom of the page, and in the examples/tutorial directory of the spead2 repository.

The initial boilerplate looks similar to the sender, and once again we’ll need a thread pool.

#!/usr/bin/env python3

import numpy as np
import spead2.recv


def main():
    thread_pool = spead2.ThreadPool()
#include <cassert>
#include <cstdint>
#include <cstddef>
#include <iostream>
#include <iomanip>
#include <boost/asio.hpp>
#include <spead2/common_ringbuffer.h>
#include <spead2/common_thread_pool.h>
#include <spead2/recv_ring_stream.h>
#include <spead2/recv_udp.h>
#include <spead2/recv_heap.h>

int main()
{
    spead2::thread_pool thread_pool;

Next we’ll declare the stream. In C++ we declare it as a ring_stream, but in Python just as a Stream. They are the same concept, but the C++ API provides other sorts of streams (hence the more verbose name) which the Python API does not. The “ring” in the name indicates that incoming heaps are placed on a ringbuffer. Readers can also take a configuration object (similarly to senders), but we won’t need one for now.

    stream = spead2.recv.Stream(thread_pool)
    spead2::recv::ring_stream stream(thread_pool);

You’ll also notice that the class name does not specify the underlying transport (UDP). Receive streams are quite flexible and in theory can even accept packets from multiple different protocols simultaneously. To feed data into a stream one must attach one or more readers. In C++ each reader is represented by a class (in this case, udp_reader) although the user does not directly instantiate the class. In Python there is an add method for each reader type. We’ll have our reader listen on UDP port 8888, the same port our sender is transmitting on.

    stream.add_udp_reader(8888)
    boost::asio::ip::udp::endpoint endpoint(boost::asio::ip::address_v4::any(), 8888);
    stream.emplace_reader<spead2::recv::udp_reader>(endpoint);

Now we’ll write a loop to iterate over the heaps. The processing of the heap is left until later. For convenience, the stream object can be iterated to obtain the heaps as they arrive.

    item_group = spead2.ItemGroup()
    for heap in stream:
        ...


if __name__ == "__main__":
    main()
    for (const spead2::recv::heap &heap : stream)
    {
        ...
    }
    return 0;
}

Now we’ll fill in the body of the loop to process the heap, by computing the mean of the squares of the samples. In Python we can just update the item group with the heap, which will create items from the descriptors in the first heap and also update the values. The C++ API doesn’t have item groups, and it leaves interpretation of descriptors up to the user. Ideally we would parse the descriptor to determine the item IDs for timestamp and adc_samples and also learn about their types, but to keep things simple we’ll just hard-code our knowledge about them from the receiver. We’re also hard-coding the assumption that the timestamp has in fact been encoded as an immediate value, for which spead2 provides a convenient way to retrieve it. If it wasn’t encoded as an immediate, we would have to use item.ptr and item.length to retrieve the raw 40-bit big-endian value and decode it.

        item_group.update(heap)
        timestamp = item_group["timestamp"].value
        power = np.mean(np.square(item_group["adc_samples"].value, dtype=int))
        print(f"Timestamp: {timestamp:<10} Power: {power:.2f}")
        std::int64_t timestamp = -1;
        const std::int8_t *adc_samples = nullptr;
        std::size_t length = 0;
        for (const auto &item : heap.get_items())
        {
            if (item.id == 0x1600)
            {
                assert(item.is_immediate);
                timestamp = item.immediate_value;
            }
            else if (item.id == 0x3300)
            {
                adc_samples = reinterpret_cast<const std::int8_t *>(item.ptr);
                length = item.length;
            }
        }
        if (timestamp >= 0 && adc_samples != nullptr)
        {
            double power = 0.0;
            for (std::size_t i = 0; i < length; i++)
                power += adc_samples[i] * adc_samples[i];
            power /= length;
            std::cout
                << "Timestamp: " << std::setw(10) << std::left << timestamp
                << " Power: " << power << '\n';
        }

Note that the Python code doesn’t do any error checking: if we missed the first heap, we won’t receive the descriptors, and so item_group["timestamp"] will raise a KeyError. You can test this by starting the receiver slightly after the sender. Additionally, ItemGroup.update() can fail for a number of reasons, such as a transmitted item having the wrong number of bytes relative to its descriptor.

If you’re following in C++, you’ll again need to compile this example code (see the previous section for instructions). Now run the receiver in one terminal, then run the sender from the previous section in another. You should see output something like the following:

Timestamp: 0          Power: 3328.61
Timestamp: 1048576    Power: 3335.04
Timestamp: 2097152    Power: 3330.53
Timestamp: 3145728    Power: 3336.71
Timestamp: 4194304    Power: 3333.94
Timestamp: 5242880    Power: 3334.75
Timestamp: 6291456    Power: 3336.29
Timestamp: 7340032    Power: 3333.02
Timestamp: 8388608    Power: 3334.64
Timestamp: 9437184    Power: 3334.27

Full code

#!/usr/bin/env python3

# Copyright 2023 National Research Foundation (SARAO)
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import numpy as np

import spead2.recv


def main():
    thread_pool = spead2.ThreadPool()
    stream = spead2.recv.Stream(thread_pool)
    stream.add_udp_reader(8888)
    item_group = spead2.ItemGroup()
    for heap in stream:
        item_group.update(heap)
        timestamp = item_group["timestamp"].value
        power = np.mean(np.square(item_group["adc_samples"].value, dtype=int))
        print(f"Timestamp: {timestamp:<10} Power: {power:.2f}")


if __name__ == "__main__":
    main()
/* Copyright 2023 National Research Foundation (SARAO)
 *
 * This program is free software: you can redistribute it and/or modify it under
 * the terms of the GNU Lesser General Public License as published by the Free
 * Software Foundation, either version 3 of the License, or (at your option) any
 * later version.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
 * details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <cassert>
#include <cstdint>
#include <cstddef>
#include <iostream>
#include <iomanip>
#include <boost/asio.hpp>
#include <spead2/common_ringbuffer.h>
#include <spead2/common_thread_pool.h>
#include <spead2/recv_ring_stream.h>
#include <spead2/recv_udp.h>
#include <spead2/recv_heap.h>

int main()
{
    spead2::thread_pool thread_pool;
    spead2::recv::ring_stream stream(thread_pool);
    boost::asio::ip::udp::endpoint endpoint(boost::asio::ip::address_v4::any(), 8888);
    stream.emplace_reader<spead2::recv::udp_reader>(endpoint);
    for (const spead2::recv::heap &heap : stream)
    {
        std::int64_t timestamp = -1;
        const std::int8_t *adc_samples = nullptr;
        std::size_t length = 0;
        for (const auto &item : heap.get_items())
        {
            if (item.id == 0x1600)
            {
                assert(item.is_immediate);
                timestamp = item.immediate_value;
            }
            else if (item.id == 0x3300)
            {
                adc_samples = reinterpret_cast<const std::int8_t *>(item.ptr);
                length = item.length;
            }
        }
        if (timestamp >= 0 && adc_samples != nullptr)
        {
            double power = 0.0;
            for (std::size_t i = 0; i < length; i++)
                power += adc_samples[i] * adc_samples[i];
            power /= length;
            std::cout
                << "Timestamp: " << std::setw(10) << std::left << timestamp
                << " Power: " << power << '\n';
        }
    }
    return 0;
}