再論具總量控管之.NET非同步工作排程模型

在之前的文章中(利用SemaphoreSlim類別來進行async/await非同步工作排程),就有針對這個模型進行探討,不過當時的專案在身,在超忙的狀況下寫的不是很齊備,例如我就乾脆把SemaphoreSlim的Quota設定到與TaskQueue等大,如此一來SemaphoreSlim的意義就消失了。這兩天剛好又有碰到這方面的議題,因此把之前的程式碼重新拿出來改善一下,以免造成讀文章的網友的誤解。(其實我的部落格大部分的程式碼都是我自己用於記憶、提示用的,所以要抄之前做好先確定你懂得原理會比較佳,因為我Copy下來改的程式碼我自己會知道哪裡有缺陷,要再多添加程式碼,但是如果你不了解的話,就會踩到雷嘍。)

我要做的事情如上圖所示,在紅色框內,基本上有一堆我們待工作的查詢字串(QueryString)。然後看到橘色框內,我們需要一個工作管理器,來幫助我們正確的管理要產生多少的執行緒來工作,而不是像之前一樣,有多少個要求就生出多少的Task來對應,這樣的做法對於伺服器端是一種很傷資源的作法,在這邊上我們基本上就是請出SemaphoreSlim類別來助攻。

黑色框的角色,是因為所有的執行緒都是寫成非同步的調用方式(async、await),所以必須要有一個管理員來幫我們管理所有的工作是否均已經運行妥當,這時候我們必須請出System.Threading.Tasks.Task.WaitAll、System.Threading.Tasks.Task.WhenAll來協助我們進行管理,如此一來程式碼會比傳統的執行緒式之程式碼複雜度至少少一半以上。

最後,在舊有的版本我們巧妙的使用一個名為oResult的List<string>來管理結果的輸出,在這個版本中,我們改用比較正式的寫法List<System.Threading.Tasks.Task>來分派並收集結果資料,如此一來記憶體的資源也可以再精簡一些。

設計過程的踩到的雷寫在下方,供給日後備忘:

