WebSocket 的替代方案:本文将介绍 gRPC 流及其不同类型,并将其与 WebSockets 进行比较。
什么是 WebSockets?
我们都知道 HTTP/HTTPS,它是客户端和服务器之间的单向请求-响应协议。HTTP 协议中的连接生命周期就是请求和响应的跨度。
WebSockets 的概念与此类似,它是客户端-服务器架构中的双向流。WebSocket 协议中的连接生命周期可以无限长,直到某些外部干扰终止连接。
恢复性应用程序接口是 HTTP 协议的一个例子,而实时聊天系统则是 WebSocket 协议的一个例子。
WebSockets 的用途
WebSockets 有多种用途,包括实时网络应用程序、聊天应用程序和游戏应用程序。
在需要通过网络连续传输数据流的应用中,我们可以使用 WebSockets。在只想一次性获取数据的情况下,我们可以使用 HTTP。
什么是 gRPC 流?
GRPC 提供三种不同的流模式:客户端流、服务器流和全双工流(包括客户端和服务器)。
在客户端流模式下,服务器将接收来自客户端的一系列信息,并在没有更多信息时做出响应。
使用服务器流式传输时,客户端发出请求,服务器回应一串信息,客户端读取每一条信息,直到没有信息为止。
全双工使客户端和服务器都能互相发送一系列信息,直到没有信息为止。
双向流适用于客户端和服务器之间需要持续共享消息的情况。
为什么使用 gRPC 流?
为什么要使用 gRPC,答案很简单。为了更容易理解,我们可以将其与 WebSockets 进行比较。
gRPC 使用 HTTP/2.0,而 WebSockets 使用 HTTP/1.1。使用 gRPC 可以提供更高的安全系统,因为它们提供内置加密,而 WebSockets 则需要访问控制。此外,gRPC 使用二进制数据格式进行通信,而 WebSockets 使用 JSON、MTTQ 等数据格式。
因此,gRPC 提供了比 WebSockets 更安全、更快速的通信渠道。
此外,gRPC 支持多路复用,使客户端可以在单个连接上发送和接收多个信息,减少了每次创建 HTTP/2 连接的开销,从而形成了一个可重复使用的连接。
如何实现 gRPC 流?
客户端流
首先,让我们了解一下什么是客户端流?
- 它是客户端和服务器之间的通信,客户端向服务器发送信息流。
- 信息流结束后(由客户端提示),服务器响应一次。
原始文件
我们正在使用客户端流更新驱动程序的位置,并添加了一些虚拟变量。
syntax = 'proto3';
package locationTrackingApp;
service Location {
rpc updateLocation (stream LoactionReq) returns (LoactionRes);
}
message LoactionRepq {
string driverId = 1;
string longitude = 2;
string latitudes = 3;
}
message LoactionRes {
}
实施
- 我们将首先使用 NodeJs 实现服务器的解决方案。
- 注意:我已经添加了 SSL 证书,以确保客户端和服务器之间的通信安全。不过,如果你愿意,也可以忽略这一点。
服务器
// grpc-js is the grpc library for node js,
// where proto-loader is a library to compiler proto on runtime
const grpc = require("@grpc/grpc-js");
const protoLoader = require("@grpc/proto-loader");
const fs = require("fs");
// load the certificates
function getServerCredentials() {
const serverCert = fs.readFileSync("./Path_To_Cert/server-cert.pem");
const serverKey = fs.readFileSync("./Path_To_Cert/server-key.pem");
const serverCredentials = grpc.ServerCredentials.createSsl(
null,
[
{
cert_chain: serverCert,
private_key: serverKey,
},
],
false
);
return serverCredentials;
}
function main() {
const server = new grpc.Server();
const packageDefinition = protoLoader.loadSync("./Path_To_Proto/locationStream.proto", {});
// we get the package name from the loaded proto file and register our
// service defination to it
const locationTrackingApp =
grpc.loadPackageDefinition(packageDefinition).locationTrackingApp;
// Add the service
server.addService(locationTrackingApp.Location.service, {
updateLocation: updateLocation,
});
const credentials = getServerCredentials();
// PS: to remove credentials use grpc.ServerCredentials.createInsecure()
server.bindAsync("0.0.0.0:50051", credentials, (err, port) => {
if (err) {
console.error(err)
return
}
server.start();
console.log(`Server running at http://0.0.0.0:${port}`);
});
}
var driverLocations = {};
function updateLocation(call, callback) {
call.on('data', (req) => {
if (!driverLocations[req.driverId]) {
driverLocations = {
...driverLocations,
[req.driverId]: [{
longitude: req.longitude,
latitudes: req.latitudes
}]
}
} else {
driverLocations[req.driverId].push({
longitude: req.longitude,
latitudes: req.latitudes
})
}
})
call.on('end', () => {
console.log("Stream Completed")
console.log(driverLocations)
callback(null, {});
});
}
客户端流提供两个事件:
- 数据:客户端写入信息的事件。
- 结束:来自客户端的流结束事件。
我将分享两个客户端代码示例:Node 和 JAVA(以防您在 Android 环境中工作)。
NodeJS 客户端
const grpc = require("@grpc/grpc-js");
const protoLoader = require("@grpc/proto-loader");
const fs = require("fs");
function getChannelCredentials() {
const rootCert = fs.readFileSync("./Path_To_Cert/ca-cert.pem");
const channelCredentials = grpc.ChannelCredentials.createSsl(rootCert);
return channelCredentials;
}
function main() {
const packageDefinition = protoLoader.loadSync("./proto/locationStream.proto", {});
const locationTrackingApp =
grpc.loadPackageDefinition(packageDefinition).locationTrackingApp;
const client = new locationTrackingApp.Location(
"localhost:50051",
getChannelCredentials()
);
// here we establish a link with our server
const stream = client.updateLocation((error, res) => {
if (error) {
console.error(error)
return;
}
console.log(res)
});
// with the same link / stream valiable we will write as many messages we want
for (let i = 0; i < 10; i++) {
// this is the "data" event
stream.write({
driverId: "driver",
longitude: "2." + i,
latitudes: "3.1"
});
}
// this send the "end" event
stream.end();
}
main();
Java客户端
// loading the certificates and building a secure channel with the server.
final SslContext sslCerts = loadTLSCredentials();
final ManagedChannel channel = NettyChannelBuilder
.forTarget("localhost:50051")
.sslContext(sslCerts)
.build();
final LocationGrpc.LocationStub stub = LocationGrpc.newStub(channel);
// are this is a stream we will use StreamObserver to
// observe response from server "responseObserver"
StreamObserver<LoactionRes> responseObserver = new StreamObserver<>() {
@Override
// function to handle response from server
public void onNext(LoactionRes res) {
System.out.println(res);
}
@Override
// handle error if any received from server
public void onError(Throwable throwable) {
throwable.printStackTrace();
finishLatch.countDown();
}
@Override
public void onCompleted() {
System.out.println("Finished UpdateLocation");
finishLatch.countDown();
}
};
// now we will create a stream with the secure channel
// we will use the responseObserver to send message stream
StreamObserver<LoactionReq> responseObserver= stub.updateLocation(responseObserver);
System.out.println("requestObserver stream started");
for(int i=0;i<10;i++){
// this emmit the "data" event
requestObserver.onNext(LoactionReq.
newBuilder()
.setDriverId("driver_java_1_tester")
.setLatitudes("lat"+i)
.setLongitude("long")
.build());
// java requires a minute wait as
// processing is done in asynchronous manner
Thread.sleep(5);
}
// this emits the "end" event
requestObserver.onCompleted();
// How to load certificates
public static SslContext loadTLSCredentials() throws SSLException {
File serverCACertFile = new File("Path_To_Certs/ca-cert.pem");
return GrpcSslContexts.forClient()
.trustManager(serverCACertFile)
.build();
}
在 Android 环境中,我们的证书和通道创建更新为:
Resources res = getResources();
CertificateFactory cf = CertificateFactory.getInstance("X.509");
InputStream mainstream = res.openRawResource(R.raw.ca_cert);
Certificate ca = cf.generateCertificate(mainstream);
KeyStore kStore = KeyStore.getInstance(KeyStore.getDefaultType());
kStore.load(null, null);
kStore.setCertificateEntry("ca", ca);
TrustManagerFactory tmf = TrustManagerFactory
.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(kStore);
TrustManager[] trustManagers = tmf.getTrustManagers();
if (trustManagers.length != 1 || !(trustManagers[0] instanceof X509TrustManager)) {
throw new IllegalStateException("Unexpected default trust managers:" + Arrays.toString(trustManagers));
}
SSLContext context = SSLContext.getInstance("TLS");
context.init(null, tmf.getTrustManagers(), null);
SSLSocketFactory sslSocketFactory = context.getSocketFactory();
final ManagedChannel channel = OkHttpChannelBuilder
.forTarget("IP:PORT | domain ")
.useTransportSecurity()
.overrideAuthority("IP:PORT | domain")
.sslSocketFactory(sslSocketFactory)
.build();
在 Android 中,我们将证书添加为原始资源,并将其加载和设置为证书,我们使用证书初始化存根(gRPC 实例)。详细内容查看:如何在 Android 中使用 TLS 搭建 gRPC 客户端
版权声明:本文内容转自互联网,本文观点仅代表作者本人。本站仅提供信息存储空间服务,所有权归原作者所有。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至1393616908@qq.com 举报,一经查实,本站将立刻删除。