| | 1 | | using Cysharp.Threading.Tasks; |
| | 2 | | using Cysharp.Threading.Tasks.Linq; |
| | 3 | | using Decentraland.Renderer.RendererServices; |
| | 4 | | using Google.Protobuf; |
| | 5 | | using rpc_csharp; |
| | 6 | | using rpc_csharp.protocol; |
| | 7 | | using rpc_csharp.transport; |
| | 8 | | using System; |
| | 9 | | using System.Threading; |
| | 10 | |
|
| | 11 | | namespace RPC.Services |
| | 12 | | { |
| | 13 | | public class AsyncQueueEnumerable<T> : IUniTaskAsyncEnumerable<T> where T: class |
| | 14 | | { |
| | 15 | | private readonly ProtocolHelpers.AsyncQueue<T> queue; |
| | 16 | |
|
| | 17 | | public AsyncQueueEnumerable(ProtocolHelpers.AsyncQueue<T> queue) |
| | 18 | | { |
| | 19 | | this.queue = queue; |
| | 20 | | } |
| | 21 | |
|
| | 22 | | public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken |
| | 23 | | { |
| | 24 | | return queue; |
| | 25 | | } |
| | 26 | | } |
| | 27 | |
|
| | 28 | | public class TransportServiceImpl : ITransportService<RPCContext>, ITransport |
| | 29 | | { |
| | 30 | | public event Action OnCloseEvent; |
| | 31 | | public event Action<string> OnErrorEvent; |
| | 32 | | public event Action<byte[]> OnMessageEvent; |
| | 33 | | public event Action OnConnectEvent; |
| | 34 | |
|
| | 35 | | private readonly ProtocolHelpers.AsyncQueue<Payload> queue; |
| | 36 | | private RpcClient client; |
| | 37 | | private RpcClientPort port; |
| | 38 | |
|
| | 39 | | public static void RegisterService(RpcServerPort<RPCContext> port) |
| | 40 | | { |
| 0 | 41 | | TransportServiceCodeGen.RegisterService(port, new TransportServiceImpl()); |
| 0 | 42 | | } |
| | 43 | |
|
| 1 | 44 | | public TransportServiceImpl() |
| | 45 | | { |
| 5 | 46 | | queue = new ProtocolHelpers.AsyncQueue<Payload>((_, __) => { }); |
| 1 | 47 | | } |
| | 48 | |
|
| | 49 | | private async UniTaskVoid BuildClient(RPCContext context) |
| | 50 | | { |
| 1 | 51 | | client = new RpcClient(this); |
| 2 | 52 | | port = await client.CreatePort("renderer-protocol"); |
| | 53 | |
|
| 0 | 54 | | context.transport.OnLoadModules?.Invoke(port); |
| 0 | 55 | | } |
| | 56 | |
|
| | 57 | | private async UniTaskVoid HandleMessages(IUniTaskAsyncEnumerable<Payload> streamRequest, CancellationToken token |
| | 58 | | { |
| 15 | 59 | | await foreach (Payload request in streamRequest) |
| | 60 | | { |
| 3 | 61 | | if (token.IsCancellationRequested) |
| | 62 | | break; |
| | 63 | |
|
| 3 | 64 | | OnMessageEvent?.Invoke(request.Payload_.ToByteArray()); |
| | 65 | | } |
| 0 | 66 | | } |
| | 67 | |
|
| | 68 | | public IUniTaskAsyncEnumerable<Payload> OpenTransportStream(IUniTaskAsyncEnumerable<Payload> streamRequest, RPCC |
| | 69 | | { |
| | 70 | | // Client builder... |
| 1 | 71 | | BuildClient(context).Forget(); |
| | 72 | |
|
| 1 | 73 | | OnConnectEvent?.Invoke(); |
| | 74 | |
|
| 1 | 75 | | return UniTaskAsyncEnumerable.Create<Payload>(async (writer, token) => |
| | 76 | | { |
| | 77 | | // Async call... |
| 1 | 78 | | HandleMessages(streamRequest, token).Forget(); |
| | 79 | |
|
| 4 | 80 | | while (true) |
| | 81 | | { |
| 5 | 82 | | var nextFuture = queue.MoveNextAsync(); |
| | 83 | |
|
| 12 | 84 | | if (await nextFuture == false || token.IsCancellationRequested) |
| | 85 | | { |
| 0 | 86 | | queue.Close(); |
| 0 | 87 | | OnCloseEvent?.Invoke(); |
| 0 | 88 | | break; |
| | 89 | | } |
| | 90 | |
|
| 6 | 91 | | await writer.YieldAsync(queue.Current); |
| | 92 | | } |
| 0 | 93 | | }); |
| | 94 | | } |
| | 95 | |
|
| | 96 | | public void SendMessage(byte[] data) |
| | 97 | | { |
| 4 | 98 | | queue.Enqueue(new Payload() { Payload_ = ByteString.CopyFrom(data) }); |
| 4 | 99 | | } |
| | 100 | |
|
| | 101 | | public void Close() |
| | 102 | | { |
| 0 | 103 | | queue.Close(); |
| 0 | 104 | | } |
| | 105 | |
|
| 1 | 106 | | public void Dispose() { } |
| | 107 | | } |
| | 108 | | } |