| | 1 | | using System; |
| | 2 | | using System.Threading; |
| | 3 | | using System.Threading.Tasks; |
| | 4 | | using Cysharp.Threading.Tasks; |
| | 5 | | using Cysharp.Threading.Tasks.Linq; |
| | 6 | | using DCL; |
| | 7 | | using Google.Protobuf; |
| | 8 | | using rpc_csharp; |
| | 9 | | using rpc_csharp.protocol; |
| | 10 | | using rpc_csharp.transport; |
| | 11 | | using UnityEngine; |
| | 12 | | using Environment = DCL.Environment; |
| | 13 | |
|
| | 14 | | namespace RPC.Services |
| | 15 | | { |
| | 16 | | public class AsyncQueueEnumerable<T> : IUniTaskAsyncEnumerable<T> where T : class |
| | 17 | | { |
| | 18 | | private readonly ProtocolHelpers.AsyncQueue<T> queue; |
| | 19 | |
|
| | 20 | | public AsyncQueueEnumerable(ProtocolHelpers.AsyncQueue<T> queue) |
| | 21 | | { |
| | 22 | | this.queue = queue; |
| | 23 | | } |
| | 24 | |
|
| | 25 | | public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken |
| | 26 | | { |
| | 27 | | return queue; |
| | 28 | | } |
| | 29 | | } |
| | 30 | |
|
| | 31 | | public class TransportServiceImpl : ITransportService<RPCContext>, ITransport |
| | 32 | | { |
| | 33 | | public event Action OnCloseEvent; |
| | 34 | | public event Action<string> OnErrorEvent; |
| | 35 | | public event Action<byte[]> OnMessageEvent; |
| | 36 | | public event Action OnConnectEvent; |
| | 37 | |
|
| | 38 | | private readonly ProtocolHelpers.AsyncQueue<Payload> queue; |
| | 39 | | private RpcClient client; |
| | 40 | | private RpcClientPort port; |
| | 41 | |
|
| | 42 | | public static void RegisterService(RpcServerPort<RPCContext> port) |
| | 43 | | { |
| 0 | 44 | | TransportServiceCodeGen.RegisterService(port, new TransportServiceImpl()); |
| 0 | 45 | | } |
| | 46 | |
|
| 1 | 47 | | public TransportServiceImpl() |
| | 48 | | { |
| 5 | 49 | | queue = new ProtocolHelpers.AsyncQueue<Payload>((_, __) => {}); |
| 1 | 50 | | } |
| | 51 | |
|
| | 52 | | private async UniTaskVoid BuildClient(RPCContext context) |
| | 53 | | { |
| 1 | 54 | | client = new RpcClient(this); |
| 2 | 55 | | port = await client.CreatePort("renderer-protocol"); |
| 0 | 56 | | DCL.RPC.LoadModules(port, Environment.i.serviceLocator.Get<IRPC>()); |
| 0 | 57 | | } |
| | 58 | |
|
| | 59 | | private async UniTaskVoid HandleMessages(IUniTaskAsyncEnumerable<Payload> streamRequest, CancellationToken token |
| | 60 | | { |
| 15 | 61 | | await foreach (Payload request in streamRequest) |
| | 62 | | { |
| 3 | 63 | | if (token.IsCancellationRequested) |
| | 64 | | break; |
| | 65 | |
|
| 3 | 66 | | OnMessageEvent?.Invoke(request.Payload_.ToByteArray()); |
| | 67 | | } |
| 0 | 68 | | } |
| | 69 | |
|
| | 70 | | public IUniTaskAsyncEnumerable<Payload> OpenTransportStream(IUniTaskAsyncEnumerable<Payload> streamRequest, RPCC |
| | 71 | | { |
| | 72 | | // Client builder... |
| 1 | 73 | | BuildClient(context).Forget(); |
| | 74 | |
|
| 1 | 75 | | OnConnectEvent?.Invoke(); |
| 1 | 76 | | return UniTaskAsyncEnumerable.Create<Payload>(async (writer, token) => |
| | 77 | | { |
| | 78 | | // Async call... |
| 1 | 79 | | HandleMessages(streamRequest, token).Forget(); |
| | 80 | |
|
| 4 | 81 | | while (true) |
| | 82 | | { |
| 5 | 83 | | var nextFuture = queue.MoveNextAsync(); |
| | 84 | |
|
| 12 | 85 | | if (await nextFuture == false || token.IsCancellationRequested) |
| | 86 | | { |
| 0 | 87 | | queue.Close(); |
| 0 | 88 | | OnCloseEvent?.Invoke(); |
| 0 | 89 | | break; |
| | 90 | | } |
| | 91 | |
|
| 6 | 92 | | await writer.YieldAsync(queue.Current); |
| | 93 | | } |
| 0 | 94 | | }); |
| | 95 | | } |
| | 96 | |
|
| | 97 | | public void SendMessage(byte[] data) |
| | 98 | | { |
| 4 | 99 | | queue.Enqueue(new Payload() { Payload_ = ByteString.CopyFrom(data) }); |
| 4 | 100 | | } |
| | 101 | | public void Close() |
| | 102 | | { |
| 0 | 103 | | queue.Close(); |
| 0 | 104 | | } |
| | 105 | | } |
| | 106 | | } |