使用非同步處理提升資料庫更新速度

來自同事的資料庫程式效能調校案例一則。

情境為一支同步來源及目的資料表的排程,先一次取回來源及目的資料表,逐一檢查資料是否已存在目的資料表,若不存在即執行Insert,若存在則執行 Update 更新欄位。因 Insert/Update 之前需進行特定轉換,故難以改寫為 Stored Procedure。排程有執行過慢問題,處理四萬筆資料耗時近 27 分鐘。

程式示意如下:

foreach (var src in srcList)
{
    try
    {
        var target = findExistingData(src);
        if (target == null)
        {
            AddTargetToDB(src);
        }
        else
        {
            UpdateTargetToDB(target, src);
        }
    }
    catch (Exception e)
    {
        LogError(e);
    }
}

同事加入多執行緒平行處理,改寫為 Parallel.ForEach 版本如下,很神奇地把時間縮短到 5 分鐘內完成!

var count = 0;
Parallel.ForEach(srcList, () => 0, (src, state, subtotal) =>
{
    try
    {
        var target = FindExistingData(src);
        if (target == null)
        {
            return AddTargetToDB(src);
        }
        else
        {
            return UpdateTargetToDB(target, src);
        }
    }
    catch (Exception e)
    {
        LogError(e);
        return 0;
    }
},
rowsEffected =>
{
    Interlocked.Add(ref count, rowsEffected);
});

加入平行處理可加速在預期之內,高達五倍的效能提升卻讓我大吃一驚!我原本預期,四萬次 Insert 或 Update 操作大批進入應該在資料庫端也會形成瓶頸,例如:若 Insert 或 Update 涉及 Unique Index,資料庫端需依賴鎖定機制防止資料重複,即使同時送入多個執行指令,進了資料庫還是得排隊執行。

仔細分析,此案例靠多核平行運算能產生的效益有限,效能提升主要來自節省網路傳輪的等待時間。為此,我設計了一個實驗:建主一個包含 12 個欄位的資料表,4 個 VARCHAR(16)、4 個 INT、4 個 DATETIME,使用以下程式測試用 foreach 及 Parallel.ForEach 分別執行 1024, 2048, 4096, 8192 筆資料的新增與更新並記錄時間,Parallel.ForEach 部分則加入同時執行的最大執行緒數目統計:

using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Dapper;
using System.Diagnostics;
using System.Threading;
 
namespace BatchInsert
{
    class Program
    {
        static string cs = "連線字串";
        static string truncCommand = @"TRUNCATE TABLE Sample";
        static string insertCommand = @"
INSERT INTO Sample (T1,T2,T3,T4,N1,N2,N3,N4,D1,D2,D3,D4) 
VALUES (@T1,@T2,@T3,@T4,@N1,@N2,@N3,@N4,@D1,@D2,@D3,@D4)";
        static string updateCommand = @"
UPDATE [dbo].[Sample]
   SET [T2] = @T2, [T3] = @T3, [T4] = @T4
      ,[N1] = @N1, [N2] = @N2, [N3] = @N3, [N4] = @N4
      ,[D1] = @D1, [D2] = @D2, [D3] = @D3, [D4] = @D4
 WHERE T1 = @T1";
        static void Main(string[] args)
        {
            Test(1024);
            Test(2048);
            Test(4096);
            Test(8192);
            Console.Read();
        }
 
        static void Test(int count)
        {
            List<DynamicParameters> data = new List<DynamicParameters>();
            for (var i = 0; i < count; i++)
            {
                var d = new DynamicParameters();
                d.Add("T1", $"A{i:0000}", System.Data.DbType.String);
                d.Add("T2", $"B{i:0000}", System.Data.DbType.String);
                d.Add("T3", $"C{i:0000}", System.Data.DbType.String);
                d.Add("T4", $"D{i:0000}", System.Data.DbType.String);
                d.Add("N1", i, System.Data.DbType.Int32);
                d.Add("N2", i, System.Data.DbType.Int32);
                d.Add("N3", i, System.Data.DbType.Int32);
                d.Add("N4", i, System.Data.DbType.Int32);
                d.Add("D1", DateTime.Today.AddDays(i));
                d.Add("D2", DateTime.Today.AddDays(i));
                d.Add("D3", DateTime.Today.AddDays(i));
                d.Add("D4", DateTime.Today.AddDays(i));
                data.Add(d);
            }
            TestDbExecute(data, true, false);
            TestDbExecute(data, true, true);
            TestDbExecute(data, false, false);
            TestDbExecute(data, false, true);
        }
 
 
        static object sync = new object();
 
