SIMD для цифровой обработки сигналов
Несмотря на то, что мой блог о разработке на Java, мне иногда приходится расчехлять старые знания и писать на Си. Вообще, программирование на Си - это целый мир со своими заморочками, инструментами и подходами. И он совсем не пересекается с тем, что творится в Java. Это, с одной стороны, немного досадно, так как все накопленные знания не имеют никакого смысла, а с другой стороны немного волнительно, как открытие Америки.
Совсем недавно я придумал одну идею - сделать небольшой TCP сервер, который бы раздавал некоторый диапазон радио частот клиентам. Я наверное по-подробнее напишу про этот проект потом, когда закончу, а в этой статье опишу небольшую часть функциональности. А именно - использование SIMD инструкций процессора для цифровой обработки сигналов.
Frequency Xlating FIR filter
Большинство блоков в обработке сигналов основаны на КИХ-фильтрах. Схема их работы достаточно простая:
На вход подаётся оцифрованный сигнал. Каждый сэмпл сигнала нужно перемножить с импульсной характеристикой фильтра, сложить и получить один выходной сэмпл. Импульсная характеристика фильтра - это некоторые коэффициенты типа float. Если представить в виде уравнения, то выглядит это так:
float_input_1 * float_filter_1 + float_input_2 * float_filter_2 + ... = filtered sample
После того как получен первый сэмпл, нужно сдвинуть фильтр на 1 элемент влево и ещё раз умножить все коэффициенты:
float_input_2 * float_filter_1 + float_input_3 * float_filter_2 + ... = filtered sample
И так далее.
Наивная реализация
Итак, для того, чтобы запрограммировать такой фильтр, нужно написать совсем простой цикл:
float sum0[2] = {0,0};
float sum1[2] = {0,0};
unsigned int i = 0;
for(i = 0; i < number_of_samples; ++i) {
sum0[0] += in[0] * tp[0] - in[1] * tp[1];
sum0[1] += in[0] * tp[1] + in[1] * tp[0];
sum1[0] += in[2] * tp[2] - in[3] * tp[3];
sum1[1] += in[2] * tp[3] + in[3] * tp[2];
in += 4;
tp += 4;
}
res[0] = sum0[0] + sum1[0];
res[1] = sum0[1] + sum1[1];
В примере выше выполняется перемножение двух массивов комплексных чисел. Вы же не думали, что фильтр - это просто? Но суть процесса от этого не меняется. Нужно очень много раз перемножить float и сложить результаты. Именно так и работает FIRFilter в моём проекте jradio.
Использование SIMD
Как раз для таких вычислений и придумали SIMD - single instruction multiple data. Эти инструкции позволяют загрузить много данных в особые регистры процессора и выполнить операцию одной командой. Это, теоретически, позволить ускорить выполнение вычислений. Конечно, тут есть очень много тонкостей. Например, это может замедлить программу, если данные неправильно лежат в памяти. Или SIMD инструкции на одном процессоре будут совершенно по-другому работать на другом. Или размер этих регистров в RaspberryPI сильно отличается от размеров Intel. Но и самая неприятная штука - надо будет писать код на ассемблере.
Но не стоит унывать! Разработчики со всего мира собрались и придумали способы упростить жизнь. В частности, придумали такой проект, как volk. Эта Си библиотека содержит множество часто употребимых функций для подобных вычислений. Достаточно лишь подключить библиотеку и вызвать нужную функцию:
volk_32fc_x2_dot_prod_32fc_a(lv_32fc_t* output, const lv_32fc_t* input, const lv_32fc_t* filter, unsigned int size);
А в зависимости от архитектуры процессора и типов SIMD инструкций, библиотека подставит наиболее быструю реализацию этой функции. При этом все реализации таких функций пишутся вручную как на Си, так и на ассемблере. Вот пример реализации для ARM NEON (используется в RaspberryPI):
@ static inline void volk_32fc_x2_dot_prod_32fc_neonasm(float* cVector, const float* aVector, const float* bVector, unsigned int num_points);
.global volk_32fc_x2_dot_prod_32fc_neonasm
volk_32fc_x2_dot_prod_32fc_neonasm:
push {r4, r5, r6, r7, r8, lr}
vpush {q0-q7}
vpush {q8-q15}
mov r8, r3 @ hold on to num_points (r8)
@ zero out accumulators -- leave 1 reg in alu
veor q8, q15, q15
mov r7, r0 @ (r7) is cVec
...
Если же процессор не поддерживает нужную SIMD инструкцию или вообще не поддерживает SIMD, то volk подставит наивную реализацию с использованием циклов.
Выравнивание
Единственное, что требует volk - это выровненные массивы. Понятие “выравнивание” не существует в Java, поэтому пришлось вспоминать университетский курс по программированию. Тогда нам рассказывали, что выравнивание как-то связано с шиной данных. И выровненные данные каким-то образом ускоряют их обработку. Но тогда это было упомянуто чисто теоретически и вскользь. Немного поискав в Интернете, я нашёл отличную статью (и перевод) о том, что такое выравнивание данных, зачем оно нужно и как использовать.
Если вкратце, то выравнивают не сами данные, а расположение данных в памяти. Указатель на начало данных должен быть кратен размеру шины данных. В таком случае считывание данных из памяти в процессор будет происходить быстрее. А значит и операции будут происходить быстрее.
volk предоставляет несколько удобных методов для задания выравнивания:
size_t volk_get_alignment(void);
Это метод получает размер шины данных или необходимую кратность начала данных. На моём Macbook Air это значение равно 32.
void* volk_malloc(size_t size, size_t alignment);
Второй метод как раз и выделяет выровненную память нужного размера.
Выравнивание памяти для КИХ фильтра
Но выделить выровненную память недостаточно. Если вернуться в описание фильтра, то можно увидеть, что операция с выровненной памятью будет только для первого элемента массива. После смещения на один элемент, указатель будет показывать на адрес не кратный нужному выравниванию.
Чтобы этого избежать, можно применить следующую хитрость:
Для этого нужно создать несколько различных вариантов фильтра. При этом массив параметров фильтра всегда будет выровнен, а вот коэффициенты в начале могут быть 0. Несмотря на то, что появится несколько лишних операций умножения на 0, оно того стоит.
На практике такой трюк можно записать следующим образом:
size_t alignment = volk_get_alignment();
float *buffer = volk_malloc(sizeof(float) * 16, alignment);
const lv_32fc_t *buf = (const lv_32fc_t*) (buffer + 2); // 2 - some random offset
const lv_32fc_t *aligned_buffer = (const lv_32fc_t *)((size_t)buf & ~(alignment - 1));
unsigned align_index = buf - aligned_buffer; // index in the array of aligned filter configurations
printf("alignment: %zu\n", alignment);
printf("initial %p\n", buffer);
printf("offset %p\n", buf);
printf("aligned %p\n", aligned_buffer);
printf("index: %d\n", align_index);
На моём Macbook Air вывод будет следующим:
alignment: 32
initial 0x7fc6f0c07020
offset 0x7fc6f0c07028
aligned 0x7fc6f0c07020
index: 1
К слову сказать, отсутствие выравнивания иногда крэшило программу в segmentation fault.
Результаты
Для того чтобы измерить улучшение производительности, я написал небольшую программу. Её суть достаточно проста: в цикле вызывать фильтр. Полученное время запуска поделить на общее количество циклов. Результаты выглядят следующим образом:
Macbook Air | Raspberrypi 3 | |
---|---|---|
Naive | 0.005615 s | 0.082677 s |
SIMD | 0.002649 s | 0.032759 s |
Как видно, использование SIMD в 2+ раза ускоряет работу программы!