gRPC 練習 - 簡單寫個聊天室
2 |
隨時代演進,.NET 的 API 介面標準從 Remoting、Web Service、WCF 轉向 Web API,近幾年,主打 HTTP/2 (傳輸效率大勝 HTTP 1.1)、Protobuf 格式(輕量級二進位序列化,遠比 JSON 精簡有效率)、支援雙向傳輸、跨語言相容的 gRPC 異軍突起,成為效能優先 API 的開發首選,而從 .NET Core 3 開始,gRPC 還成為官方指定的 WCF 接班人。參考:為何我們為 WCF 開發人員建議 gRPC
gRPC 與 WebAPI 比較表 來源
抱著沒吃過豬肉也要看看豬走路的心情,我打算挑個題材練習,之前寫過 ASP.NET Core WebSocket 聊天室,這回就利用 gRPC 的雙向傳輸特性,寫個 gRPC 聊天室吧。
不得不說,微軟官方文件的 gRPC 教學寫得真好,跟著做完範例差不多算入門了。這裡跳過細節,整理重點:
- gRPC 分為伺服器端跟客戶端,伺服器有專屬的 grpc 專案型別(用
dotnet new grpc
建立專案),客戶端則用一般專案加入 NuGet Package: Grpc.Net.Client、Google.Protobuf、Grpc.Tools - 在 Protos\svcName.proto 檔案定義 Protobuf 介面,.proto IDL 語法有點小複雜,但能支援集合、列舉、多重型別等應用,要熟悉得花點時間,微軟有份不錯的參考文件,讀完應可掌握十之八九
- 我定義的 chat.proto 長這樣,只有一個 Join 方法,參數跟回應都使用 Streaming 方式傳送,善用 gRPC 的雙向傳輸功能:
syntax = "proto3"; import "google/protobuf/timestamp.proto"; option csharp_namespace = "grpc_chat"; package chat; service Chatroom { rpc Join(stream SpeakRequest) returns (stream BroadcastMessage); } message SpeakRequest { string uid = 1; string name = 2; string message = 3; } message BroadcastMessage { string speaker = 1; google.protobuf.Timestamp time = 2; string message = 3; }
- 在 .csproj 加入以下設定 (GrpcServices 在伺服器端為 Server、客戶端為 Client)
讓 Grpc.Tools 依據 .proto 產生對映的類別:<ItemGroup> <Protobuf Include="Protos\filename.proto" GrpcServices="Client" /> </ItemGroup>
- 伺服端實作 Services/SvcNameService.cs,繼承 SvcName.SvcNameBase (由步驟 3 自動產生) 並一一 override 方法介面實作邏輯
- 客戶端也要加入相同的 Protos\svcName.proto,先
GrpcChannel.ForAddress(url)
建立GrpcChannel,再new Chatroom.ChatroomClient(channel)
生成客戶端物件,呼叫 Join() 方法取得一個雙向 Streaming 傳輸 AsyncDuplexStreamingCall 物件,透過 RequestStream 傳送 SpeakRequest、透過 ResponseStream 接收 BroadcastMessage,這部分稍稍複雜,官方文件有篇 Migrate WCF duplex services to gRPC 以訂閱股票報價為例,參考價值頗高。
這個簡單的聊天室分為伺服器端 grpc-chat-svc (Microsoft.NET.Sdk.Web/ASP.NET Core) 跟客戶端 grpc-chat-client (.NET 6 Console),專案結構很簡單:
兩支核心程式為 grpc-chat-svc/Services/ChatroomService.cs:
using Grpc.Core;
using grpc_chat_svc;
using grpc_chat;
using System.Collections.Concurrent;
namespace grpc_chat_svc.Services;
public class ChatroomService : Chatroom.ChatroomBase
{
private readonly ILogger<ChatroomService> _logger;
public ChatroomService(ILogger<ChatroomService> logger)
{
_logger = logger;
}
static ConcurrentDictionary<string, MsgChannel> channels =
new ConcurrentDictionary<string, MsgChannel>();
class MsgChannel
{
public string Name;
public IServerStreamWriter<BroadcastMessage> Stream;
}
async Task Speak(string speaker, string message)
{
var msg = new BroadcastMessage
{
Speaker = speaker,
Time = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(DateTime.UtcNow),
Message = message
};
var lostUsers = new List<string>();
foreach (var kv in channels.ToArray())
{
try
{
await kv.Value.Stream.WriteAsync(msg);
}
catch
{
lostUsers.Add(kv.Value.Name);
channels.TryRemove(kv.Key, out _);
}
}
if (lostUsers.Any())
{
await Speak("system", String.Join(", ", lostUsers.ToArray()) + " disconnected");
}
}
public override async Task Join(IAsyncStreamReader<SpeakRequest> requestStream,
IServerStreamWriter<BroadcastMessage> responseStream, ServerCallContext context)
{
var clientIp = context.GetHttpContext().Connection.RemoteIpAddress!.ToString();
//_logger.LogInformation($"{clientIp}/{context.Peer} connnected");
string uid = string.Empty;
string name = string.Empty;
try
{
await foreach (var speakReq in requestStream.ReadAllAsync(context.CancellationToken))
{
uid = speakReq.Uid;
name = speakReq.Name;
var speaker = $"{name}@{clientIp}";
var newMember = false;
if (!channels.TryGetValue(uid, out var channel))
{
_logger.LogInformation($"{uid}/{speaker}");
channel = new MsgChannel { Name = name, Stream = responseStream };
if (!channels.TryAdd(uid, channel))
throw new ApplicationException("Failed to join the chatroom");
newMember = true;
}
channel.Name = name;
if (speakReq.Message == "/exit")
break;
else if (newMember)
await Speak("system", $"{name} joined");
else
await Speak(speaker, speakReq.Message);
}
}
catch (Exception ex)
{
await Speak("system", $"{name} {ex.Message}");
}
//_logger.LogInformation($"{context.Peer} disconnected");
await Speak($"system", $"{name} left");
if (!string.IsNullOrEmpty(uid)) channels.TryRemove(uid, out _);
}
}
與 grpc-chat-client/Program.cs 力求簡單,程式碼都壓在 100 行內。gRPC 程式庫走 async/await 風格,這部分我的經驗尚淺,處理得有點生硬,再請大家指點一二。
using System.Net.Sockets;
using Grpc.Core;
using Grpc.Net.Client;
using grpc_chat;
if (args.Length < 2)
{
Console.WriteLine("Syntax: grpc-chat-client https://localhost:7042 userName");
return;
}
var url = args[0];
var name = args[1];
Console.Clear();
using var channel = GrpcChannel.ForAddress(url);
var client = new Chatroom.ChatroomClient(channel);
var duplex = client.Join();
var uid = Guid.NewGuid().ToString();
var spkMsg = new SpeakRequest
{
Uid = uid,
Name = name,
Message = "/join"
};
await duplex.RequestStream.WriteAsync(spkMsg);
int x = 0, y = 2;
void Print(string msg, ConsoleColor color = ConsoleColor.White) {
Console.ForegroundColor = color;
int origX = Console.CursorLeft, origY = Console.CursorTop;
Console.SetCursorPosition(x, y);
Console.WriteLine(msg);
x = Console.CursorLeft;
y = Console.CursorTop;
Console.SetCursorPosition(origX, origY);
Console.ResetColor();
};
var rcvTask = Task.Run(async () =>
{
try
{
await foreach (var resp in duplex.ResponseStream.ReadAllAsync(CancellationToken.None))
{
Print($"{resp.Time.ToDateTime().ToLocalTime():HH:mm:ss} [{resp.Speaker}] {resp.Message}",
resp.Speaker == "system" ? ConsoleColor.Yellow : ConsoleColor.Cyan);
}
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Unavailable)
{
Print($"Connection broken", ConsoleColor.Magenta);
Console.Clear();
Environment.Exit(254);
}
catch (RpcException ex) {
Print($"Error {ex.InnerException?.Message}", ConsoleColor.Magenta);
Console.Clear();
Environment.Exit(255);
}
});
while (true)
{
Console.SetCursorPosition(0, 0);
Console.WriteLine(new String(' ', Console.WindowWidth) + new string('-', Console.WindowWidth));
Console.SetCursorPosition(0, 0);
var msg = Console.ReadLine();
try
{
spkMsg.Message = msg;
await duplex.RequestStream.WriteAsync(spkMsg);
if (msg == "/exit") break;
}
catch (RpcException)
{
break;
}
}
try { await duplex.RequestStream.CompleteAsync(); } catch { }
rcvTask.Wait();
Console.Clear();
最後來看執行效果:
還蠻有模有樣的吧!
老規矩,範例專案已上傳到 Github,有興趣的同學可 Clone 回去玩看看。
Using .NET + gRPC to create a simple chatroom.
Comments
# by Ryan
請教黑大關於web socket 的處理邏輯: 在Broadcast Message時,大部分網路範例都是用迴圈將訊息發送給所有channel,若是在超量使用者(ex百萬)同時連線時是否會造成效能下降?是否有更好的處理方式? (假設網路與硬體設備皆完美)
# by Jeffrey
to Ryan,要實現同時百萬等級連線數,可採金字塔架構,最底層末端主機只負責數百到上千客戶端,每台中間主機負責通知數百台末端主機,廣播時由核心主機通知中間主機,再層層佈達下去,機器跟層級夠多,就不會因為跑迴圈導致部分使用者很慢才收到訊息。