http-flv技术的实现
HTTP协议中有个约定:content-length字段,http的body部分的⻓度服务器回复http请求的时候如果有这个字段,客户端就接收这个⻓度的数据然后就认为数据传输完成了,
如果服务器回复http请求中没有这个字段,客户端就⼀直接收数据,直到服务器跟客户端的socket连接断开。
http-flv直播就是利⽤了这个原理,服务器回复客户端请求的时候不加content-length字段,在回复了http内容之后,紧接着发送flv数据,客户端就⼀直接收数据了。
请求SRS返回的是:
HTTP/1.1 200 OK
Connection: Keep-Alive
Content-Type: video/x-flv
Server: SRS/3.0.141(OuXuli)
Transfer-Encoding: chunked
注:wiresharek过滤条件为:http or tcp.port==8081。
srs配置http和http-flv服务
主要分为两部分:
- 配置http 服务
- 配置http-flv服务
配置⽂件如下所示:
listen 1935;
max_connections 1000;
#srs_log_tank file;
#srs_log_file ./objs/srs.log;
daemon off;
srs_log_tank console;
http_api {
enabled on;
listen 1985;
}
http_server {
enabled on;
listen 8081; #http监听端口,配置的是http服务器,如果是云服务器一定要开放对应端口
dir ./objs/nginx/html;
}
stats {
network 0;
disk sda sdb xvda xvdb;
}
vhost __defaultVhost__ {
hls {
enabled on;
hls_path ./objs/nginx/html;
hls_fragment 10;
hls_window 60;
}
http_remux {
enabled on;
mount [vhost]/[app]/[stream].flv;
hstrs on;
}
}
验证配置是否成功
在客户端进⾏推流验证:
ffmpeg -re -i source.200kbps.768x320.flv -vcodec copy -acodec copy -f flv -y
rtmp://8.141.75.248/live/livestream
在客户端进行拉流验证:
ffplay http://8.141.75.248:8081/live/livestream.flv
ffplay rtmp://8.141.75.248/live/livestream
http_remux配置说明
会根据mount的值,如[vhost]/[app]/[stream].flv,判断什么样的播放类型。
SrsLiveEntry::SrsLiveEntry(std::string m)
{
mount = m;
stream = NULL;
cache = NULL;
req = NULL;
source = NULL;
std::string ext = srs_path_filext(m); //根据mount获取后缀,如[vhost]/[app]/[stream].flv
_is_flv = (ext == ".flv");
_is_ts = (ext == ".ts");
_is_mp3 = (ext == ".mp3");
_is_aac = (ext == ".aac");
}
http不仅支持FLV的拉流,还支持TS,AAC,MP3类型的拉流,以下为http_remux的配置。
http_remux {
enabled on;
mount [vhost]/[app]/[stream].flv; # ⽀持.flv .ts .aac .mp3的使⽤
hstrs on;
}
vhost配置说明
vhost作为应用配置的单元,能隔离客户,应用不同的配置。
Vhost的主要应用场景包括:
- 一个分发网络支持多个客户:譬如CDN,一个分发网络中,有N个客户公用一套流媒体系统,如何区分用户,计费,监控等等?通过app么?大家可能都叫做live之类。最好是通过各自的域名。
- 不同的应用配置:譬如FMLE推上来的流是h264+mp3,可以将音频转码后放到其他的vhost分发hls,这样接入h264+mp3的vhost就不用切hls。
参考wiki:https://github.com/ossrs/srs/wiki/v3_CN_RtmpUrlVhost
配置文件调用栈
#0 SrsHttpStreamServer::initialize_flv_entry (this=0xa11fd0, vhost="__defaultVhost__")
at src/app/srs_app_http_stream.cpp:1163
#1 0x00000000005028d3 in SrsHttpStreamServer::initialize_flv_streaming (this=0xa11fd0)
at src/app/srs_app_http_stream.cpp:1154
#2 0x0000000000500a2a in SrsHttpStreamServer::initialize (this=0xa11fd0) at
src/app/srs_app_http_stream.cpp:873
#3 0x0000000000561eb7 in SrsHttpServer::initialize (this=0xa11e00) at
src/app/srs_app_http_conn.cpp:279
#4 0x00000000004c84c0 in SrsServer::initialize (this=0xa11ea0, ch=0x0) at
src/app/srs_app_server.cpp:757
#5 0x00000000005bcb57 in run (svr=0xa11ea0) at src/main/srs_main_server.cpp:395
#6 0x00000000005bb769 in do_main (argc=3, argv=0x7fffffffe4f8) at
src/main/srs_main_server.cpp:184
#7 0x00000000005bb8ad in main (argc=3, argv=0x7fffffffe4f8) at
src/main/srs_main_server.cpp:192
基本信息
SrsLiveStream::do_serve_http 处理客户端的数据发送。
每个http client连接对应⼀个SrsHttpConn,和SrsRtmpConn连接类似。
每个SrsHttpConn也会对应⼀个消费者SrsConsumer,即是SrsConsumer对应rtmp、http-flv都是通⽤的,作为中间数据的缓存。
相关类说明
- SrsBufferCache:HTTP直播流编码器的缓存
- SrsFlvStreamEncoder:将RTMP转成HTTP FLV流
- SrsTsStreamEncoder:将RTMP转成HTTP TS流
- SrsAacStreamEncoder:将RTMP含有的AAC成分转成HTTP AAC流
- SrsMp3StreamEncoder:将RTMP含有的MP3成分转成HTTP MP3流
- SrsBufferWriter:将流直接写⼊到HTTP响应
- SrsLiveStream:HTTP直播流,将RTMP转成HTTP-FLV或者其他格式,其实际是handler
- SrsLiveEntry:直播⼊⼝,⽤来处理HTTP 直播流
- SrsHttpStreamServer:HTTP直播流服务,服务FLV/TS/MP3/AAC流
- SrsHttpResponseWriter:负责将数据发送给客户端,本质是调⽤SrsStSocket进⾏发送
- SrsHttpServeMux HTTP请求多路复⽤器,⾥⾯记录了path以及对应的handler
RTMP推流
推流的时候根据url创建对应的handler,拉流的时候根据url找到对应处理的handler。
rtmp推流调用stack:
(gdb) bt
#0 SrsLiveStream::SrsLiveStream (this=0xb159f0, s=0xadd3c0, r=0xadde30, c=0xaded50) at src/app/srs_app_http_stream.cpp:514
#1 0x0000000000501f3b in SrsHttpStreamServer::http_mount (this=0xa11db0, s=0xadd3c0, r=0xadde30) at src/app/srs_app_http_stream.cpp:912
#2 0x000000000056358d in SrsHttpServer::http_mount (this=0xa12220, s=0xadd3c0, r=0xadde30) at src/app/srs_app_http_conn.cpp:308
#3 0x00000000004ce06a in SrsServer::on_publish (this=0xa10370, s=0xadd3c0, r=0xadde30) at src/app/srs_app_server.cpp:1610
#4 0x00000000004e775e in SrsSource::on_publish (this=0xadd3c0) at src/app/srs_app_source.cpp:2463
#5 0x00000000004d96ca in SrsRtmpConn::acquire_publish (this=0xac0f10, source=0xadd3c0) at src/app/srs_app_rtmp_conn.cpp:940
#6 0x00000000004d874c in SrsRtmpConn::publishing (this=0xac0f10, source=0xadd3c0) at src/app/srs_app_rtmp_conn.cpp:822
#7 0x00000000004d5ee7 in SrsRtmpConn::stream_service_cycle (this=0xac0f10) at src/app/srs_app_rtmp_conn.cpp:534
#8 0x00000000004d4ddf in SrsRtmpConn::service_cycle (this=0xac0f10) at src/app/srs_app_rtmp_conn.cpp:388
#9 0x00000000004d3ba7 in SrsRtmpConn::do_cycle (this=0xac0f10) at src/app/srs_app_rtmp_conn.cpp:209
#10 0x00000000004d1d99 in SrsConnection::cycle (this=0xac0f88) at src/app/srs_app_conn.cpp:171
#11 0x000000000050ab08 in SrsSTCoroutine::cycle (this=0xac11f0) at src/app/srs_app_st.cpp:198
#12 0x000000000050ab7d in SrsSTCoroutine::pfn (arg=0xac11f0) at src/app/srs_app_st.cpp:213
#13 0x00000000005bed1a in _st_thread_main () at sched.c:337
#14 0x00000000005bf492 in st_thread_create (start=0x5be696 <_st_vp_schedule+170>, arg=0x700000001, joinable=1, stk_size=1) at sched.c:616
Backtrace stopped: previous frame inner to this frame (corrupt stack?)
对应代码在SrsHttpStreamServer::http_mount:
srs_error_t SrsHttpStreamServer::http_mount(SrsSource* s, SrsRequest* r)
{
srs_error_t err = srs_success;
// the id to identify stream.
std::string sid = r->get_stream_url(); // 比如rtmp://8.141.75.248:1935/live/livestream中streamUrl就是/live/stream
SrsLiveEntry* entry = NULL; //SrsLiveEntry,直播⼊⼝,⽤来处理HTTP直播流
// create stream from template when not found.
if (sflvs.find(sid) == sflvs.end()) { //找不到对于的sid
if (tflvs.find(r->vhost) == tflvs.end()) { //查找对应的vhost,找不到则返回
return err;
}
SrsLiveEntry* tmpl = tflvs[r->vhost];
std::string mount = tmpl->mount; //配置中的mount是[vhost]/[app]/[stream].flv
// replace the vhost variable 替换mount,由[vhost]/[app]/[stream].flv变为__defaultVhost__/live/livestream.flv
mount = srs_string_replace(mount, "[vhost]", r->vhost);
mount = srs_string_replace(mount, "[app]", r->app);
mount = srs_string_replace(mount, "[stream]", r->stream);
// remove the default vhost mount 由__defaultVhost__/live/livestream.flv替换为/live/livestream.flv
mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/");
entry = new SrsLiveEntry(mount); //创建SrsLiveEntry并标明类型,比如flv还是ts
entry->source = s; //指向source
entry->req = r->copy()->as_http();
entry->cache = new SrsBufferCache(s, r);
entry->stream = new SrsLiveStream(s, r, entry->cache); //创建SrsLiveStream,HTTP直播流,将RTMP转成HTTP-FLV或者其他格式,其实际是handler
// TODO: FIXME: maybe refine the logic of http remux service.
// if user push streams followed:
// rtmp://test.com/live/stream1
// rtmp://test.com/live/stream2
// and they will using the same template, such as: [vhost]/[app]/[stream].flv
// so, need to free last request object, otherwise, it will cause memory leak.
srs_freep(tmpl->req);
tmpl->source = s;
tmpl->req = r->copy()->as_http();
sflvs[sid] = entry; //每个sid对应一个entry,entry包含SrsLiveStream
// mount the http flv stream.
// we must register the handler, then start the thread, 根据给的mount挂载handler (SrsLiveStream)
// for the thread will cause thread switch context.
// @see https://github.com/ossrs/srs/issues/404 创建路由
if ((err = mux.handle(mount, entry->stream)) != srs_success) { //挂载http-flv流
return srs_error_wrap(err, "http: mount flv stream for vhost=%s failed", sid.c_str());
}
// start http stream cache thread 开启http stream 缓存线程
if ((err = entry->cache->start()) != srs_success) {
return srs_error_wrap(err, "http: start stream cache failed");
}
srs_trace("http: mount flv stream for sid=%s, mount=%s", sid.c_str(), mount.c_str());
} else {
// The entry exists, we reuse it and update the request of stream and cache.
entry = sflvs[sid];
entry->stream->update_auth(s, r);
entry->cache->update_auth(s, r);
}
if (entry->stream) {
entry->stream->entry->enabled = true;
return err;
}
return err;
}
主要过程有:
- 创建SrsLiveEntry并标明类型,比如flv还是ts。
- 创建SrsLiveStream,HTTP直播流,将RTMP转成HTTP-FLV或者其他格式。
- 根据给的mount挂载handler (SrsLiveStream)。
创建SrsLiveEntry并标明类型,比如flv还是ts
见SrsLiveEntry::SrsLiveEntry函数:
SrsLiveEntry::SrsLiveEntry(std::string m)
{
mount = m;
stream = NULL;
cache = NULL;
req = NULL;
source = NULL;
std::string ext = srs_path_filext(m); //判断什么类型
_is_flv = (ext == ".flv");
_is_ts = (ext == ".ts");
_is_mp3 = (ext == ".mp3");
_is_aac = (ext == ".aac");
}
创建SrsLiveStream,HTTP直播流,将RTMP转成HTTP-FLV或者其他格式
见SrsLiveStream::SrsLiveStream函数:
SrsLiveStream::SrsLiveStream(SrsSource* s, SrsRequest* r, SrsBufferCache* c)
{
source = s;
cache = c;
req = r->copy()->as_http();
}
根据给的mount挂载handler (SrsLiveStream)
见函数SrsHttpServeMux::handle:
srs_error_t SrsHttpServeMux::handle(std::string pattern, ISrsHttpHandler* handler) //handle是SrsLiveStream对象(每个source都会绑定一个)
{ // pattern:/live/livestream.flv
srs_assert(handler);
if (pattern.empty()) {
return srs_error_new(ERROR_HTTP_PATTERN_EMPTY, "empty pattern");
}
if (entries.find(pattern) != entries.end()) {
SrsHttpMuxEntry* exists = entries[pattern];
if (exists->explicit_match) {
return srs_error_new(ERROR_HTTP_PATTERN_DUPLICATED, "pattern=%s exists", pattern.c_str());
}
}
std::string vhost = pattern;
if (pattern.at(0) != '/') {
if (pattern.find("/") != string::npos) {
vhost = pattern.substr(0, pattern.find("/"));
}
vhosts[vhost] = handler;
}
if (true) {
SrsHttpMuxEntry* entry = new SrsHttpMuxEntry(); //创建SrsHttpMuxEntry,服务器多路复用器的多路复用条目,匹配器信息包含,例如模式和处理程序。
entry->explicit_match = true;
entry->handler = handler; //SrsLiveStream
entry->pattern = pattern; //pattern:/live/livestream.flv
entry->handler->entry = entry;
if (entries.find(pattern) != entries.end()) {
SrsHttpMuxEntry* exists = entries[pattern];
srs_freep(exists);
}
entries[pattern] = entry; //加入entries的map中,key:pattern,value:entry,entry包含了handle,即SrsLiveStream
}
// Helpful behavior:
// If pattern is /tree/, insert an implicit permanent redirect for /tree.
// It can be overridden by an explicit registration.
if (pattern != "/" && !pattern.empty() && pattern.at(pattern.length() - 1) == '/') { //没进来
std::string rpattern = pattern.substr(0, pattern.length() - 1);
SrsHttpMuxEntry* entry = NULL;
// free the exists implicit entry
if (entries.find(rpattern) != entries.end()) {
entry = entries[rpattern];
}
// create implicit redirect.
if (!entry || !entry->explicit_match) {
srs_freep(entry);
entry = new SrsHttpMuxEntry();
entry->explicit_match = false;
entry->handler = new SrsHttpRedirectHandler(pattern, SRS_CONSTS_HTTP_Found);
entry->pattern = pattern;
entry->handler->entry = entry;
entries[rpattern] = entry;
}
}
return srs_success;
}
接收客户端推流message是在 SrsRtmpConn::process_publish_message 里负责接收音视频数据并存入。
Http-Flv拉流
当拉流客户端请求HTTP-FLV流时,会带URI。如wiresharek截图:
主要入口代码SrsHttpServeMux::serve_http:
srs_error_t SrsHttpServeMux::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
srs_error_t err = srs_success;
ISrsHttpHandler* h = NULL;
if ((err = find_handler(r, &h)) != srs_success) { //根据uri找到对应的handle
return srs_error_wrap(err, "find handler");
}
srs_assert(h);
if ((err = h->serve_http(w, r)) != srs_success) { //SrsLiveStream::serve_http
return srs_error_wrap(err, "serve http");
}
return err;
}
调用栈:
(gdb) bt
#0 SrsLiveStream::do_serve_http (this=0xb159f0, w=0xb68810, r=0xb69e20) at src/app/srs_app_http_stream.cpp:552
#1 0x00000000004fef4a in SrsLiveStream::serve_http (this=0xb159f0, w=0xb68810, r=0xb69e20) at src/app/srs_app_http_stream.cpp:544
#2 0x000000000049d2af in SrsHttpServeMux::serve_http (this=0xa11dc0, w=0xb68810, r=0xb69e20) at src/protocol/srs_http_stack.cpp:711
#3 0x0000000000563518 in SrsHttpServer::serve_http (this=0xa12220, w=0xb68810, r=0xb69e20) at src/app/srs_app_http_conn.cpp:300
#4 0x000000000049e0fe in SrsHttpCorsMux::serve_http (this=0xb4ca40, w=0xb68810, r=0xb69e20) at src/protocol/srs_http_stack.cpp:859
#5 0x000000000056251e in SrsHttpConn::process_request (this=0xb57740, w=0xb68810, r=0xb69e20) at src/app/srs_app_http_conn.cpp:161
#6 0x0000000000562180 in SrsHttpConn::do_cycle (this=0xb57740) at src/app/srs_app_http_conn.cpp:133
#7 0x00000000004d1d99 in SrsConnection::cycle (this=0xb57740) at src/app/srs_app_conn.cpp:171
#8 0x000000000050ab08 in SrsSTCoroutine::cycle (this=0xb577e0) at src/app/srs_app_st.cpp:198
#9 0x000000000050ab7d in SrsSTCoroutine::pfn (arg=0xb577e0) at src/app/srs_app_st.cpp:213
#10 0x00000000005bed1a in _st_thread_main () at sched.c:337
#11 0x00000000005bf492 in st_thread_create (start=0x5be696 <_st_vp_schedule+170>, arg=0x900000001, joinable=1, stk_size=1) at sched.c:616
Backtrace stopped: previous frame inner to this frame (corrupt stack?)
SRS主要过程有:
- SRS会根据URI去匹配,找到对应的handle(SrsLiveStream)
- 通过handle发送HTTP-FLV流到客户端。
SRS会根据URI去匹配,找到对应的handle(SrsLiveStream)
见函数:SrsHttpServeMux::find_handler:
srs_error_t SrsHttpServeMux::find_handler(ISrsHttpMessage* r, ISrsHttpHandler** ph)
{
srs_error_t err = srs_success;
// TODO: FIXME: support the path . and ..
if (r->url().find("..") != std::string::npos) {
return srs_error_new(ERROR_HTTP_URL_NOT_CLEAN, "url %s not canonical", r->url().c_str());
}
if ((err = match(r, ph)) != srs_success) { //通过uri匹配对应的handle(SrsLiveStream)
return srs_error_wrap(err, "http match");
}
// always hijack.
if (!hijackers.empty()) {
// notify all hijackers unless matching failed.
std::vector<ISrsHttpMatchHijacker*>::iterator it;
for (it = hijackers.begin(); it != hijackers.end(); ++it) {
ISrsHttpMatchHijacker* hijacker = *it;
if ((err = hijacker->hijack(r, ph)) != srs_success) { //进入SrsHttpStreamServer::hijack
return srs_error_wrap(err, "http hijack");
}
}
}
static ISrsHttpHandler* h404 = new SrsHttpNotFoundHandler();
if (*ph == NULL) {
*ph = h404;
}
return err;
}
通过uri匹配对应的handle(SrsLiveStream),见函数SrsHttpServeMux::match:
srs_error_t SrsHttpServeMux::match(ISrsHttpMessage* r, ISrsHttpHandler** ph)
{
std::string path = r->path(); //http请求带过来的uri:/live/livestream.flv
// Host-specific pattern takes precedence over generic ones
if (!vhosts.empty() && vhosts.find(r->host()) != vhosts.end()) {
path = r->host() + path;
}
int nb_matched = 0;
ISrsHttpHandler* h = NULL;
std::map<std::string, SrsHttpMuxEntry*>::iterator it;
for (it = entries.begin(); it != entries.end(); ++it) { //遍历获取pattern和entry
std::string pattern = it->first;
SrsHttpMuxEntry* entry = it->second;
if (!entry->enabled) {
continue;
}
if (!path_match(pattern, path)) {
continue;
}
if (!h || (int)pattern.length() > nb_matched) { //匹配成功返回handle
nb_matched = (int)pattern.length();
h = entry->handler;
}
}
*ph = h;
return srs_success;
}
调用栈如下:
(gdb) bt
#0 SrsHttpServeMux::match (this=0xa11dc0, r=0xb634a0, ph=0xb61b68) at src/protocol/srs_http_stack.cpp:766
#1 0x000000000049d40a in SrsHttpServeMux::find_handler (this=0xa11dc0, r=0xb634a0, ph=0xb61b68) at src/protocol/srs_http_stack.cpp:727
#2 0x000000000056349f in SrsHttpServer::serve_http (this=0xa12220, w=0xb61d90, r=0xb634a0) at src/app/srs_app_http_conn.cpp:296
#3 0x000000000049e0fe in SrsHttpCorsMux::serve_http (this=0xb47b40, w=0xb61d90, r=0xb634a0) at src/protocol/srs_http_stack.cpp:859
#4 0x000000000056251e in SrsHttpConn::process_request (this=0xb50d50, w=0xb61d90, r=0xb634a0) at src/app/srs_app_http_conn.cpp:161
#5 0x0000000000562180 in SrsHttpConn::do_cycle (this=0xb50d50) at src/app/srs_app_http_conn.cpp:133
#6 0x00000000004d1d99 in SrsConnection::cycle (this=0xb50d50) at src/app/srs_app_conn.cpp:171
#7 0x000000000050ab08 in SrsSTCoroutine::cycle (this=0xb50c20) at src/app/srs_app_st.cpp:198
#8 0x000000000050ab7d in SrsSTCoroutine::pfn (arg=0xb50c20) at src/app/srs_app_st.cpp:213
#9 0x00000000005bed1a in _st_thread_main () at sched.c:337
#10 0x00000000005bf492 in st_thread_create (start=0x5be696 <_st_vp_schedule+170>, arg=0x900000001, joinable=1, stk_size=1) at sched.c:616
Backtrace stopped: previous frame inner to this frame (corrupt stack?)
客户端拉取HTTP-FLV
函数见:SrsLiveStream::do_serve_http
调用栈:
(gdb) bt
#0 SrsLiveStream::do_serve_http (this=0xb159f0, w=0xb68810, r=0xb69e20) at src/app/srs_app_http_stream.cpp:620
#1 0x00000000004fef4a in SrsLiveStream::serve_http (this=0xb159f0, w=0xb68810, r=0xb69e20) at src/app/srs_app_http_stream.cpp:544
#2 0x000000000049d2af in SrsHttpServeMux::serve_http (this=0xa11dc0, w=0xb68810, r=0xb69e20) at src/protocol/srs_http_stack.cpp:711
#3 0x0000000000563518 in SrsHttpServer::serve_http (this=0xa12220, w=0xb68810, r=0xb69e20) at src/app/srs_app_http_conn.cpp:300
#4 0x000000000049e0fe in SrsHttpCorsMux::serve_http (this=0xb4ca40, w=0xb68810, r=0xb69e20) at src/protocol/srs_http_stack.cpp:859
#5 0x000000000056251e in SrsHttpConn::process_request (this=0xb57740, w=0xb68810, r=0xb69e20) at src/app/srs_app_http_conn.cpp:161
#6 0x0000000000562180 in SrsHttpConn::do_cycle (this=0xb57740) at src/app/srs_app_http_conn.cpp:133
#7 0x00000000004d1d99 in SrsConnection::cycle (this=0xb57740) at src/app/srs_app_conn.cpp:171
#8 0x000000000050ab08 in SrsSTCoroutine::cycle (this=0xb577e0) at src/app/srs_app_st.cpp:198
#9 0x000000000050ab7d in SrsSTCoroutine::pfn (arg=0xb577e0) at src/app/srs_app_st.cpp:213
#10 0x00000000005bed1a in _st_thread_main () at sched.c:337
#11 0x00000000005bf492 in st_thread_create (start=0x5be696 <_st_vp_schedule+170>, arg=0x900000001, joinable=1, stk_size=1) at sched.c:616
Backtrace stopped: previous frame inner to this frame (corrupt stack?)
对应函数如下,主要功能有:
- 根据pattern后缀设置不同的ISrsBufferEncoder,如flv时是SrsFlvStreamEncoder
- 创建消费者并从存储音视频数据的队列queue取出messages。
- 将messages封装成tag发送到拉流客户端。
srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
srs_error_t err = srs_success;
string enc_desc;
ISrsBufferEncoder* enc = NULL;
srs_assert(entry);
if (srs_string_ends_with(entry->pattern, ".flv")) { //根据不同的后缀,选择enc
w->header()->set_content_type("video/x-flv"); // 设置内容type
enc_desc = "FLV";
enc = new SrsFlvStreamEncoder();
} else if (srs_string_ends_with(entry->pattern, ".aac")) {
w->header()->set_content_type("audio/x-aac");
enc_desc = "AAC";
enc = new SrsAacStreamEncoder();
} else if (srs_string_ends_with(entry->pattern, ".mp3")) {
w->header()->set_content_type("audio/mpeg");
enc_desc = "MP3";
enc = new SrsMp3StreamEncoder();
} else if (srs_string_ends_with(entry->pattern, ".ts")) {
w->header()->set_content_type("video/MP2T");
enc_desc = "TS";
enc = new SrsTsStreamEncoder();
} else {
return srs_error_new(ERROR_HTTP_LIVE_STREAM_EXT, "invalid pattern=%s", entry->pattern.c_str());
}
SrsAutoFree(ISrsBufferEncoder, enc);
// Enter chunked mode, because we didn't set the content-length. 进入分块模式,因为我们没有设置内容长度。
w->write_header(SRS_CONSTS_HTTP_OK);
// create consumer of souce, ignore gop cache, use the audio gop cache. 创建源的消费者,忽略gop缓存,使用音频gop缓存。
SrsConsumer* consumer = NULL;
if ((err = source->create_consumer(NULL, consumer, true, true, !enc->has_cache())) != srs_success) {
return srs_error_wrap(err, "create consumer");
}
SrsAutoFree(SrsConsumer, consumer);
srs_verbose("http: consumer created success.");
SrsPithyPrint* pprint = SrsPithyPrint::create_http_stream();
SrsAutoFree(SrsPithyPrint, pprint);
SrsMessageArray msgs(SRS_PERF_MW_MSGS);
// Use receive thread to accept the close event to avoid FD leak.
// @see https://github.com/ossrs/srs/issues/636#issuecomment-298208427
SrsHttpMessage* hr = dynamic_cast<SrsHttpMessage*>(r);
SrsResponseOnlyHttpConn* hc = dynamic_cast<SrsResponseOnlyHttpConn*>(hr->connection());
// update the statistic when source disconveried.
SrsStatistic* stat = SrsStatistic::instance();
if ((err = stat->on_client(srs_int2str(_srs_context->get_id()), req, hc, SrsRtmpConnPlay)) != srs_success) {
return srs_error_wrap(err, "stat on client");
}
// the memory writer.
SrsBufferWriter writer(w);
if ((err = enc->initialize(&writer, cache)) != srs_success) {
return srs_error_wrap(err, "init encoder");
}
// if gop cache enabled for encoder, dump to consumer.
if (enc->has_cache()) {
if ((err = enc->dump_cache(consumer, source->jitter())) != srs_success) { //SrsFlvStreamEncoder,flv没有开gop cache
return srs_error_wrap(err, "encoder dump cache");
}
}
SrsFlvStreamEncoder* ffe = dynamic_cast<SrsFlvStreamEncoder*>(enc); //创建SrsFlvStreamEncoder
// Set the socket options for transport.
bool tcp_nodelay = _srs_config->get_tcp_nodelay(req->vhost);
if (tcp_nodelay) {
if ((err = hc->set_tcp_nodelay(tcp_nodelay)) != srs_success) {
return srs_error_wrap(err, "set tcp nodelay");
}
}
srs_utime_t mw_sleep = _srs_config->get_mw_sleep(req->vhost);
if ((err = hc->set_socket_buffer(mw_sleep)) != srs_success) {
return srs_error_wrap(err, "set mw_sleep %" PRId64, mw_sleep);
}
SrsHttpRecvThread* trd = new SrsHttpRecvThread(hc); //http客户端相关信息
SrsAutoFree(SrsHttpRecvThread, trd);
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "start recv thread");
}
srs_trace("FLV %s, encoder=%s, nodelay=%d, mw_sleep=%dms, cache=%d, msgs=%d",
entry->pattern.c_str(), enc_desc.c_str(), tcp_nodelay, srsu2msi(mw_sleep), //FLV /live/livestream.flv, encoder=FLV, nodelay=0, mw_sleep=350ms, cache=0, msgs=128
enc->has_cache(), msgs.max);
// TODO: free and erase the disabled entry after all related connections is closed.
// TODO: FXIME: Support timeout for player, quit infinite-loop.
while (entry->enabled) {
// Whether client closed the FD.
if ((err = trd->pull()) != srs_success) { //开启协程接收
return srs_error_wrap(err, "recv thread");
}
pprint->elapse();
// get messages from consumer.
// each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
int count = 0;
if ((err = consumer->dump_packets(&msgs, count)) != srs_success) { //从消费者读取msgs
return srs_error_wrap(err, "consumer dump packets");
}
if (count <= 0) {
// Directly use sleep, donot use consumer wait, because we couldn't awake consumer.
srs_usleep(mw_sleep);
// ignore when nothing got.
continue;
}
if (pprint->can_print()) {
srs_trace("-> " SRS_CONSTS_LOG_HTTP_STREAM " http: got %d msgs, age=%d, min=%d, mw=%d",
count, pprint->age(), SRS_PERF_MW_MIN_MSGS, srsu2msi(mw_sleep));
}
// sendout all messages.
if (ffe) {
err = ffe->write_tags(msgs.msgs, count); //如果是flv,此时ffe是SrsFlvStreamEncoder
} else {
err = streaming_send_messages(enc, msgs.msgs, count);
}
// free the messages.
for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = msgs.msgs[i];
srs_freep(msg);
}
// check send error code.
if (err != srs_success) {
return srs_error_wrap(err, "send messages");
}
}
// Here, the entry is disabled by encoder un-publishing or reloading,
// so we must return a io.EOF error to disconnect the client, or the client will never quit.
return srs_error_new(ERROR_HTTP_STREAM_EOF, "Stream EOF");
}
1. 根据pattern后缀设置不同的ISrsBufferEncoder,如flv时是SrsFlvStreamEncoder
if (srs_string_ends_with(entry->pattern, ".flv")) { //根据不同的后缀,选择enc
w->header()->set_content_type("video/x-flv"); // 设置内容type
enc_desc = "FLV";
enc = new SrsFlvStreamEncoder();
}
2. 创建消费者并从存储音视频数据的队列queue取出messages
- 见SrsSource::create_consumer和SrsConsumer::dump_packets,从SRS服务器拉RTMP流分析过对应函数。
3. 将messages封装成tag发送到拉流客户端
见函数SrsFlvStreamEncoder::write_tags,功能分为两部分:
- 发送flv header到拉流客户端
- 发送flv body到拉流客户端
srs_error_t SrsFlvStreamEncoder::write_tags(SrsSharedPtrMessage** msgs, int count)
{
srs_error_t err = srs_success;
// For https://github.com/ossrs/srs/issues/939
if (!header_written) { //先写flv header
bool has_video = false;
bool has_audio = false;
for (int i = 0; i < count && (!has_video || !has_audio); i++) {
SrsSharedPtrMessage* msg = msgs[i];
if (msg->is_video()) {
has_video = true; //有视频消息
} else if (msg->is_audio()) {
has_audio = true; //有音频消息
}
}
// Drop data if no A+V. 如果没有音频和视频数据,返回
if (!has_video && !has_audio) {
return err;
}
if ((err = write_header(has_video, has_audio)) != srs_success) { //先写header,再写tag
return srs_error_wrap(err, "write header");
}
}
return enc->write_tags(msgs, count); //写tag
}
发送flv header到拉流客户端
先发送flv header到拉流客户端。
srs_error_t SrsFlvStreamEncoder::write_header(bool has_video, bool has_audio)
{
srs_error_t err = srs_success;
if (!header_written) {
header_written = true;
if ((err = enc->write_header(has_video, has_audio)) != srs_success) { //写flv header和第一个previousTagSize
return srs_error_wrap(err, "write header");
}
srs_trace("FLV: write header audio=%d, video=%d", has_audio, has_video);
}
return err;
}
实际执行函数为SrsFlvTransmuxer::write_header:
srs_error_t SrsFlvTransmuxer::write_header(bool has_video, bool has_audio) // FLV头部
{
srs_error_t err = srs_success;
uint8_t av_flag = 0;
av_flag += (has_audio? 4:0);
av_flag += (has_video? 1:0);
// 9bytes header and 4bytes first previous-tag-size
char flv_header[] = { //构造flv header,9字节,详见flv格式分析
'F', 'L', 'V', // Signatures "FLV"
(char)0x01, // File version (for example, 0x01 for FLV version 1)
(char)av_flag, // 4, audio; 1, video; 5 audio+video.
(char)0x00, (char)0x00, (char)0x00, (char)0x09 // DataOffset UI32 The length of this header in bytes
};
// flv specification should set the audio and video flag,
// actually in practise, application generally ignore this flag,
// so we generally set the audio/video to 0.
// write 9bytes header.
if ((err = write_header(flv_header)) != srs_success) { //发送时是写flv header和第一个previousTagSize
return srs_error_wrap(err, "write header");
}
return err;
}
发送时是写flv header和第一个previousTagSize:
srs_error_t SrsFlvTransmuxer::write_header(char flv_header[9])
{
srs_error_t err = srs_success;
// write data.
if ((err = writer->write(flv_header, 9, NULL)) != srs_success) {
return srs_error_wrap(err, "write flv header failed");
}
// previous tag size.
char pts[] = { (char)0x00, (char)0x00, (char)0x00, (char)0x00 };
if ((err = writer->write(pts, 4, NULL)) != srs_success) {
return srs_error_wrap(err, "write pts");
}
return err;
}
wireshark抓包:
发送flv body到拉流客户端
flv tag data由4字节的previousTagSize和flv tag组成。
其中flv tag又是由11字节的flv tag header和flv tag data组成。
所以可以分为flv body可以看成由三部分组成:previousTagSize、tag header和tag data。
代码SrsFlvTransmuxer::write_tags如下:
srs_error_t SrsFlvTransmuxer::write_tags(SrsSharedPtrMessage** msgs, int count)
{
srs_error_t err = srs_success;
// realloc the iovss.
int nb_iovss = 3 * count; //连续3个iovss分为代表,tag header,tag data,previousTagSize
iovec* iovss = iovss_cache;
if (nb_iovss_cache < nb_iovss) {
srs_freepa(iovss_cache);
nb_iovss_cache = nb_iovss;
iovss = iovss_cache = new iovec[nb_iovss];
}
// realloc the tag headers. 重新分配tag header内存,一个tag header为11字节
char* cache = tag_headers;
if (nb_tag_headers < count) {
srs_freepa(tag_headers);
nb_tag_headers = count;
cache = tag_headers = new char[SRS_FLV_TAG_HEADER_SIZE * count];
}
// realloc the pts. 重新分配previousTagSize内存,一个previousTagSize为4字节
char* pts = ppts;
if (nb_ppts < count) {
srs_freepa(ppts);
nb_ppts = count;
pts = ppts = new char[SRS_FLV_PREVIOUS_TAG_SIZE * count];
}
// the cache is ok, write each messages.
iovec* iovs = iovss;
for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = msgs[i];
// cache all flv header.
if (msg->is_audio()) { //构造audio tag header
cache_audio(msg->timestamp, msg->payload, msg->size, cache);
} else if (msg->is_video()) { //构造video tag header
cache_video(msg->timestamp, msg->payload, msg->size, cache);
} else { //构造metadata的tag header
cache_metadata(SrsFrameTypeScript, msg->payload, msg->size, cache);
}
// cache all pts.
cache_pts(SRS_FLV_TAG_HEADER_SIZE + msg->size, pts);
// all ioves.
iovs[0].iov_base = cache; // tag header
iovs[0].iov_len = SRS_FLV_TAG_HEADER_SIZE;
iovs[1].iov_base = msg->payload; // tag body
iovs[1].iov_len = msg->size;
iovs[2].iov_base = pts; //
iovs[2].iov_len = SRS_FLV_PREVIOUS_TAG_SIZE; // previousTagSize
// move next. 移动到下一个msg
cache += SRS_FLV_TAG_HEADER_SIZE;
pts += SRS_FLV_PREVIOUS_TAG_SIZE;
iovs += 3;
}
if ((err = writer->writev(iovss, nb_iovss, NULL)) != srs_success) {
return srs_error_wrap(err, "write flv tags failed");
}
return err;
}
其中不同的tag header结构不一样,需要独立构造。
构造audio tag header:
void SrsFlvTransmuxer::cache_audio(int64_t timestamp, char* data, int size, char* cache)
{
srs_assert(data);
timestamp &= 0x7fffffff;
// 11bytes tag header
/*char tag_header[] = {
(char)SrsFrameTypeAudio, // TagType UB [5], 8 = audio
(char)0x00, (char)0x00, (char)0x00, // DataSize UI24 Length of the message.
(char)0x00, (char)0x00, (char)0x00, // Timestamp UI24 Time in milliseconds at which the data in this tag applies.
(char)0x00, // TimestampExtended UI8
(char)0x00, (char)0x00, (char)0x00, // StreamID UI24 Always 0.
};*/
SrsBuffer* tag_stream = new SrsBuffer(cache, 11);
SrsAutoFree(SrsBuffer, tag_stream);
// write data size.
tag_stream->write_1bytes(SrsFrameTypeAudio);
tag_stream->write_3bytes(size);
tag_stream->write_3bytes((int32_t)timestamp);
// default to little-endian
tag_stream->write_1bytes((timestamp >> 24) & 0xFF);
tag_stream->write_3bytes(0x00);
}
构造video tag header:
void SrsFlvTransmuxer::cache_video(int64_t timestamp, char* data, int size, char* cache)
{
srs_assert(data);
timestamp &= 0x7fffffff;
// 11bytes tag header
/*char tag_header[] = {
(char)SrsFrameTypeVideo, // TagType UB [5], 9 = video
(char)0x00, (char)0x00, (char)0x00, // DataSize UI24 Length of the message.
(char)0x00, (char)0x00, (char)0x00, // Timestamp UI24 Time in milliseconds at which the data in this tag applies.
(char)0x00, // TimestampExtended UI8
(char)0x00, (char)0x00, (char)0x00, // StreamID UI24 Always 0.
};*/
SrsBuffer* tag_stream = new SrsBuffer(cache, 11);
SrsAutoFree(SrsBuffer, tag_stream);
// write data size.
tag_stream->write_1bytes(SrsFrameTypeVideo);
tag_stream->write_3bytes(size);
tag_stream->write_3bytes((int32_t)timestamp);
// default to little-endian
tag_stream->write_1bytes((timestamp >> 24) & 0xFF);
tag_stream->write_3bytes(0x00);
}
构造metadata tag header
void SrsFlvTransmuxer::cache_metadata(char type, char* data, int size, char* cache)
{
srs_assert(data);
// 11 bytes tag header
/*char tag_header[] = {
(char)type, // TagType UB [5], 18 = script data
(char)0x00, (char)0x00, (char)0x00, // DataSize UI24 Length of the message.
(char)0x00, (char)0x00, (char)0x00, // Timestamp UI24 Time in milliseconds at which the data in this tag applies.
(char)0x00, // TimestampExtended UI8
(char)0x00, (char)0x00, (char)0x00, // StreamID UI24 Always 0.
};*/
SrsBuffer* tag_stream = new SrsBuffer(cache, 11);
SrsAutoFree(SrsBuffer, tag_stream);
// write data size.
tag_stream->write_1bytes(type);
tag_stream->write_3bytes(size);
tag_stream->write_3bytes(0x00);
tag_stream->write_1bytes(0x00);
tag_stream->write_3bytes(0x00);
}
至此,完成拉流客户端从SRS服务器拉流过程。
原文:https://www.yuque.com/wahaha-0yfyj/mnfloz/hur2oe
版权声明:本文内容转自互联网,本文观点仅代表作者本人。本站仅提供信息存储空间服务,所有权归原作者所有。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至1393616908@qq.com 举报,一经查实,本站将立刻删除。