2022. 10. 3. 21:00ㆍ카테고리 없음
이 글을 본 후 또는 gRPC에 대한 기본 개념이 없다면 그전 포스트 글을 보고 다시 보시면 도움이 되실 거예요.
gRPC는 4가지 방식의 통신을 지원합니다.
1. 단항 통신
2. 서버 스트리밍 통신
3. 클라이언트 스트리밍 통신
4. 양방향 통신
스트리밍이란?
서버나 클라이언트가 하나의 패킷만으로 처리하기 힘든 상황일 경우 패킷을 여러 번 보내 해결하는 방식
예) OTT 스트리밍
1. 단항 통신
클라이언트가 하나의 패킷을 통해 서버로 요청하면 서버는 패킷에 대한 결과를 전달 하는 구조.
클라이언트, 서버의 관계는 1:1의 관계이고 제일 많이 씁니다
2. 서버 스트리밍 통신
클라이언트가 하나의 패킷을 통해 서버로 요청하면 서버는 패킷에 대한 결과를 여러 번의 패킷으로 전달하는 구조.
클라이언트, 서버의 관계는 1:N의 관계
3. 클라이언트 스트리밍 통신
클라이언트가 여러번의 패킷을 통해 서버로 요청 하면 서버는 패킷에 대한 결과를 하나의 패킷으로 전달 하는 구조.
클라이언트, 서버의 관계는 N:1의 관계
4. 양방향 스트리밍 통신
클라이언트가 여러번의 패킷을 통해 서버로 요청 하면 서버는 패킷에 대한 결과를 여러번의 패킷으로 전달 하는 구조.
클라이언트, 서버의 관계는 N:N의 관계
단항 통신은 그전 포스트를 통해 Hello, World를 만들면서 자연스럽게 사용했기 때문에 넘어가고 서버 스트리밍 통신, 클라이언트 스트리밍 통신에 대해 적어 보도록 하겠습니다.(양방향 통신은 서버와 클라이언트의 스트리밍 통신 방법을 본다면 응용해서 만들어 볼 수 있기 때문에 제외)
예제를 직접 따라 하는 식으로 적지 않고 해당 기능을 핵심만 집어서 집필했습니다.
로직을 실행해 보고 싶다면 하단 참고 링크에 가서 소스 파일을 다운로드하여 실행하면 됩니다.
서버 스트리밍 통신
예제는 MS에서 제공하는 샘플 예제를 이용해서 설명하겠습니다.
- 시나리오
- 클라이언트가 서버에 저장돼있는 이미지 파일을 gRPC를 통해 다운로드 요청
- 서버는 클라이언트의 이미지 다운로드 요청을 처리하기 위해 이미지 파일을 나눠서 전송(스트리밍)
- 클라이언트는 서버로부터 전달받은 패킷을 저장해 이미지 파일 데이터 완성
1. download.proto 파일 생성
syntax = "proto3";
package download;
service Downloader {
rpc DownloadFile (DownloadFileRequest) returns (stream DownloadFileResponse);
}
message DownloadFileRequest {
string id = 1;
}
message DownloadFileResponse {
FileMetadata metadata = 1;
bytes data = 2;
}
message FileMetadata {
string file_name = 1;
}
눈여겨보실 부분은 stream 키워드입니다.
서버 스트리밍을 하려면 returns 다음 stream 키워드를 붙여줘야 하기 때문입니다.
2. DownloaderService.cs 파일 생성
using Grpc.Core;
using Download;
using Google.Protobuf;
namespace GrpcGreeterServer.Services
{
public class DownloaderService : Downloader.DownloaderBase
{
private readonly ILogger _logger;
private const int ChunkSize = 1024 * 32;
public DownloaderService(ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger<DownloaderService>();
}
public override async Task DownloadFile(DownloadFileRequest request, IServerStreamWriter<DownloadFileResponse> responseStream, ServerCallContext context)
{
var requestParam = request.Id;
var filename = requestParam switch
{
"4" => "pancakes4.png",
_ => "pancakes.jpg",
};
await responseStream.WriteAsync(new DownloadFileResponse
{
Metadata = new FileMetadata { FileName = filename }
});
var buffer = new byte[ChunkSize];
await using var fileStream = File.OpenRead(filename);
while (true)
{
var numBytesRead = await fileStream.ReadAsync(buffer);
if (numBytesRead == 0)
{
break;
}
_logger.LogInformation("Sending data chunk of {numBytesRead} bytes", numBytesRead);
await responseStream.WriteAsync(new DownloadFileResponse
{
Data = UnsafeByteOperations.UnsafeWrap(buffer.AsMemory(0, numBytesRead))
});
}
}
}
}
public override async Task DownloadFile(DownloadFileRequest request, IServerStreamWriter <DownloadFileResponse> responseStream, ServerCallContext context) 함수는 서버가 클라이언트의 다운로드 요청 시 처리할 함수입니다.
매개변수를 보면 IServerStreamWriter responseStream를 눈여겨보셔야 되는데, 서버에서 여러 번 패킷을 보낼 거기 때문에 responseStream이라는 변수가 추가된 걸 볼 수 있습니다.
responseStream.WriteAsync(..) 함수를 통해 클라이언트에게 이미지 데이터를 저장하고 있습니다.
3. 클라이언트 파일
using Download;
using Grpc.Core;
using Grpc.Net.Client;
using var channel = GrpcChannel.ForAddress("https://localhost:7042");
var client = new Downloader.DownloaderClient(channel);
var downloadsPath = Path.Combine(Environment.CurrentDirectory, "downloads");
var downloadId = Path.GetRandomFileName();
var downloadIdPath = Path.Combine(downloadsPath, downloadId);
Directory.CreateDirectory(downloadIdPath);
Console.WriteLine("Starting call");
using var call = client.DownloadFile(new DownloadFileRequest
{
Id = downloadId
});
await using var writeStream = File.Create(Path.Combine(downloadIdPath, "data.bin"));
await foreach (var message in call.ResponseStream.ReadAllAsync())
{
if (message.Metadata != null)
{
Console.WriteLine("Saving metadata to file");
var metadata = message.Metadata.ToString();
await File.WriteAllTextAsync(Path.Combine(downloadIdPath, "metadata.json"), metadata);
}
if (message.Data != null)
{
var bytes = message.Data.Memory;
Console.WriteLine($"Saving {bytes.Length} bytes to file");
await writeStream.WriteAsync(bytes);
}
}
Console.WriteLine();
Console.WriteLine("Files were saved in: " + downloadIdPath);
Console.WriteLine("Press any key to exit...");
Console.ReadKey();
클라이언트에서는 await foreach 안에서 call.ResponseStream.readAllAsync() 함수를 통해 서버로부터 전달받은 패킷들을 읽은 다음 저장 하고 있습니다.
서버에서는 클라이언트가 요청한 DownloadFile(..) 함수를 통해 처리 후 함수가 종료되면 클라이언트에서는 더 이상 읽을 데이터가 없기 때문에 foreach를 빠져나와 끝나는 로직입니다.
4. 결과
클라이언트 스트리밍 통신
예제는 MS에서 제공하는 샘플 예제를 이용해서 설명하겠습니다.
- 시나리오
- 클라이언트는 서버에게 업로드할 이미지 파일을 gRPC를 통해 업로드 요청
- 서버는 클라이언트로부터 전달받은 데이터들을 저장
1. upload.proto 파일 생성
syntax = "proto3";
package upload;
service Uploader {
rpc UploadFile (stream UploadFileRequest) returns (UploadFileResponse);
}
message UploadFileRequest {
FileMetadata metadata = 1;
bytes data = 2;
}
message FileMetadata {
string file_name = 1;
}
message UploadFileResponse {
string id = 1;
}
서버 스트리밍을 할 때는 returns 다음 stream 키워드를 사용했는데 클라이언트 스트리밍을 할 경우 매개변수의 인자 앞에 stream을 적었습니다.
2. UploaderService.cs 파일 생성
namespace GrpcGreeterServer.Services
{
using Grpc.Core;
using Upload;
namespace Server
{
public class UploaderService : Uploader.UploaderBase
{
private readonly ILogger _logger;
private readonly IConfiguration _config;
public UploaderService(ILoggerFactory loggerFactory, IConfiguration config)
{
_logger = loggerFactory.CreateLogger<UploaderService>();
_config = config;
}
public override async Task<UploadFileResponse> UploadFile(IAsyncStreamReader<UploadFileRequest> requestStream, ServerCallContext context)
{
var uploadId = Path.GetRandomFileName();
var uploadPath = Path.Combine(_config["StoredFilesPath"]!, uploadId);
Directory.CreateDirectory(uploadPath);
await using var writeStream = File.Create(Path.Combine(uploadPath, "data.bin"));
await foreach (var message in requestStream.ReadAllAsync())
{
if (message.Metadata != null)
{
await File.WriteAllTextAsync(Path.Combine(uploadPath, "metadata.json"), message.Metadata.ToString());
}
if (message.Data != null)
{
await writeStream.WriteAsync(message.Data.Memory);
}
}
return new UploadFileResponse { Id = uploadId };
}
}
}
}
public override async Task <UploadFileResponse> UploadFile(IAsyncStreamReader <UploadFileRequest> requestStream, ServerCallContext context) 함수는 서버가 클라이언트의 업로드 요청 시 처리할 함수입니다.
매개변수를 보면 IAsyncStreamReader requestStream를 눈여겨보셔야 되는데, 클라이언트에서 여러 번 패킷을 보낼 거기 때문에 requestStream이라는 변수가 추가된 걸 볼 수 있습니다.
requestStream.ReadAllAsync() 함수를 통해 클라이언트가 전달 한 이미지 데이터를 저장하고 있습니다.
3. 클라이언트 파일
using Google.Protobuf;
using Grpc.Net.Client;
using Upload;
const int ChunkSize = 1024 * 32; // 32 KB
using var channel = GrpcChannel.ForAddress("https://localhost:7042");
var client = new Uploader.UploaderClient(channel);
Console.WriteLine("Starting call");
var call = client.UploadFile();
Console.WriteLine("Sending file metadata");
await call.RequestStream.WriteAsync(new UploadFileRequest
{
Metadata = new FileMetadata
{
FileName = "pancakes.jpg"
}
});
var buffer = new byte[ChunkSize];
await using var readStream = File.OpenRead("pancakes.jpg");
while (true)
{
var count = await readStream.ReadAsync(buffer);
if (count == 0)
{
break;
}
Console.WriteLine("Sending file data chunk of length " + count);
await call.RequestStream.WriteAsync(new UploadFileRequest
{
Data = UnsafeByteOperations.UnsafeWrap(buffer.AsMemory(0, count))
});
}
Console.WriteLine("Complete request");
await call.RequestStream.CompleteAsync();
var response = await call;
Console.WriteLine("Upload id: " + response.Id);
Console.WriteLine("Shutting down");
Console.WriteLine("Press any key to exit...");
Console.ReadKey();
크게 4 부분으로 볼 수 있습니다.
1) var call = client.UploadFile();
2) call.RequestStream.WriteAsync(...)
3) await call.RequestStream.CompleteAsync();
4) var esponse = await call;
차례대로 보면 1) 클라이언트는 서버에게 업로드하겠다는 서비스를 요청하고 2) 서버에 업로드를 진행하다 3) 업로드가 끝나 완료 메서드를 호출 한 다음 4) 서버의 결과 메시지를 받아 처리하고 있습니다.
4. 결과
기본적인 흐름은 봤으니 자주 일어나는 예외 상황 처리에 대해 확인하고 마무리하겠다.
서버 스트리밍 중 클라이언트의 취소 요청
위 Download 예에서 클라이언트가 취소할 경우 ServerCallContext.CancellationToken이 발생한다.
서버에서는 위 에러 상황이 발생할 경우 처리하고 있던 함수에서 빠져나오기만 하면 된다.
위 소스 코드를 고치면
while (!context.CancellationToken.IsCancellationRequested)
이런 식으로 기존 while(true) 부분을 변경한다.
스트리밍 중에 함수 처리를 안전하게 하기 위한 방법
스트리밍 중인 서비스가 종료될 때 다중 스레드로 인해 사용 중인 함수가 있었다면 런타임 에러가 생길 수 있다.
public override async Task StreamingFromServer(ExampleRequest request,
IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
_ = Task.Run(async () =>
{
for (var i = 0; i < 5; i++)
{
await responseStream.WriteAsync(new ExampleResponse());
await Task.Delay(TimeSpan.FromSeconds(1));
}
});
await PerformLongRunningWorkAsync();
}
예를 들어 위와 같이 Task.Run(..)으로 스레드가 실행 중에 StreamingFromServer함수가 종료 됐다면 responseStream객체 값의 에러가 발생할 수 있다.
public override async Task StreamingFromServer(ExampleRequest request,
IServerStreamWriter<ExampleResponse> responseStream, ServerCallContext context)
{
var writeTask = Task.Run(async () =>
{
for (var i = 0; i < 5; i++)
{
await responseStream.WriteAsync(new ExampleResponse());
await Task.Delay(TimeSpan.FromSeconds(1));
}
});
await PerformLongRunningWorkAsync();
await writeTask;
}
위와 같이 코드를 수정하면 에러 상황을 해결할 수 있다.
참고 자료