隨時代演進,.NET 的 API 介面標準從 Remoting、Web Service、WCF 轉向 Web API,近幾年,主打 HTTP/2 (傳輸效率大勝 HTTP 1.1)、Protobuf 格式(輕量級二進位序列化,遠比 JSON 精簡有效率)、支援雙向傳輸、跨語言相容的 gRPC 異軍突起,成為效能優先 API 的開發首選,而從 .NET Core 3 開始,gRPC 還成為官方指定的 WCF 接班人。參考:為何我們為 WCF 開發人員建議 gRPC

gRPC 與 WebAPI 比較表 來源

抱著沒吃過豬肉也要看看豬走路的心情,我打算挑個題材練習,之前寫過 ASP.NET Core WebSocket 聊天室,這回就利用 gRPC 的雙向傳輸特性,寫個 gRPC 聊天室吧。

不得不說,微軟官方文件的 gRPC 教學寫得真好,跟著做完範例差不多算入門了。這裡跳過細節,整理重點:

  1. gRPC 分為伺服器端跟客戶端,伺服器有專屬的 grpc 專案型別(用dotnet new grpc建立專案),客戶端則用一般專案加入 NuGet Package: Grpc.Net.Client、Google.Protobuf、Grpc.Tools
  2. 在 Protos\svcName.proto 檔案定義 Protobuf 介面,.proto IDL 語法有點小複雜,但能支援集合、列舉、多重型別等應用,要熟悉得花點時間,微軟有份不錯的參考文件,讀完應可掌握十之八九
  3. 我定義的 chat.proto 長這樣,只有一個 Join 方法,參數跟回應都使用 Streaming 方式傳送,善用 gRPC 的雙向傳輸功能:
     syntax = "proto3";
    
     import "google/protobuf/timestamp.proto";
     option csharp_namespace = "grpc_chat";
    
     package chat;
    
     service Chatroom {
       rpc Join(stream SpeakRequest) returns (stream BroadcastMessage);
     }
    
     message SpeakRequest {
       string uid = 1;
       string name = 2;
       string message = 3;
     }
    
     message BroadcastMessage {
       string speaker = 1;
       google.protobuf.Timestamp time = 2;
       string message = 3;
     }
    
  4. 在 .csproj 加入以下設定 (GrpcServices 在伺服器端為 Server、客戶端為 Client)
    <ItemGroup>
      <Protobuf Include="Protos\filename.proto" GrpcServices="Client" />
    </ItemGroup>
    
    讓 Grpc.Tools 依據 .proto 產生對映的類別:
  5. 伺服端實作 Services/SvcNameService.cs,繼承 SvcName.SvcNameBase (由步驟 3 自動產生) 並一一 override 方法介面實作邏輯
  6. 客戶端也要加入相同的 Protos\svcName.proto,先GrpcChannel.ForAddress(url)建立GrpcChannel,再new Chatroom.ChatroomClient(channel)生成客戶端物件,呼叫 Join() 方法取得一個雙向 Streaming 傳輸 AsyncDuplexStreamingCall 物件,透過 RequestStream 傳送 SpeakRequest、透過 ResponseStream 接收 BroadcastMessage,這部分稍稍複雜,官方文件有篇 Migrate WCF duplex services to gRPC 以訂閱股票報價為例,參考價值頗高。

這個簡單的聊天室分為伺服器端 grpc-chat-svc (Microsoft.NET.Sdk.Web/ASP.NET Core) 跟客戶端 grpc-chat-client (.NET 6 Console),專案結構很簡單:

兩支核心程式為 grpc-chat-svc/Services/ChatroomService.cs:

using Grpc.Core;
using grpc_chat_svc;
using grpc_chat;
using System.Collections.Concurrent;

namespace grpc_chat_svc.Services;

public class ChatroomService : Chatroom.ChatroomBase
{
    private readonly ILogger<ChatroomService> _logger;
    public ChatroomService(ILogger<ChatroomService> logger)
    {
        _logger = logger;
    }

    static ConcurrentDictionary<string, MsgChannel> channels =
        new ConcurrentDictionary<string, MsgChannel>();
    class MsgChannel
    {
        public string Name;
        public IServerStreamWriter<BroadcastMessage> Stream;
    }

    async Task Speak(string speaker, string message)
    {
        var msg = new BroadcastMessage
        {
            Speaker = speaker,
            Time = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(DateTime.UtcNow),
            Message = message
        };
        var lostUsers = new List<string>();
        foreach (var kv in channels.ToArray())
        {
            try
            {
                await kv.Value.Stream.WriteAsync(msg);
            }
            catch
            {
                lostUsers.Add(kv.Value.Name);
                channels.TryRemove(kv.Key, out _);
            }
        }
        if (lostUsers.Any())
        {
            await Speak("system", String.Join(", ", lostUsers.ToArray()) + " disconnected");
        }
    }

