< Summary

Class:RPC.Services.TransportServiceImpl
Assembly:RPC.Services.TransportService
File(s):/tmp/workspace/unity-renderer/unity-renderer/Assets/Scripts/MainScripts/DCL/WorldRuntime/KernelCommunication/RPC/Services/TransportService/TransportServiceImpl.cs
Covered lines:18
Uncovered lines:11
Coverable lines:29
Total lines:106
Line coverage:62% (18 of 29)
Covered branches:0
Total branches:0

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity NPath complexity Sequence coverage
RegisterService(...)0%2100%
TransportServiceImpl()0%220100%
BuildClient()0%4.123050%
HandleMessages()0%15.8912070%
OpenTransportStream(...)0%220100%
>c__DisplayClass19_0/<<OpenTransportStream()0%9.868069.23%
SendMessage(...)0%110100%
Close()0%2100%

File(s)

/tmp/workspace/unity-renderer/unity-renderer/Assets/Scripts/MainScripts/DCL/WorldRuntime/KernelCommunication/RPC/Services/TransportService/TransportServiceImpl.cs

#LineLine coverage
 1using System;
 2using System.Threading;
 3using System.Threading.Tasks;
 4using Cysharp.Threading.Tasks;
 5using Cysharp.Threading.Tasks.Linq;
 6using DCL;
 7using Google.Protobuf;
 8using rpc_csharp;
 9using rpc_csharp.protocol;
 10using rpc_csharp.transport;
 11using UnityEngine;
 12using Environment = DCL.Environment;
 13
 14namespace RPC.Services
 15{
 16    public class AsyncQueueEnumerable<T> : IUniTaskAsyncEnumerable<T> where T : class
 17    {
 18        private readonly ProtocolHelpers.AsyncQueue<T> queue;
 19
 20        public AsyncQueueEnumerable(ProtocolHelpers.AsyncQueue<T> queue)
 21        {
 22            this.queue = queue;
 23        }
 24
 25        public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken
 26        {
 27            return queue;
 28        }
 29    }
 30
 31    public class TransportServiceImpl : ITransportService<RPCContext>, ITransport
 32    {
 33        public event Action OnCloseEvent;
 34        public event Action<string> OnErrorEvent;
 35        public event Action<byte[]> OnMessageEvent;
 36        public event Action OnConnectEvent;
 37
 38        private readonly ProtocolHelpers.AsyncQueue<Payload> queue;
 39        private RpcClient client;
 40        private RpcClientPort port;
 41
 42        public static void RegisterService(RpcServerPort<RPCContext> port)
 43        {
 044            TransportServiceCodeGen.RegisterService(port, new TransportServiceImpl());
 045        }
 46
 147        public TransportServiceImpl()
 48        {
 549            queue = new ProtocolHelpers.AsyncQueue<Payload>((_, __) => {});
 150        }
 51
 52        private async UniTaskVoid BuildClient(RPCContext context)
 53        {
 154            client = new RpcClient(this);
 255            port = await client.CreatePort("renderer-protocol");
 056            DCL.RPC.LoadModules(port, Environment.i.serviceLocator.Get<IRPC>());
 057        }
 58
 59        private async UniTaskVoid HandleMessages(IUniTaskAsyncEnumerable<Payload> streamRequest, CancellationToken token
 60        {
 1561            await foreach (Payload request in streamRequest)
 62            {
 363                if (token.IsCancellationRequested)
 64                    break;
 65
 366                OnMessageEvent?.Invoke(request.Payload_.ToByteArray());
 67            }
 068        }
 69
 70        public IUniTaskAsyncEnumerable<Payload> OpenTransportStream(IUniTaskAsyncEnumerable<Payload> streamRequest, RPCC
 71        {
 72            // Client builder...
 173            BuildClient(context).Forget();
 74
 175            OnConnectEvent?.Invoke();
 176            return UniTaskAsyncEnumerable.Create<Payload>(async (writer, token) =>
 77            {
 78                // Async call...
 179                HandleMessages(streamRequest, token).Forget();
 80
 481                while (true)
 82                {
 583                    var nextFuture = queue.MoveNextAsync();
 84
 1285                    if (await nextFuture == false || token.IsCancellationRequested)
 86                    {
 087                        queue.Close();
 088                        OnCloseEvent?.Invoke();
 089                        break;
 90                    }
 91
 692                    await writer.YieldAsync(queue.Current);
 93                }
 094            });
 95        }
 96
 97        public void SendMessage(byte[] data)
 98        {
 499            queue.Enqueue(new Payload() { Payload_ = ByteString.CopyFrom(data) });
 4100        }
 101        public void Close()
 102        {
 0103            queue.Close();
 0104        }
 105    }
 106}