持续的套接字连接对于确保正确的应用程序行为至关重要。无论是提供实时聊天更新、股票价格还是应用内指标,可靠的连接都至关重要。
套接字令人恼火的问题之一是突然失去连接。如果真正的原因并不明显,即互联网连接不稳定,那么中断原因往往会被很好地隐藏起来。为了解决这个问题,我们可以实施套接字自动重新连接策略。让我们看看 Dart 的行业标准套接字库——web_socket_channel 中有哪些选项。
经典方法
库中的示例非常简单:
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/status.dart' as status;
main() async {
final wsUrl = Uri.parse('ws://localhost:1234')
var channel = WebSocketChannel.connect(wsUrl);
channel.stream.listen((message) {
channel.sink.add('received!');
channel.sink.close(status.goingAway);
});
}
遗憾的是,WebSocketChannel
并不提供处理重新连接的内置配置选项。因此,我们需要手动对流错误做出反应。让我们模仿一下 WebSocket
突然出错的情况。以下是在流监听器中捕获错误的方法:
channel.stream.listen(
(message) {
channel.sink.add('received!');
},
onError: (error) {
// Handle error here
},
onDone: () {
// Handle socket disruption
},
);
典型的解决方案是再次调用 WebSocketChannel.connect
,并在回调中覆盖流。
onDone: () {
channel = WebSocketChannel.connect(Uri.parse(url));
stream = channel.stream.listen(
...
);
},
虽然这种方法行之有效,但在具有良好结构的生产应用程序中可能会变得繁琐。
简洁架构解决方案
典型的应用程序架构由不同的层、类和责任区组成。让我们来看看简洁架构示例:
套接字通常在数据层中初始化,在表现层中使用。理想情况下,表现层不应该知道套接字的内部工作,包括套接字是否正在尝试重新连接。
然而,前一种方法迫使我们在表现层中管理重新连接逻辑。因此,我们可以将重新连接逻辑整合到数据层,最好是整合到一个类中,而不是让用户界面层承担过多的数据责任。
双流策略
我们的想法是利用两个流:一个内流负责保持与套接字的连接,另一个外流作为其他类的入口,同时保持连接。内层流负责处理错误和重新连接,而外层流则保持不动,等待来自内层流的数据。
实现 SocketChannel 类
让我们深入了解 SocketChannel
类的实现,它将处理我们的重连接逻辑。首先,我们将把套接字配置传递给该类:
SocketChannel getChannel() {
return SocketChannel(
() => IOWebSocketChannel.connect(
'ws://localhost:1234' ,
),
);
}
SocketChannel 类将处理订阅、重新连接、消息发送和数据流。我们还需要一个传递消息的汇和 IOWebSocketChannel 本身,这将从构造函数参数中提取。
如前所述,我们将实现一个内流和一个外流,后者将由 rxdart 库中的 BehaviorSubject 呈现。因此,每次有人连接到我们的套接字类时,他们都将从套接字中获得最新数据。
class SocketChannel {
SocketChannel(this._getIOWebSocketChannel) {
_startConnection();
}
final IOWebSocketChannel Function() _getIOWebSocketChannel;
late IOWebSocketChannel _ioWebSocketChannel;
WebSocketSink get _sink => _ioWebSocketChannel.sink;
late Stream<dynamic> _innerStream;
final _outerStreamSubject = BehaviorSubject<dynamic>();
Stream<dynamic> get stream => _outerStreamSubject.stream;
}
现在,让我们添加 _startConnection()
方法,以便从构造函数启动套接字连接:
void _startConnection() {
_ioWebSocketChannel = _getIOWebSocketChannel();
_innerStream = _ioWebSocketChannel.stream;
_innerStream.listen(
(event) {
// Forward data to outer stream
_outerStreamSubject.add(event);
},
onError: (error) {
// Handle web socket connection error
_handleLostConnection();
},
onDone: () {
// Handle web socket connection break
_handleLostConnection();
},
);
}
void _handleLostConnection() {
_startConnection();
}
改进重新连接逻辑
为了增强我们的解决方案,让我们来解决套接字无法立即重新连接的情况。例如,如果互联网连接中断了几分钟,我们可以实施一种 ping 机制来定期检查服务器的状态。第一次重新连接尝试应在初始连接中断后立即进行,随后的尝试将被延迟。
bool _isFirstRestart = false;
bool _isFollowingRestart = false;
void _handleLostConnection() {
if (_isFirstRestart && !_isFollowingRestart) {
Future.delayed(const Duration(seconds: 3), () {
_isFollowingRestart = false;
_startConnection();
});
_isFollowingRestart = true;
} else {
_isFirstRestart = true;
_startConnection();
}
}
最后,我们可以添加 close() 方法来关闭套接字。关闭套接字将触发 onDone 回调,因此我们需要在方法中设置 _isManuallyClose = true 标志,并在回调中进行检查。
bool _isManuallyClosed = false;
void _startConnection() {
...
onDone: () {
if (!_isManuallyClosed) {
_handleLostConnection();
}
},
...
}
void close() {
_isManuallyClosed = true;
_sink.close();
}
最后结果:
import 'package:rxdart/rxdart.dart';
import 'package:web_socket_channel/io.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
class SocketChannel {
SocketChannel(this._getIOWebSocketChannel) {
_startConnection();
}
final IOWebSocketChannel Function() _getIOWebSocketChannel;
late IOWebSocketChannel _ioWebSocketChannel;
WebSocketSink get _sink => _ioWebSocketChannel.sink;
late Stream<dynamic> _innerStream;
final _outerStreamSubject = BehaviorSubject<dynamic>();
Stream<dynamic> get stream => _outerStreamSubject.stream;
bool _isFirstRestart = false;
bool _isFollowingRestart = false;
bool _isManuallyClosed = false;
void _handleLostConnection() {
if (_isFirstRestart && !_isFollowingRestart) {
Future.delayed(const Duration(seconds: 3), () {
_isFollowingRestart = false;
_startConnection();
});
_isFollowingRestart = true;
} else {
_isFirstRestart = true;
_startConnection();
}
}
void _startConnection() {
_ioWebSocketChannel = _getIOWebSocketChannel();
_innerStream = _ioWebSocketChannel.stream;
_innerStream.listen(
(event) {
_isFirstRestart = false;
_outerStreamSubject.add(event);
},
onError: (error) {
_handleLostConnection();
},
onDone: () {
if (!_isManuallyClosed) {
_handleLostConnection();
}
},
);
}
void sendMessage(String message) => _sink.add(message);
void close() {
_isManuallyClosed = true;
_sink.close();
}
}
结论
在本文中,我们探讨了 Flutter 应用程序中的套接字重连接,并使用 SocketChannel 类实现了一个简洁高效的解决方案。通过在数据层中封装重连接逻辑,我们可以保持表现层的简洁。有了延迟重连接的附加功能,我们就为保持连续的套接字连接奠定了基础。
本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/34195.html