Preface
這裡探討的只是簡單,但可達到不錯效果的方法 (但還是很慢 )。以下之探討對於較大型之矩陣才有意義,小型之矩陣執行起來差沒多少。另本文並不會針對 cache hit 、open mp 做分析介紹,講的方法都屬較簡單方式,沒涉及演算法部份。若對效率沒太大要求,Basic 看完便行。若對效率有較高之要求,還是去找 library 吧。
Basic
先放上一段初學者完整的 code。
- /**
- * @file mat_mul_sample.c
- * @brief sample of matrix multiple
- * @author EdisonX < Edison.Shih >
- * @date 2012.03.07
- */
- #include <stdio.h>
- #include <string.h>
- #include <stdlib.h>
- /*
- * @brief allocate matrix space
- * @param[in] row #row of matrix
- * @param[in] col #col of matrix
- * @return double** allocate heap
- */
- double** mat_new(const size_t row, const size_t col)
- {
- size_t i;
- double *ptr;
- double **mat = (double**)malloc(
- sizeof(double) * row * col + sizeof(double*) * row);
- if(mat==NULL) return NULL;
- ptr = (double*)( (char*)mat + sizeof(double*)*row );
- for(i=0; i<row; ++i)
- mat[i] = ptr, ptr+=col;
- return mat;
- }
- /*
- * @brief display matrix, has no return value
- * @param[in] mat matrix which want to display
- * @param[in] row #row of matrix
- * @param[in] col #col of matrix
- */
- void mat_print(double** mat,
- const size_t row, const size_t col)
- {
- size_t i, j;
- for(i=0; i<row; ++i){
- for(j=0; j<col; ++j)
- printf("%5.0lf ", mat[i][j]);
- puts("");
- }
- }
- /*
- * @brief matrix multiple.
- * @param[out] rst multiple result of matrix a and matrix b, rst[m][p]
- * @param[in] a first matrix, a[m][n]
- * @param[in] b second matrix, b[n][p]
- * @param[in] m row count of matrix a
- * @param[in] n col count of matrix a ( equation to row count of matrix b)
- * @param[in] p col count of matrix b
- * @return int success or fail
- * @retval 0 fail
- * @retval 1 success
- */
- int mat_mul(
- double** rst, double** a, double** b,
- const size_t m, const size_t n, const size_t p)
- {
- size_t i, j, k;
- double sum=0.0;
- if(rst==NULL || a==NULL || b==NULL) return 0;
- if(m==0 || n==0 || p==0) return 0;
- for(i=0; i<m; ++i)
- for(j=0; j<p; ++j){
- sum=0.0;
- for(k=0; k<n; ++k)
- sum+=a[i][k]*b[k][j];
- rst[i][j] = sum;
- }
- return 1;
- }
- int main()
- {
- unsigned seed = 0;
- size_t i, j;
- const size_t M = 3, N=4, P=3;
- double **m1 = mat_new(M,N);
- double **m2 = mat_new(N,P);
- double **r = mat_new(M,P);
- srand(seed);
- // generate matrix m1 and m2
- for(i=0; i<M; ++i) for(j=0; j<N; ++j)
- m1[i][j] = 1.0 + rand() % 10;
- for(i=0; i<N; ++i) for(j=0; j<P; ++j)
- m2[i][j] = 1.0 + rand() % 10;
- // calculate matrix m3
- mat_mul(r, m1, m2, M, N, P);
- // display result
- puts("\n[ M1 ]"), mat_print(m1, M, N);
- puts("\n[ M2 ]"), mat_print(m2, N, P);
- puts("\n[ Rst ]"), mat_print(r, M, P);
- free(*m1), free(m1);
- free(*m2), free(m2);
- free(*r), free(r);
- return 0;
- }
執行結果參考
[ M1 ]
9 10 9 8
6 8 6 6
1 3 4 1
[ M2 ]
3 2 8
2 6 6
8 1 7
2 6 7
[ Rst ]
135 135 251
94 102 180
43 30 61
Cache Hit - 修改 Index 順序
上述寫的 mat_mul 是一般教學上常見之撰寫方式,速度算較慢,有一種探討是 針對 loop index , i, j, k 順序做調整,cache hit 會有所不同,所以乘法瞬間變三種寫法。在不考慮例外處理,核心碼大致如下 。
( cache hit 次高 )
- void mat_mul_1(
- double** rst, double** a, double** b,
- size_t m, size_t n, size_t p)
- {
- size_t i, j, k;
- double sum=0.0;
- for(i=0; i<m; ++i)
- for(j=0; j<p; ++j){
- sum=0.0;
- for(k=0; k<n; ++k)
- sum+=a[i][k]*b[k][j];
- rst[i][j] = sum;
- }
- }
( cache hit 最高 )
- void mat_mul_2(
- double** rst, double** a, double** b,
- size_t m, size_t n, size_t p)
- {
- size_t i, j, k;
- double r;
- for(i=0; i<m; ++i) for(j=0; j<p; ++j) rst[i][j]=0.0;
- for(k=0; k<n; ++k){
- for(i=0; i<m; ++i){
- r = a[i][k];
- for(j=0; j<p; ++j)
- rst[i][j] += r * b[k][j];
- }
- }
- }
( cache hit 最低 )
- void mat_mul_3(
- double** rst, double** a, double** b,
- size_t m, size_t n, size_t p)
- {
- size_t i, j, k;
- double r;
- for(i=0; i<m; ++i) for(j=0; j<p; ++j) rst[i][j]=0.0;
- for(j=0; j<p; ++j){
- for(k=0; k<n; ++k){
- r = b[k][j];
- for(i=0; i<m; ++i)
- rst[i][j] += r * a[i][k];
- }
- }
- }
Cache Hit - Block
既然 CPU 之 cache 有限,那一次就只做一個 Block 就好,於是限定每次 matrix 讀取之數量為固定。我們以上述 cache hit 最高的程式碼作為例子。
- void mat_mul_block(
- double** rst, double** a, double** b,
- int m, int n, int p, int BLOCK)
- {
- int i, j, k, ii, jj, kk;
- double r;
- memset((void*)(*rst), 0, sizeof(double)*m*p);
- for(kk=0; kk<n; kk+=BLOCK)
- for(ii=0; ii<m; ii+=BLOCK)
- for(jj=0; jj<p; jj+=BLOCK)
- for(k=kk; k<kk+BLOCK && k<n; ++k)
- for(i=ii; i<ii+BLOCK && i<m; ++i) {
- r = a[i][k];
- for(j=jj; j<jj+BLOCK && j<p; ++j)
- rst[i][j]+=r*b[k][j];
- }
- }
上面這段 code 有兩點可以討論。一是內層的三個回圈,另一種 coding style 是將拆成另一個 function 呼叫,一般人會覺得執行會較浪費時間 ( 因呼叫的次數約是 (N / BLOCK)3 ),但實際上要寫多行緒時反而拆開來寫較方便。另一個是筆者在內層三個 for loop 裡實際的 condition 不是用 k < KK + BLOCK && k < n ,而是寫 ek = MIN(kk+BLOCK, n),condition 變成 k < ek,這些細微的差別有興趣自己試試 (設立 ek 會比其他兩種寫法稍快無誤 )。
BLOCK 是控制一次抓取之資料量,但問題是 BLOCK 要設多大?
一種 理論上( 強調理論是因為實作後比這算法還大些,約 4~8 倍 ) 的算法是 cache line size / sizeof( double )。但要怎麼抓 cache line size ? Windows 下可查這個 API: GetLogicalProcessorInformation,裡面得到的資訊有 CACHE_DESCRIPTOR,MSDN 裡面也有範例。
故以上述算法而言,筆者手邊硬體測出 cache line size = 64,除上 sizeof(double) = 8,最佳之 BLOCK SIZE 應該要等於 8 。強調的是,算法不只這種,也還有其他文獻提出不同算法。
Cache Hit - Transpose
事實上,上述講的方法,都沒 Transpose 來得直接。 直接將第二個 matrix 做轉置動作,再相乘,轉置之動作速度與前面的差很多,有興趣可實測。
- void mat_mul_trans_block(
- double** rst, double** a, double** b,
- int m, int n, int p, int BLOCK)
- {
- int ii, jj, kk, i, j, k;
- double sum, **transb = mat_new(p,n);
- if(transb==NULL) {
- puts("mat_mul_t allocate fail");
- return ;
- }
- // get transpose of matrix b
- for(i=0; i<n; ++i) for(j=0; j<p; ++j)
- transb[j][i] = b[i][j];
- memset((void*)(*rst), 0, sizeof(double)*m*p);
- for(ii=0; ii<m; ii+=BLOCK)
- for(jj=0; jj<p; jj+=BLOCK)
- for(kk=0; kk<n; kk+=BLOCK)
- for(i=ii; i<ii+BLOCK && i<m; ++i)
- for(j=jj; j<jj+BLOCK && j<p; ++j) {
- sum=0.0;
- for(k=kk; k<kk+BLOCK && k<n; ++k)
- sum+=a[i][k]*transb[j][k];
- rst[i][j]+=sum;
- }
- free(transb);
- }
除了和前面兩個地方 ( 再包一個副函式、終止條件問題 ) 外,使用轉置方式必須要再配置 p * m 大小之 matrix,故若矩陣大維度時將使得沒辦法使用此法進行。
留言列表