watermelon 发表于 2020-11-14 18:57:40

【C Fortran】使用SIMD加速4x4矩阵Jacobi迭代计算

上次自然辩证法上看知乎,忽然又想起来了SIMD,当初大约两年前,小弟我还是大三上半学期,初次见到站长写的SIMD加速计算,是使用泰勒展开法计算的cos函数(精度为16阶),当初仔细研读了代码并写了注释进行学习。
具体的代码以及SIMD的相关介绍可以见该贴:传送门->https://www.0xaa55.com/forum.php?mod=viewthread&tid=16950&highlight=0xAA55

这次是自己进行写一个简单的SIMD加速计算,是使用的4x4的Jacobi迭代进行练手,之前想过一些矩阵的直接解法使用SIMD计算的,但是碍于小弟水平有限,感觉不是那么好用SIMD进行编写,所以我就找了一个软柿子捏。
并且注意这次的这个SIMD计算的程序只能进行4x4的矩阵运算,因为__m128可以存储4个float(32bit),若矩阵的维度多于4,感觉需要分块进行SIMD的计算;若矩阵维度少于4,则直接使用ANSI C进行计算即可。

直接上代码:
C语言代码
/*
* Use the SIMD to accelerate the calculation speed of the 4X4 matrix in Jacobi Iteration
* Author       : Zhou Xingguang
* Organization : Xi'an Jiaotong University - NuTHeL
* Date         : 2020/11/13
*/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <xmmintrin.h>
#include <smmintrin.h>
#include <Windows.h>

#define STOP_STEP 50
#define EPSILON 1.0E-3


// 4x4 matrix
void JacobiIter(float (*matrix), float *b, float *x)
{
        int i;
        float *x_front = NULL;
        __m128 _matrix_0 = _mm_loadu_ps(matrix);
        __m128 _matrix_1 = _mm_loadu_ps(matrix);
        __m128 _matrix_2 = _mm_loadu_ps(matrix);
        __m128 _matrix_3 = _mm_loadu_ps(matrix);
        __m128 _x = _mm_set1_ps(0);
        __m128 _b = _mm_loadu_ps(b);
        __m128 _x_front = _mm_set1_ps(0);
        __m128 _diagonal = _mm_set_ps(matrix,
                matrix,
                matrix,
                matrix);

        // DEBUG USE
        __m128 temp;
        __m128 _sum0, _sum1, _sum2, _sum3;
        __m128 _sum_final;

        x_front = (float *)malloc(sizeof(float)*4);
        memset(x_front, 0, sizeof(float)* 4);

        _x = _mm_loadu_ps(x);
        _x_front = _mm_loadu_ps(x_front);

        for (i = 0; i < STOP_STEP; i++)
        {
                // only the SSE4.1 can use the dot multiply between two _m128 vectors
                temp = _mm_mul_ps(_matrix_0, _x_front);
                _sum0 = _mm_dp_ps(temp, _mm_set1_ps(1.0), 0xFF);

                temp = _mm_mul_ps(_matrix_1, _x_front);
                _sum1 = _mm_dp_ps(temp, _mm_set1_ps(1.0), 0xFF);

                temp = _mm_mul_ps(_matrix_2, _x_front);
                _sum2 = _mm_dp_ps(temp, _mm_set1_ps(1.0), 0xFF);

                temp = _mm_mul_ps(_matrix_3, _x_front);
                _sum3 = _mm_dp_ps(temp, _mm_set1_ps(1.0), 0xFF);

                // make the multiple result.
                __m128 m000 = _mm_move_ss(_sum1, _sum0);
                __m128 m111 = _mm_movelh_ps(m000, _sum2);
                __m128 m222 = _mm_shuffle_ps(m111, m111, _MM_SHUFFLE(0, 1, 2, 3));
                _sum_final = _mm_move_ss(m222, _sum3);
                _sum_final = _mm_shuffle_ps(_sum_final, _sum_final, _MM_SHUFFLE(0, 1, 2, 3));

                temp = _mm_add_ps(_mm_sub_ps(_b, _sum_final), _mm_mul_ps(_diagonal, _x_front));
                _x = _mm_div_ps(temp, _diagonal);

                // begin to judge the 2-Norm of the subtract vector between the solution vector and its front vector
                __m128 square = _mm_mul_ps(_mm_sub_ps(_x, _x_front), _mm_sub_ps(_x, _x_front));
                temp = _mm_dp_ps(square, _mm_set1_ps(1.0), 0xFF);
                __m128 sqrt = _mm_sqrt_ps(temp);
                // if the 2-Norm is less than 1E-3, then we get the solution, return the solution.
                if (sqrt.m128_f32 < EPSILON)
                {
                        printf("[*] 2-Norm is less than 1E-3...\n");
                        goto end;
                }

                // record the front value
                _x_front = _x;

                // print the result of each iteration
                printf("Iteration : %d\n", i + 1);
                printf("x = %f, %f, %f, %f\n", _x.m128_f32,
                        _x.m128_f32,
                        _x.m128_f32,
                        _x.m128_f32);
        }
end:
        _mm_storeu_ps(x, _x);
        _mm_sfence();
        _ReadWriteBarrier();
        free(x_front);
        x_front = NULL;
        return;
}


