情境如下:原本 API 由資料庫查詢資料,現今要整合由第三方 API 取得的資料,將二者合併成一個資料陣列傳回呼叫端。最直白的做法是查資料庫得到 List<T> list,呼叫 WebAPI 也取得 List<T> extraData,list.AddRange(extraData)。

不過,系統希望降低第三方服務品質造成的衝擊,例如:避免第三方 API 執行過慢讓原本很快的查詢等到地老天荒,或是因為第三方 API 故障導致連資料庫查詢結果都看不到。簡單來說,在第三方 API 查詢過慢或異常時,就放棄顯示來自第三方資料,盡快顯示資料庫查詢,至少維持改版前的服務水準。

我想到的策略是呼叫第三方 API 時設定一個較短時限,一旦逾時就放棄查詢只回傳資料庫查詢結果;若第三方 API 出錯,也一樣採取棄卒保帥的策略。而這兩種情況發生時,我也不希望使用者渾然無知,打算在結果插入一筆假資料提供警示(好處是前端完全不用修改),告知資料可能短少。(註:資料類似待辦工作性質,清單項目各自獨立,缺資料不會導致資訊錯亂)

原本要徒手寫 Code,但很快想起去年學到處理 Deadlock、網路瞬斷、伺服器忙線等暫時性故障的利器,去吧! Polly,就決定是你了。

這是之前沒試過的花式應用,以下是我學到的新技巧:

  1. 用 Timeout 限定 5 秒逾時。TimeoutStrategy 有分 Optimistic(樂觀) 跟 Pessimistic(悲觀) 兩種,前者會補捉 OperationCanceledException 跟 CancellationTokenSource.IsCancellationRequested 判定超時,後者則是時間到就直接報錯。範例程式用 TimeoutStrategy.Pessimistic 讓程式碼簡單一點,樂觀逾時的實作細節可參考開發人員不可缺少的重試處理利器 Polly by 余小章
  2. 我希望 Timeout 或出錯時也要傳回一筆假資料提供警示訊息,這可以透過 Polly<ReturnType>.Handle<ExceptionType>().Fallback(() => Some_ResultType) 實現。
  3. 規劃的處理流程是,設定 Timeout Policy (timeoutPolicy),用 Fallback Policy 攔截 TimeoutRejectedException (逾時會拋出的例外物件) 回傳 Timeout 警示 (timeoutFallbackPolicy),若有其他錯誤,再由最外層的 Policy 攔截所有 Exception 回傳 API 失敗警示 (fallbackPolicy)。要組合這些 Policy 的寫法是 fallbackPolicy.Wrap(timeoutFallbackPolicy).Wrap(timeoutPolicy) (由外包內) 得到 PolicyWrap 物件,再用它 .Execute() 執行作業。
  4. 由於第三方 API 可能有多個,我用 Parallel.ForEach() 多執行緒同步執行,提高效能。

完成的第一個版本如下:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Polly;
using System.Text.Json;

namespace PollyDemo
{
    class Program
    {
        static void Main(string[] args)
        {
            var data = QueryData();
            var options = new JsonSerializerOptions {
                WriteIndented = true
            };
            Console.WriteLine(JsonSerializer.Serialize(data, options));
        }
        public class Entry
        {
            public Guid Id { get; set; }
            public string Subject { get; set; }
        }
        static IEnumerable<Entry> Call3rdApi(string srcName, int delayTime)
        {
            Task.Delay(delayTime * 1000).Wait();
            return Enumerable.Range(1, 2).Select(o =>
                 new Entry
                 {
                     Id = Guid.NewGuid(),
                     Subject = $"Data from ExtraData[{srcName}] {DateTime.Now.Ticks % 100000:00000}"
                 });
        }
        static Dictionary<string, Func<IEnumerable<Entry>>> extDataJobs = 
            new Dictionary<string, Func<IEnumerable<Entry>>>
            {
                ["SrcA"] = () => Call3rdApi("A", 3),
                ["SrcB"] = () => Call3rdApi("B", 8),
                ["SrcC"] = () => { throw new ApplicationException("Error"); }
            };
        static IEnumerable<Entry> QueryData()
        {
            var list = new List<Entry>();
            //database query simulation
            list.Add(new Entry
            {
                Id = Guid.NewGuid(),
                Subject = "Data from local service"
            });

            Parallel.ForEach(extDataJobs.Keys, (src) =>
            {
                var timeoutPolicy = 
                    Policy.Timeout(TimeSpan.FromSeconds(5), Polly.Timeout.TimeoutStrategy.Pessimistic);
                var timeoutFallbackPolicy = Policy<IEnumerable<Entry>>
                    .Handle<Polly.Timeout.TimeoutRejectedException>()
                    .Fallback(() =>
                        new List<Entry>() {
                            new Entry
                            {
                                Id = Guid.NewGuid(),
                                Subject = $"Warning: [{src}] API timeout"
                            }
                    });
                var fallbackPolicy = Policy<IEnumerable<Entry>>.Handle<Exception>().Fallback(() =>
                {
                    return new List<Entry>() {
                        new Entry
                        {
                            Id = Guid.NewGuid(),
                            Subject = $"Warning: [{src}] API failed"
                        }
                    };
                });
                
                var policyWrap = fallbackPolicy.Wrap(timeoutFallbackPolicy).Wrap(timeoutPolicy);

                var extData = policyWrap.Execute(() =>
                {
                    return extDataJobs[src]();
                });
                lock (list)
                {
                    list.AddRange(extData);
                }
            });

            return list;
        }

    }
}