    public override async Task Join(IAsyncStreamReader<SpeakRequest> requestStream, 
            IServerStreamWriter<BroadcastMessage> responseStream, ServerCallContext context)
    {
        var clientIp = context.GetHttpContext().Connection.RemoteIpAddress!.ToString();
        //_logger.LogInformation($"{clientIp}/{context.Peer} connnected");
        string uid = string.Empty;
        string name = string.Empty;
        try
        {
            await foreach (var speakReq in requestStream.ReadAllAsync(context.CancellationToken))
            {
                uid = speakReq.Uid;
                name = speakReq.Name;
                var speaker = $"{name}@{clientIp}";
                var newMember = false;
                if (!channels.TryGetValue(uid, out var channel))
                {
                    _logger.LogInformation($"{uid}/{speaker}");
                    channel = new MsgChannel { Name = name, Stream = responseStream };
                    if (!channels.TryAdd(uid, channel))
                        throw new ApplicationException("Failed to join the chatroom");
                    newMember = true;
                }
                channel.Name = name;
                if (speakReq.Message == "/exit")
                    break;
                else if (newMember)
                    await Speak("system", $"{name} joined");
                else
                    await Speak(speaker, speakReq.Message);
            }
        }
        catch (Exception ex)
        {
            await Speak("system", $"{name} {ex.Message}");
        }
        //_logger.LogInformation($"{context.Peer} disconnected");
        await Speak($"system", $"{name} left");
        if (!string.IsNullOrEmpty(uid)) channels.TryRemove(uid, out _);
    }
}

與 grpc-chat-client/Program.cs 力求簡單,程式碼都壓在 100 行內。gRPC 程式庫走 async/await 風格,這部分我的經驗尚淺,處理得有點生硬,再請大家指點一二。

using System.Net.Sockets;
using Grpc.Core;
using Grpc.Net.Client;
using grpc_chat;

if (args.Length < 2)
{
    Console.WriteLine("Syntax: grpc-chat-client https://localhost:7042 userName");
    return;
}
var url = args[0];
var name = args[1];
Console.Clear();
using var channel = GrpcChannel.ForAddress(url);
var client = new Chatroom.ChatroomClient(channel);
var duplex = client.Join();
var uid = Guid.NewGuid().ToString();
var spkMsg = new SpeakRequest
{
    Uid = uid,
    Name = name,
    Message = "/join"
};
await duplex.RequestStream.WriteAsync(spkMsg);
int x = 0, y = 2;
void Print(string msg, ConsoleColor color = ConsoleColor.White) {
    Console.ForegroundColor = color;
    int origX = Console.CursorLeft, origY = Console.CursorTop;
    Console.SetCursorPosition(x, y);
    Console.WriteLine(msg);
    x = Console.CursorLeft;
    y = Console.CursorTop;
    Console.SetCursorPosition(origX, origY);
    Console.ResetColor();
};
var rcvTask = Task.Run(async () =>
{
    try
    {
        await foreach (var resp in duplex.ResponseStream.ReadAllAsync(CancellationToken.None))
        {
            Print($"{resp.Time.ToDateTime().ToLocalTime():HH:mm:ss} [{resp.Speaker}] {resp.Message}", 
                resp.Speaker == "system" ? ConsoleColor.Yellow : ConsoleColor.Cyan);
        }
    }
    catch (RpcException ex) when (ex.StatusCode == StatusCode.Unavailable)
    {
        Print($"Connection broken", ConsoleColor.Magenta);
        Console.Clear();
        Environment.Exit(254);
    }
    catch (RpcException ex) {
        Print($"Error {ex.InnerException?.Message}", ConsoleColor.Magenta);
        Console.Clear();
        Environment.Exit(255);
    }
});


while (true)
{
    Console.SetCursorPosition(0, 0);
    Console.WriteLine(new String(' ', Console.WindowWidth) + new string('-', Console.WindowWidth));
    Console.SetCursorPosition(0, 0);
    var msg = Console.ReadLine();
    try
    {
        spkMsg.Message = msg;
        await duplex.RequestStream.WriteAsync(spkMsg);
        if (msg == "/exit") break;
    }
    catch (RpcException)
    {
        break;
    }
}
try { await duplex.RequestStream.CompleteAsync(); } catch { }
rcvTask.Wait();
Console.Clear();

最後來看執行效果:

還蠻有模有樣的吧!

老規矩,範例專案已上傳到 Github,有興趣的同學可 Clone 回去玩看看。

Using .NET + gRPC to create a simple chatroom.


Comments

# by Ryan

請教黑大關於web socket 的處理邏輯: 在Broadcast Message時,大部分網路範例都是用迴圈將訊息發送給所有channel,若是在超量使用者(ex百萬)同時連線時是否會造成效能下降?是否有更好的處理方式? (假設網路與硬體設備皆完美)

# by Jeffrey

to Ryan,要實現同時百萬等級連線數,可採金字塔架構,最底層末端主機只負責數百到上千客戶端,每台中間主機負責通知數百台末端主機,廣播時由核心主機通知中間主機,再層層佈達下去,機器跟層級夠多,就不會因為跑迴圈導致部分使用者很慢才收到訊息。

Post a comment