This commit is contained in:
CortexCore
2024-06-08 15:12:48 +08:00
parent b9731c20a1
commit 3e8bd54a08
9 changed files with 155 additions and 182 deletions

View File

@@ -40,9 +40,9 @@ namespace BITKit.Net
private readonly ValidHandle _isConnected = new();
private int _index = 1001;
private int _index = int.MinValue;
private readonly ConcurrentDictionary<int, object> _p2p = new();
private readonly ConcurrentDictionary<string,object> _rpc = new();
private readonly ConcurrentDictionary<string,Func<object,UniTask<object>>> _rpc = new();
public KcpNetClient()
{
client = new KcpClient(
@@ -168,8 +168,42 @@ namespace BITKit.Net
break;
case NetCommandType.ReturnToClient:
var id = reader.ReadInt32();
var value = BITBinary.Read(reader);
_p2p.TryAdd(id,value);
try
{
var value = BITBinary.Read(reader);
_p2p.TryAdd(id,value);
}
catch (Exception e)
{
BIT4Log.Warning<INetClient>($"请求返回失败:{id}");
BIT4Log.LogException(e);
}
break;
case NetCommandType.GetFromClient:
try
{
var requestId = reader.ReadInt32();
var commandObj = BITBinary.Read(reader);
if (_rpc.TryGetValue(commandObj.GetType().FullName, out var func) is false)
{
throw new NotImplementedException($"未找到对应的方法:{commandObj.GetType().FullName}");
}
var value = await func.As<Func<object, UniTask<object>>>().Invoke(commandObj);
using var _ms = new MemoryStream();
using var _writer = new BinaryWriter(_ms);
_writer.Write((byte)NetCommandType.ReturnToServer);
_writer.Write(requestId);
BITBinary.Write(_writer, value);
var _bytes = _ms.ToArray();
commandQueue.Enqueue(_bytes);
}
catch(Exception e)
{
BIT4Log.LogException(e);
}
break;
default:
BIT4Log.Log<KcpClient>($"未知消息类型:{type},字节:{(byte)type}");
@@ -268,12 +302,18 @@ namespace BITKit.Net
_events.AddListener<T>(handle);
}
public void AddCommandListener<T>(Func<T, T> func)
public void AddCommandListener<T>(Func<T,UniTask<T>> func)
{
_rpc.GetOrAdd(typeof(T).FullName, func);
_rpc.TryAdd(typeof(T).FullName, F);
return;
async UniTask<object> F(object o)
{
return await func.Invoke((T)o);
}
}
public void RemoveCommandListener<T>(Func<T, T> func)
public void RemoveCommandListener<T>(Func<T,UniTask<T>> func)
{
_rpc.TryRemove(typeof(T).FullName, out _);
}

View File

@@ -9,6 +9,8 @@ using Timer = System.Timers.Timer;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;
using BITKit.Net.Examples;
namespace BITKit.Net
@@ -29,7 +31,7 @@ namespace BITKit.Net
private int _index = 1001;
private readonly ConcurrentDictionary<int, object> _p2p = new();
private readonly ConcurrentDictionary<string,Func<object,object>> _rpc = new();
private readonly ConcurrentDictionary<string,Func<object,UniTask<object>>> _rpc = new();
public KCPNetServer()
@@ -44,11 +46,13 @@ namespace BITKit.Net
_timer.Elapsed += Tick;
BIT4Log.Log<KCPNetServer>("已创建KCP服务器");
AddCommandListener<SimplePing>(x =>
AddCommandListener<SimplePing>(F);
return;
UniTask<SimplePing> F(SimplePing p)
{
x.EndTime = DateTime.Now;
return x;
});
p.EndTime = DateTime.Now;
return UniTask.FromResult(p);
}
}
private void Tick(object sender, ElapsedEventArgs e)
@@ -107,7 +111,18 @@ namespace BITKit.Net
public void Tick() => server.Tick();
public void Tick()
{
try
{
server.Tick();
}
catch (SocketException)
{
BIT4Log.Log<INetServer>("有用户断开连接,如有异常请检查");
}
}
public void HandShake()
{
foreach (var Id in server.connections.Keys)
@@ -133,7 +148,7 @@ namespace BITKit.Net
OnClientDisconnected?.Invoke(Id);
BIT4Log.Log<KCPNetServer>($"{Id}已断开");
}
private void OnData(int Id, ArraySegment<byte> bytes, KcpChannel channel)
private async void OnData(int Id, ArraySegment<byte> bytes, KcpChannel channel)
{
using var ms = new MemoryStream(bytes.ToArray());
using var reader = new BinaryReader(ms);
@@ -179,15 +194,12 @@ namespace BITKit.Net
var requestId = reader.ReadInt32();
var commandObj = BITBinary.Read(reader);
BIT4Log.Log<KCPNetServer>($"已收到请求:{commandObj},请求ID:{requestId}");
if (_rpc.TryGetValue(commandObj.GetType().FullName, out var func) is false)
{
throw new NotImplementedException($"未找到对应的方法:{commandObj.GetType().FullName}");
}
var value = func.As<Func<object,object>>().Invoke(commandObj);
var value =await func.As<Func<object,UniTask<object>>>().Invoke(commandObj);
using var _ms = new MemoryStream();
using var _writer = new BinaryWriter(_ms);
_writer.Write((byte)NetCommandType.ReturnToClient);
@@ -202,6 +214,13 @@ namespace BITKit.Net
Send(Id,NetCommandType.ReturnToClient,-1,0);
}
break;
case NetCommandType.ReturnToServer:
{
var id = reader.ReadInt32();
var value = BITBinary.Read(reader);
_p2p.TryAdd(id, value);
}
break;
default:
BIT4Log.Log<KCPNetServer>($"未知消息类型:{type},字节:{(byte)type}");
BIT4Log.Log<KCPNetServer>($"已收到:({Id}, {BitConverter.ToString(bytes.Array, bytes.Offset, bytes.Count)} @ {channel})");
@@ -236,9 +255,29 @@ namespace BITKit.Net
throw new NotImplementedException();
}
public UniTask<T> GetFromClient<T>(int id, T command = default)
public async UniTask<T> GetFromClient<T>(int id, T command = default)
{
throw new NotImplementedException();
var index = _index++;
using var ms = new MemoryStream();
await using var writer = new BinaryWriter(ms);
writer.Write((byte)NetCommandType.GetFromClient);
writer.Write(index);
BITBinary.Write(writer,command);
var bytes = ms.ToArray();
server.Send(id,bytes,KcpChannel.Reliable);
var startTime = DateTime.Now;
while (true)
{
if(DateTime.Now-startTime>TimeSpan.FromSeconds(5) || server.connections.ContainsKey(id) is false)
throw new TimeoutException();
if (_p2p.TryRemove(index, out var value))
{
return (T)value;
}
await Task.Delay(100);
}
}
public void AddRpcHandle(object rpcHandle)
@@ -251,17 +290,17 @@ namespace BITKit.Net
_events.AddListener<T>(handle);
}
public void AddCommandListener<T>(Func<T, T> func)
public void AddCommandListener<T>(Func<T,UniTask<T>> func)
{
_rpc.TryAdd(typeof(T).FullName, F);
return;
object F(object o)
async UniTask<object> F(object o)
{
return func.Invoke((T)o);
return await func.Invoke((T)o);
}
}
public void RemoveCommandListener<T>(Func<T, T> func)
public void RemoveCommandListener<T>(Func<T,UniTask<T>> func)
{
_rpc.TryRemove(typeof(T).FullName, out _);
}
@@ -283,6 +322,11 @@ namespace BITKit.Net
}
}
public void KickClient(int id)
{
server.Disconnect(id);
}
public void RemoveCommandListener<T>(Action<T> handle)
{
_events.RemoveListener<T>(handle);

View File

@@ -97,19 +97,26 @@ namespace BITKit
/// <param name="handle">远程指令回调</param>
/// <typeparam name="T">远程指令类型</typeparam>
void AddCommandListener<T>(Action<T> handle);
/// <summary>
/// 取消监听远程指令
/// </summary>
/// <param name="handle"></param>
/// <typeparam name="T"></typeparam>
void RemoveCommandListener<T>(Action<T> handle);
/// <summary>
/// 监听远程func
/// </summary>
/// <param name="func"></param>
/// <typeparam name="T"></typeparam>
void AddCommandListener<T>(Func<T,T> func);
void AddCommandListener<T>(Func<T,UniTask<T>> func);
/// <summary>
/// 取消监听远程指令
/// </summary>
/// <param name="handle">远程指令回调</param>
/// <typeparam name="T">远程指令类型</typeparam>
void RemoveCommandListener<T>(Func<T,T> func);
void RemoveCommandListener<T>(Func<T,UniTask<T>> func);
/// <summary>
/// 向服务端发送Rpc
@@ -215,6 +222,11 @@ namespace BITKit
/// <param name="handle"></param>
/// <typeparam name="T"></typeparam>
void AddCommandListenerWithId<T>(Action<int,T> handle);
/// <summary>
/// 踢出客户端
/// </summary>
/// <param name="id"></param>
void KickClient(int id);
}
/// <summary>
/// 基本网络客户端的接口定义,包括了基本客户端的功能