< Summary

Class:RPC.Services.AsyncQueueEnumerable[T]
Assembly:RPC.Services.TransportService
File(s):/tmp/workspace/unity-renderer/unity-renderer/Assets/Scripts/MainScripts/DCL/WorldRuntime/KernelCommunication/RPC/Services/TransportService/TransportServiceImpl.cs
Covered lines:0
Uncovered lines:4
Coverable lines:4
Total lines:108
Line coverage:0% (0 of 4)
Covered branches:0
Total branches:0

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity NPath complexity Sequence coverage
AsyncQueueEnumerable(...)0%2100%
GetAsyncEnumerator(...)0%2100%

File(s)

/tmp/workspace/unity-renderer/unity-renderer/Assets/Scripts/MainScripts/DCL/WorldRuntime/KernelCommunication/RPC/Services/TransportService/TransportServiceImpl.cs

#LineLine coverage
 1using Cysharp.Threading.Tasks;
 2using Cysharp.Threading.Tasks.Linq;
 3using Decentraland.Renderer.RendererServices;
 4using Google.Protobuf;
 5using rpc_csharp;
 6using rpc_csharp.protocol;
 7using rpc_csharp.transport;
 8using System;
 9using System.Threading;
 10
 11namespace RPC.Services
 12{
 13    public class AsyncQueueEnumerable<T> : IUniTaskAsyncEnumerable<T> where T: class
 14    {
 15        private readonly ProtocolHelpers.AsyncQueue<T> queue;
 16
 017        public AsyncQueueEnumerable(ProtocolHelpers.AsyncQueue<T> queue)
 18        {
 019            this.queue = queue;
 020        }
 21
 22        public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken
 23        {
 024            return queue;
 25        }
 26    }
 27
 28    public class TransportServiceImpl : ITransportService<RPCContext>, ITransport
 29    {
 30        public event Action OnCloseEvent;
 31        public event Action<string> OnErrorEvent;
 32        public event Action<byte[]> OnMessageEvent;
 33        public event Action OnConnectEvent;
 34
 35        private readonly ProtocolHelpers.AsyncQueue<Payload> queue;
 36        private RpcClient client;
 37        private RpcClientPort port;
 38
 39        public static void RegisterService(RpcServerPort<RPCContext> port)
 40        {
 41            TransportServiceCodeGen.RegisterService(port, new TransportServiceImpl());
 42        }
 43
 44        public TransportServiceImpl()
 45        {
 46            queue = new ProtocolHelpers.AsyncQueue<Payload>((_, __) => { });
 47        }
 48
 49        private async UniTaskVoid BuildClient(RPCContext context)
 50        {
 51            client = new RpcClient(this);
 52            port = await client.CreatePort("renderer-protocol");
 53
 54            context.transport.OnLoadModules?.Invoke(port);
 55        }
 56
 57        private async UniTaskVoid HandleMessages(IUniTaskAsyncEnumerable<Payload> streamRequest, CancellationToken token
 58        {
 59            await foreach (Payload request in streamRequest)
 60            {
 61                if (token.IsCancellationRequested)
 62                    break;
 63
 64                OnMessageEvent?.Invoke(request.Payload_.ToByteArray());
 65            }
 66        }
 67
 68        public IUniTaskAsyncEnumerable<Payload> OpenTransportStream(IUniTaskAsyncEnumerable<Payload> streamRequest, RPCC
 69        {
 70            // Client builder...
 71            BuildClient(context).Forget();
 72
 73            OnConnectEvent?.Invoke();
 74
 75            return UniTaskAsyncEnumerable.Create<Payload>(async (writer, token) =>
 76            {
 77                // Async call...
 78                HandleMessages(streamRequest, token).Forget();
 79
 80                while (true)
 81                {
 82                    var nextFuture = queue.MoveNextAsync();
 83
 84                    if (await nextFuture == false || token.IsCancellationRequested)
 85                    {
 86                        queue.Close();
 87                        OnCloseEvent?.Invoke();
 88                        break;
 89                    }
 90
 91                    await writer.YieldAsync(queue.Current);
 92                }
 93            });
 94        }
 95
 96        public void SendMessage(byte[] data)
 97        {
 98            queue.Enqueue(new Payload() { Payload_ = ByteString.CopyFrom(data) });
 99        }
 100
 101        public void Close()
 102        {
 103            queue.Close();
 104        }
 105
 106        public void Dispose() { }
 107    }
 108}