我設定執行超過 5 秒即為逾,模擬呼叫 API 三次分別耗時 3 秒、8 秒及故意拋出錯誤,8 秒及出錯的呼叫變成一筆訊息資料夾在結果,測試成功。

Parllel.ForEach() 迴圈每次重新建立 Policy 有點浪費資源,另一方面,若將來要使用 CircuitBreaker (累計出錯幾次先熔斷一段時間),必須共用或重複使用 Policy 物件,上面的寫法借用了 Closure 特性,在 Fallback 中直接取用 src 變數,如果要共用,src 就必須以參數方式傳入。這裡又學到一個技巧,透過 contextData 在 Policy 間傳遞參數。做法是 Execute() 時加入 context 參數並將 src 存入 Dictionary<string, object> 建立 contextData 物件,policy.Execute((context) => , , contextData: new Dictionary<string, object> { ["Src"] = src });兩個 Fallback 則接收 context,由 context["Src"] 取出 src,另外宣告一個空的 onFallback: (ex, ctx) => ) 參數是為了吻合 Fallback 多載 (Overloading) 要求的參數數量與型別。

修改版如下:

static void Main(string[] args)
{
    PreparePolicy();
    var data = QueryData();
    var options = new JsonSerializerOptions
    {
        WriteIndented = true
    };
    Console.WriteLine(JsonSerializer.Serialize(data, options));
}

static Polly.Wrap.PolicyWrap<IEnumerable<Entry>> policy = null;
static void PreparePolicy()
{
    var timeoutPolicy =
        Policy.Timeout(TimeSpan.FromSeconds(5), Polly.Timeout.TimeoutStrategy.Pessimistic);
    var timeoutFallbackPolicy = Policy<IEnumerable<Entry>>
        .Handle<Polly.Timeout.TimeoutRejectedException>()
        .Fallback((context) =>
            new List<Entry>() {
                        new Entry
                        {
                            Id = Guid.NewGuid(),
                            Subject = $"Warning: [{context["Src"]}] API timeout"
                        }
            }, onFallback: (ex, ctx) => { });
    var fallbackPolicy = Policy<IEnumerable<Entry>>.Handle<Exception>()
        .Fallback((context) =>
            new List<Entry>() {
                        new Entry
                        {
                            Id = Guid.NewGuid(),
                            Subject = $"Warning: [{context["Src"]}] API failed"
                        }
            }, onFallback: (ex, ctx) => { });

    policy = fallbackPolicy.Wrap(timeoutFallbackPolicy).Wrap(timeoutPolicy);
}

static IEnumerable<Entry> QueryData()
{
    var list = new List<Entry>();
    //database query simulation
    list.Add(new Entry
    {
        Id = Guid.NewGuid(),
        Subject = "Data from local service"
    });

    Parallel.ForEach(extDataJobs.Keys, (src) =>
    {
        var extData = policy.Execute((context) =>
        {
            return extDataJobs[src]();
        }, contextData: new Dictionary<string, object>
        {
            ["Src"] = src
        });
        lock (list)
        {
            list.AddRange(extData);
        }
    });

    return list;
}

範例程式已放在 Github,有需要的同學請自取參考。

Using Polly to combine multi-source data query results with timeout and fault-tolerance.


Comments

Be the first to post a comment

Post a comment