实时音频同步(第 2 部分)

多设备音频实时同步是一项引人入胜的挑战,它涉及解决延迟问题和确保播放事件的精确定时。在本系列中,我们将深入探讨此类系统的复杂技术。在第 1 部分中,我们讨论了实现精确定时以确保无缝音频体验所面临的挑战和解决方案。本文我们将深入探讨如何在Google Cloud Platform(GCP)上实现该系统,重点关注 Google Cloud Storage、Terraform、Packer、WebSockets 和 REST API。

Google Cloud Storage 和分块抓取:

Google Cloud Storage(GCS)是一种强大且可扩展的存储解决方案,可让我们高效地存储和检索大型音频文件。我们使用的一项关键技术是分块下载音频文件。这种方法可减少内存消耗,并在需要时只获取音频文件的必要部分,从而确保流畅播放。

为什么要分块抓取?

分块抓之所以重要,有以下几个原因:

  • 高效内存使用:只有部分音频文件被加载到内存中,这对于资源有限的设备来说非常理想。
  • 减少延迟:较小的分块下载速度更快,可最大限度地减少播放延迟。
  • 可恢复下载:在网络中断的情况下,可从最后获取的片段恢复下载,确保可靠性。

下面是一个从 GCS 抓取音频文件块的实用程序示例:

const { Storage } = require('@google-cloud/storage');
const storage = new Storage();

async function downloadInPieces(bucketName, fileName, chunkSize, onChunk) {
  const bucket = storage.bucket(bucketName);
  const file = bucket.file(fileName);
  const fileSize = (await file.getMetadata())[0].size;
  let start = 0;

  while (start < fileSize) {
    const end = Math.min(start + chunkSize, fileSize);
    await file.createReadStream({ start, end }).on('data', onChunk);
    start = end;
  }
}

Terraform:基础设施即代码

Terraform 是一款开源工具,用于使用声明式配置语言配置和管理云基础设施。我们使用 Terraform 在 GCP 上建立虚拟私有云 (VPC) 环境。

为什么使用 Terraform?

  • 一致性:确保基础架构设置在不同环境中保持一致。
  • 自动化:自动化部署流程,减少人工干预。
  • 可扩展性:根据需要轻松扩展基础架构。

Terraform 允许我们将基础架构定义为代码,从而便于版本控制和团队成员之间的共享。下面,我们将详细介绍 Terraform 配置的每个部分是如何帮助建立基础架构的。

提供商配置:

我们首先指定 Google 云提供商及其凭据:

provider "google" {
  credentials = file(var.creds_file_path)
  project     = var.project
  region      = var.region
}

此配置块指定 GCP 提供商,并设置必要的凭证、项目 ID 和区域。凭证属性指向包含服务账户密钥的文件,用于与 GCP 进行身份验证。

VPC 和子网配置:

接下来,我们定义虚拟专用云(VPC)及其子网。VPC 隔离了我们的网络环境:

resource "google_compute_network" "vpc" {
  name                            = "vpc"
  routing_mode                    = var.routing_mode
  auto_create_subnetworks         = false
  delete_default_routes_on_create = true
}

这将创建一个名为 “vpc “的新 VPC。将 “auto_create_subnetworks “设置为 “false”,意味着我们将手动定义子网,从而对网络配置拥有更多控制权。delete_default_routes_on_create “属性可确保不自动创建默认路由,从而允许我们定义自定义路由规则。

然后,我们为主扬声器和从扬声器创建子网:

resource "google_compute_subnetwork" "master_speaker_subnet" {
  name          = "master-speaker"
  ip_cidr_range = "10.0.1.0/24"
  network       = google_compute_network.vpc.self_link
  region        = var.region
}

上述代码块定义了一个名为 “master-speaker “的子网,其 IP 范围为 10.0.1.0/24。它与我们的 VPC 相关联,并在指定区域内创建。

resource "google_compute_subnetwork" "slave_speaker_subnet" {
  count         = 3
  name          = "slave-speaker-${count.index}"
  ip_cidr_range = "10.0.${count.index + 1}.0/24"
  network       = google_compute_network.vpc.self_link
  region        = var.region
}

在这里,我们为从属扬声器创建了三个子网。每个子网都有一个唯一的名称和 IP 范围(10.0.2.0/24、10.0.3.0/24、10.0.4.0/24)。count 参数允许我们使用单个资源块创建多个子网。

防火墙规则:

我们定义了防火墙规则,以控制进出实例的流量。主扬声器需要可访问,以便进行控制和数据传输:

resource "google_compute_firewall" "master_speaker_firewall" {
  name    = "master-speaker-firewall"
  network = google_compute_network.vpc.self_link

  allow {
    protocol = "tcp"
    ports    = ["3001", "22"]
  }

  target_tags   = ["master_speaker"]
  source_ranges = ["0.0.0.0/0"]

  depends_on = [google_compute_subnetwork.master_speaker_subnet]
}

此防火墙规则允许端口 3001 和 22(SSH)上的传入 TCP 流量进入标记为 “master_speaker “的实例。source_ranges 属性允许来自任何 IP 地址的流量。
从属扬声器的访问权限受到更多限制:

