< Summary

Class:RPC.Services.CRDTServiceImpl
Assembly:RPC.Services.CRDTService
File(s):/tmp/workspace/unity-renderer/unity-renderer/Assets/Scripts/MainScripts/DCL/WorldRuntime/KernelCommunication/RPC/Services/CRDTService/CRDTService.cs
Covered lines:33
Uncovered lines:6
Coverable lines:39
Total lines:109
Line coverage:84.6% (33 of 39)
Covered branches:0
Total branches:0

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity NPath complexity Sequence coverage
CRDTServiceImpl()0%110100%
RegisterService(...)0%110100%
OnCRDTReceived(...)0%5.045088.24%
SendCRDT(...)0%2.052076.92%
CrdtNotificationStream()0%6200%

File(s)

/tmp/workspace/unity-renderer/unity-renderer/Assets/Scripts/MainScripts/DCL/WorldRuntime/KernelCommunication/RPC/Services/CRDTService/CRDTService.cs

#LineLine coverage
 1using System;
 2using System.Collections.Generic;
 3using System.IO;
 4using System.Threading;
 5using Cysharp.Threading.Tasks;
 6using DCL;
 7using DCL.CRDT;
 8using Google.Protobuf;
 9using KernelCommunication;
 10using rpc_csharp;
 11using UnityEngine;
 12using BinaryWriter = KernelCommunication.BinaryWriter;
 13
 14namespace RPC.Services
 15{
 16    public static class CRDTServiceImpl
 17    {
 118        private static readonly UniTask<CRDTResponse> defaultResponse = UniTask.FromResult(new CRDTResponse());
 119        private static readonly UniTask<CRDTManyMessages> emptyResponse = UniTask.FromResult(new CRDTManyMessages() { Sc
 20
 121        private static readonly CRDTManyMessages reusableCrdtMessage = new CRDTManyMessages();
 22
 123        private static readonly CRDTStream crdtStream = new CRDTStream();
 124        private static readonly MemoryStream memoryStream = new MemoryStream();
 125        private static readonly BinaryWriter binaryWriter = new BinaryWriter(memoryStream);
 26
 27        public static void RegisterService(RpcServerPort<RPCContext> port)
 28        {
 229            CRDTService<RPCContext>.RegisterService(
 30                port,
 31                sendCrdt: OnCRDTReceived,
 32                pullCrdt: SendCRDT,
 33                crdtNotificationStream: CrdtNotificationStream
 34            );
 235        }
 36
 37        private static UniTask<CRDTResponse> OnCRDTReceived(CRDTManyMessages messages, RPCContext context, CancellationT
 38        {
 139            messages.Payload.WriteTo(crdtStream);
 40
 141            var sceneMessagesPool = context.crdtContext.messageQueueHandler.sceneMessagesPool;
 42
 43            try
 44            {
 145                using (var iterator = KernelBinaryMessageDeserializer.Deserialize(crdtStream))
 46                {
 247                    while (iterator.MoveNext())
 48                    {
 149                        if (!(iterator.Current is CRDTMessage crdtMessage))
 50                            continue;
 51
 152                        if (!sceneMessagesPool.TryDequeue(out QueuedSceneMessage_Scene queuedMessage))
 53                        {
 154                            queuedMessage = new QueuedSceneMessage_Scene();
 55                        }
 156                        queuedMessage.method = MessagingTypes.CRDT_MESSAGE;
 157                        queuedMessage.type = QueuedSceneMessage.Type.SCENE_MESSAGE;
 158                        queuedMessage.sceneId = messages.SceneId;
 159                        queuedMessage.payload = crdtMessage;
 60
 161                        context.crdtContext.messageQueueHandler.EnqueueSceneMessage(queuedMessage);
 62                    }
 163                }
 164            }
 65            catch (Exception e)
 66            {
 067                Debug.LogError(e);
 068            }
 69
 170            return defaultResponse;
 71        }
 72
 73        private static UniTask<CRDTManyMessages> SendCRDT(PullCRDTRequest request, RPCContext context, CancellationToken
 74        {
 275            string sceneId = request.SceneId;
 76
 77            try
 78            {
 279                if (!context.crdtContext.scenesOutgoingCrdts.TryGetValue(sceneId, out CRDTProtocol sceneCrdtState))
 80                {
 081                    return emptyResponse;
 82                }
 83
 284                memoryStream.SetLength(0);
 85
 286                context.crdtContext.scenesOutgoingCrdts.Remove(sceneId);
 87
 288                KernelBinaryMessageSerializer.Serialize(binaryWriter, sceneCrdtState);
 289                sceneCrdtState.ClearOnUpdated();
 90
 291                reusableCrdtMessage.SceneId = sceneId;
 292                reusableCrdtMessage.Payload = ByteString.CopyFrom(memoryStream.ToArray());
 93
 294                return UniTask.FromResult(reusableCrdtMessage);
 95            }
 96            catch (Exception e)
 97            {
 098                Debug.LogError(e);
 099                return emptyResponse;
 100            }
 2101        }
 102
 103        [Obsolete("deprecated")]
 104        private static IEnumerator<CRDTManyMessages> CrdtNotificationStream(CRDTStreamRequest request, RPCContext contex
 105        {
 0106            yield break;
 107        }
 108    }
 109}