构建便宜的高性能无服务器 WebSocket 解决方案

为什么是无服务器?

如今,ServerLess(无服务器) 正在热炒。每个人都希望自己的服务部署到 ServerLess 中。因为对于数百万用户来说,扩展 ServerLess 部署非常容易。账单也相当简单。按使用付费。顺便说一下,这并不意味着 ServerLess 没有服务器。

多年来,WebSocket 的扩展过于艰难。即使是价值数千美元的 WebSocket 服务器,在最大并发连接数达到 10k 时也会崩溃。并非每项服务都如此糟糕,也有一些不错的 WebSocket 服务。但它们也很昂贵,而且价格还算合理。

需求

大约两周前,我在为一个网站寻找一个可扩展的 WebSocket 服务器解决方案,该网站一般有 20k 并发用户。该网站的管理机构对长时间的拉取和推送服务不再感兴趣。他们需要自己的解决方案。他们还准备为此支付任何费用,因为他们是非常知名的美国客户。

经过需求分析,我发现…

  • WebSocket 服务器需要处理至少 20k 并发连接,并向每个人广播消息。
  • 一个连接可以订阅多个频道,并像推送服务一样从这些频道接收数据。
  • 由于 WebSocket 协议不像 rest api 那样支持 CORS 保护,因此需要一些机制来验证和限制连接客户端的来源。
  • WebSocket 服务器必须能够每分钟广播至少 2kb 的数据。
  • 解决方案必须能够扩展到 100k 并发用户等。
  • 尽可能采用最佳安全实践。
  • 解决方案必须可以随时定制。

我在 WebSocket 和实时方面拥有丰富的工作经验。我决定使用 AWS api 网关 WebSocket 服务。

计划使用 AWS api 网关 WebSocket、4 个 lambda 函数来处理连接、断开、广播和通道功能。

不同意?

你可能会说这个方案太疯狂了,因为 lambda 服务是按使用量收费的,2 万个并发 WebSocket 连接的账单将高达数百万。

让我来解开你的疑惑。WebSocket 连接不由 lambda 维护。客户端连接将使用 api 网关服务,其费用接近 0 美元。因此,与 api 网关服务维护的持久 WebSocket 连接相比,lambda 函数的使用率相对较低。

为了保存通道信息和持久连接 ID,我们开始使用 DynamoDb。

AWS 堆栈模板

现在是编码时间,让我们来编码解决方案。查看下面的 AWS CloudFormation 模板。稍后我会解释。

