Preface


這裡探討的只是簡單,但可達到不錯效果的方法 (但還是很慢 )。以下之探討對於較大型之矩陣才有意義,小型之矩陣執行起來差沒多少。另本文並不會針對 cache hit 、open mp 做分析介紹,講的方法都屬較簡單方式,沒涉及演算法部份。若對效率沒太大要求,Basic 看完便行。若對效率有較高之要求,還是去找 library 吧。

 

Basic

 

先放上一段初學者完整的 code。

 

Code Snippet
  1. /**
  2. * @file mat_mul_sample.c
  3. * @brief sample of matrix multiple
  4. * @author EdisonX < Edison.Shih >
  5. * @date 2012.03.07
  6. */
  7.  
  8. #include <stdio.h>
  9. #include <string.h>
  10. #include <stdlib.h>
  11.  
  12. /*
  13. * @brief allocate matrix space
  14. * @param[in] row #row of matrix
  15. * @param[in] col #col of matrix
  16. * @return double** allocate heap
  17. */
  18. double** mat_new(const size_t row, const size_t col)
  19. {
  20.     size_t i;
  21.     double *ptr;
  22.     double **mat = (double**)malloc(
  23.         sizeof(double) * row * col + sizeof(double*) * row);
  24.     if(mat==NULL) return NULL;
  25.     ptr = (double*)( (char*)mat + sizeof(double*)*row );
  26.     for(i=0; i<row; ++i)
  27.         mat[i] = ptr, ptr+=col;
  28.     return mat;
  29. }
  30.  
  31. /*
  32. * @brief display matrix, has no return value
  33. * @param[in] mat matrix which want to display
  34. * @param[in] row #row of matrix
  35. * @param[in] col #col of matrix
  36. */
  37. void mat_print(double** mat,
  38.     const size_t row, const size_t col)
  39. {
  40.     size_t i, j;
  41.     for(i=0; i<row; ++i){
  42.         for(j=0; j<col; ++j)
  43.             printf("%5.0lf ", mat[i][j]);
  44.         puts("");
  45.     }
  46. }
  47.  
  48. /*
  49. * @brief matrix multiple.
  50. * @param[out] rst multiple result of matrix a and matrix b, rst[m][p]
  51. * @param[in]  a   first matrix, a[m][n]
  52. * @param[in]  b   second matrix, b[n][p]
  53. * @param[in]  m   row count of matrix a
  54. * @param[in]  n   col count of matrix a ( equation to row count of matrix b)
  55. * @param[in]  p   col count of matrix b
  56. * @return     int success or fail
  57. * @retval 0 fail
  58. * @retval 1 success
  59. */
  60. int mat_mul(
  61.     double** rst, double** a, double** b,
  62.     const size_t m, const size_t n, const size_t p)
  63. {
  64.     size_t i, j, k;
  65.     double sum=0.0;
  66.     if(rst==NULL || a==NULL || b==NULL) return 0;
  67.     if(m==0 || n==0 || p==0) return 0;
  68.  
  69.     for(i=0; i<m; ++i)
  70.         for(j=0; j<p; ++j){
  71.             sum=0.0;
  72.             for(k=0; k<n; ++k)
  73.                 sum+=a[i][k]*b[k][j];
  74.             rst[i][j] = sum;
  75.         }
  76.     return 1;
  77. }
  78.  
  79. int main()
  80. {
  81.     unsigned seed = 0;
  82.     size_t i, j;
  83.     const size_t M = 3, N=4, P=3;
  84.  
  85.     double **m1 = mat_new(M,N);
  86.     double **m2 = mat_new(N,P);
  87.     double **r  = mat_new(M,P);
  88.     
  89.     srand(seed);
  90.     // generate matrix m1 and m2
  91.     for(i=0; i<M; ++i) for(j=0; j<N; ++j)
  92.         m1[i][j] = 1.0 + rand() % 10;
  93.     for(i=0; i<N; ++i) for(j=0; j<P; ++j)
  94.         m2[i][j] = 1.0 + rand() % 10;
  95.     // calculate matrix m3
  96.     mat_mul(r, m1, m2, M, N, P);
  97.     
  98.     // display result
  99.     puts("\n[ M1  ]"), mat_print(m1, M, N);
  100.     puts("\n[ M2  ]"), mat_print(m2, N, P);
  101.     puts("\n[ Rst ]"), mat_print(r, M, P);
  102.     free(*m1), free(m1);
  103.     free(*m2), free(m2);
  104.     free(*r),  free(r);
  105.     return 0;
  106. }

 

執行結果參考


[ M1 ]
9 10 9 8
6 8 6 6
1 3 4 1

[ M2 ]
3 2 8
2 6 6
8 1 7
2 6 7

[ Rst ]
135 135 251
94 102 180
43 30 61

 

Cache Hit - 修改 Index 順序

 

上述寫的 mat_mul 是一般教學上常見之撰寫方式,速度算較慢,有一種探討是 針對 loop index , i, j, k 順序做調整,cache hit 會有所不同,所以乘法瞬間變三種寫法。在不考慮例外處理,核心碼大致如下 。

 

( cache hit 次高 )

Code Snippet
  1. void mat_mul_1(
  2.     double** rst,  double** a,  double** b,
  3.     size_t m,  size_t n,  size_t p)
  4. {
  5.     size_t i, j, k;
  6.     double sum=0.0;
  7.  
  8.     for(i=0; i<m; ++i)
  9.         for(j=0; j<p; ++j){
  10.             sum=0.0;
  11.             for(k=0; k<n; ++k)
  12.                 sum+=a[i][k]*b[k][j];
  13.             rst[i][j] = sum;
  14.         }
  15. }

 

