603 lines
22 KiB
C#
603 lines
22 KiB
C#
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System.Timers;
|
|
using Cysharp.Threading.Tasks;
|
|
using kcp2k;
|
|
using Timer = System.Timers.Timer;
|
|
using System.Threading.Tasks;
|
|
using System.IO;
|
|
using System.Numerics;
|
|
using System.Reflection;
|
|
using System.Text;
|
|
using BITKit.Net.Examples;
|
|
using Newtonsoft.Json;
|
|
using Unity.Mathematics;
|
|
|
|
namespace BITKit.Net
|
|
{
|
|
public class KcpNetClient:INetClient,INetProvider
|
|
{
|
|
public event Action OnStartConnect;
|
|
public event Action OnConnected;
|
|
public event Action OnDisconnected;
|
|
public event Action OnConnectedFailed;
|
|
public bool IsConnected => _isConnected;
|
|
public bool IsConnecting { get; private set; }
|
|
public float2 Traffic { get; set; }
|
|
public bool ManualTick { get; set; }
|
|
|
|
public int Ping { get; private set; }
|
|
public int Id { get; private set; } = -1;
|
|
private readonly KcpClient client;
|
|
|
|
private readonly ConcurrentQueue<byte[]> _commandQueue = new();
|
|
|
|
private DateTime _lastPingTime = DateTime.Now;
|
|
|
|
private readonly Timer _timer = new(100)
|
|
{
|
|
AutoReset = true
|
|
};
|
|
|
|
private readonly GenericEvent _events = new();
|
|
|
|
private readonly ValidHandle _isConnected = new();
|
|
|
|
private int _index = int.MinValue;
|
|
private readonly ConcurrentDictionary<int, object> _p2p = new();
|
|
private readonly ConcurrentDictionary<string,Func<object,UniTask<object>>> _rpc = new();
|
|
private readonly ConcurrentDictionary<string,MethodInfo> _rpcMethods = new();
|
|
private readonly ConcurrentDictionary<string,object> _rpcHandles = new();
|
|
private DateTime _lastHeartbeat = DateTime.Now;
|
|
private DateTime _now = DateTime.Now;
|
|
private TimeSpan _interval = TimeSpan.FromMilliseconds(100);
|
|
|
|
private readonly byte[] _heartBeat = new byte[] { (byte)NetCommandType.Heartbeat };
|
|
public KcpNetClient()
|
|
{
|
|
client = new KcpClient(
|
|
OnConnectedInternal,
|
|
OnData,
|
|
OnDisconnectInternal,
|
|
OnError,
|
|
KCPNet.Config
|
|
);
|
|
_timer.Elapsed += Tick;
|
|
BIT4Log.Log<KcpNetClient>("已创建KCP客户端");
|
|
AddCommandListener<NetClientAllocateIdCommand>(x =>
|
|
{
|
|
Id = x.Id;
|
|
});
|
|
_isConnected.AddListener(ConnectionCallback);
|
|
}
|
|
|
|
private async void ConnectionCallback(bool x)
|
|
{
|
|
await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext);
|
|
if (x)
|
|
{
|
|
OnConnected?.Invoke();
|
|
BIT4Log.Log<KcpNetClient>("连接成功");
|
|
}
|
|
else
|
|
{
|
|
OnDisconnected?.Invoke();
|
|
BIT4Log.Log<KcpNetClient>("连接已断开");
|
|
}
|
|
}
|
|
|
|
private void Tick(object sender, ElapsedEventArgs e)
|
|
{
|
|
if (!ManualTick)
|
|
Tick();
|
|
}
|
|
|
|
public async void Disconnect()
|
|
{
|
|
client.Disconnect();
|
|
_isConnected.RemoveElement(this);
|
|
_timer.Stop();
|
|
try
|
|
{
|
|
await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext,BITApp.CancellationToken);
|
|
OnDisconnected?.Invoke();
|
|
}
|
|
catch (OperationCanceledException){}
|
|
}
|
|
|
|
public async UniTask<bool> Connect(string address = "127.0.0.1", ushort port = 27014)
|
|
{
|
|
if (IsConnecting) return false;
|
|
IsConnecting = true;
|
|
if (client.connected) return false;
|
|
if (BITApp.SynchronizationContext is not null)
|
|
await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext, BITApp.CancellationToken);
|
|
OnStartConnect?.Invoke();
|
|
await UniTask.SwitchToThreadPool();
|
|
try
|
|
{
|
|
_lastHeartbeat = DateTime.Now;
|
|
client.Connect(address, port);
|
|
_timer.Start();
|
|
_interval = TimeSpan.FromMilliseconds(_timer.Interval);
|
|
// for (var i = 0; i < 5; i++)
|
|
// {
|
|
// client.Tick();
|
|
// await Task.Delay(100);
|
|
// }
|
|
|
|
//_commandQueue.Enqueue(new []{(byte)NetCommandType.Heartbeat });
|
|
|
|
//client.Send(new []{(byte)NetCommandType.Heartbeat }, KcpChannel.Reliable);
|
|
HandShake();
|
|
|
|
if (BITApp.SynchronizationContext is not null)
|
|
await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext);
|
|
|
|
Traffic += new float2(1, 0);
|
|
//_commandQueue.Enqueue(new []{(byte)NetCommandType.Heartbeat});
|
|
|
|
|
|
for (var i = 0; i < 5; i++)
|
|
{
|
|
client.Send(new []{(byte)NetCommandType.Heartbeat }, KcpChannel.Reliable);
|
|
client.Tick();
|
|
await Task.Delay(100);
|
|
}
|
|
|
|
if (client.connected)
|
|
{
|
|
SendServerMessage(Environment.MachineName);
|
|
|
|
IsConnecting = false;
|
|
return client.connected;
|
|
}
|
|
|
|
OnConnectedFailed?.Invoke();
|
|
Disconnect();
|
|
|
|
IsConnecting = false;
|
|
return false;
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
BIT4Log.LogException(e);
|
|
if (BITApp.SynchronizationContext is not null)
|
|
await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext);
|
|
OnConnectedFailed?.Invoke();
|
|
_timer.Stop();
|
|
|
|
IsConnecting = false;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
private void OnData(ArraySegment<byte> bytes, KcpChannel channel)
|
|
{
|
|
try
|
|
{
|
|
Traffic+=new float2(0,bytes.Count);
|
|
OnDataInternel(bytes, channel);
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
BIT4Log.LogException(e);
|
|
}
|
|
}
|
|
private async void OnDataInternel(ArraySegment<byte> bytes, KcpChannel channel)
|
|
{
|
|
using var ms = new MemoryStream(bytes.ToArray());
|
|
using var reader = new BinaryReader(ms);
|
|
|
|
var type = (NetCommandType)ms.ReadByte();
|
|
switch (type)
|
|
{
|
|
case NetCommandType.Message:
|
|
BIT4Log.Log<KcpClient>($"已收到消息:{reader.ReadString()}");
|
|
break;
|
|
case NetCommandType.AllClientCommand:
|
|
case NetCommandType.Command:
|
|
var command = BITBinary.Read(reader);
|
|
if (command is object[] { Length: 1 } objs) command = objs[0];
|
|
//BIT4Log.Log<KcpClient>($"已收到指令:{command},值:\n{JsonConvert.SerializeObject(command, Formatting.Indented)}");
|
|
try
|
|
{
|
|
if (BITApp.SynchronizationContext is not null)
|
|
await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext,
|
|
BITApp.CancellationToken);
|
|
_events.Invoke(command.GetType().FullName, command);
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
BIT4Log.LogException(e);
|
|
}
|
|
|
|
break;
|
|
case NetCommandType.Heartbeat:
|
|
if (Id is not -1)
|
|
{
|
|
_isConnected.AddElement(this);
|
|
}
|
|
_lastHeartbeat = DateTime.Now;
|
|
break;
|
|
case NetCommandType.Ping:
|
|
Ping = (int)(DateTime.Now - _lastPingTime).TotalMilliseconds;
|
|
break;
|
|
case NetCommandType.ReturnToClient:
|
|
var id = reader.ReadInt32();
|
|
try
|
|
{
|
|
if (reader.ReadBoolean())
|
|
{
|
|
var value = BITBinary.Read(reader);
|
|
_p2p.TryAdd(id,value);
|
|
}
|
|
else
|
|
{
|
|
var message = reader.ReadString();
|
|
_p2p.TryAdd(id,new Exception(message));
|
|
}
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
BIT4Log.Warning<INetClient>($"请求返回失败:{id}");
|
|
BIT4Log.LogException(e);
|
|
}
|
|
break;
|
|
case NetCommandType.GetFromClient:
|
|
try
|
|
{
|
|
var requestId = reader.ReadInt32();
|
|
|
|
|
|
using var returnMS = new MemoryStream();
|
|
await using var returnWriter = new BinaryWriter(returnMS);
|
|
returnWriter.Write((byte)NetCommandType.ReturnToServer);
|
|
returnWriter.Write(requestId);
|
|
try
|
|
{
|
|
object value = null;
|
|
if (reader.ReadBoolean())
|
|
{
|
|
var path = reader.ReadString();
|
|
var pars = BITBinary.Read(reader).As<object[]>();
|
|
|
|
|
|
if (_rpcMethods.TryGetValue(path, out var methodInfo))
|
|
{
|
|
var isAwaitable = methodInfo.ReturnType.GetMethod(nameof(Task.GetAwaiter)) != null;
|
|
var handle = _rpcHandles[path];
|
|
if (isAwaitable)
|
|
{
|
|
dynamic result = methodInfo.Invoke(handle, pars)!;
|
|
|
|
value = await result;
|
|
}
|
|
else
|
|
{
|
|
value = methodInfo.Invoke(handle, pars);
|
|
}
|
|
|
|
returnWriter.Write(true);
|
|
BITBinary.Write(returnWriter, value);
|
|
}
|
|
else
|
|
{
|
|
returnWriter.Write(false);
|
|
returnWriter.Write($"未找到对应的Rpc方法:{path}");
|
|
}
|
|
}
|
|
else
|
|
{
|
|
var commandObj = BITBinary.Read(reader)
|
|
.As<object[]>()[0];
|
|
var func = _rpc[commandObj.GetType()!.FullName!];
|
|
value = await func.As<Func<object, UniTask<object>>>().Invoke(commandObj);
|
|
}
|
|
|
|
returnWriter.Write(true);
|
|
BITBinary.Write(returnWriter, value);
|
|
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
BIT4Log.LogException(e);
|
|
returnWriter.Write(false);
|
|
returnWriter.Write(e.Message);
|
|
}
|
|
var _bytes = returnMS.ToArray();
|
|
|
|
_commandQueue.Enqueue(_bytes);
|
|
}
|
|
catch(Exception e)
|
|
{
|
|
BIT4Log.LogException(e);
|
|
}
|
|
break;
|
|
case NetCommandType.AllRpc:
|
|
case NetCommandType.TargetRpc:
|
|
{
|
|
var rpcName = reader.ReadString();
|
|
if (_rpcMethods.TryGetValue(rpcName, out var methodInfo))
|
|
{
|
|
var pars = BITBinary.Read(reader).As<object[]>();
|
|
methodInfo.Invoke(_rpcHandles[rpcName], pars);
|
|
}
|
|
else
|
|
{
|
|
BIT4Log.Warning<KcpClient>($"未找到对应的Rpc方法:{rpcName}");
|
|
}
|
|
}
|
|
break;
|
|
default:
|
|
BIT4Log.Log<KcpClient>($"未知消息类型:{type},字节:{(byte)type}");
|
|
if (bytes.Array != null)
|
|
BIT4Log.Log<KcpClient>(
|
|
$"已收到:({Id}, {BitConverter.ToString(bytes.Array, bytes.Offset, bytes.Count)} @ {channel})");
|
|
break;
|
|
}
|
|
}
|
|
|
|
private async void OnConnectedInternal()
|
|
{
|
|
// if (BITApp.SynchronizationContext is not null)
|
|
// await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext);
|
|
// OnConnected?.Invoke();
|
|
// BIT4Log.Log<KcpNetClient>("已连接");
|
|
}
|
|
|
|
private async void OnDisconnectInternal()
|
|
{
|
|
//BIT4Log.Log<KcpNetClient>("连接被断开");
|
|
Disconnect();
|
|
}
|
|
private void OnError(ErrorCode errorCode, string message)
|
|
{
|
|
BIT4Log.Log<KCPNetServer>($"{client.remoteEndPoint}异常:{errorCode},{message}");
|
|
}
|
|
|
|
public void ServerCommand<T>(T command = default)
|
|
{
|
|
using var ms = new MemoryStream();
|
|
using var writer = new BinaryWriter(ms);
|
|
writer.Write((byte)NetCommandType.Command);
|
|
BITBinary.Write(writer,command);
|
|
var bytes = ms.ToArray();
|
|
_commandQueue.Enqueue(bytes);
|
|
}
|
|
|
|
public void AllClientCommand<T>(T command = default)
|
|
{
|
|
using var ms = new MemoryStream();
|
|
using var writer = new BinaryWriter(ms);
|
|
writer.Write((byte)NetCommandType.AllClientCommand);
|
|
BITBinary.Write(writer,command);
|
|
var bytes = ms.ToArray();
|
|
}
|
|
|
|
public void ClientCommand<T>(int id, T command)
|
|
{
|
|
using var ms = new MemoryStream();
|
|
using var writer = new BinaryWriter(ms);
|
|
writer.Write((byte)NetCommandType.Command);
|
|
BITBinary.Write(writer,command);
|
|
var bytes = ms.ToArray();
|
|
_commandQueue.Enqueue(bytes);
|
|
}
|
|
|
|
public async UniTask<T> GetFromServer<T>(string path = default,params object[] pars)
|
|
{
|
|
//await UniTask.SwitchToThreadPool();
|
|
var id = _index++;
|
|
var ms = new MemoryStream();
|
|
var writer = new BinaryWriter(ms);
|
|
writer.Write((byte)NetCommandType.GetFromServer);
|
|
writer.Write(id);
|
|
if (string.IsNullOrEmpty(path))
|
|
{
|
|
writer.Write(false);
|
|
}
|
|
else
|
|
{
|
|
writer.Write(true);
|
|
writer.Write(path);
|
|
}
|
|
BITBinary.Write(writer,pars);
|
|
|
|
var bytes = ms.ToArray();
|
|
|
|
await ms.DisposeAsync();
|
|
await writer.DisposeAsync();
|
|
|
|
_commandQueue.Enqueue(bytes);
|
|
var startTime = _now;
|
|
|
|
|
|
while (true)
|
|
{
|
|
if ((_now - startTime).TotalSeconds > 5 || IsConnected is false)
|
|
{
|
|
//await BITApp.SwitchToMainThread();
|
|
if (string.IsNullOrEmpty(path))
|
|
{
|
|
throw new TimeoutException("请求超时或已断开连接");
|
|
|
|
}
|
|
throw new TimeoutException($"请求超时或已断开连接,请求为{path}");
|
|
}
|
|
|
|
|
|
if (_p2p.TryRemove(id, out var value))
|
|
{
|
|
await BITApp.SwitchToMainThread();
|
|
if (value is Exception e)
|
|
{
|
|
throw e;
|
|
}
|
|
if (UniTask.CompletedTask is T t)
|
|
{
|
|
return t;
|
|
}
|
|
return value.As<T>();
|
|
}
|
|
await Task.Delay(_interval);
|
|
}
|
|
}
|
|
|
|
public UniTask<T> GetFromClient<T>(int id,string path, params object[] pars)
|
|
{
|
|
throw new NotImplementedException();
|
|
}
|
|
|
|
|
|
public void AddRpcHandle(object rpcHandle)
|
|
{
|
|
var reportBuilder = new StringBuilder();
|
|
|
|
reportBuilder.AppendLine($"正在通过反射注册{rpcHandle.GetType().Name}");
|
|
foreach (var methodInfo in rpcHandle.GetType().GetMethods())
|
|
{
|
|
var att = methodInfo.GetCustomAttribute<NetRpcAttribute>();
|
|
if(att is null)continue;
|
|
_rpcMethods.AddOrUpdate(methodInfo.Name, methodInfo, (s, info) => methodInfo);
|
|
_rpcHandles.AddOrUpdate(methodInfo.Name, rpcHandle, (s, o) => rpcHandle);
|
|
|
|
reportBuilder.AppendLine($"Add [{methodInfo.Name}] as MethodInfo");
|
|
}
|
|
|
|
foreach (var eventInfo in rpcHandle.GetType().GetEvents())
|
|
{
|
|
var att = eventInfo.GetCustomAttribute<NetRpcAttribute>();
|
|
if(att is null)continue;
|
|
var handle = eventInfo.EventHandlerType.GetMethod("Invoke");
|
|
_rpcMethods.AddOrUpdate(handle.Name, handle, (s, info) => handle);
|
|
_rpcHandles.AddOrUpdate(handle.Name, rpcHandle, (s, info) => rpcHandle);
|
|
|
|
reportBuilder.AppendLine($"Add [{eventInfo.Name} as EventInfo]");
|
|
}
|
|
|
|
BIT4Log.Log<KcpNetClient>(reportBuilder);
|
|
}
|
|
|
|
public void AddCommandListener<T>(Action<T> handle)
|
|
{
|
|
_events.AddListener<T>(handle);
|
|
}
|
|
|
|
public void AddCommandListener<T>(Func<T,UniTask<T>> func)
|
|
{
|
|
_rpc.TryAdd(typeof(T).FullName, F);
|
|
return;
|
|
|
|
async UniTask<object> F(object o)
|
|
{
|
|
return await func.Invoke((T)o);
|
|
}
|
|
}
|
|
|
|
public void RemoveCommandListener<T>(Func<T,UniTask<T>> func)
|
|
{
|
|
_rpc.TryRemove(typeof(T).FullName, out _);
|
|
}
|
|
|
|
public void RemoveCommandListener<T>(Action<T> handle)
|
|
{
|
|
_events.RemoveListener<T>(handle);
|
|
}
|
|
|
|
public void SendRT(string rpcName, params object[] pars)
|
|
{
|
|
using var ms = new MemoryStream();
|
|
using var writer = new BinaryWriter(ms);
|
|
writer.Write((byte)NetCommandType.GetFromClient);
|
|
writer.Write(rpcName);
|
|
BITBinary.Write(writer,pars);
|
|
var bytes = ms.ToArray();
|
|
_commandQueue.Enqueue(bytes);
|
|
}
|
|
|
|
public void SendTargetRT(int id, string rpcName, params object[] pars)
|
|
{
|
|
throw new NotImplementedException();
|
|
}
|
|
|
|
public void SendAllRT(string rpcName, params object[] pars)
|
|
{
|
|
throw new NotImplementedException();
|
|
}
|
|
|
|
public void SendServerMessage(string message)
|
|
{
|
|
var bytes = BinaryBuilder
|
|
.Create()
|
|
.Write(((byte)NetCommandType.Message))
|
|
.Write(message)
|
|
.Build();
|
|
Traffic+=new float2(bytes.Length,0);
|
|
client.Send(bytes, KcpChannel.Reliable);
|
|
}
|
|
|
|
#if UNITY_EDITOR
|
|
private readonly IntervalUpdate _pingInterval = new(1);
|
|
#endif
|
|
|
|
public void Tick()
|
|
{
|
|
try
|
|
{
|
|
_now = DateTime.UtcNow;
|
|
if (client.connected)
|
|
{
|
|
if (DateTime.Now - _lastHeartbeat > TimeSpan.FromSeconds(5))
|
|
{
|
|
BIT4Log.Warning<KcpNetClient>("心跳超时,自动断开");
|
|
Disconnect();
|
|
_commandQueue.Clear();
|
|
return;
|
|
}
|
|
while (_commandQueue.TryDequeue(out var bytes))
|
|
{
|
|
Traffic += new float2(bytes.Length, 0);
|
|
client.Send(bytes, KcpChannel.Reliable);
|
|
}
|
|
client.Send(_heartBeat, KcpChannel.Unreliable);
|
|
}
|
|
else
|
|
{
|
|
if (_commandQueue.Count > 0)
|
|
{
|
|
_commandQueue.Clear();
|
|
//BIT4Log.Warning<KcpNetClient>("连接已断开,清空指令队列");
|
|
}
|
|
}
|
|
|
|
#if UNITY_EDITOR
|
|
if (_pingInterval.AllowUpdate)
|
|
{
|
|
_lastPingTime = DateTime.Now;
|
|
client.Send(new[] { (byte)NetCommandType.Ping }, KcpChannel.Reliable);
|
|
}
|
|
#endif
|
|
|
|
client.Tick();
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
BIT4Log.LogException(e);
|
|
}
|
|
|
|
}
|
|
|
|
public void HandShake()
|
|
{
|
|
// send client to server
|
|
Traffic+=new float2(2,0);
|
|
client.Send(new byte[]{0x01, 0x02}, KcpChannel.Reliable);
|
|
}
|
|
}
|
|
} |