Resources:
  Connections:
    Type: AWS::DynamoDB::Table
    Properties:
      KeySchema:
        - AttributeName: connectionId
          KeyType: HASH
        - AttributeName: channel
          KeyType: RANGE
      AttributeDefinitions:
        - AttributeName: connectionId
          AttributeType: S
        - AttributeName: channel
          AttributeType: S
      ProvisionedThroughput:
        ReadCapacityUnits: 5
        WriteCapacityUnits: 5
    UpdateReplacePolicy: Delete
    DeletionPolicy: Delete
  ConnectHandlerServiceRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Statement:
          - Action: sts:AssumeRole
            Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
        Version: "2012-10-17"
      ManagedPolicyArns:
        - Fn::Join:
            - ""
            - - "arn:"
              - Ref: AWS::Partition
              - :iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
  ConnectHandlerServiceRoleDefaultPolicy:
    Type: AWS::IAM::Policy
    Properties:
      PolicyDocument:
        Statement:
          - Action:
              - dynamodb:BatchWriteItem
              - dynamodb:PutItem
              - dynamodb:UpdateItem
              - dynamodb:DeleteItem
              - dynamodb:DescribeTable
            Effect: Allow
            Resource:
              - Fn::GetAtt:
                  - Connections
                  - Arn
              - Ref: AWS::NoValue
        Version: "2012-10-17"
      PolicyName: ConnectHandlerServiceRoleDefaultPolicy
      Roles:
        - Ref: ConnectHandlerServiceRole
  ConnectHandler:
    Type: AWS::Lambda::Function
    Properties:
      Code:
        ZipFile: |-
          const AWS = require('aws-sdk');
                const ddb = new AWS.DynamoDB.DocumentClient();
                exports.handler = async function (event, context) {
                  try {
                    await ddb
                      .put({
                        TableName: process.env.table,
                        Item: {
                          connectionId: event.requestContext.connectionId,
                          channel: 'default'
                        },
                      })
                      .promise();
                  } catch (err) {
                    return {
                      statusCode: 500,
                    };
                  }
                  return {
                    statusCode: 200,
                  };
                };
      Role:
        Fn::GetAtt:
          - ConnectHandlerServiceRole
          - Arn
      Environment:
        Variables:
          table:
            Ref: Connections
      Handler: index.handler
      Runtime: nodejs16.x
    DependsOn:
      - ConnectHandlerServiceRoleDefaultPolicy
      - ConnectHandlerServiceRole
  DisconnectHandlerServiceRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Statement:
          - Action: sts:AssumeRole
            Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
        Version: "2012-10-17"
      ManagedPolicyArns:
        - Fn::Join:
            - ""
            - - "arn:"
              - Ref: AWS::Partition
              - :iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
  DisconnectHandlerServiceRoleDefaultPolicy:
    Type: AWS::IAM::Policy
    Properties:
      PolicyDocument:
        Statement:
          - Action:
              - dynamodb:BatchWriteItem
              - dynamodb:PutItem
              - dynamodb:UpdateItem
              - dynamodb:DeleteItem
              - dynamodb:DescribeTable
              - dynamodb:BatchGetItem
              - dynamodb:GetRecords
              - dynamodb:GetShardIterator
              - dynamodb:Query
              - dynamodb:GetItem
              - dynamodb:Scan
              - dynamodb:ConditionCheckItem
            Effect: Allow
            Resource:
              - Fn::GetAtt:
                  - Connections
                  - Arn
              - Ref: AWS::NoValue
        Version: "2012-10-17"
      PolicyName: DisconnectHandlerServiceRoleDefaultPolicy
      Roles:
        - Ref: DisconnectHandlerServiceRole
  DisconnectHandler:
    Type: AWS::Lambda::Function
    Properties:
      Code:
        ZipFile: |-
          const AWS = require('aws-sdk');
                const ddb = new AWS.DynamoDB.DocumentClient();

                exports.handler = async function (event, context) {
                  try {
                    const rowsResult = await ddb.scan({
                      TableName: process.env.table,
                      FilterExpression: 'connectionId = :connectionId',
                      ExpressionAttributeValues: {
                          ":connectionId": event.requestContext.connectionId
                      }
                    }).promise();

                    const deleteableItems = rowsResult.Items.map(({ connectionId, channel }) => ({
                      DeleteRequest: {
                        Key: {
                          connectionId,
                          channel
                        }
                      }
                    }))

                    if (deleteableItems.length) {
                      await ddb.batchWrite({
                        RequestItems: {
                          [process.env.table]: deleteableItems,
                        }
                      }).promise();
                    }

                    return {
                      statusCode: 200
                    }
                  } catch (err) {
                    console.log(err)
                    return {
                      statusCode: 500,
                    };
                  }
                };
      Role:
        Fn::GetAtt:
          - DisconnectHandlerServiceRole
          - Arn
      Environment:
        Variables:
          table:
            Ref: Connections
      Handler: index.handler
      Runtime: nodejs16.x
    DependsOn:
      - DisconnectHandlerServiceRoleDefaultPolicy
      - DisconnectHandlerServiceRole
  SendMessageHandlerServiceRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Statement:
          - Action: sts:AssumeRole
            Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
        Version: "2012-10-17"
      ManagedPolicyArns:
        - Fn::Join:
            - ""
            - - "arn:"
              - Ref: AWS::Partition
              - :iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
  SendMessageHandlerServiceRoleDefaultPolicy:
    Type: AWS::IAM::Policy
    Properties:
      PolicyDocument:
        Statement:
          - Action:
              - dynamodb:BatchGetItem
              - dynamodb:GetRecords
              - dynamodb:GetShardIterator
              - dynamodb:Query
              - dynamodb:GetItem
              - dynamodb:Scan
              - dynamodb:ConditionCheckItem
              - dynamodb:DescribeTable
            Effect: Allow
            Resource:
              - Fn::GetAtt:
                  - Connections
                  - Arn
              - Ref: AWS::NoValue
        Version: "2012-10-17"
      PolicyName: SendMessageHandlerServiceRoleDefaultPolicy
      Roles:
        - Ref: SendMessageHandlerServiceRole
  SendMessageHandler:
    Type: AWS::Lambda::Function
    Properties:
      Code:
        ZipFile: |-
          const AWS = require('aws-sdk');
                const ddb = new AWS.DynamoDB.DocumentClient();

                exports.handler = async function (event, context) {
                  const body = JSON.parse(event.body)
                  const message = body.message;
                  const channel = body.channel??'default';

                  let connections;
                  try {
                    connections = await ddb.scan({
                      TableName: process.env.table,
                      FilterExpression: 'channel = :channelId',
                      ExpressionAttributeValues: {
                          ":channelId": channel
                      }
                    }).promise();
                  } catch (err) {
                    console.log(err)
                    return {
                      statusCode: 500,
                    };
                  }
                  const callbackAPI = new AWS.ApiGatewayManagementApi({
                    apiVersion: '2018-11-29',
                    endpoint:
                      event.requestContext.domainName + '/' + event.requestContext.stage,
                  });

                  const sendMessages = connections.Items.map(async ({ connectionId }) => {
                    if (connectionId !== event.requestContext.connectionId) {
                      try {
                        await callbackAPI
                          .postToConnection({ ConnectionId: connectionId, Data: message })
                          .promise();
                      } catch (e) {
                        console.log(e);
                      }
                    }
                  });

                  try {
                    await Promise.allSettled(sendMessages);
                  } catch (e) {
                    console.log(e);
                    return {
                      statusCode: 500,
                    };
                  }

                  return { statusCode: 200 };
                };
      Role:
        Fn::GetAtt:
          - SendMessageHandlerServiceRole
          - Arn
      Environment:
        Variables:
          table:
            Ref: Connections
      Handler: index.handler
      Runtime: nodejs16.x
    DependsOn:
      - SendMessageHandlerServiceRoleDefaultPolicy
      - SendMessageHandlerServiceRole
  JoinChannelHandlerServiceRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Statement:
          - Action: sts:AssumeRole
            Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
        Version: "2012-10-17"
      ManagedPolicyArns:
        - Fn::Join:
            - ""
            - - "arn:"
              - Ref: AWS::Partition
              - :iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
  JoinChannelHandlerServiceRoleDefaultPolicy:
    Type: AWS::IAM::Policy
    Properties:
      PolicyDocument:
        Statement:
          - Action:
              - dynamodb:BatchWriteItem
              - dynamodb:PutItem
              - dynamodb:UpdateItem
              - dynamodb:DeleteItem
              - dynamodb:DescribeTable
              - dynamodb:BatchGetItem
              - dynamodb:GetRecords
              - dynamodb:GetShardIterator
              - dynamodb:Query
              - dynamodb:GetItem
              - dynamodb:Scan
              - dynamodb:ConditionCheckItem
            Effect: Allow
            Resource:
              - Fn::GetAtt:
                  - Connections
                  - Arn
              - Ref: AWS::NoValue
        Version: "2012-10-17"
      PolicyName: JoinChannelHandlerServiceRoleDefaultPolicy
      Roles:
        - Ref: JoinChannelHandlerServiceRole
  JoinChannelHandler:
    Type: AWS::Lambda::Function
    Properties:
      Code:
        ZipFile: |-
          const AWS = require('aws-sdk');
                const ddb = new AWS.DynamoDB.DocumentClient();

                exports.handler = async function (event, context) {
                  try {
                    const channel = JSON.parse(event.body).channel;
                    if (! channel.length) throw new Error("Channel Id is required")

                    await ddb
                      .put({
                        TableName: process.env.table,
                        Item: {
                          connectionId: event.requestContext.connectionId,
                          channel
                        },
                      })
                      .promise();

                    const callbackAPI = new AWS.ApiGatewayManagementApi({
                      apiVersion: '2018-11-29',
                      endpoint:
                        event.requestContext.domainName + '/' + event.requestContext.stage,
                    });

                    try {
                      await callbackAPI.postToConnection({ ConnectionId: event.requestContext.connectionId, Data: JSON.stringify({
                        channel,
                        connectionId: event.requestContext.connectionId
                      }) }).promise();
                    } catch (e) {}

                  } catch (err) {
                    console.log(err)
                    return {
                      statusCode: 500,
                    };
                  }
                  return {
                    statusCode: 200,
                  };
                };
      Role:
        Fn::GetAtt:
          - JoinChannelHandlerServiceRole
          - Arn
      Environment:
        Variables:
          table:
            Ref: Connections
      Handler: index.handler
      Runtime: nodejs16.x
    DependsOn:
      - JoinChannelHandlerServiceRoleDefaultPolicy
      - JoinChannelHandlerServiceRole
  ManageConnections:
    Type: AWS::IAM::Policy
    Properties:
      PolicyDocument:
        Statement:
          - Action: execute-api:ManageConnections
            Effect: Allow
            Resource:
              Fn::Join:
                - ""
                - - "arn:aws:execute-api:"
                  - Ref: AWS::Region
                  - ":"
                  - Ref: AWS::AccountId
                  - ":"
                  - "*/*/POST/@connections/*"
        Version: "2012-10-17"
      PolicyName: ManageConnections
      Roles:
        - Ref: SendMessageHandlerServiceRole
        - Ref: JoinChannelHandlerServiceRole