其實要說是雷也太嚴重,大部分是我這個人追求程式碼的精簡化(偷懶)所致,因此需要動用太複雜的Task寫法,都被我丟棄不採用了。

  1. 使用黏巴達Lambda寫closures function時,一定要小心自認為區域變數其實是參考到公域變數的問題,這個問題似乎在C# 5.0已經解決。(reference: Closing over the loop variable considered harmful
  2. 承上,另外執行緒去new出來的closures function,也要小心執行優先權的問題,例如你在閉包內動態引用某個變數(i = 1 to 10),那麼有可能最後「這些」背景執行緒,在最後一刻起執行時,全部一起拿到10這個數字。可是你本來是想要執行緒一拿到1、執行緒二拿到2...之類的。(reference: How can I capture the value of an outer variable inside a lambda expression?
  3. 承上,所以你馬上會想到,那我把閉包寫成可以引入外部參數(Parameter),這時候又踩到雷了,因為System.Threading.Tasks.Task.Run只接受Action跟Func<TResult>,也就是說,沒有參數這種東西喔!
  4. 承上,你可能會想說,那我寫成System.Threading.Tasks.TaskFactory.StartNew的寫法總可以了吧,因為他至少有支援Action<object>外部參數的引入啊!此時又中雷了,你會遇到逆變型(Contravariant)的問題(共變「covariance」與逆變「contravariance」),也就是說你必須先將輸入參數要宣告成總父類別(Object)的型態,進入函式內後在自己轉型。(reference: C# 學習筆記:多執行緒 (6) - TPL
  5. 承上,你終於解決了System.Threading.Tasks.TaskFactory.StartNew的參數引入寫法,再套回來原來的閉包一看,你會發現他X的StartNew在支援async、await的寫法很不好寫,要回到最原始的事件委派(delegate)才不會讓你的系統發生可能的崩潰,也就是你要把你的程式碼變得更複雜了。這部分可以自己查一下TaskFactory.StartNew vs Task.Run,相信你會有所明白。(reference: StartNew is Dangerous
  6. 所以峰迴路轉後,我回到了System.Threading.Tasks.Task.Run,然後把參數的問題利用共用物件來取代,也就是大家看到程式碼中的System.Collections.Generic.List<string> oUrls,這種解法算是不完美中的妥協,沒辦法,我只是不喜歡寫出一個連日後我自己都很難看的懂的程式碼,這不會增加工作效率,只是會浪費自己的生命時間而已。

非同步工作排程模型程式碼

上面屁了那麼多心路歷程,如果有幫到某些一樣跟我身陷苦海的人,那就有一點福報了,哈哈!不囉嗦,程式碼如下:

static void Main(string[] args)
{
  /*** 設定區塊開始 ***/
  //要調用的目標JSON(網路免費資源)
  System.Collections.Generic.List<string> oUrls = new System.Collections.Generic.List<string>()
  {
    "http://jsonplaceholder.typicode.com/users/1",
    "http://jsonplaceholder.typicode.com/users/2",
    "http://jsonplaceholder.typicode.com/users/3",
    "http://jsonplaceholder.typicode.com/users/4",
    "http://jsonplaceholder.typicode.com/users/5-ErrorTest",
    "http://jsonplaceholder.typicode.com/users/6",
    "http://jsonplaceholder.typicode.com/users/7",
    "http://jsonplaceholder.typicode.com/users/8",
    "http://jsonplaceholder.typicode.com/users/9",
    "http://jsonplaceholder.typicode.com/users/10-ErrorTest"
  };
  //定義每次運行的執行緒數量
  int iTaskPoolQuota = 2;
  /*** 設定區塊結束 ***/

  //宣告執行緒數量監控器
  System.Threading.SemaphoreSlim oTaskControl = new System.Threading.SemaphoreSlim(iTaskPoolQuota);
  //宣告執行緒集合
  System.Collections.Generic.List<System.Threading.Tasks.Task<string>> oTasks = new System.Collections.Generic.List<System.Threading.Tasks.Task<string>>();
  //因為執行緒在運行的過程中,會不斷的影響oUrls的內容,因此將迴圈的中止值先抄寫出來
  int iTotalCount = oUrls.Count;
  //將工作加到執行緒集合內
  for (int i = 0; i < iTotalCount; i++)
  {
    oTasks.Add(
      System.Threading.Tasks.Task.Run<string>(async () =>
      {
        /* 執行緒控制器鎖定 */
        await oTaskControl.WaitAsync();

        /*** 執行緒詳細運作開始 ***/
        //設定一個區域變數
        string cLocalUrl;
        //鎖定取用資源,以防止執行緒交互汙染變數
        lock (oUrls)
        {
          try
          {
            cLocalUrl = oUrls[0];
            oUrls.RemoveAt(0);
          }
          catch (System.Exception ex)
          {
            cLocalUrl = $"{{\"ErrorMessage\": \"{ex.Message}\"}}";
          }
        }
        //如果在取用的過程中沒有發生錯誤,且URL是空值,那就記錄特別資訊
        if (cLocalUrl.Length == 0) { cLocalUrl = @"{""ErrorMessage"": ""Null URL string""}"; }
        //開始利用URL到伺服器端抓取JSON資料
        if (cLocalUrl.IndexOf("ErrorMessage") == -1)
        {
          using (System.Net.WebClient oWC = new System.Net.WebClient())
          {
            try
            { cLocalUrl = await oWC.DownloadStringTaskAsync(cLocalUrl); }
            catch (System.Exception ex)
            { cLocalUrl = $"{{\"ErrorMessage\": \"查詢「{cLocalUrl}」時,發生{ex.Message}\"}}"; }
          }
        }
        /*** 執行緒詳細運作結束 ***/

        /* 執行緒控制器鎖定 */
        oTaskControl.Release();
        //回傳資料
        return cLocalUrl;
      })
    );
  }
  //後續列舉結果作業
  WriteLine("1. 工作準備妥當,開始分批派送運算。");
  System.Threading.Tasks.Task.WaitAll(oTasks.ToArray());
  //如果需要進行工作型的回傳(用一個工作來管理所有工作的結果回傳集),可以考慮改用這種寫法
  //System.Threading.Tasks.Task<string[]> oTotalTaskResult = System.Threading.Tasks.Task.WhenAll<string>(oTasks);
  WriteLine("2. 工作運算完畢,開始列舉資料。");
  foreach (var item in oTasks)
  { WriteLine(item.Result); }
  WriteLine("3. 列舉資料完畢。");
  ReadKey();
}

※ 最後在點一下,如果你要上正式用途,最好要考慮到多執行緒交雜的問題,例如你把這個模型寫成一個類別,那麼最好考慮一下lock的設計是否貼切,衍生的執行緒存取oUrls的權限範圍問題,oUrls的生命週期問題(例如是否拉出去外面static化)等,都是需要被討論的點。

相關參考:

System.Threading.SemaphoreSlim System.Threading.Tasks.Task.Run System.Threading.Tasks.TaskFactory.StartNew System.Threading.Tasks.Task.WaitAll System.Threading.Tasks.Task.WhenAll