< Summary

Class:RPC.Services.CRDTServiceImpl
Assembly:RPC.Services.CRDTService
File(s):/tmp/workspace/unity-renderer/unity-renderer/Assets/Scripts/MainScripts/DCL/WorldRuntime/KernelCommunication/RPC/Services/CRDTService/CRDTServiceImpl.cs
Covered lines:30
Uncovered lines:7
Coverable lines:37
Total lines:114
Line coverage:81% (30 of 37)
Covered branches:0
Total branches:0
Covered methods:3
Total methods:4
Method coverage:75% (3 of 4)

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity NPath complexity Sequence coverage
CRDTServiceImpl()0%110100%
RegisterService(...)0%2100%
SendCrdt()0%16.3515081.82%
PullCrdt(...)0%4.114081.25%

File(s)

/tmp/workspace/unity-renderer/unity-renderer/Assets/Scripts/MainScripts/DCL/WorldRuntime/KernelCommunication/RPC/Services/CRDTService/CRDTServiceImpl.cs

#LineLine coverage
 1using Cysharp.Threading.Tasks;
 2using DCL;
 3using DCL.Controllers;
 4using DCL.CRDT;
 5using DCL.Models;
 6using Decentraland.Renderer.RendererServices;
 7using Google.Protobuf;
 8using rpc_csharp;
 9using System;
 10using System.IO;
 11using System.Threading;
 12using UnityEngine;
 13using BinaryWriter = KernelCommunication.BinaryWriter;
 14
 15namespace RPC.Services
 16{
 17    public class CRDTServiceImpl : ICRDTService<RPCContext>
 18    {
 119        private static readonly CRDTResponse defaultResponse = new CRDTResponse();
 120        private static readonly UniTask<CRDTManyMessages> emptyResponse = UniTask.FromResult(new CRDTManyMessages() { Sc
 21
 122        private static readonly CRDTManyMessages reusableCrdtMessage = new CRDTManyMessages();
 23
 124        private static readonly MemoryStream memoryStream = new MemoryStream();
 125        private static readonly BinaryWriter binaryWriter = new BinaryWriter(memoryStream);
 26
 27        public static void RegisterService(RpcServerPort<RPCContext> port)
 28        {
 029            CRDTServiceCodeGen.RegisterService(port, new CRDTServiceImpl());
 030        }
 31
 32        public async UniTask<CRDTResponse> SendCrdt(CRDTManyMessages messages, RPCContext context, CancellationToken ct)
 33        {
 34            // This line is to avoid a race condition because a CRDT message could be sent before the scene was loaded
 35            // more info: https://github.com/decentraland/sdk/issues/480#issuecomment-1331309908
 1236            await UniTask.WaitUntil(() => context.crdt.MessagingControllersManager.ContainsController(messages.SceneNumb
 37                cancellationToken: ct);
 38
 1239            await UniTask.WaitWhile(() => context.crdt.MessagingControllersManager.HasScenePendingMessages(messages.Scen
 40                cancellationToken: ct);
 41
 342            await UniTask.SwitchToMainThread(ct);
 43
 44            try
 45            {
 346                using (var iterator = CRDTDeserializer.DeserializeBatch(messages.Payload.Memory))
 47                {
 648                    while (iterator.MoveNext())
 49                    {
 350                        if (!(iterator.Current is CrdtMessage crdtMessage))
 51                            continue;
 52
 353                        context.crdt.CrdtMessageReceived?.Invoke(messages.SceneNumber, crdtMessage);
 54                    }
 355                }
 56
 357                if (context.crdt.WorldState.TryGetScene(messages.SceneNumber, out IParcelScene scene))
 58                {
 59                    // When sdk7 scene receive it first crdt we set `InitMessagesDone` since
 60                    // kernel won't be sending that message for those scenes
 261                    if (scene.sceneData.sdk7 && !scene.IsInitMessageDone())
 62                    {
 163                        context.crdt.SceneController.EnqueueSceneMessage(new QueuedSceneMessage_Scene()
 64                        {
 65                            sceneNumber = messages.SceneNumber,
 66                            tag = "scene",
 67                            payload = new Protocol.SceneReady(),
 68                            method = MessagingTypes.INIT_DONE,
 69                            type = QueuedSceneMessage.Type.SCENE_MESSAGE
 70                        });
 71                    }
 72                }
 373            }
 74            catch (Exception e)
 75            {
 076                Debug.LogError(e);
 077            }
 78
 379            return defaultResponse;
 380        }
 81
 82        public UniTask<CRDTManyMessages> PullCrdt(PullCRDTRequest request, RPCContext context, CancellationToken ct)
 83        {
 84            try
 85            {
 286                if (!context.crdt.scenesOutgoingCrdts.TryGetValue(request.SceneNumber, out DualKeyValueSet<int, long, Cr
 87                {
 088                    return emptyResponse;
 89                }
 90
 291                memoryStream.SetLength(0);
 92
 293                context.crdt.scenesOutgoingCrdts.Remove(request.SceneNumber);
 94
 895                foreach (var msg in sceneCrdtOutgoingMessage)
 96                {
 297                    CRDTSerializer.Serialize(binaryWriter, msg.value);
 98                }
 299                sceneCrdtOutgoingMessage.Clear();
 100
 2101                reusableCrdtMessage.SceneId = request.SceneId;
 2102                reusableCrdtMessage.SceneNumber = request.SceneNumber;
 2103                reusableCrdtMessage.Payload = ByteString.CopyFrom(memoryStream.ToArray());
 104
 2105                return UniTask.FromResult(reusableCrdtMessage);
 106            }
 107            catch (Exception e)
 108            {
 0109                Debug.LogError(e);
 0110                return emptyResponse;
 111            }
 2112        }
 113    }
 114}