        static void TestDbExecute(List<DynamicParameters> data, 
            bool insert, bool parallel)
        {
            string cmdText = insert ? insertCommand : updateCommand;
            using (SqlConnection cn = new SqlConnection(cs))
            {
                Stopwatch sw = new Stopwatch();
                cn.Execute(truncCommand);
                sw.Start();
                if (!parallel)
                {
                    foreach (var d in data)
                    {
                        cn.Execute(cmdText, d);
                    }
                }
                else
                {
                    int threadCount = 0;
                    int maxThreadCount = 0;
                    Parallel.ForEach(data, (d) =>
                    {
                        lock (sync)
                        {
                            threadCount++;
                            if (threadCount > maxThreadCount)
                                maxThreadCount = threadCount;
                        }
                        using (var cnx = new SqlConnection(cs))
                        {
                            cnx.ExecuteReader(cmdText, d);
                        }
 
                        Interlocked.Decrement(ref threadCount);
                    });
                    Console.WriteLine("[MaxThreads={0}]", maxThreadCount);
                }
                sw.Stop();
                Console.Write("{0} {1}  {2}: {3:n0}ms\n",
                    data.Count, parallel ? "Parallel" : "Loop", 
                    insert ? "Insert": "Update", sw.ElapsedMilliseconds);
            }
        }
    }
}

找了一台內網的遠端 SQL 資料庫進行測試,從 1024 到 8192 四種筆數,使用 Parallel.ForEach 都節省近一半時間,成效卓著:

1024 Loop Insert: 8,372ms
[MaxThreads=10]
1024 Parallel Insert: 4,668ms
1024 Loop Update: 8,737ms
[MaxThreads=11]
1024 Parallel Update: 4,620ms

2048 Loop Insert: 16,665ms
[MaxThreads=14]
2048 Parallel Insert: 8,358ms
2048 Loop Update: 16,545ms
[MaxThreads=12]
2048 Parallel Update: 8,538ms

4096 Loop Insert: 36,444ms
[MaxThreads=22]
4096 Parallel Insert: 17,925ms
4096 Loop Update: 33,724ms
[MaxThreads=22]
4096 Parallel Update: 17,427ms

8192 Loop Insert: 67,885ms
[MaxThreads=31]
8192 Parallel Insert: 35,011ms
8192 Loop Update: 65,761ms
[MaxThreads=27]
8192 Parallel Update: 34,819ms

接著我改連本機資料庫執行相同測試,這一回加速效果很不明顯,甚至出現 Parallel.ForEach 比 foreach 迴圈還慢的狀況:

1024 Loop  Insert: 5,073ms
[MaxThreads=10]
1024 Parallel Insert: 4,772ms
1024 Loop Update: 4,342ms
[MaxThreads=10]
1024 Parallel Update: 4,457ms

2048 Loop Insert: 8,144ms
[MaxThreads=11]
2048 Parallel Insert: 8,672ms
2048 Loop Update: 8,540ms
[MaxThreads=12]
2048 Parallel Update: 8,659ms

4096 Loop Insert: 17,477ms
[MaxThreads=22]
4096 Parallel Insert: 17,860ms
4096 Loop  Update: 18,089ms
[MaxThreads=22]
4096 Parallel Update: 17,629ms

8192 Loop Insert: 33,393ms
[MaxThreads=30]
8192 Parallel Insert: 35,364ms
8192 Loop Update: 35,869ms
[MaxThreads=39]
8192 Parallel  Update: 36,817ms

