using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Timers; using Cysharp.Threading.Tasks; using kcp2k; using Timer = System.Timers.Timer; using System.Threading.Tasks; using System.IO; using System.Numerics; using BITKit.Net.Examples; using Newtonsoft.Json; namespace BITKit.Net { public class KcpNetClient:INetClient,INetProvider { public event Action OnStartConnect; public event Action OnConnected; public event Action OnDisconnected; public event Action OnConnectedFailed; public bool IsConnected => client.connected; public bool ManualTick { get; set; } public int Ping { get; private set; } public int Id { get; private set; } = -1; private readonly KcpClient client; private readonly Queue commandQueue = new(); private DateTime _lastPingTime = DateTime.Now; private readonly Timer _timer = new(100) { AutoReset = true }; private readonly GenericEvent _events = new(); private readonly ValidHandle _isConnected = new(); private int _index = int.MinValue; private readonly ConcurrentDictionary _p2p = new(); private readonly ConcurrentDictionary>> _rpc = new(); public KcpNetClient() { client = new KcpClient( OnConnectedInternal, OnData, OnDisconnectInternal, OnError, KCPNet.Config ); _timer.Elapsed += Tick; BIT4Log.Log("已创建KCP客户端"); AddCommandListener(x => { Id = x.Id; }); _isConnected.AddListener(ConnectionCallback); } private async void ConnectionCallback(bool x) { if (x is false) return; await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext); OnConnected?.Invoke(); } private void Tick(object sender, ElapsedEventArgs e) { if (!ManualTick) Tick(); } public async void Disconnect() { client.Disconnect(); try { await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext,BITApp.CancellationToken); OnDisconnected?.Invoke(); } catch (OperationCanceledException){} } public async UniTask Connect(string address = "127.0.0.1", ushort port = 27014) { if (BITApp.SynchronizationContext is not null) await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext, BITApp.CancellationToken); OnStartConnect?.Invoke(); await UniTask.SwitchToThreadPool(); try { client.Connect(address, port); for (var i = 0; i < 5; i++) { client.Tick(); await Task.Delay(100); } _timer.Start(); client.Send(new[] { (byte)NetCommandType.Heartbeat }, KcpChannel.Reliable); if (BITApp.SynchronizationContext is not null) await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext); if (client.connected) { SendServerMessage(Environment.MachineName); } for (var i = 0; i < 5; i++) { client.Tick(); await Task.Delay(100); } return client.connected; } catch (Exception e) { BIT4Log.LogException(e); if (BITApp.SynchronizationContext is not null) await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext); OnConnectedFailed?.Invoke(); return false; } } private async void OnData(ArraySegment bytes, KcpChannel channel) { using var ms = new MemoryStream(bytes.ToArray()); using var reader = new BinaryReader(ms); var type = (NetCommandType)ms.ReadByte(); switch (type) { case NetCommandType.Message: reader.ReadBoolean(); reader.ReadString(); BIT4Log.Log($"已收到消息:{reader.ReadString()}"); break; case NetCommandType.AllClientCommand: case NetCommandType.Command: var command = BITBinary.Read(reader); if (command is object[] { Length: 1 } objs) command = objs[0]; //BIT4Log.Log($"已收到指令:{command},值:\n{JsonConvert.SerializeObject(command, Formatting.Indented)}"); try { if (BITApp.SynchronizationContext is not null) await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext, BITApp.CancellationToken); _events.Invoke(command.GetType().FullName, command); } catch (OperationCanceledException) { } catch (Exception e) { BIT4Log.LogException(e); } break; case NetCommandType.Heartbeat: client.Send(new[] { (byte)NetCommandType.Heartbeat }, KcpChannel.Reliable); _isConnected.AddElement(this); break; case NetCommandType.Ping: Ping = (int)(DateTime.Now - _lastPingTime).TotalMilliseconds; break; case NetCommandType.ReturnToClient: var id = reader.ReadInt32(); try { if (reader.ReadBoolean()) { var value = BITBinary.Read(reader); _p2p.TryAdd(id,value); } else { var message = reader.ReadString(); _p2p.TryAdd(id,new Exception(message)); } } catch (Exception e) { BIT4Log.Warning($"请求返回失败:{id}"); BIT4Log.LogException(e); } break; case NetCommandType.GetFromClient: try { var requestId = reader.ReadInt32(); var commandObj = BITBinary.Read(reader); if (_rpc.TryGetValue(commandObj.GetType().FullName, out var func) is false) { throw new NotImplementedException($"未找到对应的方法:{commandObj.GetType().FullName}"); } using var _ms = new MemoryStream(); using var _writer = new BinaryWriter(_ms); _writer.Write((byte)NetCommandType.ReturnToServer); _writer.Write(requestId); try { var value = await func.As>>().Invoke(commandObj); _writer.Write(true); BITBinary.Write(_writer, value); var _bytes = _ms.ToArray(); commandQueue.Enqueue(_bytes); } catch (Exception e) { BIT4Log.LogException(e); _writer.Write(false); _writer.Write(e.Message); } } catch(Exception e) { BIT4Log.LogException(e); } break; default: BIT4Log.Log($"未知消息类型:{type},字节:{(byte)type}"); if (bytes.Array != null) BIT4Log.Log( $"已收到:({Id}, {BitConverter.ToString(bytes.Array, bytes.Offset, bytes.Count)} @ {channel})"); break; } } private async void OnConnectedInternal() { if (BITApp.SynchronizationContext is not null) await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext); OnConnected?.Invoke(); BIT4Log.Log("已连接"); } private async void OnDisconnectInternal() { BIT4Log.Log("断开连接"); _timer.Stop(); try { if (BITApp.SynchronizationContext is not null) await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext, BITApp.CancellationToken); _isConnected.RemoveElement(this); OnDisconnected?.Invoke(); } catch (OperationCanceledException) { } } private void OnError(ErrorCode errorCode, string message) { BIT4Log.Log($"{client.remoteEndPoint}异常:{errorCode},{message}"); } public void ServerCommand(T command = default) { Send(NetCommandType.Command,command); } public void AllClientCommand(T command = default) { Send(NetCommandType.AllClientCommand,command); } public void ClientCommand(int id, T command) { Send(NetCommandType.TargetCommand,id,command); } public async UniTask GetFromServer(T command = default) { var id = _index++; using var ms = new MemoryStream(); await using var writer = new BinaryWriter(ms); writer.Write((byte)NetCommandType.GetFromServer); writer.Write(id); BITBinary.Write(writer,command); var bytes = ms.ToArray(); commandQueue.Enqueue(bytes); var startTime = DateTime.Now; while (true) { if(DateTime.Now-startTime>TimeSpan.FromSeconds(5) || IsConnected is false) throw new TimeoutException(); if (_p2p.TryRemove(id, out var value)) { if (value is Exception e) { throw e; } return (T)value; } await Task.Delay(100); } } public UniTask GetFromClient(int id, T command = default) { throw new NotImplementedException(); } public void AddRpcHandle(object rpcHandle) { throw new NotImplementedException(); } public void AddCommandListener(Action handle) { _events.AddListener(handle); } public void AddCommandListener(Func> func) { _rpc.TryAdd(typeof(T).FullName, F); return; async UniTask F(object o) { return await func.Invoke((T)o); } } public void RemoveCommandListener(Func> func) { _rpc.TryRemove(typeof(T).FullName, out _); } public void RemoveCommandListener(Action handle) { _events.RemoveListener(handle); } public void SendRT(string rpcName, params object[] pars) { throw new NotImplementedException(); } public void SendTargetRT(int id, string rpcName, params object[] pars) { throw new NotImplementedException(); } public void SendAllRT(string rpcName, params object[] pars) { throw new NotImplementedException(); } public void SendServerMessage(string message) { var bytes = BinaryBuilder .Create() .Write(((byte)NetCommandType.Message)) .Write(message) .Build(); client.Send(bytes, KcpChannel.Reliable); } #if UNITY_EDITOR private readonly IntervalUpdate _pingInterval = new(1); #endif public void Tick() { while (commandQueue.TryDequeue(out var bytes)) { client.Send(bytes, KcpChannel.Reliable); } #if UNITY_EDITOR if (_pingInterval.AllowUpdate) { _lastPingTime = DateTime.Now; client.Send(new[] { (byte)NetCommandType.Ping }, KcpChannel.Reliable); } #endif client.Tick(); } public void HandShake() { // send client to server client.Send(new byte[]{0x01, 0x02}, KcpChannel.Reliable); } private void Send(NetCommandType commandType,params object[] values) { var bytes = BinaryBuilder .Create() .Write((byte)commandType) .WriteObject(values) .Build(); commandQueue.Enqueue(bytes); //client.Send(bytes, KcpChannel.Reliable); } } }