配置和安装

使用此模板:

1、创建一个 YAML 文件,文件名为 api-gateway-websocket.yaml 。复制上述代码块并粘贴到 YAML 文件中。

2、 登录您的 AWS 控制台账户。搜索 CloudFormation Stacks。单击“Create Stack with new resources (standard)”。

  • 如下图所示进行设置
构建便宜的高性能无服务器 WebSocket 解决方案
  • 使用箭头3所示的上传器,上传之前保存的API Gateway WebSocket YAML文件
  • 提供堆栈名称并保留其余内容默认不变。查看并创建堆栈并等待其完成。

3、堆栈完成后,需要创建 AWS API Gateway WebSocket服务。

  • 导航至 API Gateway 并单击创建 API。在 WebSocket API 卡中单击 “构建 “按钮。
  • 在路由选择表达式输入中提供应用程序名称和 request.body.action。
  • 在 “Add Routes “页面,从预定义路由中添加 $connect、$disconnect,并添加 sendmessage、joinchannel 键以创建另外 2 个自定义路由。单击下一步。
  • 在 “Attach Integration “页面,为所有路由的集成类型选择 lambda。搜索并为 $connect 选择 “ConnectHandler “函数,为 $disconnect 选择 “DisconnectHandler “函数,为 sendmessage 路由选择 “SendMessageHandler “函数,为 joinchannel 路由选择 “JoinChannelHandler “函数。
  • 添加一个阶段,如 dev 或 production,随你怎么命名。
  • 审核并创建。部署完成后,你会得到一个以 wss://* 开头的 WebSocket 端点。复制 WebSocket 网址并尝试通过 PieHost 进行连接。也可以使用 wscat cli 工具进行测试。使用此命令 wscat -c wss://your-api-gateway-websocket-url

