using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Timers; using Cysharp.Threading.Tasks; using kcp2k; using Newtonsoft.Json; using Timer = System.Timers.Timer; using System.IO; using System.Linq; using System.Net; using System.Net.Sockets; using System.Reflection; using System.Threading.Tasks; using System.Windows.Markup; using BITKit.Net.Examples; namespace BITKit.Net { public class KCPNetServer:INetServer,INetProvider { public string Name { get; set; } = "Default"; public int TickRate { get; set; } = 16; public bool ManualTick { get; set; } public event Action OnClientConnected; public event Action OnClientDisconnected; public event Action OnStartServer; public event Action OnStopServer; private readonly KcpServer server; private readonly GenericEvent _events = new(); private bool _isStarted; private readonly Timer _timer = new(100) { AutoReset = true }; private int _index = 1001; private readonly ConcurrentDictionary _p2p = new(); private readonly ConcurrentDictionary>> _rpc = new(); private readonly ConcurrentDictionary _rpcMethods = new(); private readonly ConcurrentDictionary _rpcEvents = new(); private readonly ConcurrentDictionary _rpcHandles = new(); private readonly ConcurrentDictionary _lastHeartbeat = new(); private readonly ConcurrentQueue<(int id,byte[] bytes)> _sendQueue = new(); private readonly ConcurrentDictionary _dropCount = new(); private DateTime _now=DateTime.Now; private TimeSpan _interval=TimeSpan.FromSeconds(0.32); private readonly byte[] _heartBeat = new byte[] { (byte)NetCommandType.Heartbeat }; [NetRpc] public event Action OnNetRpcTest; public KCPNetServer() { server = new KcpServer( OnConnected, OnData, OnDisconnect, OnError, KCPNet.Config ); _timer.Elapsed += Tick; BIT4Log.Log("已创建KCP服务器"); AddCommandListener(F); AddRpcHandle(this); OnNetRpcTest += (_int, _float, _bool) => { BIT4Log.Log($"已收到Rpc测试:{_int},{_float},{_bool}"); }; return; UniTask F(SimplePing p) { p.EndTime = DateTime.Now; return UniTask.FromResult(p); } } private void Tick(object sender, ElapsedEventArgs e) { _now = DateTime.UtcNow; try { if (_isStarted && IsRunningServer is false) { StartServer(_port); } foreach (var id in Connections.Keys.ToArray()) { server.Send(id,_heartBeat , KcpChannel.Unreliable); if (!_lastHeartbeat.TryGetValue(id, out var time)) continue; if (!((_now - time).TotalSeconds > 3)) continue; server.Disconnect(id); _lastHeartbeat.TryRemove(id); BIT4Log.Log($"{Name}:链接{id}超时,已断开"); } if (server.IsActive() is false) return; server.Tick(); //BIT4Log.Log($"{Name}目前有{server.connections.Count}个链接"); _dropCount.Clear(); while (_sendQueue.TryDequeue(out var value)) { if (server.connections.ContainsKey(value.id)) { server.Send(value.id, value.bytes, KcpChannel.Reliable); } else { int UpdateValueFactory(int i, int i1) => i1 + value.bytes.Length; _dropCount.AddOrUpdate(value.id,value.bytes.Length,UpdateValueFactory); } } foreach (var (id,length) in _dropCount) { BIT4Log.Log($"未找到链接:{id},已丢弃字节数量:{length}"); } } catch (SocketException) { //丢失链接,有用户断开连接,通常是正常现象 } catch (Exception exception) { BIT4Log.LogException(exception); } } private ushort _port; public void StartServer(ushort port = 27014) { _port = port; if (IsRunningServer is false) { if (TickRate > 0) { _timer.Interval = 1000f / TickRate; _interval = TimeSpan.FromSeconds(1.0 / TickRate); } OnStartServer?.Invoke(); server.Start(port); if (ManualTick is false) _timer.Start(); _isStarted = true; BIT4Log.Log($"已启动KCP服务器:{port}"); } else { BIT4Log.Warning($"KCP服务器已经启动,忽略此次请求"); } } public void StopServer(bool dispose = false) { if (IsRunningServer) { _isStarted = false; server.Stop(); OnStopServer?.Invoke(); _timer.Stop(); BIT4Log.Log($"已停止KCP服务器"); } else { BIT4Log.Warning($"KCP服务器未运行,忽略此次请求"); } } public bool IsRunningServer => server.IsActive(); public void SendMessageToClient(int id, string message) { using var ms = new MemoryStream(); using var writer = new BinaryWriter(ms); writer.Write((byte)NetCommandType.Message); writer.Write(message); _sendQueue.Enqueue((id,ms.ToArray())); } public void SendMessageToAll(string message) { foreach (var Id in server.connections.Keys) { SendMessageToClient(Id,message); } } public IDictionary Connections => new Dictionary( server.connections.Select(x => new KeyValuePair(x.Key, x.Value.remoteEndPoint) )); public void Tick() { try { Tick(null,null); } catch (SocketException) { BIT4Log.Log("有用户断开连接,如有异常请检查"); } } public void HandShake() { foreach (var id in server.connections.Keys) { server.Send(id, new byte[]{0x03, 0x04}, KcpChannel.Reliable); } } private void OnConnectedInternel(int id) { OnClientConnected?.Invoke(id); ClientCommand(id,new NetClientAllocateIdCommand { Id = id, Ip = server.connections[id].remoteEndPoint.ToString() }); BIT4Log.Log($"{id}已连接到:{Name}"); SendMessageToClient(id, $"成功连接到服务器:{Name}"); } private void OnConnected(int Id) { // OnClientConnected?.Invoke(Id); // ClientCommand(Id,new NetClientAllocateIdCommand // { // Id = Id, // Ip = server.connections[Id].remoteEndPoint.ToString() // }); // BIT4Log.Log($"{Id}已连接到:{Name}"); // SendMessageToClient(Id, $"成功连接到服务器:{Name}"); } private void OnDisconnect(int Id) { OnClientDisconnected?.Invoke(Id); BIT4Log.Log($"{Id}已断开"); } private void OnData(int Id, ArraySegment bytes, KcpChannel channel) { try { OnDataInternel(Id, bytes, channel); } catch (Exception e) { BIT4Log.LogException(e); } } private async void OnDataInternel(int Id, ArraySegment bytes, KcpChannel channel) { using var ms = new MemoryStream(bytes.ToArray()); using var reader = new BinaryReader(ms); try { var type = (NetCommandType)ms.ReadByte(); switch (type) { case NetCommandType.Message: BIT4Log.Log($"已收到消息,{Id}:{reader.ReadString()}"); break; case NetCommandType.Command: var command = BITBinary.Read(reader); if (command is object[] { Length: 1 } objs) command = objs[0]; //BIT4Log.Log($"已收到指令:{command},值:\n{JsonConvert.SerializeObject(command, Formatting.Indented)}"); _events.Invoke(command.GetType().FullName, command); (int Id, object Command) tuple = (Id, command); _events.InvokeDirect(command.GetType().FullName, tuple); break; case NetCommandType.Heartbeat: if (Connections.ContainsKey(Id)) { _lastHeartbeat.AddOrUpdate(Id,OnAdd,OnUpdate); } break; DateTime OnAdd(int arg) { OnConnectedInternel(Id); return _now; } DateTime OnUpdate(int arg1, DateTime arg2) { return _now; } case NetCommandType.AllClientCommand: foreach (var id in server.connections.Keys.ToArray()) { _sendQueue.Enqueue((id,bytes.ToArray())); } break; case NetCommandType.TargetCommand: var targetId = reader.ReadInt32(); _sendQueue.Enqueue((targetId,bytes.ToArray())); break; case NetCommandType.Ping: _sendQueue.Enqueue((Id,new byte[] { (byte)NetCommandType.Ping })); break; case NetCommandType.GetFromServer: { var requestId = reader.ReadInt32(); using var returnMS = new MemoryStream(); await using var returnWriter = new BinaryWriter(returnMS); returnWriter.Write((byte)NetCommandType.ReturnToClient); returnWriter.Write(requestId); if (reader.ReadBoolean()) { var path = reader.ReadString(); var pars = BITBinary.Read(reader).As(); object value = null; if (_rpcMethods.TryGetValue(path, out var methodInfo)) { var isAwaitable = methodInfo.ReturnType.GetMethod(nameof(Task.GetAwaiter)) != null; var handle = _rpcHandles[path]; if (methodInfo.GetParameters().Length is 0) { pars = new object[] { }; } try { if (isAwaitable) { dynamic result = methodInfo.Invoke(handle, pars)!; if (methodInfo.ReturnType == typeof(void) || methodInfo.ReturnType == typeof(UniTask) || methodInfo.ReturnType == typeof(UniTask<>) ) { await result; value = -1; } else { value = await result; } } else { value = methodInfo.Invoke(handle, pars); } } catch (Exception e) { if(e is TargetInvocationException tie) e = tie.InnerException; returnWriter.Write(false); returnWriter.Write(e.Message); var _bytes = returnMS.ToArray(); _sendQueue.Enqueue((Id,_bytes)); if (e is InGameException inGameException) { BIT4Log.Warning(inGameException.Message); return; } BIT4Log.LogException(e); return; } returnWriter.Write(true); }else if (_rpcEvents.TryGetValue(path, out var eventInfo)) { var handle = _rpcHandles[path]; var fieldInfo = handle.GetType().GetField(eventInfo.Name,ReflectionHelper.Flags)!; var eventDelegate = fieldInfo.GetValue(handle) as MulticastDelegate; foreach (var del in eventDelegate!.GetInvocationList()) { del.Method.Invoke(del.Target, pars); } } else { throw new Exception($"未找到对应的Rpc方法:{path}"); } if (value is not null) { BITBinary.Write(returnWriter, value); } } else { var commandObj = BITBinary.Read(reader) .As()[0]; var funcName = commandObj.GetType()!.FullName!; if (_rpc.TryGetValue(funcName, out var func)) { var value = await func.As>>().Invoke(commandObj); returnWriter.Write(true); BITBinary.Write(returnWriter, value); } else { throw new Exception($"未找到对应的Rpc方法:{funcName}"); } } { var _bytes = returnMS.ToArray(); _sendQueue.Enqueue((Id,_bytes)); } } break; case NetCommandType.ReturnToServer: { var id = reader.ReadInt32(); if (reader.ReadBoolean()) { var value = BITBinary.Read(reader); _p2p.TryAdd(id, value); } else { var message = reader.ReadString(); _p2p.TryAdd(id, new Exception(message)); } } break; default: BIT4Log.Log($"未知消息类型:{type},字节:{(byte)type}"); BIT4Log.Log( $"已收到:({Id}, {BitConverter.ToString(bytes.Array, bytes.Offset, bytes.Count)} @ {channel})"); break; } } catch (Exception e) { ms.Close(); reader.Close(); BIT4Log.LogException(e); } } private void OnError(int Id, ErrorCode errorCode, string message) { BIT4Log.Log($"异常:{errorCode},{message}"); } public void ServerCommand(T command = default) { _events.Invoke(command); } public void AllClientCommand(T command = default) { foreach (var id in server.connections.Keys.ToArray()) { ClientCommand(id, command); } } public void ClientCommand(int id, T command) { using var ms = new MemoryStream(); using var writer = new BinaryWriter(ms); writer.Write((byte)NetCommandType.Command); BITBinary.Write(writer,command); _sendQueue.Enqueue((id,ms.ToArray())); } public UniTask GetFromServer(string path=default,params object[] pars) { throw new NotImplementedException(); } public async UniTask GetFromClient(int id,string path=default, params object[] pars) { await UniTask.SwitchToThreadPool(); var index = _index++; var ms = new MemoryStream(); var writer = new BinaryWriter(ms); writer.Write((byte)NetCommandType.GetFromClient); writer.Write(index); if (string.IsNullOrEmpty(path)) { writer.Write(false); } else { writer.Write(true); writer.Write(path); } BITBinary.Write(writer,pars); var bytes = ms.ToArray(); await ms.DisposeAsync(); await writer.DisposeAsync(); _sendQueue.Enqueue((id,bytes)); var startTime = _now; while (true) { var time = _now - startTime; if(time.TotalSeconds>5) throw new TimeoutException($"等待超时,Id:{id},时间{time.TotalSeconds}"); if (_p2p.TryRemove(index, out var value)) { //await BITApp.SwitchToMainThread(); if (value is Exception e) { throw e; } if (UniTask.CompletedTask is T t) { return t; } if (typeof(T) == typeof(UniTaskVoid)) { return default; } return (T)value; } await Task.Delay(_interval); } } public void AddRpcHandle(object rpcHandle) { foreach (var methodInfo in rpcHandle.GetType().GetMethods()) { var att = methodInfo.GetCustomAttribute(true); if(att is null)continue; _rpcMethods.TryAdd(methodInfo.Name, methodInfo); _rpcHandles.TryAdd(methodInfo.Name, rpcHandle); } foreach (var eventInfo in rpcHandle.GetType().GetEvents()) { var att = eventInfo.GetCustomAttribute(true); if(att is null)continue; _rpcEvents.TryAdd(eventInfo.Name, eventInfo); _rpcHandles.TryAdd(eventInfo.Name, rpcHandle); } } [NetRpc] public string MyRpcTest(string hello) { return "Hello World"; } [NetRpc] public async UniTask MyRpcTestAsync(string hello) { await Task.Delay(1000); return $"{hello} World"; } public void AddCommandListener(Action handle) { _events.AddListener(handle); } public void AddCommandListener(Func> func) { _rpc.TryAdd(typeof(T).FullName, F); return; async UniTask F(object o) { return await func.Invoke((T)o); } } public void RemoveCommandListener(Func> func) { _rpc.TryRemove(typeof(T).FullName, out _); } public void AddCommandListenerWithId(Action handle) { _events.AddListenerDirect(typeof(T).FullName, Callback); return; void Callback(object value) { if (value is ValueTuple tuple && tuple.Item2 is T) { handle.Invoke(tuple.Item1, (T)tuple.Item2); } else { Console.WriteLine(value); } } } public void KickClient(int id) { server.Disconnect(id); } public void RemoveCommandListener(Action handle) { _events.RemoveListener(handle); } public void RemoveCommandListener(Func func) { throw new NotImplementedException(); } public void SendRT(string rpcName, params object[] pars) { throw new NotImplementedException("服务端不支持此方法"); } public void SendTargetRT(int id, string rpcName, params object[] pars) { using var ms = new MemoryStream(); using var writer = new BinaryWriter(ms); writer.Write((byte)NetCommandType.TargetRpc); writer.Write(rpcName); BITBinary.Write(writer,pars); var bytes = ms.ToArray(); _sendQueue.Enqueue((id,bytes)); } public void SendAllRT(string rpcName, params object[] pars) { using var ms = new MemoryStream(); using var writer = new BinaryWriter(ms); writer.Write((byte)NetCommandType.AllRpc); writer.Write(rpcName); BITBinary.Write(writer,pars); var bytes = ms.ToArray(); foreach (var id in server.connections.Keys) { _sendQueue.Enqueue((id,bytes)); } } } }