< Summary

Class:RPC.Services.AsyncQueueEnumerable[T]
Assembly:RPC.Services.TransportService
File(s):/tmp/workspace/unity-renderer/unity-renderer/Assets/Scripts/MainScripts/DCL/WorldRuntime/KernelCommunication/RPC/Services/TransportService/TransportServiceImpl.cs
Covered lines:0
Uncovered lines:4
Coverable lines:4
Total lines:106
Line coverage:0% (0 of 4)
Covered branches:0
Total branches:0

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity NPath complexity Sequence coverage
AsyncQueueEnumerable(...)0%2100%
GetAsyncEnumerator(...)0%2100%

File(s)

/tmp/workspace/unity-renderer/unity-renderer/Assets/Scripts/MainScripts/DCL/WorldRuntime/KernelCommunication/RPC/Services/TransportService/TransportServiceImpl.cs

#LineLine coverage
 1using System;
 2using System.Threading;
 3using System.Threading.Tasks;
 4using Cysharp.Threading.Tasks;
 5using Cysharp.Threading.Tasks.Linq;
 6using DCL;
 7using Google.Protobuf;
 8using rpc_csharp;
 9using rpc_csharp.protocol;
 10using rpc_csharp.transport;
 11using UnityEngine;
 12using Environment = DCL.Environment;
 13
 14namespace RPC.Services
 15{
 16    public class AsyncQueueEnumerable<T> : IUniTaskAsyncEnumerable<T> where T : class
 17    {
 18        private readonly ProtocolHelpers.AsyncQueue<T> queue;
 19
 020        public AsyncQueueEnumerable(ProtocolHelpers.AsyncQueue<T> queue)
 21        {
 022            this.queue = queue;
 023        }
 24
 25        public IUniTaskAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken
 26        {
 027            return queue;
 28        }
 29    }
 30
 31    public class TransportServiceImpl : ITransportService<RPCContext>, ITransport
 32    {
 33        public event Action OnCloseEvent;
 34        public event Action<string> OnErrorEvent;
 35        public event Action<byte[]> OnMessageEvent;
 36        public event Action OnConnectEvent;
 37
 38        private readonly ProtocolHelpers.AsyncQueue<Payload> queue;
 39        private RpcClient client;
 40        private RpcClientPort port;
 41
 42        public static void RegisterService(RpcServerPort<RPCContext> port)
 43        {
 44            TransportServiceCodeGen.RegisterService(port, new TransportServiceImpl());
 45        }
 46
 47        public TransportServiceImpl()
 48        {
 49            queue = new ProtocolHelpers.AsyncQueue<Payload>((_, __) => {});
 50        }
 51
 52        private async UniTaskVoid BuildClient(RPCContext context)
 53        {
 54            client = new RpcClient(this);
 55            port = await client.CreatePort("renderer-protocol");
 56            DCL.RPC.LoadModules(port, Environment.i.serviceLocator.Get<IRPC>());
 57        }
 58
 59        private async UniTaskVoid HandleMessages(IUniTaskAsyncEnumerable<Payload> streamRequest, CancellationToken token
 60        {
 61            await foreach (Payload request in streamRequest)
 62            {
 63                if (token.IsCancellationRequested)
 64                    break;
 65
 66                OnMessageEvent?.Invoke(request.Payload_.ToByteArray());
 67            }
 68        }
 69
 70        public IUniTaskAsyncEnumerable<Payload> OpenTransportStream(IUniTaskAsyncEnumerable<Payload> streamRequest, RPCC
 71        {
 72            // Client builder...
 73            BuildClient(context).Forget();
 74
 75            OnConnectEvent?.Invoke();
 76            return UniTaskAsyncEnumerable.Create<Payload>(async (writer, token) =>
 77            {
 78                // Async call...
 79                HandleMessages(streamRequest, token).Forget();
 80
 81                while (true)
 82                {
 83                    var nextFuture = queue.MoveNextAsync();
 84
 85                    if (await nextFuture == false || token.IsCancellationRequested)
 86                    {
 87                        queue.Close();
 88                        OnCloseEvent?.Invoke();
 89                        break;
 90                    }
 91
 92                    await writer.YieldAsync(queue.Current);
 93                }
 94            });
 95        }
 96
 97        public void SendMessage(byte[] data)
 98        {
 99            queue.Enqueue(new Payload() { Payload_ = ByteString.CopyFrom(data) });
 100        }
 101        public void Close()
 102        {
 103            queue.Close();
 104        }
 105    }
 106}