有时,我会偶然发现一些网络广播电台,这些电台播放的音乐种类繁多,但往往被主流流媒体平台所忽视。由于经常能发现意想不到的好曲目,我越来越喜欢这种音频消费模式。一般来说,虽然像 Icecast 这样的软件为实时音频流实现了自己的自定义协议,但通过 HTTP 直接进行流式传输相对更简单、更易用。本文将深入探讨如何以纯粹的方式构建这样一个服务器。
前提条件
- Go >= v1.21(首选)
- 支持 AAC 音频格式的网络浏览器或 url 音乐播放器。
- 音频文件 – 将其编码为 AAC 音频文件。
开始
首先,初始化项目并创建 main.go:
go mod init radio
touch main.go
然后,将音频文件移至项目目录。目录结构如下所示:
├── file.aac
├── go.mod
├── go.sum
└── main.go
在 main.go 中,导入所有必需的包:
import (
"bytes"
"flag"
"io"
"log"
"net/http"
"os"
"sync"
"time"
)
在 main 函数中,添加一些命令行参数来解析流文件并使用 os.Open() 打开它:
fname := flag.String("filename", "file.aac", "path of the audio file")
flag.Parse()
file, err := os.Open(*fname)
if err != nil {
log.Fatal(err)
}
由于每次迭代都需要复制数据流,因此数据流将以字节切片的形式存储。
ctn, err := io.ReadAll(file)
if err != nil {
log.Fatal(err)
}
注意:
- 为方便起见,我将在直播中使用一个连续循环的音频文件。
- 使用 AAC 作为首选文件格式,因为它具有自包含帧且不含任何元数据。
管理连接
为每个连接到我们服务器的听众创建一个新的音频流是非常耗费资源的。因此,服务器必须设计为多个客户端提供单一音频流服务。
由于有多个客户端,因此必须在必要时打开、跟踪和关闭它们与音频流的连接。
为此,我们将创建一个连接池:
type ConnectionPool struct {
bufferChannelMap map [ chan [] byte ] struct {}
musync.Mutex
}
ConnectionPool
结构有两个字段,一个是缓冲通道Map(bufferChannelMap,用于存储发送和接收音频缓冲的通道),另一个是sync.Mutex
,用于保护缓冲通道,避免并发访问时出现竞争情况。
我们可以通过这些连接实现添加、删除和广播的方法:
func (cp *ConnectionPool) AddConnection(bufferChannel chan []byte) {
defer cp.mu.Unlock()
cp.mu.Lock()
cp.bufferChannelMap[bufferChannel] = struct{}{}
}
func (cp *ConnectionPool) DeleteConnection(bufferChannel chan []byte) {
defer cp.mu.Unlock()
cp.mu.Lock()
delete(cp.bufferChannelMap, bufferChannel)
}
func (cp *ConnectionPool) Broadcast(buffer []byte) {
defer cp.mu.Unlock()
cp.mu.Lock()
for bufferChannel, _ := range cp.bufferChannelMap {
clonedBuffer := make([]byte, 4096)
copy(clonedBuffer, buffer)
select {
case bufferChannel <- clonedBuffer:
default:
}
}
}
在 Broadcast()
方法中,我们遍历每个缓冲通道并执行非阻塞发送。这将确保数据流不会因为单个连接上的缓慢写入而出现瓶颈。我们每次都会克隆缓冲区,以避免缓冲区最终被读取时出现竞争情况。
除了上述方法,我们还要创建一个函数 NewConnectionPool()
来初始化连接池:
func NewConnectionPool () *ConnectionPool {
bufferChannelMap := make ( map [ chan [] byte ] struct {})
return &ConnectionPool{bufferChannelMap: bufferChannelMap}
}
Stream Goroutine
为了实际播放音频,让我们创建一个 stream() 函数。它将包含主流循环和另一个总体循环,后者用于复制音频流,并在音频流结束后立即重新启动。
func stream(connectionPool *ConnectionPool, content []byte) {
for {
// duplicates the stream and creates a new ticker
for {
// consumes the stream and uses connectionPool.Broadcast
// to brodcast it on every tick
}
}
}
在外循环中:
for {
tempfile := bytes.NewReader(content)
// clear() is a builtin function introduced in go 1.21.
// Reinitialize the buffer if on a lower version.
clear(buffer)
ticker := time.NewTicker(time.Millisecond * 250)
for range ticker.C {
// inner loop
}
}
在这里,我们将之前存储为字节片段的音频内容转换为 io.Reader,并将其封装在字节.Reader 中。我们将其存储为临时流,每次内循环中的流读取完毕后都会创建临时流。
我们清空缓冲区,使其可以重复使用,并创建一个 250 毫秒的定时器,使流有一个输出延迟,不会在较小的时间间隔内发送过多数据。
在内循环中:
for rangeticker.C {
_, err := tempfile.Read(buffer)
if err == io.EOF {
ticker.Stop()
break
}
connectionPool.Broadcast(buffer)
}
在此,我们从临时缓冲区读取数据并向连接池广播,直到达到 EOF(音频流结束的信号),然后停止滴答,跳出内循环,重新启动音频流。
启动 HTTP 服务器
回到 main 函数,初始化连接池并启动音频流。
connPool := NewConnectionPool()
go stream(connPool, ctn)
现在,我们使用 net/http 的默认处理程序,监听 / 上的传入请求。
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
// handle request
}
添加所需的标头:
w.Header().Add( "Content-Type" , "audio/aac" )
w.Header().Add( "Connection" , "keep-alive" )
访问ResponseWriter
的lusher来刷新每个接收到的缓冲区上的写入:
flusher, ok := w.(http.Flusher)
if !ok {
log.Println("Could not create flusher")
}
现在,创建客户端缓冲通道,将其添加到连接池,并无限期监听广播。
bufferChannel := make(chan []byte)
connPool.AddConnection(bufferChannel)
log.Printf("%s has connected\n", r.Host)
for {
buf := <-bufferChannel
if _, err := w.Write(buf); err != nil {
connPool.DeleteConnection(bufferChannel)
log.Printf("%s's connection has been closed\n", r.Host)
return
}
flusher.Flush()
}
当 w.Write
返回错误时,我们可以认为连接已断开,并将其从池中删除。
最后,在 8080 端口启动服务器:
log.Println("Listening on port 8080...")
log.Fatal(http.ListenAndServe(":8080", nil))
如果我们通过音乐播放器或浏览器访问 localhost:8080,就能收听到流媒体音频。如果用多个标签页或播放器实例打开它,它就会在每个设备上同步运行。
结论
我们已经成功创建了实时音频流服务器,代码可在 Github 上获取。
作者:icelain
本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/jishu/42582.html