using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Timers; using Cysharp.Threading.Tasks; using kcp2k; using Newtonsoft.Json; using Timer = System.Timers.Timer; using System.IO; using System.Linq; using System.Net; using System.Net.Sockets; using System.Reflection; using System.Threading; using System.Threading.Tasks; using System.Windows.Markup; using BITKit.Net.Examples; using Microsoft.Extensions.Logging; namespace BITKit.Net { public class KCPNetServer:INetServer,INetProvider { private readonly NetProviderCommon _common = new(); private readonly ILogger _logger; public string Name { get; set; } = "Default"; public uint TickRate { get; set; } = 8; public bool ManualTick { get; set; } public event Action OnClientConnected; public event Action OnClientDisconnected; public event Action OnStartServer; public event Action OnStopServer; private readonly KcpServer server; private bool _isStarted; private readonly Timer _timer = new(100) { AutoReset = true }; private int _index = 1001; private DateTime _now=DateTime.UtcNow; [NetRpc] public event Action OnNetRpcTest; public KCPNetServer(ILogger logger) { _logger = logger; server = new KcpServer( OnConnected, OnData, OnDisconnect, OnError, KCPNet.Config ); _timer.Elapsed += Tick; //BIT4Log.Log("已创建KCP服务器"); AddCommandListener(F); AddRpcHandle(this); OnNetRpcTest += (_int, _float, _bool) => { _logger.LogInformation($"已收到Rpc测试:{_int},{_float},{_bool}"); }; return; UniTask F(SimplePing p) { p.EndTime = DateTime.Now; return UniTask.FromResult(p); } } private void Tick(object sender, ElapsedEventArgs e) { _now = DateTime.UtcNow; try { if (_isStarted && IsRunningServer is false) { StartServer(_port); } foreach (var id in Connections.Keys.ToArray()) { server.Send(id,_common.HeartBeat , KcpChannel.Unreliable); if (!_common.LastHeartbeat.TryGetValue(id, out var time)) continue; if (!((_now - time).TotalSeconds > 3)) continue; server.Disconnect(id); _common.LastHeartbeat.TryRemove(id); _logger.LogInformation($"{Name}:链接{id}超时,已断开"); } if (server.IsActive() is false) return; _common.DropCount.Clear(); while (_common.SendQueue.TryDequeue(out var value)) { if (server.connections.ContainsKey(value.id)) { server.Send(value.id, value.bytes, KcpChannel.Reliable); } else { int UpdateValueFactory(int i, int i1) => i1 + value.bytes.Length; _common.DropCount.AddOrUpdate(value.id,value.bytes.Length,UpdateValueFactory); } } foreach (var (id,length) in _common.DropCount) { _logger.LogInformation($"未找到链接:{id},已丢弃字节数量:{length}"); } server.Tick(); } catch (SocketException) { //丢失链接,有用户断开连接,通常是正常现象 } catch (Exception exception) { _logger.LogCritical(exception,exception.Message); } } private ushort _port; public void StartServer(ushort port = 27014) { _port = port; if (IsRunningServer is false) { if (TickRate > 0) { _timer.Interval = 1000f / TickRate; } OnStartServer?.Invoke(); server.Start(port); if (ManualTick is false) _timer.Start(); _isStarted = true; _logger.LogInformation($"已启动KCP服务器:{port}"); } else { BIT4Log.Warning($"KCP服务器已经启动,忽略此次请求"); } } public void StopServer(bool dispose = false) { if (IsRunningServer) { _isStarted = false; server.Stop(); OnStopServer?.Invoke(); _timer.Stop(); _logger.LogInformation($"已停止KCP服务器"); } else { BIT4Log.Warning($"KCP服务器未运行,忽略此次请求"); } } public bool IsRunningServer => server.IsActive(); public void SendMessageToClient(int id, string message) { using var ms = new MemoryStream(); using var writer = new BinaryWriter(ms); writer.Write((byte)NetCommandType.Message); writer.Write(message); _common.SendQueue.Enqueue((id,ms.ToArray())); } public void SendMessageToAll(string message) { foreach (var Id in server.connections.Keys) { SendMessageToClient(Id,message); } } public IDictionary Connections => new Dictionary( server.connections.Select(x => new KeyValuePair(x.Key, x.Value.remoteEndPoint) )); public void Tick() { try { Tick(null,null); } catch (SocketException) { _logger.LogInformation("有用户断开连接,如有异常请检查"); } } public void HandShake() { foreach (var id in server.connections.Keys) { server.Send(id, new byte[]{0x03, 0x04}, KcpChannel.Reliable); } } private void OnConnectedInternel(int id) { //server.connections[id].peer.kcp.SetInterval(TickRate); OnClientConnected?.Invoke(id); ClientCommand(id,new NetClientAllocateIdCommand { Id = id, Ip = server.connections[id].remoteEndPoint.ToString() }); _logger.LogInformation($"{id}已连接到:{Name}"); SendMessageToClient(id, $"成功连接到服务器:{Name}"); } private void OnConnected(int Id) { } private void OnDisconnect(int Id) { OnClientDisconnected?.Invoke(Id); _logger.LogInformation($"{Id}已断开"); } private void OnData(int Id, ArraySegment bytes, KcpChannel channel) { try { OnDataInternel(Id, bytes, channel); } catch (Exception e) { BIT4Log.LogException(e); } } private async void OnDataInternel(int Id, ArraySegment bytes, KcpChannel channel) { using var ms = new MemoryStream(bytes.ToArray()); using var reader = new BinaryReader(ms); try { var type = (NetCommandType)ms.ReadByte(); switch (type) { case NetCommandType.Message: _logger.LogInformation($"已收到消息,{Id}:{reader.ReadString()}"); break; case NetCommandType.Command: var command = BITBinary.Read(reader); if (command is object[] { Length: 1 } objs) command = objs[0]; //BIT4Log.Log($"已收到指令:{command},值:\n{JsonConvert.SerializeObject(command, Formatting.Indented)}"); _common.Events.Invoke(command.GetType().FullName, command); (int Id, object Command) tuple = (Id, command); _common.Events.InvokeDirect(command.GetType().FullName, tuple); break; case NetCommandType.Heartbeat: if (Connections.ContainsKey(Id)) { if (_common.LastHeartbeat.ContainsKey(Id)) { _common.LastHeartbeat[Id] = _now; } else { _common.LastHeartbeat.TryAdd(Id, _now); OnConnectedInternel(Id); } } break; case NetCommandType.AllClientCommand: foreach (var id in server.connections.Keys.ToArray()) { _common.SendQueue.Enqueue((id,bytes.ToArray())); } break; case NetCommandType.TargetCommand: var targetId = reader.ReadInt32(); _common.SendQueue.Enqueue((targetId,bytes.ToArray())); break; case NetCommandType.Ping: _common.SendQueue.Enqueue((Id,new byte[] { (byte)NetCommandType.Ping })); break; case NetCommandType.GetFromServer: { var requestId = reader.ReadInt32(); using var returnMS = new MemoryStream(); await using var returnWriter = new BinaryWriter(returnMS); returnWriter.Write((byte)NetCommandType.ReturnToClient); returnWriter.Write(requestId); if (reader.ReadBoolean()) { var path = reader.ReadString(); var pars = BITBinary.Read(reader).As(); object value = null; if (_common.RpcMethods.TryGetValue(path, out var methodInfo)) { var isAwaitable = methodInfo.ReturnType.GetMethod(nameof(Task.GetAwaiter)) != null; var handle = _common.RpcHandles[path]; if (methodInfo.GetParameters().Length is 0) { pars = new object[] { }; } try { if (isAwaitable) { dynamic result = methodInfo.Invoke(handle, pars)!; if (methodInfo.ReturnType == typeof(void) || methodInfo.ReturnType == typeof(UniTask) || methodInfo.ReturnType == typeof(UniTask<>) ) { await result; value = -1; } else { value = await result; } } else { value = methodInfo.Invoke(handle, pars); } } catch (Exception e) { if(e is TargetInvocationException tie) e = tie.InnerException; returnWriter.Write(false); returnWriter.Write(e.Message); var _bytes = returnMS.ToArray(); _common.SendQueue.Enqueue((Id,_bytes)); if (e is InGameException inGameException) { BIT4Log.Warning(inGameException.Message); return; } BIT4Log.LogException(e); return; } returnWriter.Write(true); }else if (_common.RpcEvents.TryGetValue(path, out var eventInfo)) { var handle = _common.RpcHandles[path]; var fieldInfo = handle.GetType().GetField(eventInfo.Name,ReflectionHelper.Flags)!; var eventDelegate = fieldInfo.GetValue(handle) as MulticastDelegate; foreach (var del in eventDelegate!.GetInvocationList()) { del.Method.Invoke(del.Target, pars); } } else { throw new Exception($"未找到对应的Rpc方法:{path}"); } if (value is not null) { BITBinary.Write(returnWriter, value); } } else { var commandObjs = BITBinary.Read(reader); var commandObj = commandObjs.As()[0]; var funcName = commandObj.GetType()!.FullName!; if (_common.Rpc.TryGetValue(funcName, out var func)) { var value = await func.As>>().Invoke(commandObj); returnWriter.Write(true); BITBinary.Write(returnWriter, value); } else { throw new Exception($"未找到对应的Rpc方法:{funcName}"); } } { _common.SendQueue.Enqueue((Id,returnMS.ToArray())); } } break; case NetCommandType.ReturnToServer: { var id = reader.ReadInt32(); if (_common.P2P.TryRemove(id, out var source)) { if (reader.ReadBoolean()) { var value = BITBinary.Read(reader); source.TrySetResult(value); } else { var message = reader.ReadString(); source.TrySetException(new Exception(message)); } } else { _logger.LogWarning($"ID为{id}的请求未注册回调"); } } break; default: _logger.LogInformation($"未知消息类型:{type},字节:{(byte)type}"); _logger.LogInformation( $"已收到:({Id}, {BitConverter.ToString(bytes.Array, bytes.Offset, bytes.Count)} @ {channel})"); break; } } catch (Exception e) { ms.Close(); reader.Close(); BIT4Log.LogException(e); } } private void OnError(int Id, ErrorCode errorCode, string message) { _logger.LogInformation($"异常:{errorCode},{message}"); } public void ServerCommand(T command = default) { _common. Events.Invoke(command); } public void AllClientCommand(T command = default) { foreach (var id in server.connections.Keys.ToArray()) { ClientCommand(id, command); } } public void ClientCommand(int id, T command) { using var ms = new MemoryStream(); using var writer = new BinaryWriter(ms); writer.Write((byte)NetCommandType.Command); BITBinary.Write(writer,command); _common.SendQueue.Enqueue((id,ms.ToArray())); } public UniTask GetFromServer(string path=default,params object[] pars) { throw new NotImplementedException(); } public async UniTask GetFromClient(int id,string path=default, params object[] pars) { await UniTask.SwitchToThreadPool(); var index = _index++; var ms = new MemoryStream(); var writer = new BinaryWriter(ms); writer.Write((byte)NetCommandType.GetFromClient); writer.Write(index); if (string.IsNullOrEmpty(path)) { writer.Write(false); } else { writer.Write(true); writer.Write(path); } BITBinary.Write(writer,pars); var bytes = ms.ToArray(); await ms.DisposeAsync(); await writer.DisposeAsync(); _common.SendQueue.Enqueue((id,bytes)); var source = new UniTaskCompletionSource(); _common.P2P.TryAdd(index, source); var timeoutCts = new CancellationTokenSource(); timeoutCts.CancelAfter(5000); // 设置超时时间 var value =await source.Task.AttachExternalCancellation(timeoutCts.Token); //await BITApp.SwitchToMainThread(); if (value is Exception e) { throw e; } if (UniTask.CompletedTask is T t) { return t; } if (typeof(T) == typeof(UniTaskVoid)) { return default; } return (T)value; } public void AddRpcHandle(object rpcHandle) { foreach (var methodInfo in rpcHandle.GetType().GetMethods()) { var att = methodInfo.GetCustomAttribute(true); if(att is null)continue; _common.RpcMethods.TryAdd(methodInfo.Name, methodInfo); _common.RpcHandles.TryAdd(methodInfo.Name, rpcHandle); } foreach (var eventInfo in rpcHandle.GetType().GetEvents()) { var att = eventInfo.GetCustomAttribute(true); if(att is null)continue; _common. RpcEvents.TryAdd(eventInfo.Name, eventInfo); _common. RpcHandles.TryAdd(eventInfo.Name, rpcHandle); } } [NetRpc] public string MyRpcTest(string hello) { return "Hello World"; } [NetRpc] public async UniTask MyRpcTestAsync(string hello) { await Task.Delay(1000); return $"{hello} World"; } public void AddCommandListener(Action handle) { _common.Events.AddListener(handle); } public void AddCommandListener(Func> func) { _common.Rpc.TryAdd(typeof(T).FullName, F); return; async UniTask F(object o) { return await func.Invoke((T)o); } } public void RemoveCommandListener(Func> func) { _common. Rpc.TryRemove(typeof(T).FullName, out _); } public void AddCommandListenerWithId(Action handle) { _common.Events.AddListenerDirect(typeof(T).FullName, Callback); return; void Callback(object value) { if (value is ValueTuple tuple && tuple.Item2 is T) { handle.Invoke(tuple.Item1, (T)tuple.Item2); } else { Console.WriteLine(value); } } } public void KickClient(int id) { server.Disconnect(id); } public void RemoveCommandListener(Action handle) { _common. Events.RemoveListener(handle); } public void RemoveCommandListener(Func func) { throw new NotImplementedException(); } public void SendRT(string rpcName, params object[] pars) { throw new NotImplementedException("服务端不支持此方法"); } public void SendTargetRT(int id, string rpcName, params object[] pars) { using var ms = new MemoryStream(); using var writer = new BinaryWriter(ms); writer.Write((byte)NetCommandType.TargetRpc); writer.Write(rpcName); BITBinary.Write(writer,pars); var bytes = ms.ToArray(); _common.SendQueue.Enqueue((id,bytes)); } public void SendAllRT(string rpcName, params object[] pars) { using var ms = new MemoryStream(); using var writer = new BinaryWriter(ms); writer.Write((byte)NetCommandType.AllRpc); writer.Write(rpcName); BITBinary.Write(writer,pars); var bytes = ms.ToArray(); foreach (var id in server.connections.Keys) { _common.SendQueue.Enqueue((id,bytes)); } } } }