using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics.Tracing; using System.Timers; using Cysharp.Threading.Tasks; using kcp2k; using Timer = System.Timers.Timer; using System.Threading.Tasks; using System.IO; using System.Linq; using System.Net; using System.Numerics; using System.Reflection; using System.Text; using BITKit.Net.Examples; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Unity.Mathematics; namespace BITKit.Net { public class KcpNetClient:INetClient,INetProvider { private readonly ILogger _logger; public KcpNetClient(ILogger logger) { _logger = logger; _client = new KcpClient( OnConnectedInternal, OnData, OnDisconnectInternal, OnError, KCPNet.Config ); _timer.Elapsed += Tick; _logger.LogInformation("已创建KCP客户端"); AddCommandListener(x => { Id = x.Id; }); _isConnected.AddListener(ConnectionCallback); } public event Action OnStartConnect; public event Action OnConnected; public event Action OnDisconnected; public event Action OnConnectedFailed; public bool IsConnected => _isConnected; public bool IsConnecting { get; private set; } public double RpcTimeOut { get; set; } = 5; public bool AutoReconnect { get; set; } = true; public float2 Traffic { get; private set; } public bool ManualTick { get; set; } private readonly IntervalUpdate _reconnectInterval = new(1); public int Ping { get; private set; } public int Id { get; private set; } = -1; private readonly KcpClient _client; private readonly ConcurrentQueue _commandQueue = new(); private DateTime _lastPingTime = DateTime.Now; private readonly Timer _timer = new(100) { AutoReset = true }; private readonly GenericEvent _events = new(); private readonly ValidHandle _isConnected = new(); private bool _userConnected; private int _index = int.MinValue; private readonly ConcurrentDictionary _p2p = new(); private readonly ConcurrentDictionary>> _rpc = new(); private readonly ConcurrentDictionary _rpcMethods = new(); private readonly ConcurrentDictionary _rpcEvents = new(); private readonly ConcurrentDictionary _rpcHandles = new(); private DateTime _lastHeartbeat = DateTime.Now; private DateTime _now = DateTime.Now; private TimeSpan _interval = TimeSpan.FromMilliseconds(100); private string _connectedAddress = "127.0.0.1"; private ushort _connectedPort = 27014; private readonly byte[] _heartBeat = new byte[] { (byte)NetCommandType.Heartbeat }; private async void ConnectionCallback(bool x) { await BITApp.SwitchToMainThread(); if (x) { OnConnected?.Invoke(); _logger.LogInformation("连接成功"); } else { OnDisconnected?.Invoke(); _logger.LogInformation("连接已断开"); } } private void Tick(object sender, ElapsedEventArgs e) { if (!ManualTick) Tick(); } public async void Disconnect() { _userConnected = false; DisconnectInternal(); } private async void DisconnectInternal() { IsConnecting = false; _client.Disconnect(); _isConnected.RemoveElement(this); _timer.Stop(); try { await BITApp.SwitchToMainThread(); OnDisconnected?.Invoke(); } catch (OperationCanceledException){} } private string _lastHostName; public async UniTask Connect(string address = "127.0.0.1", ushort port = 27014) { if (IsConnecting) { BIT4Log.Warning("正在连接中"); return false; } _userConnected = true; //如果address是域名,解析为Ip if (address.Contains(".")) { var ip = await Dns.GetHostAddressesAsync(address); if (ip.Length > 0) { address = ip[0].ToString(); if (_lastHostName != address) { _logger.LogInformation($"解析域名:{address},IP:{ip}"); _lastHostName = address; } } } if (address is not "127.0.0.1") _connectedAddress = address; if (port is not 27014) _connectedPort = port; IsConnecting = true; if (_client.connected) return false; await BITApp.SwitchToMainThread(); OnStartConnect?.Invoke(); await UniTask.SwitchToThreadPool(); try { _lastHeartbeat = DateTime.Now; _client.Connect(address, port); _timer.Start(); _interval = TimeSpan.FromMilliseconds(_timer.Interval); HandShake(); await BITApp.SwitchToMainThread(); for (var i = 0; i < 5; i++) { _client.Send(new[] { (byte)NetCommandType.Heartbeat }, KcpChannel.Reliable); Traffic += new float2(1, 0); _client.Tick(); await Task.Delay(100); } if (_client.connected) { SendServerMessage(Environment.MachineName); IsConnecting = false; _connectedAddress = address; _connectedPort = port; return _client.connected; } OnConnectedFailed?.Invoke(); DisconnectInternal(); IsConnecting = false; return false; } catch (Exception e) { BIT4Log.LogException(e); if (BITApp.SynchronizationContext is not null) await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext); OnConnectedFailed?.Invoke(); _timer.Stop(); IsConnecting = false; return false; } } private void OnData(ArraySegment bytes, KcpChannel channel) { try { Traffic+=new float2(0,bytes.Count); OnDataInternel(bytes, channel); } catch (Exception e) { BIT4Log.LogException(e); } } private async void OnDataInternel(ArraySegment bytes, KcpChannel channel) { using var ms = new MemoryStream(bytes.ToArray()); using var reader = new BinaryReader(ms); var type = (NetCommandType)ms.ReadByte(); switch (type) { case NetCommandType.Message: _logger.LogInformation($"已收到消息:{reader.ReadString()}"); break; case NetCommandType.AllClientCommand: case NetCommandType.Command: var command = BITBinary.Read(reader); if (command is object[] { Length: 1 } objs) command = objs[0]; //BIT4Log.Log($"已收到指令:{command},值:\n{JsonConvert.SerializeObject(command, Formatting.Indented)}"); try { if (BITApp.SynchronizationContext is not null) await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext, BITApp.CancellationToken); _events.Invoke(command.GetType().FullName, command); } catch (OperationCanceledException) { } catch (Exception e) { BIT4Log.LogException(e); } break; case NetCommandType.Heartbeat: if (Id is not -1) { _isConnected.AddElement(this); } _lastHeartbeat = DateTime.Now; break; case NetCommandType.Ping: Ping = (int)(DateTime.Now - _lastPingTime).TotalMilliseconds; break; case NetCommandType.ReturnToClient: var id = reader.ReadInt32(); try { if (reader.ReadBoolean()) { var value = BITBinary.Read(reader); _p2p.TryAdd(id,value); } else { var message = reader.ReadString(); _p2p.TryAdd(id,new Exception(message)); } } catch (Exception e) { BIT4Log.Warning($"请求返回失败:{id}"); BIT4Log.LogException(e); } break; case NetCommandType.GetFromClient: try { var requestId = reader.ReadInt32(); using var returnMS = new MemoryStream(); await using var returnWriter = new BinaryWriter(returnMS); returnWriter.Write((byte)NetCommandType.ReturnToServer); returnWriter.Write(requestId); try { object value = null; if (reader.ReadBoolean()) { var path = reader.ReadString(); var pars = BITBinary.Read(reader).As(); if (_rpcMethods.TryGetValue(path, out var methodInfo)) { var isAwaitable = methodInfo.ReturnType.GetMethod(nameof(Task.GetAwaiter)) != null; var handle = _rpcHandles[path]; try { if (isAwaitable) { dynamic result = methodInfo.Invoke(handle, pars)!; value = await result; } else { value = methodInfo.Invoke(handle, pars); } } catch (Exception e) { BIT4Log.Warning(path); throw; } returnWriter.Write(true); BITBinary.Write(returnWriter, value); } else if (_rpcEvents.TryGetValue(path, out var eventInfo)) { var handle = _rpcHandles[path]; var fieldInfo = handle.GetType().GetField(eventInfo.Name,ReflectionHelper.Flags)!; var eventDelegate = fieldInfo.GetValue(handle) as MulticastDelegate; foreach (var del in eventDelegate!.GetInvocationList()) { del.Method.Invoke(del.Target, pars); } } else { returnWriter.Write(false); returnWriter.Write($"未找到对应的Rpc方法:{path}"); } } else { var commandObj = BITBinary.Read(reader) .As()[0]; var func = _rpc[commandObj.GetType()!.FullName!]; value = await func.As>>().Invoke(commandObj); } returnWriter.Write(true); BITBinary.Write(returnWriter, value); } catch (Exception e) { BIT4Log.LogException(e); returnWriter.Write(false); returnWriter.Write(e.Message); } var _bytes = returnMS.ToArray(); _commandQueue.Enqueue(_bytes); } catch(Exception e) { BIT4Log.LogException(e); } break; case NetCommandType.AllRpc: case NetCommandType.TargetRpc: { var rpcName = reader.ReadString(); var pars = BITBinary.Read(reader).As(); if (_rpcMethods.TryGetValue(rpcName, out var methodInfo)) { try { methodInfo.Invoke(_rpcHandles[rpcName], pars); } catch (TargetException targetException) { var reportBuilder = new StringBuilder(); reportBuilder.AppendLine("正在检查参数类型"); var parameterInfos = methodInfo.GetParameters(); if (parameterInfos.Length != pars.Length) { reportBuilder.AppendLine("参数数量不匹配,期望:" + parameterInfos.Length + ",实际:" + pars.Length); } for (var i = 0; i < pars.Length; i++) { var parameterInfo = parameterInfos[i]; var parameter = pars[i]; if (parameterInfo.ParameterType != parameter.GetType()) { reportBuilder.AppendLine($"参数{i}类型不匹配,期望:{parameterInfo.ParameterType},实际:{parameter.GetType()}"); } else { reportBuilder.AppendLine($"参数{parameter.GetType()}类型匹配:{parameterInfo.ParameterType}"); } } BIT4Log.Warning(reportBuilder); throw; } catch (Exception e) { BIT4Log.Warning(rpcName); throw; } } else if (_rpcEvents.TryGetValue(rpcName, out var eventInfo)) { var handle = _rpcHandles[rpcName]; var fieldInfo = handle.GetType().GetField(eventInfo.Name,ReflectionHelper.Flags)!; var eventDelegate = fieldInfo.GetValue(handle) as MulticastDelegate; if (eventDelegate is null) { //BIT4Log.Warning($"未找到对应的事件:{rpcName}"); } else { foreach (var del in eventDelegate.GetInvocationList()) { del.Method.Invoke(del.Target, pars); } } } else { BIT4Log.Warning($"未找到对应的Rpc方法:{rpcName}"); } } break; default: _logger.LogInformation($"未知消息类型:{type},字节:{(byte)type}"); if (bytes.Array != null) _logger.LogInformation( $"已收到:({Id}, {BitConverter.ToString(bytes.Array, bytes.Offset, bytes.Count)} @ {channel})"); break; } } private async void OnConnectedInternal() { // if (BITApp.SynchronizationContext is not null) // await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext); // OnConnected?.Invoke(); // _logger.LogInformation("已连接"); } private async void OnDisconnectInternal() { // _logger.LogInformation("连接被断开"); DisconnectInternal(); } private void OnError(ErrorCode errorCode, string message) { _logger.LogInformation($"{_client.remoteEndPoint}异常:{errorCode},{message}"); } public void ServerCommand(T command = default) { using var ms = new MemoryStream(); using var writer = new BinaryWriter(ms); writer.Write((byte)NetCommandType.Command); BITBinary.Write(writer,command); var bytes = ms.ToArray(); _commandQueue.Enqueue(bytes); } public void AllClientCommand(T command = default) { using var ms = new MemoryStream(); using var writer = new BinaryWriter(ms); writer.Write((byte)NetCommandType.AllClientCommand); BITBinary.Write(writer,command); var bytes = ms.ToArray(); } public void ClientCommand(int id, T command) { using var ms = new MemoryStream(); using var writer = new BinaryWriter(ms); writer.Write((byte)NetCommandType.Command); BITBinary.Write(writer,command); var bytes = ms.ToArray(); _commandQueue.Enqueue(bytes); } public async UniTask GetFromServer(string path = default,params object[] pars) { if (IsConnected is false) { throw new NetOfflineException(); } //await UniTask.SwitchToThreadPool(); var id = _index++; var ms = new MemoryStream(); var writer = new BinaryWriter(ms); writer.Write((byte)NetCommandType.GetFromServer); writer.Write(id); if (string.IsNullOrEmpty(path)) { writer.Write(false); } else { writer.Write(true); writer.Write(path); } BITBinary.Write(writer,pars); var bytes = ms.ToArray(); await ms.DisposeAsync(); await writer.DisposeAsync(); _commandQueue.Enqueue(bytes); var startTime = _now; while (true) { if (IsConnected is false) { throw new NetOfflineException(); } if ((_now - startTime).TotalSeconds > RpcTimeOut) { await BITApp.SwitchToMainThread(); if (string.IsNullOrEmpty(path)) { throw new TimeoutException("请求超时或已断开连接"); } throw new TimeoutException($"请求超时或已断开连接,请求为{path}"); } if (_p2p.TryRemove(id, out var value)) { await BITApp.SwitchToMainThread(); if (value is Exception e) { throw new InGameException(e.Message); } if (UniTask.CompletedTask is T t) { return t; } return value.As(); } await Task.Delay(_interval); } } public UniTask GetFromClient(int id,string path, params object[] pars) { throw new NotImplementedException(); } public void AddRpcHandle(object rpcHandle) { var reportBuilder = new StringBuilder(); reportBuilder.AppendLine($"正在通过反射注册{rpcHandle.GetType().Name}"); foreach (var methodInfo in rpcHandle.GetType().GetMethods()) { var att = methodInfo.GetCustomAttribute(); if (att is null) continue; _rpcMethods.AddOrUpdate(methodInfo.Name, methodInfo, (s, info) => methodInfo); _rpcHandles.AddOrUpdate(methodInfo.Name, rpcHandle, (s, o) => rpcHandle); reportBuilder.AppendLine($"Add [{methodInfo.Name}] as MethodInfo"); } foreach (var eventInfo in rpcHandle.GetType().GetEvents()) { var att = eventInfo.GetCustomAttribute(); if (att is null) continue; _rpcEvents.TryAdd(eventInfo.Name, eventInfo); _rpcHandles.TryAdd(eventInfo.Name, rpcHandle); reportBuilder.AppendLine($"Add [{eventInfo.Name}] as EventInfo"); } _logger.LogInformation(reportBuilder.ToString()); } public void AddCommandListener(Action handle) { _events.AddListener(handle); } public void AddCommandListener(Func> func) { _rpc.TryAdd(typeof(T).FullName, F); return; async UniTask F(object o) { return await func.Invoke((T)o); } } public void RemoveCommandListener(Func> func) { _rpc.TryRemove(typeof(T).FullName, out _); } public void RemoveCommandListener(Action handle) { _events.RemoveListener(handle); } public void SendRT(string rpcName, params object[] pars) { if (IsConnected is false) { throw new NetOfflineException(); } using var ms = new MemoryStream(); using var writer = new BinaryWriter(ms); writer.Write((byte)NetCommandType.GetFromServer); writer.Write(++_index); writer.Write(true); writer.Write(rpcName); BITBinary.Write(writer,pars); var bytes = ms.ToArray(); _commandQueue.Enqueue(bytes); } public void SendTargetRT(int id, string rpcName, params object[] pars) { throw new NotImplementedException(); } public void SendAllRT(string rpcName, params object[] pars) { throw new NotImplementedException(); } public void SendServerMessage(string message) { var bytes = BinaryBuilder .Create() .Write(((byte)NetCommandType.Message)) .Write(message) .Build(); Traffic+=new float2(bytes.Length,0); _client.Send(bytes, KcpChannel.Reliable); } #if UNITY_EDITOR private readonly IntervalUpdate _pingInterval = new(1); #endif public void Tick() { _now = DateTime.Now; if(_userConnected is false)return; try { if (_client.connected) { if (DateTime.Now - _lastHeartbeat > TimeSpan.FromSeconds(5)) { BIT4Log.Warning("心跳超时,自动断开"); DisconnectInternal(); _commandQueue.Clear(); return; } while (_commandQueue.TryDequeue(out var bytes)) { Traffic += new float2(bytes.Length, 0); _client.Send(bytes, KcpChannel.Reliable); } Traffic+=new float2(1,0); _client.Send(_heartBeat, KcpChannel.Unreliable); } else { if (AutoReconnect && _reconnectInterval.AllowUpdate) { if (IsConnecting is false) { Connect(_connectedAddress,_connectedPort).Forget(); } } if (_commandQueue.Count > 0) { _commandQueue.Clear(); //BIT4Log.Warning("连接已断开,清空指令队列"); } } _client.Tick(); } catch (Exception e) { BIT4Log.LogException(e); } } public void HandShake() { // send client to server Traffic+=new float2(2,0); _client.Send(new byte[]{0x01, 0x02}, KcpChannel.Reliable); } } }