使用方法

接下来是如何使用 URL 进行 WebSocket 连接。我将使用浏览器中的 javascript WebSocket 和 NodeJs WebSocket 连接方法,使用相同的代码库重新连接 WebSocket。查看下面的代码。

const airportRws = new ReconnectingWebSocket(
  "wss://abcdefg.execute-api.us-east-1.amazonaws.com/dev"
);
airportRws.onopen = (e) => {
    // you will be able to join multiple channel and wait for the messages
  airportRws.send(
    JSON.stringify({
      action: "joinchannel",
      channel: "friends-channel", // your channel name
    })
  );
  airportRws.send(
    JSON.stringify({
      action: "joinchannel",
      channel: "family-channel",
    })
  );
  airportRws.send(
    JSON.stringify({
      action: "sendmessage",
      channel: "friends-channel",
      message: "Hello, everyone",
    })
  ); // this will broadcast your joining message to everyone already connected with this channel
};
airportRws.onclose = (e) => {
  console.log("Websocket close", e);
};
airportRws.onerror = (e) => {
  console.log("Websocket error", e);
};
airportRws.onmessage = (e) => {
  const message = e.data;
  const channel = e.channel;
  console.log("Channel", parsed.channel)
  console.log("Message", parsed)
};

通过使用上述代码,您将能够从浏览器以及 NodeJs/NextJs 服务器连接到该 WebSocket。

最后,我没有在本指南中包含授权和验证机制。因为这取决于您想如何实现这些功能。

作者:Yousuf Hossain
译自:https://dev.to/yhshanto/build-cheapest-high-performance-serverless-websocket-solution-4h17

本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/im/47337.html

(0)

相关推荐

发表回复

登录后才能评论