Net.Like.Xue.Tokyo/Assets/BITKit/Core/Kcp/KcpNetServer.cs

254 lines
7.7 KiB
C#

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Timers;
using Cysharp.Threading.Tasks;
using kcp2k;
using Newtonsoft.Json;
using Timer = System.Timers.Timer;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Markup;
using BITKit.Net.Examples;
using Microsoft.Extensions.Logging;
namespace BITKit.Net
{
public class KCPNetServer:INetServer,INetProvider
{
private readonly NetProviderCommon _common = new();
private readonly ILogger<KCPNetServer> _logger;
public string Name { get; set; } = "Default";
public uint TickRate { get; set; } = 8;
public bool ManualTick { get; set; }
public event Action<int> OnClientConnected;
public event Action<int> OnClientDisconnected;
public event Action OnStartServer;
public event Action OnStopServer;
private readonly KcpServer server;
private bool _isStarted;
private readonly Timer _timer = new(100)
{
AutoReset = true
};
private int _index = 1001;
private DateTime _now=DateTime.UtcNow;
public KCPNetServer(ILogger<KCPNetServer> logger)
{
_logger = logger;
server = new KcpServer(
OnConnected,
OnData,
OnDisconnect,
OnError,
KCPNet.Config
);
_timer.Elapsed += Tick;
}
private void Tick(object sender, ElapsedEventArgs e)
{
_now = DateTime.UtcNow;
try
{
if (_isStarted && IsRunningServer is false)
{
StartServer(_port);
}
foreach (var id in Connections.Keys.ToArray())
{
server.Send(id,_common.HeartBeat , KcpChannel.Unreliable);
if (!_common.LastHeartbeat.TryGetValue(id, out var time)) continue;
if (!((_now - time).TotalSeconds > 3)) continue;
server.Disconnect(id);
_common.LastHeartbeat.TryRemove(id);
_logger.LogInformation($"{Name}:链接{id}超时,已断开");
}
if (server.IsActive() is false) return;
_common.DropCount.Clear();
while (_common.SendQueue.TryDequeue(out var value))
{
if (server.connections.ContainsKey(value.id))
{
server.Send(value.id, value.bytes, KcpChannel.Reliable);
}
else
{
int UpdateValueFactory(int i, int i1) => i1 + value.bytes.Length;
_common.DropCount.AddOrUpdate(value.id,value.bytes.Length,UpdateValueFactory);
}
}
foreach (var (id,length) in _common.DropCount)
{
_logger.LogInformation($"未找到链接:{id},已丢弃字节数量:{length}");
}
server.Tick();
}
catch (SocketException)
{
//丢失链接,有用户断开连接,通常是正常现象
}
catch (Exception exception)
{
_logger.LogCritical(exception,exception.Message);
}
}
private ushort _port;
public void StartServer(ushort port = 27014)
{
_port = port;
if (IsRunningServer is false)
{
if (TickRate > 0)
{
_timer.Interval = 1000f / TickRate;
}
OnStartServer?.Invoke();
server.Start(port);
if (ManualTick is false)
_timer.Start();
_isStarted = true;
_logger.LogInformation($"已启动KCP服务器:{port}");
}
else
{
BIT4Log.Warning<KCPNetServer>($"KCP服务器已经启动,忽略此次请求");
}
}
public void StopServer(bool dispose = false)
{
if (IsRunningServer)
{
_isStarted = false;
server.Stop();
OnStopServer?.Invoke();
_timer.Stop();
_logger.LogInformation($"已停止KCP服务器");
}
else
{
BIT4Log.Warning<KCPNetServer>($"KCP服务器未运行,忽略此次请求");
}
}
public bool IsRunningServer => server.IsActive();
public void SendMessageToClient(int id, string message)
{
using var ms = new MemoryStream();
using var writer = new BinaryWriter(ms);
writer.Write((byte)NetCommandType.Message);
writer.Write(message);
_common.SendQueue.Enqueue((id,ms.ToArray()));
}
public void SendMessageToAll(string message)
{
foreach (var Id in server.connections.Keys)
{
SendMessageToClient(Id,message);
}
}
public IDictionary<int, EndPoint> Connections =>
new Dictionary<int, EndPoint>(
server.connections.Select(x => new KeyValuePair<int, EndPoint>(x.Key, x.Value.remoteEndPoint)
));
public void Tick()
{
try
{
Tick(null,null);
}
catch (SocketException)
{
_logger.LogInformation("有用户断开连接,如有异常请检查");
}
}
public void HandShake()
{
foreach (var id in server.connections.Keys)
{
server.Send(id, new byte[]{0x03, 0x04}, KcpChannel.Reliable);
}
}
public T GetRemoteInterface<T>() => _common.GetRemoteInterface<T>();
public void Invoke(Memory<byte> bytes)
{
throw new NotImplementedException();
}
public UniTask<Memory<byte>> InvokeAsync(Memory<byte> bytes)
{
throw new NotImplementedException();
}
public void Invoke(IReadOnlyList<byte> bytes)
{
throw new NotImplementedException();
}
public UniTask<IReadOnlyList<byte>> InvokeAsync(IReadOnlyList<byte> bytes)
{
throw new NotImplementedException();
}
private void OnConnectedInternel(int id)
{
OnClientConnected?.Invoke(id);
_logger.LogInformation($"{id}已连接到:{Name}");
SendMessageToClient(id, $"成功连接到服务器:{Name}");
}
private void OnConnected(int Id)
{
}
private void OnDisconnect(int Id)
{
OnClientDisconnected?.Invoke(Id);
_logger.LogInformation($"{Id}已断开");
}
private void OnData(int Id, ArraySegment<byte> bytes, KcpChannel channel)
{
try
{
_common.OnDataInternal(Id, bytes, channel);
}
catch (Exception e)
{
_logger.LogCritical(e, e.Message);
}
}
private void OnError(int Id, ErrorCode errorCode, string message)
{
_logger.LogInformation($"异常:{errorCode},{message}");
}
public void KickClient(int id)
{
server.Disconnect(id);
}
}
}