比較上述兩組結果,Parallel.ForEach 更新遠端資料庫的時間與更新本端資料庫的時間相近,逼近資料庫的極限,可解釋為藉由平行處理排除網站傳輸因素後,遠端資料庫的效能表現趨近本機資料庫。平行處理的加速效應只出現在連線遠端資料庫,用在本機資料庫反而有負面影響,也能研判效能提升主要來自節省網路傳輸等待時間。

【結論】

在對遠端執行大量批次更新時,使用 Parallel.ForEach 確實能藉著忽略網路傳輸等待縮短總執行時間,在網路傳輸愈慢的環境效益愈明顯。既然效能提升來自避免等待,改用 ExecuteNonQueryAsync 應該也能產生類似效果,但程式寫法比 Parallel.ForEach 曲折些。這類做法本質偏向暴力破解,形同對資料庫的壓力測試,若條件許可,可考慮改用 BULK INSERTTVP 等更有效率的策略。

2017-02-10 補充:有不少網友提到 MERGE。依理而論,大量資料處理直接在 Procedure 做掉才是王道,即便不用 MERGE,在 Procedure 裡跑迴圈用 IF 實現也比拉回 C# 處理寫回效率高很多(網路傳輸及連線成本全免)。這個案例在 AddTargetToDb() 與UpdateTargeteToDb() 中有段欄位運算挺複雜,不易用 T-SQL 實現,才決定寫成排程程式(也算是逃避難題啦)。遇到資料量更大的情境(例如:百萬筆)就沒得選擇了,絞盡腦汁也得把同步運算邏輯轉成 T-SQL(或SQLCLR),寫成 Procedure 是唯一解。另一方面,將資料拉回 C# 端比對處理後再寫回 DB,缺乏 Transaction 保護就必須承受讀取到寫入這段期間資料被其他來源修改的風險,但耗時數分鐘的遠端作業加上 Transaction 的成本不容小覤。在本案中無此風險才得以使用此法,否則將戰場移回 DB 端才是較可接受的解決方案。在此感謝網友 ChingYuan, Shih、otaku119、LienFa Huang 的回饋與補充。

【小插曲】

程式改寫 Parallel 版時,由於非同步執行進度不易掌握,使透過統計 ExecuteNoQuery() 傳回受影響筆數方式確認 Insert/Update 筆數無誤。原本預期不管是新增或修改,每次變更筆數都應該為 1,萬萬沒想到統計總數卻超過總資料筆數,貌似改為平行處理後執行結果不同,引發驚慌。

深入調查才發現:目的資料表掛有 Trigger, 在特定情況會連動其他資料表的資料,造成更新一筆但受影響筆數大於 1(要加上 Trigger 所異動的資料筆數)。 最後修改程式,改由受影響筆數 >0 判定是否執行成功,計數則一律+1,化解一場虛驚。

歡迎推文分享:
Published 09 February 2017 08:49 PM by Jeffrey
Filed under: ,
Views: 11,779



Comments

# oaww said on 09 February, 2017 07:58 AM

其實我一直想問平行+transaction是有機會的嗎?@@

# ChingYuan, Shih said on 09 February, 2017 11:02 AM

請教暗黑大

有試過 T-SQL 的 MERGE 指令嗎?

msdn.microsoft.com/.../bb510625.aspx

比起 c# 程序型的逐筆比對檢查

或許一口氣 MERGE 做掉 insert/update/delete 會更猛

對 DB 的瞬間壓力 cpu/connection 也不會太大

感謝您的分享~

# Jeffrey said on 09 February, 2017 04:25 PM

to oaww,不太確定平行加Transaction的情境,可否再提供詳細一點的闡述?

# Jeffrey said on 09 February, 2017 04:36 PM

to ChingYuan, Shih, 大量資料處理,直接在 Procedure 裡做掉的確是首選,即便不用 MERGE,在 Procedure 裡跑迴圈用 IF 實現也比拉回 C# 處理寫回效率高很多(網路傳輸及連線成本全免)。這個案例在 AddTargetToDb() 與UpdateTargeteToDb() 中有段欄位運算挺複雜,不易用 T-SQL 實現,才決定寫成排程程式(也算是逃避難題啦)。遇到資料量更大的情境(例如:百萬筆)就沒得選擇了,絞盡腦汁也得把同步運算邏輯轉成 T-SQL(或SQLCLR),寫成 Procedure 是唯一解。感謝你的回饋~