resource "google_compute_firewall" "slave_speaker_firewall" {
  count = 3
  name    = "slave-speaker-firewall-${count.index}"
  network = google_compute_network.vpc.self_link

  allow {
    protocol = "tcp"
    ports    = []
  }

  target_tags = ["slave_speaker"]

  depends_on = [google_compute_subnetwork.slave_speaker_subnet]
}

该防火墙规则允许我们为每个从属扬声器子网创建单独的防火墙规则。“count”参数可确保我们为每个子网创建一条规则。这些规则目前不开放任何端口,但可以根据需要进行修改,以允许特定流量。

虚拟机实例:

最后,我们为主音箱和从音箱创建虚拟机实例。主扬声器配置了一个启动脚本,用于初始化其环境:

resource "google_compute_instance" "master_speaker" {
  name         = "master-speaker"
  machine_type = "n1-standard-1"
  zone         = var.zone

  boot_disk {
    initialize_params {
      image = var.machine_image
      size  = "100"
      type  = "pd-balanced"
    }
  }

  network_interface {
    subnetwork = google_compute_subnetwork.master_speaker_subnet.self_link
    access_config {}
  }

  depends_on = [google_compute_subnetwork.master_speaker_subnet]

  metadata_startup_script = <<-EOT
    #!/bin/bash
    sudo echo "SPEAKER_TYPE='master'" >> /home/user/speaker/.env
    node server.js
  EOT
}

该代码块使用指定的机器类型和映像创建名为 “master-speaker “的虚拟机。metadata_startup_script 会在启动时运行一个脚本,将虚拟机配置为主讲人。请注意,我们在 network_interface 块中添加了一个空的 access_config 块。这将使主扬声器对公众开放。

从属扬声器的配置与主扬声器类似,但会连接到主扬声器:

resource "google_compute_instance" "slave_speaker" {
  count = 3
  name         = "slave-speaker-${count.index}"
  machine_type = "n1-standard-1"
  zone         = var.zone

  boot_disk {
    initialize_params {
      image = var.machine_image
      size  = "100"
      type  = "pd-balanced"
    }
  }

  network_interface {
    subnetwork = google_compute_subnetwork.slave_speaker_subnet.self_link
  }

  depends_on = [
    google_compute_subnetwork.slave_speaker_subnet,
    google_compute_instance.master_speaker
  ]

  metadata_startup_script = <<-EOT
    #!/bin/bash
    sudo echo "SPEAKER_TYPE='slave'" >> /home/user/speaker/.env
    sudo echo "MASTER_SPEAKER_URL='http://10.0.1.0:3001'" >> /home/user/speaker/.env
    node server.js
  EOT
}

该模块为从属扬声器创建三个虚拟机实例。每个实例运行一个启动脚本,以配置虚拟机并将其连接到主扬声器。

Packer:自动创建虚拟机映像

Packer 是一种从单一源配置为多个平台创建相同机器映像的工具。我们使用 Packer 构建带有 Node.js 和预装必要库的自定义虚拟机映像。

为什么使用 Packer?

  • 自动镜像创建:简化虚拟机镜像的创建和配置。
  • 一致性:确保所有虚拟机具有相同的配置。
  • 效率:通过使用预建镜像,减少设置虚拟机所需的时间。

以下是我们的 Packer 配置示例:

packer {
  required_plugins {
    googlecompute = {
      version = "

>= 1.1.4"
      source  = "github.com/hashicorp/googlecompute"
    }
  }
}

source "googlecompute" "centos" {
  project_id          = var.project_id
  source_image_family = var.source_image_family
  zone                = var.zone
  ssh_username        = "packer"
}

build {
  name = "speaker-vm-image"
  sources = ["source.googlecompute.centos"]

  provisioner "file" {
    source      = "./infra/packer/scripts/"
    destination = "/tmp/"
  }

  provisioner "file" {
    source      = "./speaker.zip"
    destination = "/tmp/"
  }

  provisioner "shell" {
    inline = [
      "chmod +x /tmp/setup.sh",
      "/tmp/setup.sh"
    ]
  }
}

setup.sh(脚本文件,用于安装依赖项、解压签出的代码并启动后端):

#!/bin/bash
 sudo dnf module enable -y nodejs:18 
sudo dnf install -y nodejs 
sudo dnf install -y unzip 
sudo mkdir -p /home/user/speaker 
sudo unzip -o /tmp/speaker.zip -d /home/user/speaker

WebSockets 和实时音频处理

WebSockets 支持客户端和服务器之间的实时双向通信。我们使用 WebSockets 传输音频块,并处理播放、暂停和跳过等播放事件。

音频分块处理

音频文件分块处理和传输,以确保流畅播放和同步。每个分块由子进程处理,以分散工作量,避免阻塞主事件循环。

实现 WebSocket 通信