int main(void)
{
        int i, j;
        float matrix = { { 2.52, 0.95, 1.25, -0.85 },
        { 0.39, 1.69, -0.45, 0.49 },
        { 0.55, -1.25, 1.96, -0.98 },
        { 0.23, -1.15, -0.45, 2.31 } };

        float b = { 1.38, -0.34, 0.67, 1.52 };
        float x = { 0 };   // make the initial value to zero

        // time consumption test
        SetThreadAffinityMask(GetCurrentThread(), 1);
        LARGE_INTEGER begin_JacobiIter;
        LARGE_INTEGER end_JacobiIter;
        LARGE_INTEGER freq;

        QueryPerformanceFrequency(&freq);
        QueryPerformanceCounter(&begin_JacobiIter);
       
        for (i = 0; i < 100; i++)
        {
                for (j = 0; j < 1048576; j++)
                {
                        JacobiIter(matrix, b, x);
                }
        }
       
        QueryPerformanceCounter(&end_JacobiIter);
       
        // print the result
        printf("The final result: x = %f, %f, %f, %f\n", x, x, x, x);

        printf("Time consumption:%f\n",
                        (float)(end_JacobiIter.QuadPart - begin_JacobiIter.QuadPart) / (float)freq.QuadPart);

        return 0;
}
运行结果:
The final result: x = 0.883346, -0.513747, -0.084159, 0.298170
Time consumption:22.806086
请按任意键继续. . .

Fortran代码
! Jacobi Iteration
    module JACOBI
    implicit none
      real(8), parameter :: epsilon = 1E-3
      integer, parameter :: stop_step = 1e3
    contains
      ! Iteration solver
      subroutine IterSolver(matrix, b, x)
      implicit none
            real(8), intent(inout) :: matrix(:, :)
            real(8), intent(inout) :: b(:)
            real(8), intent(inout) :: x(:)
            real(8), allocatable   :: x_front(:)    ! try to record the value of the front calculation.
            integer                :: N
            integer                :: i, j
            N = size(matrix, 1)
            allocate(x_front(N))
            ! We should to try to calculate the initial value of x
            ! and, we try to initial the x = 0 here.
            x = 0
            x_front = x
            ! begin to iteration
            do i=1, stop_step
                !write(*, *) "step", i
                !write(*, *) x
                do j=1, N
                  x(j) = (b(j)-sum(matrix(j, :)*x_front)+matrix(j, j)*x_front(j)) / matrix(j, j)
                end do
                ! Stop condition
                ! Try to judge the 2-Norm of the subtract vector
                ! between the solution vector and its front vector.
                if(sqrt(sum((x - x_front) * (x - x_front))) .LT. epsilon) then
                  !write(*, *) "iteration stop:", x
                  exit
                end if
                ! give the value to x_front
                x_front = x
            end do
            deallocate(x_front)
            return
      end subroutine
    end module
    !
    ! MAIN
    !
    program main
    use JACOBI
    implicit none
      real(8) :: matrix(4, 4)
      real(8) :: b(4) = (/ 1.38, -0.34, 0.67, 1.52 /)
      real(8) :: x(4)
      real(8) :: time_start, time_end
      integer :: i, j
      matrix(1, :) = (/2.52, 0.95, 1.25, -0.85 /)
      matrix(2, :) = (/ 0.39, 1.69, -0.45, 0.49 /)
      matrix(3, :) = (/ 0.55, -1.25, 1.96, -0.98 /)
      matrix(4, :) = (/ 0.23, -1.15, -0.45, 2.31 /)
      
      call cpu_time(time_start)
      do i=1, 100
            do j=1, 1048576
                call IterSolver(matrix, b, x)
            end do
      end do
      call cpu_time(time_end)
      write(*, *) "The final result:", x
      write(*, *) "Time consumption:", time_end - time_start
    end program

The final result:0.883345862741093      -0.513746862467949
-8.415921190582545E-0020.298170337224724
Time consumption:   36.2500000000000
请按任意键继续. . .

这两个程序中逻辑结构是一模一样的,并且两个程序均为单线程程序,可以看到该C代码与Fortran代码计算的结果相同,不同的只有运行时间上的差别,其中C代码中使用了高精度计时函数QPC,Fortran代码中也使用了高精度计时函数cpu_time,
最终我们可以看到在循环进行100x1048576次Jacobi迭代后,我们可以看到SIMD的代码要比Fortran代码快了将近13秒多。

