SIMD для цифровой обработки сигналов

Несмотря на то, что мой блог о разработке на Java, мне иногда приходится расчехлять старые знания и писать на Си. Вообще, программирование на Си - это целый мир со своими заморочками, инструментами и подходами. И он совсем не пересекается с тем, что творится в Java. Это, с одной стороны, немного досадно, так как все накопленные знания не имеют никакого смысла, а с другой стороны немного волнительно, как открытие Америки.

Совсем недавно я придумал одну идею - сделать небольшой TCP сервер, который бы раздавал некоторый диапазон радио частот клиентам. Я наверное по-подробнее напишу про этот проект потом, когда закончу, а в этой статье опишу небольшую часть функциональности. А именно - использование SIMD инструкций процессора для цифровой обработки сигналов.

Frequency Xlating FIR filter

Большинство блоков в обработке сигналов основаны на КИХ-фильтрах. Схема их работы достаточно простая:

На вход подаётся оцифрованный сигнал. Каждый сэмпл сигнала нужно перемножить с импульсной характеристикой фильтра, сложить и получить один выходной сэмпл. Импульсная характеристика фильтра - это некоторые коэффициенты типа float. Если представить в виде уравнения, то выглядит это так:

float_input_1 * float_filter_1 + float_input_2 * float_filter_2 + ... = filtered sample

После того, как получен первый сэмпл, нужно сдвинуть фильтр на 1 элемент влево и ещё раз умножить все коэффициенты:

float_input_2 * float_filter_1 + float_input_3 * float_filter_2 + ... = filtered sample

И так далее.

Наивная реализация

Итак, для того, чтобы запрограммировать такой фильтр, нужно написать совсем простой цикл:

float sum0[2] = {0,0};
float sum1[2] = {0,0};
unsigned int i = 0;

for(i = 0; i < number_of_samples; ++i) {
    sum0[0] += in[0] * tp[0] - in[1] * tp[1];
    sum0[1] += in[0] * tp[1] + in[1] * tp[0];
    sum1[0] += in[2] * tp[2] - in[3] * tp[3];
    sum1[1] += in[2] * tp[3] + in[3] * tp[2];

    in += 4;
    tp += 4;
}

res[0] = sum0[0] + sum1[0];
res[1] = sum0[1] + sum1[1];

В примере выше выполняется перемножение двух массивов комплексных чисел. Вы же не думали, что фильтр - это просто? Но суть процесса от этого не меняется. Нужно очень много раз перемножить float и сложить результаты. Именно так и работает FIRFilter в моём проекте jradio.

Использование SIMD

Как раз для таких вычислений и придумали SIMD - single instruction multiple data. Эти инструкции позволяют загрузить много данных в особые регистры процессора и выполнить операцию одной командой. Это, теоретически, позволить ускорить выполнение вычислений. Конечно, тут есть очень много тонкостей. Например, это может замедлить программу, если данные неправильно лежат в памяти. Или SIMD инструкции на одном процессоре будут совершенно по-другому работать на другом. Или размер этих регистров в RaspberryPI сильно отличается от размеров Intel. Но и самая неприятная штука - надо будет писать код на ассемблере.

Но не стоит унывать! Разработчики со всего мира собрались и придумали способы упростить жизнь. В частности, придумали такой проект, как volk. Эта Си библиотека содержит множество частоупотребимых функций для подобных вычислений. Достаточно лишь подключить библиотеку и вызвать нужную функцию:

volk_32fc_x2_dot_prod_32fc_a(lv_32fc_t* output, const lv_32fc_t* input, const lv_32fc_t* filter, unsigned int size);

А в зависимости от архитектуры процессора и типов SIMD инструкций, библиотека подставит наиболее быструю реализацию этой функции. При этом все реализации таких функций пишутся вручную как на Си, так и на ассемблере. Вот пример реализации для ARM NEON (используется в RaspberryPI):

