为什么是无服务器?
如今,ServerLess(无服务器) 正在热炒。每个人都希望自己的服务部署到 ServerLess 中。因为对于数百万用户来说,扩展 ServerLess 部署非常容易。账单也相当简单。按使用付费。顺便说一下,这并不意味着 ServerLess 没有服务器。
多年来,WebSocket 的扩展过于艰难。即使是价值数千美元的 WebSocket 服务器,在最大并发连接数达到 10k 时也会崩溃。并非每项服务都如此糟糕,也有一些不错的 WebSocket 服务。但它们也很昂贵,而且价格还算合理。
需求
大约两周前,我在为一个网站寻找一个可扩展的 WebSocket 服务器解决方案,该网站一般有 20k 并发用户。该网站的管理机构对长时间的拉取和推送服务不再感兴趣。他们需要自己的解决方案。他们还准备为此支付任何费用,因为他们是非常知名的美国客户。
经过需求分析,我发现…
- WebSocket 服务器需要处理至少 20k 并发连接,并向每个人广播消息。
- 一个连接可以订阅多个频道,并像推送服务一样从这些频道接收数据。
- 由于 WebSocket 协议不像 rest api 那样支持 CORS 保护,因此需要一些机制来验证和限制连接客户端的来源。
- WebSocket 服务器必须能够每分钟广播至少 2kb 的数据。
- 解决方案必须能够扩展到 100k 并发用户等。
- 尽可能采用最佳安全实践。
- 解决方案必须可以随时定制。
我在 WebSocket 和实时方面拥有丰富的工作经验。我决定使用 AWS api 网关 WebSocket 服务。
计划使用 AWS api 网关 WebSocket、4 个 lambda 函数来处理连接、断开、广播和通道功能。
不同意?
你可能会说这个方案太疯狂了,因为 lambda 服务是按使用量收费的,2 万个并发 WebSocket 连接的账单将高达数百万。
让我来解开你的疑惑。WebSocket 连接不由 lambda 维护。客户端连接将使用 api 网关服务,其费用接近 0 美元。因此,与 api 网关服务维护的持久 WebSocket 连接相比,lambda 函数的使用率相对较低。
为了保存通道信息和持久连接 ID,我们开始使用 DynamoDb。
AWS 堆栈模板
现在是编码时间,让我们来编码解决方案。查看下面的 AWS CloudFormation 模板。稍后我会解释。
Resources:
Connections:
Type: AWS::DynamoDB::Table
Properties:
KeySchema:
- AttributeName: connectionId
KeyType: HASH
- AttributeName: channel
KeyType: RANGE
AttributeDefinitions:
- AttributeName: connectionId
AttributeType: S
- AttributeName: channel
AttributeType: S
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
UpdateReplacePolicy: Delete
DeletionPolicy: Delete
ConnectHandlerServiceRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Statement:
- Action: sts:AssumeRole
Effect: Allow
Principal:
Service: lambda.amazonaws.com
Version: "2012-10-17"
ManagedPolicyArns:
- Fn::Join:
- ""
- - "arn:"
- Ref: AWS::Partition
- :iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
ConnectHandlerServiceRoleDefaultPolicy:
Type: AWS::IAM::Policy
Properties:
PolicyDocument:
Statement:
- Action:
- dynamodb:BatchWriteItem
- dynamodb:PutItem
- dynamodb:UpdateItem
- dynamodb:DeleteItem
- dynamodb:DescribeTable
Effect: Allow
Resource:
- Fn::GetAtt:
- Connections
- Arn
- Ref: AWS::NoValue
Version: "2012-10-17"
PolicyName: ConnectHandlerServiceRoleDefaultPolicy
Roles:
- Ref: ConnectHandlerServiceRole
ConnectHandler:
Type: AWS::Lambda::Function
Properties:
Code:
ZipFile: |-
const AWS = require('aws-sdk');
const ddb = new AWS.DynamoDB.DocumentClient();
exports.handler = async function (event, context) {
try {
await ddb
.put({
TableName: process.env.table,
Item: {
connectionId: event.requestContext.connectionId,
channel: 'default'
},
})
.promise();
} catch (err) {
return {
statusCode: 500,
};
}
return {
statusCode: 200,
};
};
Role:
Fn::GetAtt:
- ConnectHandlerServiceRole
- Arn
Environment:
Variables:
table:
Ref: Connections
Handler: index.handler
Runtime: nodejs16.x
DependsOn:
- ConnectHandlerServiceRoleDefaultPolicy
- ConnectHandlerServiceRole
DisconnectHandlerServiceRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Statement:
- Action: sts:AssumeRole
Effect: Allow
Principal:
Service: lambda.amazonaws.com
Version: "2012-10-17"
ManagedPolicyArns:
- Fn::Join:
- ""
- - "arn:"
- Ref: AWS::Partition
- :iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
DisconnectHandlerServiceRoleDefaultPolicy:
Type: AWS::IAM::Policy
Properties:
PolicyDocument:
Statement:
- Action:
- dynamodb:BatchWriteItem
- dynamodb:PutItem
- dynamodb:UpdateItem
- dynamodb:DeleteItem
- dynamodb:DescribeTable
- dynamodb:BatchGetItem
- dynamodb:GetRecords
- dynamodb:GetShardIterator
- dynamodb:Query
- dynamodb:GetItem
- dynamodb:Scan
- dynamodb:ConditionCheckItem
Effect: Allow
Resource:
- Fn::GetAtt:
- Connections
- Arn
- Ref: AWS::NoValue
Version: "2012-10-17"
PolicyName: DisconnectHandlerServiceRoleDefaultPolicy
Roles:
- Ref: DisconnectHandlerServiceRole
DisconnectHandler:
Type: AWS::Lambda::Function
Properties:
Code:
ZipFile: |-
const AWS = require('aws-sdk');
const ddb = new AWS.DynamoDB.DocumentClient();
exports.handler = async function (event, context) {
try {
const rowsResult = await ddb.scan({
TableName: process.env.table,
FilterExpression: 'connectionId = :connectionId',
ExpressionAttributeValues: {
":connectionId": event.requestContext.connectionId
}
}).promise();
const deleteableItems = rowsResult.Items.map(({ connectionId, channel }) => ({
DeleteRequest: {
Key: {
connectionId,
channel
}
}
}))
if (deleteableItems.length) {
await ddb.batchWrite({
RequestItems: {
[process.env.table]: deleteableItems,
}
}).promise();
}
return {
statusCode: 200
}
} catch (err) {
console.log(err)
return {
statusCode: 500,
};
}
};
Role:
Fn::GetAtt:
- DisconnectHandlerServiceRole
- Arn
Environment:
Variables:
table:
Ref: Connections
Handler: index.handler
Runtime: nodejs16.x
DependsOn:
- DisconnectHandlerServiceRoleDefaultPolicy
- DisconnectHandlerServiceRole
SendMessageHandlerServiceRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Statement:
- Action: sts:AssumeRole
Effect: Allow
Principal:
Service: lambda.amazonaws.com
Version: "2012-10-17"
ManagedPolicyArns:
- Fn::Join:
- ""
- - "arn:"
- Ref: AWS::Partition
- :iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
SendMessageHandlerServiceRoleDefaultPolicy:
Type: AWS::IAM::Policy
Properties:
PolicyDocument:
Statement:
- Action:
- dynamodb:BatchGetItem
- dynamodb:GetRecords
- dynamodb:GetShardIterator
- dynamodb:Query
- dynamodb:GetItem
- dynamodb:Scan
- dynamodb:ConditionCheckItem
- dynamodb:DescribeTable
Effect: Allow
Resource:
- Fn::GetAtt:
- Connections
- Arn
- Ref: AWS::NoValue
Version: "2012-10-17"
PolicyName: SendMessageHandlerServiceRoleDefaultPolicy
Roles:
- Ref: SendMessageHandlerServiceRole
SendMessageHandler:
Type: AWS::Lambda::Function
Properties:
Code:
ZipFile: |-
const AWS = require('aws-sdk');
const ddb = new AWS.DynamoDB.DocumentClient();
exports.handler = async function (event, context) {
const body = JSON.parse(event.body)
const message = body.message;
const channel = body.channel??'default';
let connections;
try {
connections = await ddb.scan({
TableName: process.env.table,
FilterExpression: 'channel = :channelId',
ExpressionAttributeValues: {
":channelId": channel
}
}).promise();
} catch (err) {
console.log(err)
return {
statusCode: 500,
};
}
const callbackAPI = new AWS.ApiGatewayManagementApi({
apiVersion: '2018-11-29',
endpoint:
event.requestContext.domainName + '/' + event.requestContext.stage,
});
const sendMessages = connections.Items.map(async ({ connectionId }) => {
if (connectionId !== event.requestContext.connectionId) {
try {
await callbackAPI
.postToConnection({ ConnectionId: connectionId, Data: message })
.promise();
} catch (e) {
console.log(e);
}
}
});
try {
await Promise.allSettled(sendMessages);
} catch (e) {
console.log(e);
return {
statusCode: 500,
};
}
return { statusCode: 200 };
};
Role:
Fn::GetAtt:
- SendMessageHandlerServiceRole
- Arn
Environment:
Variables:
table:
Ref: Connections
Handler: index.handler
Runtime: nodejs16.x
DependsOn:
- SendMessageHandlerServiceRoleDefaultPolicy
- SendMessageHandlerServiceRole
JoinChannelHandlerServiceRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Statement:
- Action: sts:AssumeRole
Effect: Allow
Principal:
Service: lambda.amazonaws.com
Version: "2012-10-17"
ManagedPolicyArns:
- Fn::Join:
- ""
- - "arn:"
- Ref: AWS::Partition
- :iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
JoinChannelHandlerServiceRoleDefaultPolicy:
Type: AWS::IAM::Policy
Properties:
PolicyDocument:
Statement:
- Action:
- dynamodb:BatchWriteItem
- dynamodb:PutItem
- dynamodb:UpdateItem
- dynamodb:DeleteItem
- dynamodb:DescribeTable
- dynamodb:BatchGetItem
- dynamodb:GetRecords
- dynamodb:GetShardIterator
- dynamodb:Query
- dynamodb:GetItem
- dynamodb:Scan
- dynamodb:ConditionCheckItem
Effect: Allow
Resource:
- Fn::GetAtt:
- Connections
- Arn
- Ref: AWS::NoValue
Version: "2012-10-17"
PolicyName: JoinChannelHandlerServiceRoleDefaultPolicy
Roles:
- Ref: JoinChannelHandlerServiceRole
JoinChannelHandler:
Type: AWS::Lambda::Function
Properties:
Code:
ZipFile: |-
const AWS = require('aws-sdk');
const ddb = new AWS.DynamoDB.DocumentClient();
exports.handler = async function (event, context) {
try {
const channel = JSON.parse(event.body).channel;
if (! channel.length) throw new Error("Channel Id is required")
await ddb
.put({
TableName: process.env.table,
Item: {
connectionId: event.requestContext.connectionId,
channel
},
})
.promise();
const callbackAPI = new AWS.ApiGatewayManagementApi({
apiVersion: '2018-11-29',
endpoint:
event.requestContext.domainName + '/' + event.requestContext.stage,
});
try {
await callbackAPI.postToConnection({ ConnectionId: event.requestContext.connectionId, Data: JSON.stringify({
channel,
connectionId: event.requestContext.connectionId
}) }).promise();
} catch (e) {}
} catch (err) {
console.log(err)
return {
statusCode: 500,
};
}
return {
statusCode: 200,
};
};
Role:
Fn::GetAtt:
- JoinChannelHandlerServiceRole
- Arn
Environment:
Variables:
table:
Ref: Connections
Handler: index.handler
Runtime: nodejs16.x
DependsOn:
- JoinChannelHandlerServiceRoleDefaultPolicy
- JoinChannelHandlerServiceRole
ManageConnections:
Type: AWS::IAM::Policy
Properties:
PolicyDocument:
Statement:
- Action: execute-api:ManageConnections
Effect: Allow
Resource:
Fn::Join:
- ""
- - "arn:aws:execute-api:"
- Ref: AWS::Region
- ":"
- Ref: AWS::AccountId
- ":"
- "*/*/POST/@connections/*"
Version: "2012-10-17"
PolicyName: ManageConnections
Roles:
- Ref: SendMessageHandlerServiceRole
- Ref: JoinChannelHandlerServiceRole
配置和安装
使用此模板:
1、创建一个 YAML 文件,文件名为 api-gateway-websocket.yaml 。复制上述代码块并粘贴到 YAML 文件中。
2、 登录您的 AWS 控制台账户。搜索 CloudFormation Stacks。单击“Create Stack with new resources (standard)”。
- 如下图所示进行设置
- 使用箭头3所示的上传器,上传之前保存的API Gateway WebSocket YAML文件
- 提供堆栈名称并保留其余内容默认不变。查看并创建堆栈并等待其完成。
3、堆栈完成后,需要创建 AWS API Gateway WebSocket服务。
- 导航至 API Gateway 并单击创建 API。在 WebSocket API 卡中单击 “构建 “按钮。
- 在路由选择表达式输入中提供应用程序名称和 request.body.action。
- 在 “Add Routes “页面,从预定义路由中添加 $connect、$disconnect,并添加 sendmessage、joinchannel 键以创建另外 2 个自定义路由。单击下一步。
- 在 “Attach Integration “页面,为所有路由的集成类型选择 lambda。搜索并为 $connect 选择 “ConnectHandler “函数,为 $disconnect 选择 “DisconnectHandler “函数,为 sendmessage 路由选择 “SendMessageHandler “函数,为 joinchannel 路由选择 “JoinChannelHandler “函数。
- 添加一个阶段,如 dev 或 production,随你怎么命名。
- 审核并创建。部署完成后,你会得到一个以 wss://* 开头的 WebSocket 端点。复制 WebSocket 网址并尝试通过 PieHost 进行连接。也可以使用 wscat cli 工具进行测试。使用此命令 wscat -c wss://your-api-gateway-websocket-url
使用方法
接下来是如何使用 URL 进行 WebSocket 连接。我将使用浏览器中的 javascript WebSocket 和 NodeJs WebSocket 连接方法,使用相同的代码库重新连接 WebSocket。查看下面的代码。
const airportRws = new ReconnectingWebSocket(
"wss://abcdefg.execute-api.us-east-1.amazonaws.com/dev"
);
airportRws.onopen = (e) => {
// you will be able to join multiple channel and wait for the messages
airportRws.send(
JSON.stringify({
action: "joinchannel",
channel: "friends-channel", // your channel name
})
);
airportRws.send(
JSON.stringify({
action: "joinchannel",
channel: "family-channel",
})
);
airportRws.send(
JSON.stringify({
action: "sendmessage",
channel: "friends-channel",
message: "Hello, everyone",
})
); // this will broadcast your joining message to everyone already connected with this channel
};
airportRws.onclose = (e) => {
console.log("Websocket close", e);
};
airportRws.onerror = (e) => {
console.log("Websocket error", e);
};
airportRws.onmessage = (e) => {
const message = e.data;
const channel = e.channel;
console.log("Channel", parsed.channel)
console.log("Message", parsed)
};
通过使用上述代码,您将能够从浏览器以及 NodeJs/NextJs 服务器连接到该 WebSocket。
最后,我没有在本指南中包含授权和验证机制。因为这取决于您想如何实现这些功能。
作者:Yousuf Hossain
译自:https://dev.to/yhshanto/build-cheapest-high-performance-serverless-websocket-solution-4h17
本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/47337.html