( cache hit 最高 )

Code Snippet
  1. void mat_mul_2(
  2.     double** rst,  double** a,  double** b,
  3.     size_t m,  size_t n,  size_t p)
  4. {
  5.     size_t i, j, k;
  6.     double r;
  7.     for(i=0; i<m; ++i) for(j=0; j<p; ++j) rst[i][j]=0.0;
  8.  
  9.     for(k=0; k<n; ++k){
  10.         for(i=0; i<m; ++i){
  11.             r = a[i][k];
  12.             for(j=0; j<p; ++j)
  13.                 rst[i][j] += r * b[k][j];
  14.         }
  15.     }
  16. }

 

( cache hit 最低 )

Code Snippet
  1. void mat_mul_3(
  2.     double** rst,  double** a,  double** b,
  3.     size_t m,  size_t n,  size_t p)
  4. {
  5.     size_t i, j, k;
  6.     double r;
  7.     for(i=0; i<m; ++i) for(j=0; j<p; ++j) rst[i][j]=0.0;
  8.  
  9.     for(j=0; j<p; ++j){
  10.         for(k=0; k<n; ++k){
  11.             r = b[k][j];
  12.             for(i=0; i<m; ++i)
  13.                 rst[i][j] += r * a[i][k];
  14.         }
  15.     }
  16. }

 

 

Cache Hit - Block 


 既然 CPU 之 cache 有限,那一次就只做一個 Block 就好,於是限定每次 matrix 讀取之數量為固定。我們以上述 cache hit 最高的程式碼作為例子。

 

Code Snippet
  1. void mat_mul_block(
  2.     double** rst,  double** a,  double** b,
  3.     int m,  int n,  int p, int BLOCK)
  4. {
  5.     int i, j, k, ii, jj, kk;
  6.     double r;
  7.     memset((void*)(*rst), 0, sizeof(double)*m*p);
  8.     for(kk=0; kk<n; kk+=BLOCK)
  9.         for(ii=0; ii<m; ii+=BLOCK)
  10.             for(jj=0; jj<p; jj+=BLOCK)
  11.                 for(k=kk; k<kk+BLOCK && k<n; ++k)
  12.                     for(i=ii; i<ii+BLOCK && i<m; ++i) {
  13.                         r = a[i][k];
  14.                         for(j=jj; j<jj+BLOCK && j<p; ++j)
  15.                             rst[i][j]+=r*b[k][j];
  16.                     }
  17. }

 

上面這段 code 有兩點可以討論。一是內層的三個回圈,另一種 coding style 是將拆成另一個 function 呼叫,一般人會覺得執行會較浪費時間 ( 因呼叫的次數約是 (N / BLOCK)3 ),但實際上要寫多行緒時反而拆開來寫較方便。另一個是筆者在內層三個 for loop 裡實際的 condition 不是用 k < KK + BLOCK && k < n ,而是寫 ek = MIN(kk+BLOCK, n),condition 變成 k < ek,這些細微的差別有興趣自己試試 (設立 ek 會比其他兩種寫法稍快無誤 )。

 

BLOCK 是控制一次抓取之資料量,但問題是 BLOCK 要設多大?

一種 理論上( 強調理論是因為實作後比這算法還大些,約 4~8 倍 ) 的算法是 cache line size  /  sizeof( double )。但要怎麼抓 cache line size ? Windows 下可查這個 API: GetLogicalProcessorInformation,裡面得到的資訊有 CACHE_DESCRIPTOR,MSDN 裡面也有範例。

故以上述算法而言,筆者手邊硬體測出 cache line size = 64,除上 sizeof(double) = 8,最佳之 BLOCK SIZE 應該要等於 8 。強調的是,算法不只這種,也還有其他文獻提出不同算法。

 

Cache Hit - Transpose

 

事實上,上述講的方法,都沒 Transpose 來得直接。 直接將第二個 matrix 做轉置動作,再相乘,轉置之動作速度與前面的差很多,有興趣可實測。

 

Code Snippet
  1. void mat_mul_trans_block(
  2.     double** rst,  double** a,  double** b,
  3.     int m,  int n,  int p, int BLOCK)
  4. {
  5.     int ii, jj, kk, i, j, k;
  6.     double sum, **transb = mat_new(p,n);
  7.     if(transb==NULL) {
  8.         puts("mat_mul_t allocate fail");
  9.         return ;
  10.     }
  11.     // get transpose of matrix b 
  12.     for(i=0; i<n; ++i) for(j=0; j<p; ++j)
  13.         transb[j][i] = b[i][j];
  14.     memset((void*)(*rst), 0, sizeof(double)*m*p);
  15.  
  16.     for(ii=0; ii<m; ii+=BLOCK)
  17.         for(jj=0; jj<p; jj+=BLOCK)
  18.             for(kk=0; kk<n; kk+=BLOCK)
  19.                 for(i=ii; i<ii+BLOCK && i<m; ++i)
  20.                     for(j=jj; j<jj+BLOCK && j<p; ++j) {
  21.                         sum=0.0;
  22.                         for(k=kk; k<kk+BLOCK && k<n; ++k)
  23.                             sum+=a[i][k]*transb[j][k];
  24.                         rst[i][j]+=sum;
  25.                     }
  26.     free(transb);
  27. }

 

除了和前面兩個地方 ( 再包一個副函式、終止條件問題 ) 外,使用轉置方式必須要再配置 p * m 大小之 matrix,故若矩陣大維度時將使得沒辦法使用此法進行。

edisonx 發表在 痞客邦 PIXNET 留言(0) 人氣()