我们的应用程序在主设备和从设备之间建立了 WebSocket 连接。主设备向从设备发送音频块流,从设备实时处理并播放这些音频块。通过这种设置,我们可以高效地处理控制事件(播放、暂停、跳过),确保所有设备保持同步。

示例:处理播放和暂停事件

触发播放事件时,主设备会向所有从设备发送当前播放位置。这可确保所有设备从同一位置开始播放音频。同样,当触发暂停事件时,主设备会发送一条暂停命令和时间戳,以确保所有设备在同一时刻暂停。这种方法可以补偿任何网络延迟,并保持所有设备同步播放。

以下是音频块管理的实施示例:

const { fork } = require('child_process');
const io = require('socket.io-client');
const socket = io(process.env.MASTER_SPEAKER_URL);

class AudioInfo {
    childProcessRef = null;
    filename = null;

    constructor(filename) {
        this.filename = filename;
    }

    getFileName() {
        return this.filename;
    }

    getChildProcessRef() {
        return this.childProcessRef;
    }

    setChildProcessRef(childProcessRef) {
        this.childProcessRef = childProcessRef;
    }
}

let audioInfoInstance = null;

socket.on("music-chunks", (data) => {
    const chunkBuffer = data.chunkBuffer;

    if (!audioInfoInstance || audioInfoInstance.getFileName() !== data.filename) {
        audioInfoInstance = new AudioInfo(data.filename);
        audioInfoInstance.setChildProcessRef(fork(__dirname + '/read-audio.js'));
    }

    audioInfoInstance.getChildProcessRef().send({ type: "chunk", payload: chunkBuffer });
});

socket.on("music-play-pause", (data) => {
    if (audioInfoInstance && !audioInfoInstance.getChildProcessRef().completed) {
        audioInfoInstance.getChildProcessRef().send({
            type: "pause_unpause",
            payload: { shouldPause: data.shouldPause }
        });
    }
});

音频播放的子进程:

let chunks = [];
let chunkIdx = 0;
let playLoop = null;

const getChunkIdx = () => chunkIdx;

const incrementChunkIdx = () => {
    chunkIdx += 1;
};

process.on("message", ({ type, payload }) => {
    if (type === "chunk") {
        chunks.push(payload);
        if (chunks.length === 2) {
            play();
        }
    } else if (type === "pause_unpause") {
        if (payload.shouldPause) {
            pause();
        } else {
            play();
        }
    }
});

const play = () => {
    playLoop = setInterval(() => {
        const currChunkIdx = getChunkIdx();
        if (currChunkIdx >= chunks.length) {
            clearInterval(playLoop);
        }

        // Apply effects if necessary

        incrementChunkIdx();
    }, 500);
};

const pause = () => {
    if (playLoop) {
        clearInterval(playLoop);
    }
};

控制事件的 REST API

我们使用 REST API 来管理播放、暂停和跳转等控制事件。这些 API 与 WebSocket 服务器交互,向所有连接的设备广播事件。

实现控制事件:

我们的 REST API 可处理开始、暂停或跳过音频播放的请求。收到请求后,服务器会进行处理,并通过 WebSockets 向所有连接设备发送相应命令。例如,播放请求将触发服务器获取当前播放位置,并向所有设备广播播放命令,确保它们从正确的位置开始播放。

示例:播放和暂停 API

const express = require("express");
const http = require("http");
const socket = require("socket.io");

const { checkFileExistence, downloadInPieces } = require("./utils/gcloud/storage");

let app = express();
let httpServer = app;
app.use(express.json());

const io = socket(httpServer);

io.on("connection", (socket) => {
    console.log("New connection", socket.id);
});

app.post("/music/", async (req, res) => {
    try {
        const { fileName } = req.body;

        if (!fileName) {
            return res.status(400).json({
                message: "File name missing in request body"
            });
        }

        const fileExists = await checkFileExistence(fileName);

        if (!fileExists) {
            return res.status(404).json({
                message: `File with the name ${req.body.fileName} missing in request body`
            });
        }

        downloadInPieces(fileName, io);

        res.status(200).json({
            message: "Playback and streaming started"
        });
    } catch (error) {
        console.log(error);
        return res.status(500).json({ error });
    }
});

app.post("/music/pause/", async (req, res) => {
    try {
        const { shouldPause } = req.body;

        if (shouldPause === undefined) {
            return res.status(400).json({
                message: "Pause status missing in request body"
            });
        }

        io.emit("music-play-pause", { shouldPause });

        res.status(200).json({
            message: "Action completed"
        });
    } catch (error) {
        console.log(error);
        return res.status(500).json({ error });
    }
});

const port = process.env.PORT || 3001;
httpServer.listen(port, () => {
    console.log(`Server started on port ${port}`);
});

最后,我们详细介绍了使用 GCS、Terraform、Packer、WebSockets 和 REST API 在 Google Cloud Platform 上实现实时音频同步系统的过程。我们讨论了分块抓取的重要性、使用 Terraform 和 Packer 的好处以及 WebSockets 如何实现实时通信。感谢您的阅读!

作者:Rishi Desai

本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/49305.html

(0)

相关推荐

发表回复

登录后才能评论