This commit is contained in:
CortexCore 2024-06-20 17:28:56 +08:00
parent 554af9ca4e
commit 0423dc7c48
6 changed files with 213 additions and 73 deletions

View File

@ -24,6 +24,10 @@ namespace BITKit
private static int _count; private static int _count;
public static async UniTask SwitchToMainThread() public static async UniTask SwitchToMainThread()
{ {
if (SynchronizationContext is null)
{
return;
}
await UniTask.SwitchToSynchronizationContext(SynchronizationContext); await UniTask.SwitchToSynchronizationContext(SynchronizationContext);
} }
public static class Time public static class Time
@ -178,7 +182,8 @@ namespace BITKit
/// <summary> /// <summary>
/// 主线程 /// 主线程
/// </summary> /// </summary>
public static SynchronizationContext SynchronizationContext { get; private set; } public static SynchronizationContext SynchronizationContext { get; set; } =
SynchronizationContext.Current;
[System.Serializable] [System.Serializable]
public class OpenPath : IAction, IDisposable public class OpenPath : IAction, IDisposable
@ -248,7 +253,18 @@ namespace BITKit
_count = 0; _count = 0;
Time.TimeAsDouble = 0; Time.TimeAsDouble = 0;
Time.DeltaTime = 1 / 60f; Time.DeltaTime = 1 / 60f;
SynchronizationContext=SynchronizationContext.Current;
SynchronizationContext = (SynchronizationContext, SynchronizationContext.Current) switch
{
(null, not null) => SynchronizationContext.Current,
_ => SynchronizationContext
};
if (SynchronizationContext is null)
{
BIT4Log.Warning<BITApp>("未找到主线程上下文,等待主线程将不可用");
}
Settings = settings??new AppSettings(); Settings = settings??new AppSettings();
CancellationTokenSource = new CancellationTokenSource(); CancellationTokenSource = new CancellationTokenSource();
AppName = appName; AppName = appName;

View File

@ -20,8 +20,11 @@ namespace BITKit
} }
#endif #endif
public static event Action<string> OnLog; public static event Action<string> OnLog;
public static event Action<string,Type> OnLogCallback;
public static event Action<Exception> OnException; public static event Action<Exception> OnException;
public static event Action<Exception,Type> OnExceptionCallback;
public static event Action<string> OnWarning; public static event Action<string> OnWarning;
public static event Action<string,Type> OnWarningCallback;
public static event Action<ConsoleColor> OnSetConsoleColor; public static event Action<ConsoleColor> OnSetConsoleColor;
public static event Action OnNextLine; public static event Action OnNextLine;
private static Type currentType; private static Type currentType;
@ -32,6 +35,7 @@ namespace BITKit
{ {
OnSetConsoleColor?.Invoke(color); OnSetConsoleColor?.Invoke(color);
OnLog?.Invoke(x?.ToString()); OnLog?.Invoke(x?.ToString());
OnLogCallback?.Invoke(x?.ToString(),currentType);
} }
#if UNITY_5_3_OR_NEWER #if UNITY_5_3_OR_NEWER
[HideInCallstack] [HideInCallstack]

View File