# otaku119 said on 09 February, 2017 08:03 PM

請示黑暗版主,想請版主指導一二:

如果依照版主作法,似乎都是透過SQL來做IO

特別一開始的程式碼,在我斷章取義的情況下,會認為有sync的資料污染問題

因為無法丟到store procedure處理,為何不考慮用EF(或者NHibernate)?還是版主經過測試發現EF效能太差?

煩請開示....

特別是

# Alex said on 09 February, 2017 08:18 PM

原來有這招

我以前都是先 Delete 在 bulk insert

# Jeffrey said on 09 February, 2017 09:16 PM

to otaku119,我不是很理解「sync 資料污染問題」,可否再提供多一些解釋?為什麼不用 EF?用 EF 的好處在於強型別及簡便省力,缺點在於產生的 SQL 指令無法完全掌控。

這個應用案例頗單純,強型別的好處不大,就沒想到動用 EF。

我自己的習慣,EF 主要用於制式化新增修改刪除(CRUD)程式的量產,複雜查詢則用 SQL Dapper,自己寫 SQL 指令較有踏實感。當你期許自己是專業 F1 賽車手,當然要開手排車才能超越巔峰;但如果你無心鑽研開車技術,自排車則更有效率、表現穩定且安全。

# otaku119 said on 09 February, 2017 10:49 PM

TO:暗黑版主

sync 資料污染問題,個人解釋為:

當Prod環境,可能有多人在同時存取同一個table

如果程式剛好判斷資料不存在,準備insert時,剛好也有其他資料已經先insert資料,就會造成資料新增異常(或者說資料更新污染)

此解釋是我個人看程式碼斷章取義的感想。煩請版主閱後焚燬

(網站能有個delete功能不?)

至於SQL與EF的用法比較,用兩邊極端的例子來證明自己選用的技術比較好,已經流於派系之爭,不應讓此文章原本焦點模糊

但,個人還是想要離題及模糊一下焦點:

畢竟現在都是喊MVC架構下,如果用SQL指令,或者用Porcedure來處理商業邏輯,考量資料處理速度的確是上選,但是反過來說,以後接手的人是否也要考慮手排車的進檔與退檔(有沒有可能來個高速行駛手排切R檔?!)

# Jeffrey said on 10 February, 2017 03:09 AM

to otaku119, 感謝補充解說。你講得沒錯,取出資料處理再寫回,的確要承擔此期間資料庫內容有異動的風險,如要避免得靠Transaction。此案例中,排程程式是唯一有權限更動目的資料表的,才允許以此方式進行,否則會有大問題。感謝提醒,已補充於本文。

EF or SQL 的確是取捨,端賴專案重視什麼,若降低找人接手的門檻比較重要,用 EF 顯然有利,還可避免生手亂寫 SQL 釀禍。若系統常因查詢效能不彰被電到飛高高,則重金培養賽車手甚至挖來舒馬克就變成不得不的選擇。即然沒有一種策略可以適用所有情境,知道各家優劣即可,的確不值得搞成派系之爭。:P

Leave a Comment

(required) 
(required) 
(optional)
(required) 
(提醒: 因快取機制,您的留言幾分鐘後才會顯示在網站,請耐心稍候)

5 + 3 =

Search

Go

<February 2017>
SunMonTueWedThuFriSat
2930311234
567891011
12131415161718
19202122232425
2627281234
567891011
 
RSS
創用 CC 授權條款
【廣告】
twMVC
最新回應

Tags 分類檢視
關於作者

一個醉心技術又酷愛分享的Coding魔人,十年的IT職場生涯,寫過系統、管過專案, 也帶過團隊,最後還是無怨無悔地選擇了技術鑽研這條路,近年來則以做一個"有為的中年人"自許。

文章典藏
其他功能

This Blog


Syndication