@ static inline void volk_32fc_x2_dot_prod_32fc_neonasm(float* cVector, const float* aVector, const float* bVector, unsigned int num_points);
	.global	volk_32fc_x2_dot_prod_32fc_neonasm
volk_32fc_x2_dot_prod_32fc_neonasm:
    push    {r4, r5, r6, r7, r8, lr}
    vpush   {q0-q7}
    vpush   {q8-q15}
    mov r8, r3          @ hold on to num_points (r8)
    @ zero out accumulators -- leave 1 reg in alu
    veor    q8, q15, q15
    mov r7, r0          @ (r7) is cVec
    ...

Если же процессор не поддерживает нужную SIMD инструкцию или вообще не поддерживает SIMD, то volk подставит наивную реализацию с использованием циклов.

Выравнивание

Единственное, что требует volk - это выровненные массивы. Понятие “выравнивание” не существует в Java, поэтому пришлось вспоминать университетский курс по программированию. Тогда нам рассказывали, что выравнивание как-то связано с шиной данных. И выровненные данные каким-то образом ускоряют их обработку. Но тогда это было упомянуто чисто теоретически и вскользь. Немного поискав в Интернете, я нашёл отличную статью (и перевод) о том, что такое выравнивание данных, зачем оно нужно и как использовать.

Если вкратце, то выравнивают не сами данные, а расположение данных в памяти. Указатель на начало данных должен быть кратен размеру шины данных. В таком случае считывание данных из памяти в процессор будет происходить быстрее. А значит и операции будут происходить быстрее.

volk предоставляет несколько удобных методов для задания выравнивания:

size_t volk_get_alignment(void);

Это метод получает размер шины данных или необходимую кратность начала данных. На моём Macbook Air это значение равно 32.

void* volk_malloc(size_t size, size_t alignment);

Второй метод как раз и выделяет выровненную память нужного размера.

Выравнивание памяти для КИХ фильтра

Но выделить выровненную память недостаточно. Если вернуться в описание фильтра, то можно увидеть, что операция с выровненной памятью будет только для первого элемента массива. После смещения на один элемент, указатель будет показывать на адрес не кратный нужному выравниванию.

Чтобы этого избежать, можно применить следующую хитрость:

Для этого нужно создать несколько различных вариантов фильтра. При этом массив параметров фильтра всегда будет выровнен, а вот коэффициенты в начале могут быть 0. Несмотря на то, что появится несколько лишних операций умножения на 0, оно того стоит.

На практике такой трюк можно записать следующим образом:

size_t alignment = volk_get_alignment();
float *buffer = volk_malloc(sizeof(float) * 16, alignment);
const lv_32fc_t *buf = (const lv_32fc_t*) (buffer + 2); // 2 - some random offset

const lv_32fc_t *aligned_buffer = (const lv_32fc_t *)((size_t)buf & ~(alignment - 1));
unsigned align_index = buf - aligned_buffer; // index in the array of aligned filter configurations
printf("alignment: %zu\n", alignment);
printf("initial %p\n", buffer);
printf("offset  %p\n", buf);
printf("aligned %p\n", aligned_buffer);
printf("index: %d\n", align_index);

На моём Macbook Air вывод будет следующим:

alignment: 32
initial 0x7fc6f0c07020
offset  0x7fc6f0c07028
aligned 0x7fc6f0c07020
index: 1

К слову сказать, отсутствие выравнивания иногда крэшило программу в segmentation fault.

Результаты

Для того, чтобы измерить улучшение производительности, я написал небольшую программу. Её суть достаточно проста: в цикле вызывать фильтр. Полученное время запуска поделить на общее количество циклов. Результаты выглядят следующим образом:

Macbook Air Raspberrypi 3
Naive 0.005615 s 0.082677 s
SIMD 0.002649 s 0.032759 s

Как видно, использование SIMD в 2+ раза ускоряет работу программы!