This commit is contained in:
CortexCore
2024-06-20 10:04:18 +08:00
parent d3fd104900
commit a0d95098b8
7 changed files with 251 additions and 189 deletions

View File

@@ -36,6 +36,8 @@ namespace BITKit.Net
private readonly ConcurrentDictionary<string,Func<object,UniTask<object>>> _rpc = new();
private readonly ConcurrentDictionary<string,MethodInfo> _rpcMethods = new();
private readonly ConcurrentDictionary<string,object> _rpcHandles = new();
private readonly ConcurrentQueue<(int id,byte[] bytes)> _sendQueue = new();
public KCPNetServer()
{
@@ -64,6 +66,19 @@ namespace BITKit.Net
{
try
{
while (_sendQueue.TryDequeue(out var value))
{
if (server.connections.ContainsKey(value.id))
{
server.Send(value.id,value.bytes,KcpChannel.Reliable);
}
else
{
BIT4Log.Log<KCPNetServer>($"链接{value.id}已丢失,丢弃了{value.bytes.Length}个字节");
}
}
if (server.IsActive() is false || ManualTick) return;
server.Tick();
}
@@ -113,7 +128,11 @@ namespace BITKit.Net
public bool IsRunningServer => server.IsActive();
public void SendMessageToClient(int id, string message)
{
Send(id,NetCommandType.Message,message);
using var ms = new MemoryStream();
using var writer = new BinaryWriter(ms);
writer.Write((byte)NetCommandType.Message);
writer.Write(message);
_sendQueue.Enqueue((id,ms.ToArray()));
}
public void SendMessageToAll(string message)
@@ -180,146 +199,156 @@ namespace BITKit.Net
BIT4Log.LogException(e);
}
}
private async void OnDataInternel(int Id, ArraySegment<byte> bytes, KcpChannel channel)
{
using var ms = new MemoryStream(bytes.ToArray());
using var reader = new BinaryReader(ms);
//BIT4Log.Log<INetServer>(Id);
var type = (NetCommandType)ms.ReadByte();
//BIT4Log.Log<INetServer>(type);
switch (type)
try
{
case NetCommandType.Message:
BIT4Log.Log<KCPNetServer>($"已收到消息,{Id}:{reader.ReadString()}");
break;
case NetCommandType.Command:
var command = BITBinary.Read(reader);
if (command is object[] { Length: 1 } objs) command = objs[0];
//BIT4Log.Log<KCPNetServer>($"已收到指令:{command},值:\n{JsonConvert.SerializeObject(command, Formatting.Indented)}");
_events.Invoke(command.GetType().FullName, command);
(int Id,object Command) tuple = (Id,command);
_events.InvokeDirect(command.GetType().FullName,tuple);
break;
case NetCommandType.Heartbeat:
server.Send(Id,new byte[]{(byte)NetCommandType.Heartbeat}, KcpChannel.Reliable);
break;
case NetCommandType.AllClientCommand:
foreach (var id in server.connections.Keys.ToArray())
{
server.Send(id,bytes,channel);
}
break;
case NetCommandType.TargetCommand:
var targetId = reader.ReadInt32();
server.Send(targetId,bytes,channel);
break;
case NetCommandType.Ping:
server.Send(Id,new byte[]{(byte)NetCommandType.Ping},channel);
break;
case NetCommandType.GetFromServer:
var type = (NetCommandType)ms.ReadByte();
switch (type)
{
var requestId = reader.ReadInt32();
case NetCommandType.Message:
BIT4Log.Log<KCPNetServer>($"已收到消息,{Id}:{reader.ReadString()}");
break;
case NetCommandType.Command:
var command = BITBinary.Read(reader);
if (command is object[] { Length: 1 } objs) command = objs[0];
//BIT4Log.Log<KCPNetServer>($"已收到指令:{command},值:\n{JsonConvert.SerializeObject(command, Formatting.Indented)}");
_events.Invoke(command.GetType().FullName, command);
using var returnMS = new MemoryStream();
await using var returnWriter = new BinaryWriter(returnMS);
returnWriter.Write((byte)NetCommandType.ReturnToClient);
returnWriter.Write(requestId);
try
{
if (reader.ReadBoolean())
(int Id, object Command) tuple = (Id, command);
_events.InvokeDirect(command.GetType().FullName, tuple);
break;
case NetCommandType.Heartbeat:
_sendQueue.Enqueue((Id,new byte[] { (byte)NetCommandType.Heartbeat }));
break;
case NetCommandType.AllClientCommand:
foreach (var id in server.connections.Keys.ToArray())
{
var path = reader.ReadString();
var pars = BITBinary.Read(reader).As<object[]>();
if (_rpcMethods.TryGetValue(path, out var methodInfo))
{
var isAwaitable = methodInfo.ReturnType.GetMethod(nameof(Task.GetAwaiter)) != null;
var handle = _rpcHandles[path];
object value = null;
if (methodInfo.GetParameters().Length is 0)
{
pars = new object[]{};
}
if (isAwaitable)
{
dynamic result = methodInfo.Invoke(handle, pars)!;
_sendQueue.Enqueue((id,bytes.ToArray()));
}
break;
case NetCommandType.TargetCommand:
var targetId = reader.ReadInt32();
_sendQueue.Enqueue((targetId,bytes.ToArray()));
break;
case NetCommandType.Ping:
_sendQueue.Enqueue((Id,new byte[] { (byte)NetCommandType.Ping }));
break;
case NetCommandType.GetFromServer:
{
var requestId = reader.ReadInt32();
value = await result;
using var returnMS = new MemoryStream();
await using var returnWriter = new BinaryWriter(returnMS);
returnWriter.Write((byte)NetCommandType.ReturnToClient);
returnWriter.Write(requestId);
try
{
if (reader.ReadBoolean())
{
var path = reader.ReadString();
var pars = BITBinary.Read(reader).As<object[]>();
if (_rpcMethods.TryGetValue(path, out var methodInfo))
{
var isAwaitable = methodInfo.ReturnType.GetMethod(nameof(Task.GetAwaiter)) != null;
var handle = _rpcHandles[path];
object value = null;
if (methodInfo.GetParameters().Length is 0)
{
pars = new object[] { };
}
if (isAwaitable)
{
dynamic result = methodInfo.Invoke(handle, pars)!;
value = await result;
}
else
{
value = methodInfo.Invoke(handle, pars);
}
returnWriter.Write(true);
BITBinary.Write(returnWriter, value);
}
else
{
value = methodInfo.Invoke(handle, pars);
throw new Exception($"未找到对应的Rpc方法:{path}");
}
returnWriter.Write(true);
BITBinary.Write(returnWriter, value);
}
else
{
throw new Exception($"未找到对应的Rpc方法:{path}");
var commandObj = BITBinary.Read(reader)
.As<object[]>()[0];
var funcName = commandObj.GetType()!.FullName!;
if (_rpc.TryGetValue(funcName, out var func))
{
var value = await func.As<Func<object, UniTask<object>>>().Invoke(commandObj);
returnWriter.Write(true);
BITBinary.Write(returnWriter, value);
}
else
{
throw new Exception($"未找到对应的Rpc方法:{funcName}");
}
}
{
var _bytes = returnMS.ToArray();
_sendQueue.Enqueue((Id,_bytes));
}
}
catch (Exception e)
{
returnWriter.Write(false);
returnWriter.Write(e.Message);
var _bytes = returnMS.ToArray();
_sendQueue.Enqueue((Id,_bytes));
BIT4Log.LogException(e);
}
}
break;
case NetCommandType.ReturnToServer:
{
var id = reader.ReadInt32();
if (reader.ReadBoolean())
{
var value = BITBinary.Read(reader);
_p2p.TryAdd(id, value);
}
else
{
var commandObj = BITBinary.Read(reader)
.As<object[]>()[0];
var funcName = commandObj.GetType()!.FullName!;
if (_rpc.TryGetValue(funcName, out var func))
{
var value = await func.As<Func<object, UniTask<object>>>().Invoke(commandObj);
returnWriter.Write(true);
BITBinary.Write(returnWriter, value);
}
else
{
throw new Exception($"未找到对应的Rpc方法:{funcName}");
}
}
{
var _bytes = returnMS.ToArray();
server.Send(Id, _bytes, KcpChannel.Reliable);
var message = reader.ReadString();
_p2p.TryAdd(id, new Exception(message));
}
}
catch (Exception e)
{
returnWriter.Write(false);
returnWriter.Write(e.Message);
var _bytes = returnMS.ToArray();
server.Send(Id, _bytes, KcpChannel.Reliable);
BIT4Log.LogException(e);
}
break;
default:
BIT4Log.Log<KCPNetServer>($"未知消息类型:{type},字节:{(byte)type}");
BIT4Log.Log<KCPNetServer>(
$"已收到:({Id}, {BitConverter.ToString(bytes.Array, bytes.Offset, bytes.Count)} @ {channel})");
break;
}
break;
case NetCommandType.ReturnToServer:
{
var id = reader.ReadInt32();
if (reader.ReadBoolean())
{
var value = BITBinary.Read(reader);
_p2p.TryAdd(id, value);
}
else
{
var message = reader.ReadString();
_p2p.TryAdd(id, new Exception(message));
}
}
break;
default:
BIT4Log.Log<KCPNetServer>($"未知消息类型:{type},字节:{(byte)type}");
BIT4Log.Log<KCPNetServer>($"已收到:({Id}, {BitConverter.ToString(bytes.Array, bytes.Offset, bytes.Count)} @ {channel})");
break;
}
catch (Exception e)
{
ms.Close();
reader.Close();
BIT4Log.LogException(e);
}
}
private void OnError(int Id, ErrorCode errorCode, string message)
{
BIT4Log.Log<KCPNetServer>($"异常:{errorCode},{message}");
@@ -367,7 +396,8 @@ namespace BITKit.Net
BITBinary.Write(writer,pars);
var bytes = ms.ToArray();
server.Send(id,bytes,KcpChannel.Reliable);
//server.Send(id,bytes,KcpChannel.Reliable);
_sendQueue.Enqueue((id,bytes));
var startTime = DateTime.Now;
while (true)
{
@@ -490,10 +520,8 @@ namespace BITKit.Net
.Write((byte)commandType)
.WriteObject(values)
.Build();
if (server.connections.ContainsKey(id))
{
server.Send(id,bytes, KcpChannel.Reliable);
}
//server.Send(id,bytes, KcpChannel.Reliable);
_sendQueue.Enqueue((id,bytes));
}
}