This commit is contained in:
CortexCore
2025-01-24 18:11:03 +08:00
parent 4b72602bfa
commit 41715e4413
12 changed files with 545 additions and 1391 deletions

View File

@@ -37,20 +37,7 @@ namespace BITKit.Net
);
_timer.Elapsed += Tick;
_logger.LogInformation("已创建KCP客户端");
AddCommandListener<NetClientAllocateIdCommand>(x =>
{
Id = x.Id;
});
_isConnected.AddListener(ConnectionCallback);
AddCommandListener<SimplePing>(F);
return;
UniTask<SimplePing> F(SimplePing p)
{
p.EndTime = DateTime.Now;
return UniTask.FromResult(p);
}
}
public event Action OnStartConnect;
@@ -83,8 +70,7 @@ namespace BITKit.Net
private readonly ValidHandle _isConnected = new();
private bool _userConnected;
private int _index = int.MinValue;
private DateTime _lastHeartbeat = DateTime.Now;
private DateTime _now = DateTime.Now;
private TimeSpan _interval = TimeSpan.FromMilliseconds(100);
private string _connectedAddress = "127.0.0.1";
@@ -168,8 +154,9 @@ namespace BITKit.Net
await UniTask.SwitchToThreadPool();
try
{
_lastHeartbeat = DateTime.Now;
_common.LastHeartbeat.GetOrCreate(0);
_common.LastHeartbeat[0] = DateTime.Now;
_client.Connect(address, port);
_timer.Start();
@@ -192,6 +179,8 @@ namespace BITKit.Net
_connectedAddress = address;
_connectedPort = port;
return _client.connected;
}
@@ -220,271 +209,33 @@ namespace BITKit.Net
try
{
Traffic+=new float2(0,bytes.Count);
OnDataInternel(bytes, channel);
OnDataInternal(bytes, channel);
}
catch (Exception e)
{
_logger.LogCritical(e,e.Message);
}
}
private async void OnDataInternel(ArraySegment<byte> bytes, KcpChannel channel)
private void OnDataInternal(ArraySegment<byte> bytes, KcpChannel channel)
{
using var ms = new MemoryStream(bytes.ToArray());
using var reader = new BinaryReader(ms);
var type = (NetCommandType)ms.ReadByte();
switch (type)
try
{
case NetCommandType.Message:
_logger.LogInformation($"已收到消息:{reader.ReadString()}");
break;
case NetCommandType.AllClientCommand:
case NetCommandType.Command:
var command = BITBinary.Read(reader);
if (command is object[] { Length: 1 } objs) command = objs[0];
//BIT4Log.Log<KcpClient>($"已收到指令:{command},值:\n{JsonConvert.SerializeObject(command, Formatting.Indented)}");
try
{
if (BITApp.SynchronizationContext is not null)
await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext,
BITApp.CancellationToken);
_common.Events.Invoke(command.GetType().FullName, command);
}
catch (OperationCanceledException)
{
}
catch (Exception e)
{
_logger.LogCritical(e,e.Message);
}
break;
case NetCommandType.Heartbeat:
if (Id is not -1)
{
_isConnected.AddElement(this);
}
_lastHeartbeat = DateTime.Now;
break;
case NetCommandType.Ping:
Ping = (int)(DateTime.Now - _lastPingTime).TotalMilliseconds;
break;
case NetCommandType.ReturnToClient:
var id = reader.ReadInt32();
try
{
if (_common.P2P.TryRemove(id, out var source))
{
if (reader.ReadBoolean())
{
var value = BITBinary.Read(reader);
source.TrySetResult(value);
}
else
{
var message = reader.ReadString();
source.TrySetException(new Exception(message));
}
}
else
{
_logger.LogWarning($"ID为{id}的请求未注册回调");
}
}
catch (Exception e)
{
_logger.LogWarning($"请求返回失败:{id}");
_logger.LogCritical(e,e.Message);
}
break;
case NetCommandType.GetFromClient:
try
{
var requestId = reader.ReadInt32();
using var returnMS = new MemoryStream();
await using var returnWriter = new BinaryWriter(returnMS);
returnWriter.Write((byte)NetCommandType.ReturnToServer);
returnWriter.Write(requestId);
try
{
object value = null;
if (reader.ReadBoolean())
{
var path = reader.ReadString();
var pars = BITBinary.Read(reader).As<object[]>();
if (_common.RpcMethods.TryGetValue(path, out var methodInfo))
{
var isAwaitable = methodInfo.ReturnType.GetMethod(nameof(Task.GetAwaiter)) != null;
var handle = _common.RpcHandles[path];
try
{
if (isAwaitable)
{
dynamic result = methodInfo.Invoke(handle, pars)!;
value = await result;
}
else
{
value = methodInfo.Invoke(handle, pars);
}
}
catch (Exception)
{
_logger.LogWarning(path);
throw;
}
returnWriter.Write(true);
BITBinary.Write(returnWriter, value);
}
else if (_common.RpcEvents.TryGetValue(path, out var eventInfo))
{
var handle = _common.RpcHandles[path];
var fieldInfo = handle.GetType().GetField(eventInfo.Name,ReflectionHelper.Flags)!;
var eventDelegate = fieldInfo.GetValue(handle) as MulticastDelegate;
foreach (var del in eventDelegate!.GetInvocationList())
{
del.Method.Invoke(del.Target, pars);
}
}
else
{
returnWriter.Write(false);
returnWriter.Write($"未找到对应的Rpc方法:{path}");
}
}
else
{
var commandObj = BITBinary.Read(reader)
.As<object[]>()[0];
var func = _common.Rpc[commandObj.GetType()!.FullName!];
value = await func.As<Func<object, UniTask<object>>>().Invoke(commandObj);
}
returnWriter.Write(true);
BITBinary.Write(returnWriter, value);
}
catch (Exception e)
{
_logger.LogCritical(e,e.Message);
returnWriter.Write(false);
returnWriter.Write(e.Message);
}
var _bytes = returnMS.ToArray();
_commandQueue.Enqueue(_bytes);
}
catch(Exception e)
{
_logger.LogCritical(e,e.Message);
}
break;
case NetCommandType.AllRpc:
case NetCommandType.TargetRpc:
{
var rpcName = reader.ReadString();
var pars = BITBinary.Read(reader).As<object[]>();
if (_common.RpcMethods.TryGetValue(rpcName, out var methodInfo))
{
try
{
methodInfo.Invoke(_common.RpcHandles[rpcName], pars);
}
catch (TargetException targetException)
{
var reportBuilder = new StringBuilder();
reportBuilder.AppendLine("正在检查参数类型");
var parameterInfos = methodInfo.GetParameters();
if (parameterInfos.Length != pars.Length)
{
reportBuilder.AppendLine("参数数量不匹配,期望:" + parameterInfos.Length + ",实际:" + pars.Length);
}
for (var i = 0; i < pars.Length; i++)
{
var parameterInfo = parameterInfos[i];
var parameter = pars[i];
if (parameterInfo.ParameterType != parameter.GetType())
{
reportBuilder.AppendLine($"参数{i}类型不匹配,期望:{parameterInfo.ParameterType},实际:{parameter.GetType()}");
}
else
{
reportBuilder.AppendLine($"参数{parameter.GetType()}类型匹配:{parameterInfo.ParameterType}");
}
}
_logger.LogWarning(reportBuilder.ToString());
throw;
}
catch (Exception e)
{
_logger.LogWarning(rpcName);
throw;
}
}
else if (_common.RpcEvents.TryGetValue(rpcName, out var eventInfo))
{
var handle = _common.RpcHandles[rpcName];
var fieldInfo = handle.GetType().GetField(eventInfo.Name,ReflectionHelper.Flags)!;
var eventDelegate = fieldInfo.GetValue(handle) as MulticastDelegate;
if (eventDelegate is null)
{
//BIT4Log.Warning<KcpNetClient>($"未找到对应的事件:{rpcName}");
}
else
{
foreach (var del in eventDelegate.GetInvocationList())
{
del.Method.Invoke(del.Target, pars);
}
}
}
else
{
_logger.LogWarning($"未找到对应的Rpc方法:{rpcName}");
}
}
break;
default:
_logger.LogWarning($"未知消息类型:{type},字节:{(byte)type}");
if (bytes.Array != null)
_logger.LogInformation(
$"已收到:({Id}, {BitConverter.ToString(bytes.Array, bytes.Offset, bytes.Count)} @ {channel})");
break;
_common.OnDataInternal(0,bytes,channel);
}
catch (Exception e)
{
_logger.LogCritical(e,e.Message);
}
}
private async void OnConnectedInternal()
{
// if (BITApp.SynchronizationContext is not null)
// await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext);
// OnConnected?.Invoke();
// _logger.LogInformation("已连接");
}
private async void OnDisconnectInternal()
{
// _logger.LogInformation("连接被断开");
DisconnectInternal();
}
private void OnError(ErrorCode errorCode, string message)
@@ -492,157 +243,6 @@ namespace BITKit.Net
_logger.LogInformation($"{_client.remoteEndPoint}异常:{errorCode},{message}");
}
public void ServerCommand<T>(T command = default)
{
using var ms = new MemoryStream();
using var writer = new BinaryWriter(ms);
writer.Write((byte)NetCommandType.Command);
BITBinary.Write(writer,command);
var bytes = ms.ToArray();
_commandQueue.Enqueue(bytes);
}
public void AllClientCommand<T>(T command = default)
{
using var ms = new MemoryStream();
using var writer = new BinaryWriter(ms);
writer.Write((byte)NetCommandType.AllClientCommand);
BITBinary.Write(writer,command);
var bytes = ms.ToArray();
}
public void ClientCommand<T>(int id, T command)
{
using var ms = new MemoryStream();
using var writer = new BinaryWriter(ms);
writer.Write((byte)NetCommandType.Command);
BITBinary.Write(writer,command);
var bytes = ms.ToArray();
_commandQueue.Enqueue(bytes);
}
public async UniTask<T> GetFromServer<T>(string path = default, params object[] pars)
{
if (IsConnected is false)
{
throw new NetOfflineException();
}
var id = _index++;
var ms = new MemoryStream();
var writer = new BinaryWriter(ms);
writer.Write((byte)NetCommandType.GetFromServer);
writer.Write(id);
if (string.IsNullOrEmpty(path))
{
writer.Write(false);
}
else
{
writer.Write(true);
writer.Write(path);
}
if (pars.Length is 0)
{
pars = new[] { System.Activator.CreateInstance<T>() as object };
}
BITBinary.Write(writer, pars);
var bytes = ms.ToArray();
await ms.DisposeAsync();
await writer.DisposeAsync();
_commandQueue.Enqueue(bytes);
var source = new UniTaskCompletionSource<object>();
_common.P2P.TryAdd(id, source);
var timeoutCts = new CancellationTokenSource();
timeoutCts.CancelAfter(5000); // 设置超时时间
var value = await source.Task.AttachExternalCancellation(timeoutCts.Token);
if (value is Exception e)
{
throw new InGameException(e.Message);
}
if (UniTask.CompletedTask is T t)
{
return t;
}
return value.As<T>();
}
public UniTask<T> GetFromClient<T>(int id,string path, params object[] pars)
{
throw new NotImplementedException();
}
public void AddRpcHandle(object rpcHandle)
{
_common.AddRpcHandle(rpcHandle);
}
public void AddCommandListener<T>(Action<T> handle)
{
_common. Events.AddListener<T>(handle);
}
public void AddCommandListener<T>(Func<T,UniTask<T>> func)
{
_common. Rpc.TryAdd(typeof(T).FullName, F);
return;
async UniTask<object> F(object o)
{
return await func.Invoke((T)o);
}
}
public void RemoveCommandListener<T>(Func<T,UniTask<T>> func)
{
_common. Rpc.TryRemove(typeof(T).FullName, out _);
}
public void RemoveCommandListener<T>(Action<T> handle)
{
_common. Events.RemoveListener<T>(handle);
}
public void SendRT(string rpcName, params object[] pars)
{
if (IsConnected is false)
{
throw new NetOfflineException();
}
using var ms = new MemoryStream();
using var writer = new BinaryWriter(ms);
writer.Write((byte)NetCommandType.GetFromServer);
writer.Write(++_index);
writer.Write(true);
writer.Write(rpcName);
BITBinary.Write(writer,pars);
var bytes = ms.ToArray();
_commandQueue.Enqueue(bytes);
}
public void SendTargetRT(int id, string rpcName, params object[] pars)
{
throw new NotImplementedException();
}
public void SendAllRT(string rpcName, params object[] pars)
{
throw new NotImplementedException();
}
public void SendServerMessage(string message)
{
@@ -655,11 +255,6 @@ namespace BITKit.Net
_client.Send(bytes, KcpChannel.Reliable);
}
#if UNITY_EDITOR
private readonly IntervalUpdate _pingInterval = new(1);
#endif
public void Tick()
{
_now = DateTime.Now;
@@ -668,7 +263,7 @@ namespace BITKit.Net
{
if (_client.connected)
{
if (DateTime.Now - _lastHeartbeat > TimeSpan.FromSeconds(5))
if (DateTime.Now - _common.LastHeartbeat.GetOrCreate(0) > TimeSpan.FromSeconds(5))
{
_logger.LogWarning("心跳超时,自动断开");
DisconnectInternal();
@@ -713,5 +308,25 @@ namespace BITKit.Net
Traffic+=new float2(2,0);
_client.Send(new byte[]{0x01, 0x02}, KcpChannel.Reliable);
}
public T GetRemoteInterface<T>() => _common.GetRemoteInterface<T>();
public void Invoke(Memory<byte> bytes)
{
throw new NotImplementedException();
}
public UniTask<Memory<byte>> InvokeAsync(Memory<byte> bytes)
{
throw new NotImplementedException();
}
public void Invoke(IReadOnlyList<byte> bytes)
{
throw new NotImplementedException();
}
public UniTask<IReadOnlyList<byte>> InvokeAsync(IReadOnlyList<byte> bytes)
{
throw new NotImplementedException();
}
}
}