< Summary

Class:RPC.Services.CRDTServiceImpl
Assembly:RPC.Services.CRDTService
File(s):/tmp/workspace/unity-renderer/unity-renderer/Assets/Scripts/MainScripts/DCL/WorldRuntime/KernelCommunication/RPC/Services/CRDTService/CRDTServiceImpl.cs
Covered lines:29
Uncovered lines:7
Coverable lines:36
Total lines:112
Line coverage:80.5% (29 of 36)
Covered branches:0
Total branches:0

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity NPath complexity Sequence coverage
CRDTServiceImpl()0%110100%
RegisterService(...)0%2100%
SendCrdt()0%16.3515081.82%
PullCrdt(...)0%2.052076.92%

File(s)

/tmp/workspace/unity-renderer/unity-renderer/Assets/Scripts/MainScripts/DCL/WorldRuntime/KernelCommunication/RPC/Services/CRDTService/CRDTServiceImpl.cs

#LineLine coverage
 1using Cysharp.Threading.Tasks;
 2using DCL;
 3using DCL.Controllers;
 4using DCL.CRDT;
 5using DCL.Models;
 6using Decentraland.Renderer.RendererServices;
 7using Google.Protobuf;
 8using KernelCommunication;
 9using rpc_csharp;
 10using System;
 11using System.IO;
 12using System.Threading;
 13using UnityEngine;
 14using BinaryWriter = KernelCommunication.BinaryWriter;
 15
 16namespace RPC.Services
 17{
 18    public class CRDTServiceImpl : ICRDTService<RPCContext>
 19    {
 120        private static readonly CRDTResponse defaultResponse = new CRDTResponse();
 121        private static readonly UniTask<CRDTManyMessages> emptyResponse = UniTask.FromResult(new CRDTManyMessages() { Sc
 22
 123        private static readonly CRDTManyMessages reusableCrdtMessage = new CRDTManyMessages();
 24
 125        private static readonly MemoryStream memoryStream = new MemoryStream();
 126        private static readonly BinaryWriter binaryWriter = new BinaryWriter(memoryStream);
 27
 28        public static void RegisterService(RpcServerPort<RPCContext> port)
 29        {
 030            CRDTServiceCodeGen.RegisterService(port, new CRDTServiceImpl());
 031        }
 32
 33        public async UniTask<CRDTResponse> SendCrdt(CRDTManyMessages messages, RPCContext context, CancellationToken ct)
 34        {
 35            // This line is to avoid a race condition because a CRDT message could be sent before the scene was loaded
 36            // more info: https://github.com/decentraland/sdk/issues/480#issuecomment-1331309908
 2037            await UniTask.WaitUntil(() => context.crdt.MessagingControllersManager.ContainsController(messages.SceneNumb
 38                cancellationToken: ct);
 39
 2040            await UniTask.WaitWhile(() => context.crdt.MessagingControllersManager.HasScenePendingMessages(messages.Scen
 41                cancellationToken: ct);
 42
 543            await UniTask.SwitchToMainThread(ct);
 44
 45            try
 46            {
 547                using (var iterator = CRDTDeserializer.DeserializeBatch(messages.Payload.Memory))
 48                {
 1049                    while (iterator.MoveNext())
 50                    {
 551                        if (!(iterator.Current is CRDTMessage crdtMessage))
 52                            continue;
 53
 554                        context.crdt.CrdtMessageReceived?.Invoke(messages.SceneNumber, crdtMessage);
 55                    }
 556                }
 57
 558                if (context.crdt.WorldState.TryGetScene(messages.SceneNumber, out IParcelScene scene))
 59                {
 60                    // When sdk7 scene receive it first crdt we set `InitMessagesDone` since
 61                    // kernel won't be sending that message for those scenes
 262                    if (scene.sceneData.sdk7 && !scene.IsInitMessageDone())
 63                    {
 164                        context.crdt.SceneController.EnqueueSceneMessage(new QueuedSceneMessage_Scene()
 65                        {
 66                            sceneNumber = messages.SceneNumber,
 67                            tag = "scene",
 68                            payload = new Protocol.SceneReady(),
 69                            method = MessagingTypes.INIT_DONE,
 70                            type = QueuedSceneMessage.Type.SCENE_MESSAGE
 71                        });
 72                    }
 73                }
 574            }
 75            catch (Exception e)
 76            {
 077                Debug.LogError(e);
 078            }
 79
 580            return defaultResponse;
 581        }
 82
 83        public UniTask<CRDTManyMessages> PullCrdt(PullCRDTRequest request, RPCContext context, CancellationToken ct)
 84        {
 85            try
 86            {
 287                if (!context.crdt.scenesOutgoingCrdts.TryGetValue(request.SceneNumber, out CRDTProtocol sceneCrdtState))
 88                {
 089                    return emptyResponse;
 90                }
 91
 292                memoryStream.SetLength(0);
 93
 294                context.crdt.scenesOutgoingCrdts.Remove(request.SceneNumber);
 95
 296                KernelBinaryMessageSerializer.Serialize(binaryWriter, sceneCrdtState);
 297                sceneCrdtState.ClearOnUpdated();
 98
 299                reusableCrdtMessage.SceneId = request.SceneId;
 2100                reusableCrdtMessage.SceneNumber = request.SceneNumber;
 2101                reusableCrdtMessage.Payload = ByteString.CopyFrom(memoryStream.ToArray());
 102
 2103                return UniTask.FromResult(reusableCrdtMessage);
 104            }
 105            catch (Exception e)
 106            {
 0107                Debug.LogError(e);
 0108                return emptyResponse;
 109            }
 2110        }
 111    }
 112}