@ -22,6 +22,7 @@ namespace BITKit.Net
public event Action OnDisconnected; public event Action OnDisconnected;
public event Action OnConnectedFailed; public event Action OnConnectedFailed;
public bool IsConnected => _isConnected; public bool IsConnected => _isConnected;
public bool IsConnecting { get; private set; }
public float2 Traffic { get; set; } public float2 Traffic { get; set; }
public bool ManualTick { get; set; } public bool ManualTick { get; set; }
@ -51,6 +52,7 @@ namespace BITKit.Net
private DateTime _now = DateTime.Now; private DateTime _now = DateTime.Now;
private TimeSpan _interval = TimeSpan.FromMilliseconds(100); private TimeSpan _interval = TimeSpan.FromMilliseconds(100);
private readonly byte[] _heartBeat = new byte[] { (byte)NetCommandType.Heartbeat };
public KcpNetClient() public KcpNetClient()
{ {
client = new KcpClient( client = new KcpClient(
@ -80,7 +82,7 @@ namespace BITKit.Net
else else
{ {
OnDisconnected?.Invoke(); OnDisconnected?.Invoke();
BIT4Log.Warning<KcpNetClient>("连接已断开"); BIT4Log.Log<KcpNetClient>("连接已断开");
} }
} }
@ -105,6 +107,9 @@ namespace BITKit.Net
public async UniTask<bool> Connect(string address = "127.0.0.1", ushort port = 27014) public async UniTask<bool> Connect(string address = "127.0.0.1", ushort port = 27014)
{ {
if (IsConnecting) return false;
IsConnecting = true;
if (client.connected) return false;
if (BITApp.SynchronizationContext is not null) if (BITApp.SynchronizationContext is not null)
await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext, BITApp.CancellationToken); await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext, BITApp.CancellationToken);
OnStartConnect?.Invoke(); OnStartConnect?.Invoke();
@ -120,29 +125,39 @@ namespace BITKit.Net
// client.Tick(); // client.Tick();
// await Task.Delay(100); // await Task.Delay(100);
// } // }
//_commandQueue.Enqueue(new []{(byte)NetCommandType.Heartbeat });
_commandQueue.Enqueue(new []{(byte)NetCommandType.Heartbeat }); //client.Send(new []{(byte)NetCommandType.Heartbeat }, KcpChannel.Reliable);
HandShake();
if (BITApp.SynchronizationContext is not null) if (BITApp.SynchronizationContext is not null)
await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext); await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext);
if (client.connected)
{ Traffic += new float2(1, 0);
SendServerMessage(Environment.MachineName); //_commandQueue.Enqueue(new []{(byte)NetCommandType.Heartbeat});
Traffic+=new float2(1,0);
_commandQueue.Enqueue(new []{(byte)NetCommandType.Heartbeat});
}
for (var i = 0; i < 5; i++) for (var i = 0; i < 5; i++)
{ {
client.Send(new []{(byte)NetCommandType.Heartbeat }, KcpChannel.Reliable);
client.Tick(); client.Tick();
await Task.Delay(100); await Task.Delay(100);
} }
if (client.connected is false)
if (client.connected)
{ {
OnConnectedFailed?.Invoke(); SendServerMessage(Environment.MachineName);
Disconnect();
return false; IsConnecting = false;
return client.connected;
} }
return client.connected;
OnConnectedFailed?.Invoke();
Disconnect();
IsConnecting = false;
return false;
} }
catch (Exception e) catch (Exception e)
{ {
@ -151,6 +166,8 @@ namespace BITKit.Net
await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext); await UniTask.SwitchToSynchronizationContext(BITApp.SynchronizationContext);
OnConnectedFailed?.Invoke(); OnConnectedFailed?.Invoke();
_timer.Stop(); _timer.Stop();
IsConnecting = false;
return false; return false;
} }
} }
@ -201,8 +218,10 @@ namespace BITKit.Net
break; break;
case NetCommandType.Heartbeat: case NetCommandType.Heartbeat:
_commandQueue.Enqueue(new []{(byte)NetCommandType.Heartbeat }); if (Id is not -1)
_isConnected.AddElement(this); {
_isConnected.AddElement(this);
}
_lastHeartbeat = DateTime.Now; _lastHeartbeat = DateTime.Now;
break; break;
case NetCommandType.Ping: case NetCommandType.Ping:
@ -299,6 +318,21 @@ namespace BITKit.Net
BIT4Log.LogException(e); BIT4Log.LogException(e);
} }
break; break;
case NetCommandType.AllRpc:
case NetCommandType.TargetRpc:
{
var rpcName = reader.ReadString();
if (_rpcMethods.TryGetValue(rpcName, out var methodInfo))
{
var pars = BITBinary.Read(reader).As<object[]>();
methodInfo.Invoke(_rpcHandles[rpcName], pars);
}
else
{
BIT4Log.Warning<KcpClient>($"未找到对应的Rpc方法:{rpcName}");
}
}
break;
default: default:
BIT4Log.Log<KcpClient>($"未知消息类型:{type},字节:{(byte)type}"); BIT4Log.Log<KcpClient>($"未知消息类型:{type},字节:{(byte)type}");
if (bytes.Array != null) if (bytes.Array != null)
@ -318,7 +352,7 @@ namespace BITKit.Net
private async void OnDisconnectInternal() private async void OnDisconnectInternal()
{ {
BIT4Log.Log<KcpNetClient>("连接被断开"); //BIT4Log.Log<KcpNetClient>("连接被断开");
Disconnect(); Disconnect();
} }
private void OnError(ErrorCode errorCode, string message) private void OnError(ErrorCode errorCode, string message)
@ -357,10 +391,10 @@ namespace BITKit.Net
public async UniTask<T> GetFromServer<T>(string path = default,params object[] pars) public async UniTask<T> GetFromServer<T>(string path = default,params object[] pars)
{ {
await UniTask.SwitchToThreadPool(); //await UniTask.SwitchToThreadPool();
var id = _index++; var id = _index++;
using var ms = new MemoryStream(); var ms = new MemoryStream();
await using var writer = new BinaryWriter(ms); var writer = new BinaryWriter(ms);
writer.Write((byte)NetCommandType.GetFromServer); writer.Write((byte)NetCommandType.GetFromServer);
writer.Write(id); writer.Write(id);
if (string.IsNullOrEmpty(path)) if (string.IsNullOrEmpty(path))
@ -375,14 +409,27 @@ namespace BITKit.Net
BITBinary.Write(writer,pars); BITBinary.Write(writer,pars);
var bytes = ms.ToArray(); var bytes = ms.ToArray();
await ms.DisposeAsync();
await writer.DisposeAsync();
_commandQueue.Enqueue(bytes); _commandQueue.Enqueue(bytes);
var startTime = _now; var startTime = _now;
while (true) while (true)
{ {
if((_now-startTime).TotalSeconds>5 || IsConnected is false) if ((_now - startTime).TotalSeconds > 5 || IsConnected is false)
throw new TimeoutException("请求超时或已断开连接"); {
//await BITApp.SwitchToMainThread();
if (string.IsNullOrEmpty(path))
{
throw new TimeoutException("请求超时或已断开连接");
}
throw new TimeoutException($"请求超时或已断开连接,请求为{path}");
}
if (_p2p.TryRemove(id, out var value)) if (_p2p.TryRemove(id, out var value))
{ {
@ -493,19 +540,29 @@ namespace BITKit.Net
try try
{ {
_now = DateTime.UtcNow; _now = DateTime.UtcNow;
if (IsConnected) if (client.connected)
{ {
if (DateTime.Now - _lastHeartbeat > TimeSpan.FromSeconds(5)) if (DateTime.Now - _lastHeartbeat > TimeSpan.FromSeconds(5))
{ {
BIT4Log.Warning<KcpNetClient>("心跳超时,自动断开"); BIT4Log.Warning<KcpNetClient>("心跳超时,自动断开");
Disconnect(); Disconnect();
_commandQueue.Clear();
return;
} }
while (_commandQueue.TryDequeue(out var bytes))
{
Traffic += new float2(bytes.Length, 0);
client.Send(bytes, KcpChannel.Reliable);
}
client.Send(_heartBeat, KcpChannel.Unreliable);
} }
else
while (_commandQueue.TryDequeue(out var bytes))
{ {
Traffic+=new float2(bytes.Length,0); if (_commandQueue.Count > 0)
client.Send(bytes, KcpChannel.Reliable); {
_commandQueue.Clear();
//BIT4Log.Warning<KcpNetClient>("连接已断开,清空指令队列");
}
} }
#if UNITY_EDITOR #if UNITY_EDITOR

View File

@ -18,6 +18,7 @@ namespace BITKit.Net
{ {
public class KCPNetServer:INetServer,INetProvider public class KCPNetServer:INetServer,INetProvider
{ {
public string Name { get; set; } = "Default";
public int TickRate { get; set; } = 16; public int TickRate { get; set; } = 16;
public bool ManualTick { get; set; } public bool ManualTick { get; set; }
public event Action<int> OnClientConnected; public event Action<int> OnClientConnected;
@ -36,12 +37,15 @@ namespace BITKit.Net
private readonly ConcurrentDictionary<string,Func<object,UniTask<object>>> _rpc = new(); private readonly ConcurrentDictionary<string,Func<object,UniTask<object>>> _rpc = new();
private readonly ConcurrentDictionary<string,MethodInfo> _rpcMethods = new(); private readonly ConcurrentDictionary<string,MethodInfo> _rpcMethods = new();
private readonly ConcurrentDictionary<string,object> _rpcHandles = new(); private readonly ConcurrentDictionary<string,object> _rpcHandles = new();
private readonly ConcurrentDictionary<int,DateTime> _lastHeartbeat = new();
private readonly ConcurrentQueue<(int id,byte[] bytes)> _sendQueue = new(); private readonly ConcurrentQueue<(int id,byte[] bytes)> _sendQueue = new();
private DateTime _now=DateTime.Now; private DateTime _now=DateTime.Now;
private TimeSpan _interval=TimeSpan.FromSeconds(0.32); private TimeSpan _interval=TimeSpan.FromSeconds(0.32);
private byte[] _heartBeat = new byte[] { (byte)NetCommandType.Heartbeat };
public KCPNetServer() public KCPNetServer()
{ {
server = new KcpServer( server = new KcpServer(
@ -70,21 +74,35 @@ namespace BITKit.Net
_now = DateTime.UtcNow; _now = DateTime.UtcNow;
try try
{ {
foreach (var id in Connections.Keys.ToArray())
{
server.Send(id,_heartBeat , KcpChannel.Unreliable);
if (!_lastHeartbeat.TryGetValue(id, out var time)) continue;
if (!((_now - time).TotalSeconds > 1)) continue;
server.Disconnect(id);
_lastHeartbeat.TryRemove(id);
BIT4Log.Log<KCPNetServer>($"{Name}:链接{id}超时,已断开");
}
if (server.IsActive() is false || ManualTick) return;
server.Tick();
//BIT4Log.Log<KCPNetServer>($"{Name}目前有{server.connections.Count}个链接");
while (_sendQueue.TryDequeue(out var value)) while (_sendQueue.TryDequeue(out var value))
{ {
if (server.connections.ContainsKey(value.id)) if (server.connections.ContainsKey(value.id))
{ {
server.Send(value.id,value.bytes,KcpChannel.Reliable); server.Send(value.id, value.bytes, KcpChannel.Reliable);
} }
else else
{ {
BIT4Log.Log<KCPNetServer>($"链接{value.id}已丢失,丢弃了{value.bytes.Length}个字节"); BIT4Log.Log<KCPNetServer>($"链接{value.id}已丢失,丢弃了{value.bytes.Length}个字节");
} }
} }
if (server.IsActive() is false || ManualTick) return;
server.Tick();
} }
catch (SocketException) catch (SocketException)
{ {
@ -103,8 +121,8 @@ namespace BITKit.Net
if (TickRate > 0) if (TickRate > 0)
{ {
_timer.Interval = 1000f / TickRate; _timer.Interval = 1000f / TickRate;
_interval = TimeSpan.FromSeconds(1.0 / TickRate);
} }
_interval = TimeSpan.FromSeconds(1.0 / TickRate);
OnStartServer?.Invoke(); OnStartServer?.Invoke();
server.Start(port); server.Start(port);
_timer.Start(); _timer.Start();
@ -154,12 +172,11 @@ namespace BITKit.Net
)); ));
public void Tick() public void Tick()
{ {
try try
{ {
server.Tick(); Tick(null,null);
} }
catch (SocketException) catch (SocketException)
{ {
@ -169,23 +186,33 @@ namespace BITKit.Net
public void HandShake() public void HandShake()
{ {
foreach (var Id in server.connections.Keys) foreach (var id in server.connections.Keys)
{ {
server.Send(Id, new byte[]{0x03, 0x04}, KcpChannel.Reliable); server.Send(id, new byte[]{0x03, 0x04}, KcpChannel.Reliable);
} }
} }
private void OnConnectedInternel(int id)
{
OnClientConnected?.Invoke(id);
ClientCommand(id,new NetClientAllocateIdCommand
{
Id = id,
Ip = server.connections[id].remoteEndPoint.ToString()
});
BIT4Log.Log<KCPNetServer>($"{id}已连接到:{Name}");
SendMessageToClient(id, $"成功连接到服务器:{Name}");
}
private void OnConnected(int Id) private void OnConnected(int Id)
{ {
OnClientConnected?.Invoke(Id); // OnClientConnected?.Invoke(Id);
ClientCommand(Id,new NetClientAllocateIdCommand // ClientCommand(Id,new NetClientAllocateIdCommand
{ // {
Id = Id, // Id = Id,
Ip = server.connections[Id].remoteEndPoint.ToString() // Ip = server.connections[Id].remoteEndPoint.ToString()
}); // });
BIT4Log.Log<KCPNetServer>($"{Id}已加入"); // BIT4Log.Log<KCPNetServer>($"{Id}已连接到:{Name}");
SendMessageToClient(Id, "成功连接到服务器"); // SendMessageToClient(Id, $"成功连接到服务器:{Name}");
} }
private void OnDisconnect(int Id) private void OnDisconnect(int Id)
{ {
@ -229,8 +256,20 @@ namespace BITKit.Net
_events.InvokeDirect(command.GetType().FullName, tuple); _events.InvokeDirect(command.GetType().FullName, tuple);
break; break;
case NetCommandType.Heartbeat: case NetCommandType.Heartbeat:
_sendQueue.Enqueue((Id,new byte[] { (byte)NetCommandType.Heartbeat })); if (Connections.ContainsKey(Id))
{
_lastHeartbeat.AddOrUpdate(Id,OnAdd,OnUpdate);
}
break; break;
DateTime OnAdd(int arg)
{
OnConnectedInternel(Id);
return _now;
}
DateTime OnUpdate(int arg1, DateTime arg2)
{
return _now;
}
case NetCommandType.AllClientCommand: case NetCommandType.AllClientCommand:
foreach (var id in server.connections.Keys.ToArray()) foreach (var id in server.connections.Keys.ToArray())
{ {
@ -354,6 +393,9 @@ namespace BITKit.Net
} }
} }
private void OnError(int Id, ErrorCode errorCode, string message) private void OnError(int Id, ErrorCode errorCode, string message)
{ {
BIT4Log.Log<KCPNetServer>($"异常:{errorCode},{message}"); BIT4Log.Log<KCPNetServer>($"异常:{errorCode},{message}");
@ -374,7 +416,11 @@ namespace BITKit.Net
public void ClientCommand<T>(int id, T command) public void ClientCommand<T>(int id, T command)
{ {
Send(id,NetCommandType.Command,command); using var ms = new MemoryStream();
using var writer = new BinaryWriter(ms);
writer.Write((byte)NetCommandType.Command);
BITBinary.Write(writer,command);
_sendQueue.Enqueue((id,ms.ToArray()));
} }
public UniTask<T> GetFromServer<T>(string path=default,params object[] pars) public UniTask<T> GetFromServer<T>(string path=default,params object[] pars)
@ -386,8 +432,8 @@ namespace BITKit.Net
{ {
await UniTask.SwitchToThreadPool(); await UniTask.SwitchToThreadPool();
var index = _index++; var index = _index++;
using var ms = new MemoryStream(); var ms = new MemoryStream();
await using var writer = new BinaryWriter(ms); var writer = new BinaryWriter(ms);
writer.Write((byte)NetCommandType.GetFromClient); writer.Write((byte)NetCommandType.GetFromClient);
writer.Write(index); writer.Write(index);
if (string.IsNullOrEmpty(path)) if (string.IsNullOrEmpty(path))
@ -402,17 +448,21 @@ namespace BITKit.Net
BITBinary.Write(writer,pars); BITBinary.Write(writer,pars);
var bytes = ms.ToArray(); var bytes = ms.ToArray();
//server.Send(id,bytes,KcpChannel.Reliable);
await ms.DisposeAsync();
await writer.DisposeAsync();
_sendQueue.Enqueue((id,bytes)); _sendQueue.Enqueue((id,bytes));
var startTime = _now; var startTime = _now;
while (true) while (true)
{ {
if((_now-startTime).TotalSeconds>5) var time = _now - startTime;
throw new TimeoutException($"等待超时,Id:{id},时间{DateTime.Now-startTime}"); if(time.TotalSeconds>5)
throw new TimeoutException($"等待超时,Id:{id},时间{time.TotalSeconds}");
if (_p2p.TryRemove(index, out var value)) if (_p2p.TryRemove(index, out var value))
{ {
await BITApp.SwitchToMainThread(); //await BITApp.SwitchToMainThread();
if (value is Exception e) if (value is Exception e)
{ {
throw e; throw e;
@ -507,28 +557,32 @@ namespace BITKit.Net
public void SendRT(string rpcName, params object[] pars) public void SendRT(string rpcName, params object[] pars)
{ {
throw new NotImplementedException(); throw new NotImplementedException("服务端不支持此方法");
} }
public void SendTargetRT(int id, string rpcName, params object[] pars) public void SendTargetRT(int id, string rpcName, params object[] pars)
{ {
throw new NotImplementedException(); using var ms = new MemoryStream();
using var writer = new BinaryWriter(ms);
writer.Write((byte)NetCommandType.TargetRpc);
writer.Write(rpcName);
BITBinary.Write(writer,pars);
var bytes = ms.ToArray();
_sendQueue.Enqueue((id,bytes));
} }
public void SendAllRT(string rpcName, params object[] pars) public void SendAllRT(string rpcName, params object[] pars)
{ {
throw new NotImplementedException(); using var ms = new MemoryStream();
} using var writer = new BinaryWriter(ms);
private void Send(int id,NetCommandType commandType,params object[] values) writer.Write((byte)NetCommandType.AllRpc);
{ writer.Write(rpcName);
var bytes = BinaryBuilder BITBinary.Write(writer,pars);
.Create() var bytes = ms.ToArray();
.Write((byte)commandType) foreach (var id in server.connections.Keys)
.WriteObject(values) {
.Build();
//server.Send(id,bytes, KcpChannel.Reliable);
_sendQueue.Enqueue((id,bytes)); _sendQueue.Enqueue((id,bytes));
}
} }
} }
} }

View File

@ -168,7 +168,8 @@ namespace kcp2k
// at least log a message for easier debugging. // at least log a message for easier debugging.
//sure I go next somewhere record it //sure I go next somewhere record it
Log.Info($"KcpServer: ReceiveFrom failed: {e}"); //Log.Info($"KcpServer: ReceiveFrom failed: {e}");
Disconnect(connectionId);
} }
return false; return false;

View File

@ -16,8 +16,11 @@ namespace BITKit.Net.Kcp
[SerializeField] private ushort m_port; [SerializeField] private ushort m_port;
[SerializeField] private bool connectOnStart; [SerializeField] private bool connectOnStart;
[SerializeField] private bool autoReconnect; [SerializeField] private bool autoReconnect;
[Header(Constant.Header.Debug)] [Header(Constant.Header.Debug)]
[SerializeField] [SerializeField]
[ReadOnly] private int id;
[SerializeField]
[ReadOnly]private Vector2 traffic; [ReadOnly]private Vector2 traffic;
[SerializeField] [SerializeField]
[ReadOnly]private string upTraffic; [ReadOnly]private string upTraffic;
@ -36,7 +39,7 @@ namespace BITKit.Net.Kcp
private INetClient _netClientImplementation=>client; private INetClient _netClientImplementation=>client;
private INetProvider _netProviderImplementation=>client; private INetProvider _netProviderImplementation=>client;
private readonly IntervalUpdate _reconnectInterval = new(1); private readonly IntervalUpdate _reconnectInterval = new(3);
public event Action OnStartConnect public event Action OnStartConnect
{ {
@ -212,9 +215,14 @@ namespace BITKit.Net.Kcp
if (client.IsConnected is false && autoReconnect) if (client.IsConnected is false && autoReconnect)
{ {
if (_reconnectInterval.AllowUpdate) if (_reconnectInterval.AllowUpdate)
client.Connect(m_host, m_port).Forget(); {
if (client.IsConnecting is false)
{
client.Connect(m_host, m_port).Forget();
}
}
} }
id = Id;
timer.Update(obj); timer.Update(obj);
rate = timer; rate = timer;
Tick(); Tick();