【注】
1.本程序C代码是使用msvc2013的速度最优优化,同时Fortran代码使用了优化能力较强的Intel Visual Fortran2013进行了速度最优优化,具体的项目配置可以见下面打包的项目,直接用vs2013打开项目文件即可。
2.程序进行测速时,将程序中的打印语句均注释掉了,防止因为打印语句而影响了最终函数计算的测速;若要观察每一步的迭代结果,只需要将循环次数改为1,同时,将C代码中第79,87-91行的注释取消,Fortran代码中第24,25,33行注释取消即可观察到每一步运行的结果。
3.本SIMD程序使用了_mm_dp_ps的向量内积函数,需要支持SSE4.1的指令集,不过目前稍微“现代”一些的CPU都支持SSE4.1了吧(起码教研室的电脑是都支持的)!
希望各位老师同学多多批评指正!

0xAA55 发表于 2020-11-17 18:08:01

再次阅读你的代码的时候我严重怀疑你的Fortran的real实数类型其实是C的double双精度浮点数类型。你这不科学啊,要不要考虑写一个对双精度浮点数做加速运算的方法来测试测试?

说起来,AVX指令集就是做这个的,4个double同时算。你可以试试AVX。

0xAA55 发表于 2020-11-17 05:45:43

watermelon 发表于 2020-11-16 18:43
哦哦好的,小弟我是这样想的,最后我有一个_mm_storeu_ps(x, _x),然后我感觉就需要有一个_mm_sfence()来 ...

我当时那份代码加这两条语句是为了保证ABI层面的兼容性,也就是把浮点数从SSE寄存器里抠出来,然后用FPU栈顶来返回。

之所以这么做,是因为我当时考虑在运行时通过判断CPUID来决定是否使用SIMD,以及使用哪个版本的SSE。

后来一方面不爽函数指针对优化效果的副作用,另一方面每次返回浮点数都要走RAM来把SSE存储的浮点数倒进FPU。这些低效率的操作抵消了我的数学库的优化作用。于是我更新了我的mathutil库,默认做法就是只进行编译时的指令集选择。

而且事实上作为数学加速库我并不考虑支持2008年以前的CPU的加速效果了。所以要么FPU,要么SSE4.2。

你不需要考虑用FPU传递浮点数运行结果,自然不需要_mm_sfence();和_ReadWriteBarrier();这两条语句了。

0xAA55 发表于 2020-11-16 16:03:08

_mm_sfence();和_ReadWriteBarrier();是不是不需要了?我感觉是不需要的

watermelon 发表于 2020-11-16 18:43:39

0xAA55 发表于 2020-11-16 16:03
_mm_sfence();和_ReadWriteBarrier();是不是不需要了?我感觉是不需要的

哦哦好的,小弟我是这样想的,最后我有一个_mm_storeu_ps(x, _x),然后我感觉就需要有一个_mm_sfence()来保证内容写入以后再返回,然后我实际上对_ReadWriteBarrier()不是特别了解,然后我看原先学习您的那个帖子,是_mm_sfence()和_ReadWriteBarrier()写在一起的,所以我就写在一起了。
对于_ReadWriteBarrier()这个函数,只找到了一个繁体字版本的MSDN:https://docs.microsoft.com/zh-tw/cpp/intrinsics/readwritebarrier?view=msvc-160

watermelon 发表于 2020-11-17 08:30:20

0xAA55 发表于 2020-11-17 05:45
我当时那份代码加这两条语句是为了保证ABI层面的兼容性,也就是把浮点数从SSE寄存器里抠出来,然后用FPU ...

可以可以!
原来如此!
一开始我也考虑要不要使用cpuid来判断用户的指令集是否支持SSE4.2,后来因为时间问题就没有深究;
学习到了!

watermelon 发表于 2020-11-17 19:17:51

0xAA55 发表于 2020-11-17 18:08
再次阅读你的代码的时候我严重怀疑你的Fortran的real实数类型其实是C的double双精度浮点数类型。你这不科学 ...

草,我忘了,的确,real(8)是一个double类型的,real对应float,好的好的,我了解一下AVX,感谢A5大佬指导!

0xAA55 发表于 2020-11-18 13:14:53

watermelon 发表于 2020-11-17 19:17
草,我忘了,的确,real(8)是一个double类型的,real对应float,好的好的,我了解一下AVX,感谢A5大佬指 ...

以及SSE可以做到2个double同时算。虽然听起来不是很“并行”。
页: [1]
查看完整版本: 【C Fortran】使用SIMD加速4x4矩阵Jacobi迭代计算