< Summary

Class:RPC.Services.TransportServiceImpl
Assembly:RPC.Services.TransportService
File(s):/tmp/workspace/unity-renderer/unity-renderer/Assets/Scripts/MainScripts/DCL/WorldRuntime/KernelCommunication/RPC/Services/TransportService/TransportServiceImpl.cs
Covered lines:19
Uncovered lines:11
Coverable lines:30
Total lines:108
Line coverage:63.3% (19 of 30)
Covered branches:0
Total branches:0

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity NPath complexity Sequence coverage
RegisterService(...)0%2100%
TransportServiceImpl()0%220100%
BuildClient()0%64050%
HandleMessages()0%15.8912070%
OpenTransportStream(...)0%220100%
>c__DisplayClass19_0/<<OpenTransportStream()0%9.868069.23%
SendMessage(...)0%110100%
Close()0%2100%
Dispose()0%110100%

File(s)

/tmp/workspace/unity-renderer/unity-renderer/Assets/Scripts/MainScripts/DCL/WorldRuntime/KernelCommunication/RPC/Services/TransportService/TransportServiceImpl.cs

#LineLine coverage
 1using Cysharp.Threading.Tasks;
 2using Cysharp.Threading.Tasks.Linq;
 3using Decentraland.Renderer.RendererServices;
 4using Google.Protobuf;
 5using rpc_csharp;
 6using rpc_csharp.protocol;
 7using rpc_csharp.transport;
 8using System;
 9using System.Threading;
 10
 11namespace RPC.Services
 12{
 13    public class AsyncQueueEnumerable<T> : IUniTaskAsyncEnumerable<T> where T: class
 14    {
 15        private readonly ProtocolHelpers.AsyncQueue<T> queue;
 16
 17        public AsyncQueueEnumerable(ProtocolHelpers.AsyncQueue<T> queue)
 18        {
 19            this.queue = queue;
 20        }
 21
 22        public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken
 23        {
 24            return queue;
 25        }
 26    }
 27
 28    public class TransportServiceImpl : ITransportService<RPCContext>, ITransport
 29    {
 30        public event Action OnCloseEvent;
 31        public event Action<string> OnErrorEvent;
 32        public event Action<byte[]> OnMessageEvent;
 33        public event Action OnConnectEvent;
 34
 35        private readonly ProtocolHelpers.AsyncQueue<Payload> queue;
 36        private RpcClient client;
 37        private RpcClientPort port;
 38
 39        public static void RegisterService(RpcServerPort<RPCContext> port)
 40        {
 041            TransportServiceCodeGen.RegisterService(port, new TransportServiceImpl());
 042        }
 43
 144        public TransportServiceImpl()
 45        {
 546            queue = new ProtocolHelpers.AsyncQueue<Payload>((_, __) => { });
 147        }
 48
 49        private async UniTaskVoid BuildClient(RPCContext context)
 50        {
 151            client = new RpcClient(this);
 252            port = await client.CreatePort("renderer-protocol");
 53
 054            context.transport.OnLoadModules?.Invoke(port);
 055        }
 56
 57        private async UniTaskVoid HandleMessages(IUniTaskAsyncEnumerable<Payload> streamRequest, CancellationToken token
 58        {
 1559            await foreach (Payload request in streamRequest)
 60            {
 361                if (token.IsCancellationRequested)
 62                    break;
 63
 364                OnMessageEvent?.Invoke(request.Payload_.ToByteArray());
 65            }
 066        }
 67
 68        public IUniTaskAsyncEnumerable<Payload> OpenTransportStream(IUniTaskAsyncEnumerable<Payload> streamRequest, RPCC
 69        {
 70            // Client builder...
 171            BuildClient(context).Forget();
 72
 173            OnConnectEvent?.Invoke();
 74
 175            return UniTaskAsyncEnumerable.Create<Payload>(async (writer, token) =>
 76            {
 77                // Async call...
 178                HandleMessages(streamRequest, token).Forget();
 79
 480                while (true)
 81                {
 582                    var nextFuture = queue.MoveNextAsync();
 83
 1284                    if (await nextFuture == false || token.IsCancellationRequested)
 85                    {
 086                        queue.Close();
 087                        OnCloseEvent?.Invoke();
 088                        break;
 89                    }
 90
 691                    await writer.YieldAsync(queue.Current);
 92                }
 093            });
 94        }
 95
 96        public void SendMessage(byte[] data)
 97        {
 498            queue.Enqueue(new Payload() { Payload_ = ByteString.CopyFrom(data) });
 499        }
 100
 101        public void Close()
 102        {
 0103            queue.Close();
 0104        }
 105
 1106        public void Dispose() { }
 107    }
 108}