SRS接收到客户端推流后是怎么进行处理、存储的?在进行RTMP握手,消息交互后,执行到SrsRtmpConn::publishing函数,主要包括两部分功能:根据给定的 mount 挂载 handle 和启动协程接受推流的音视频数据。
主体代码如下:
srs_error_t SrsRtmpConn::publishing(SrsSource* source)
{
srs_error_t err = srs_success;
SrsRequest* req = info->req;
if (_srs_config->get_refer_enabled(req->vhost)) {
if ((err = refer->check(req->pageUrl, _srs_config->get_refer_publish(req->vhost))) != srs_success) {
return srs_error_wrap(err, "rtmp: referer check");
}
}
if ((err = http_hooks_on_publish()) != srs_success) {
return srs_error_wrap(err, "rtmp: callback on publish");
}
// TODO: FIXME: Should refine the state of publishing.
if ((err = acquire_publish(source)) == srs_success) { //根据给的mount挂载handler (SrsLiveStream)
// use isolate thread to recv,
// @see: https://github.com/ossrs/srs/issues/237
SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source, _srs_context->get_id());
err = do_publishing(source, &rtrd); //启动协程接收客户端推送的音视频数据
rtrd.stop();
}
// whatever the acquire publish, always release publish.
// when the acquire error in the midlle-way, the publish state changed,
// but failed, so we must cleanup it.
// @see https://github.com/ossrs/srs/issues/474
// @remark when stream is busy, should never release it.
if (srs_error_code(err) != ERROR_SYSTEM_STREAM_BUSY) {
release_publish(source);
}
http_hooks_on_unpublish();
return err;
}
根据给定的mount挂载handle
首先会根据跟定的mount挂载到handle,handle指SrsLiveStream(HTTP直播流,将RTMP转成HTTP-FLV或者其他格式)
推流的时候根据url创建对应的handler,拉流的时候根据url找到对应处理的handler。
挂载handle代码分析
主代码在SrsHttpStreamServer::http_mount开始。
调用栈:
SrsLiveStream::SrsLiveStream (this=0xb15ea0, s=0xadd4a0, r=0xadde00, c=0xaf11e0) at src/app/srs_app_http_stream.cpp:514
514 SrsLiveStream::SrsLiveStream(SrsSource* s, SrsRequest* r, SrsBufferCache* c)
(gdb) bt
#0 SrsLiveStream::SrsLiveStream (this=0xb15ea0, s=0xadd4a0, r=0xadde00, c=0xaf11e0) at src/app/srs_app_http_stream.cpp:514
#1 0x0000000000501f3b in SrsHttpStreamServer::http_mount (this=0xa11db0, s=0xadd4a0, r=0xadde00) at src/app/srs_app_http_stream.cpp:912
#2 0x000000000056358d in SrsHttpServer::http_mount (this=0xa12220, s=0xadd4a0, r=0xadde00) at src/app/srs_app_http_conn.cpp:308
#3 0x00000000004ce06a in SrsServer::on_publish (this=0xa10370, s=0xadd4a0, r=0xadde00) at src/app/srs_app_server.cpp:1610
#4 0x00000000004e775e in SrsSource::on_publish (this=0xadd4a0) at src/app/srs_app_source.cpp:2463
#5 0x00000000004d96ca in SrsRtmpConn::acquire_publish (this=0xac0f10, source=0xadd4a0) at src/app/srs_app_rtmp_conn.cpp:940
#6 0x00000000004d874c in SrsRtmpConn::publishing (this=0xac0f10, source=0xadd4a0) at src/app/srs_app_rtmp_conn.cpp:822
#7 0x00000000004d5ee7 in SrsRtmpConn::stream_service_cycle (this=0xac0f10) at src/app/srs_app_rtmp_conn.cpp:534
#8 0x00000000004d4ddf in SrsRtmpConn::service_cycle (this=0xac0f10) at src/app/srs_app_rtmp_conn.cpp:388
#9 0x00000000004d3ba7 in SrsRtmpConn::do_cycle (this=0xac0f10) at src/app/srs_app_rtmp_conn.cpp:209
#10 0x00000000004d1d99 in SrsConnection::cycle (this=0xac0f88) at src/app/srs_app_conn.cpp:171
#11 0x000000000050ab08 in SrsSTCoroutine::cycle (this=0xac11f0) at src/app/srs_app_st.cpp:198
#12 0x000000000050ab7d in SrsSTCoroutine::pfn (arg=0xac11f0) at src/app/srs_app_st.cpp:213
#13 0x00000000005bed1a in _st_thread_main () at sched.c:337
#14 0x00000000005bf492 in st_thread_create (start=0x5be696 <_st_vp_schedule+170>, arg=0x700000001, joinable=1, stk_size=1) at sched.c:616
Backtrace stopped: previous frame inner to this frame (corrupt stack?)
SrsHttpStreamServer::http_mount代码如下:
srs_error_t SrsHttpStreamServer::http_mount(SrsSource* s, SrsRequest* r)
{
srs_error_t err = srs_success;
// the id to identify stream.
std::string sid = r->get_stream_url(); // 比如rtmp://8.141.75.248:1935/live/livestream中streamUrl就是/live/stream
SrsLiveEntry* entry = NULL; //SrsLiveEntry,直播⼊⼝,⽤来处理HTTP直播流
// create stream from template when not found.
if (sflvs.find(sid) == sflvs.end()) { //找不到对于的sid
if (tflvs.find(r->vhost) == tflvs.end()) { //查找对应的vhost,找不到则返回
return err;
}
SrsLiveEntry* tmpl = tflvs[r->vhost];
std::string mount = tmpl->mount; //配置中的mount是[vhost]/[app]/[stream].flv
// replace the vhost variable 替换mount,由[vhost]/[app]/[stream].flv变为__defaultVhost__/live/livestream.flv
mount = srs_string_replace(mount, "[vhost]", r->vhost);
mount = srs_string_replace(mount, "[app]", r->app);
mount = srs_string_replace(mount, "[stream]", r->stream);
// remove the default vhost mount 由__defaultVhost__/live/livestream.flv替换为/live/livestream.flv
mount = srs_string_replace(mount, SRS_CONSTS_RTMP_DEFAULT_VHOST"/", "/");
entry = new SrsLiveEntry(mount); //创建SrsLiveEntry并标明类型,比如flv还是ts
entry->source = s; //指向source
entry->req = r->copy()->as_http();
entry->cache = new SrsBufferCache(s, r);
entry->stream = new SrsLiveStream(s, r, entry->cache); //创建SrsLiveStream,HTTP直播流,将RTMP转成HTTP-FLV或者其他格式,其实际是handler
// TODO: FIXME: maybe refine the logic of http remux service.
// if user push streams followed:
// rtmp://test.com/live/stream1
// rtmp://test.com/live/stream2
// and they will using the same template, such as: [vhost]/[app]/[stream].flv
// so, need to free last request object, otherwise, it will cause memory leak.
srs_freep(tmpl->req);
tmpl->source = s;
tmpl->req = r->copy()->as_http();
sflvs[sid] = entry; //每个sid对应一个entry,entry包含SrsLiveStream
// mount the http flv stream.
// we must register the handler, then start the thread, 根据给的mount挂载handler (SrsLiveStream)
// for the thread will cause thread switch context.
// @see https://github.com/ossrs/srs/issues/404 创建路由
if ((err = mux.handle(mount, entry->stream)) != srs_success) { //挂载http-flv流
return srs_error_wrap(err, "http: mount flv stream for vhost=%s failed", sid.c_str());
}
// start http stream cache thread 开启http stream 缓存线程
if ((err = entry->cache->start()) != srs_success) {
return srs_error_wrap(err, "http: start stream cache failed");
}
srs_trace("http: mount flv stream for sid=%s, mount=%s", sid.c_str(), mount.c_str());
} else {
// The entry exists, we reuse it and update the request of stream and cache.
entry = sflvs[sid];
entry->stream->update_auth(s, r);
entry->cache->update_auth(s, r);
}
if (entry->stream) {
entry->stream->entry->enabled = true;
return err;
}
return err;
}
实际挂载http flv stream在SrsHttpServeMux::handle
SrsHttpServeMux:HTTP请求多路复⽤器,⾥⾯记录了path以及对应的handler
srs_error_t SrsHttpServeMux::handle(std::string pattern, ISrsHttpHandler* handler) //handle是SrsLiveStream对象(每个source都会绑定一个)
{ // pattern:/live/livestream.flv
srs_assert(handler);
if (pattern.empty()) {
return srs_error_new(ERROR_HTTP_PATTERN_EMPTY, "empty pattern");
}
if (entries.find(pattern) != entries.end()) {
SrsHttpMuxEntry* exists = entries[pattern];
if (exists->explicit_match) {
return srs_error_new(ERROR_HTTP_PATTERN_DUPLICATED, "pattern=%s exists", pattern.c_str());
}
}
std::string vhost = pattern;
if (pattern.at(0) != '/') {
if (pattern.find("/") != string::npos) {
vhost = pattern.substr(0, pattern.find("/"));
}
vhosts[vhost] = handler;
}
if (true) {
SrsHttpMuxEntry* entry = new SrsHttpMuxEntry(); //创建SrsHttpMuxEntry,服务器多路复用器的多路复用条目,匹配器信息包含,例如模式和处理程序。
entry->explicit_match = true;
entry->handler = handler; //SrsLiveStream
entry->pattern = pattern; //pattern:/live/livestream.flv
entry->handler->entry = entry;
if (entries.find(pattern) != entries.end()) {
SrsHttpMuxEntry* exists = entries[pattern];
srs_freep(exists);
}
entries[pattern] = entry; //加入entries的map中,key:pattern,value:entry,entry包含了handle,即SrsLiveStream
}
// Helpful behavior:
// If pattern is /tree/, insert an implicit permanent redirect for /tree.
// It can be overridden by an explicit registration.
if (pattern != "/" && !pattern.empty() && pattern.at(pattern.length() - 1) == '/') { //没进来
std::string rpattern = pattern.substr(0, pattern.length() - 1);
SrsHttpMuxEntry* entry = NULL;
// free the exists implicit entry
if (entries.find(rpattern) != entries.end()) {
entry = entries[rpattern];
}
// create implicit redirect.
if (!entry || !entry->explicit_match) {
srs_freep(entry);
entry = new SrsHttpMuxEntry();
entry->explicit_match = false;
entry->handler = new SrsHttpRedirectHandler(pattern, SRS_CONSTS_HTTP_Found);
entry->pattern = pattern;
entry->handler->entry = entry;
entries[rpattern] = entry;
}
}
return srs_success;
}
启动协程接收推流的音视频数据
SrsRtmpConn::do_publishing会启动协程接收客户端推送的音视频数据。
srs_error_t SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* rtrd)
{
...
// start isolate recv thread.
if ((err = rtrd->start()) != srs_success) { //SrsPublishRecvThread::start
return srs_error_wrap(err, "rtmp: receive thread");
}
...
}
最后执行具体接收音视频数据操作的对象是SrsRecvThread::do_cycle,调用过程如下:
SrsPublishRecvThread::start->
SrsRecvThread::start->
SrsSTCoroutine::start->
SrsSTCoroutine::pfn->
SrsSTCoroutine::cycle->
SrsRecvThread::cycle->
SrsRecvThread::do_cycle
SrsRecvThread::do_cycle会完成两步操作:
- 处理接收到的message。
- 然后去消费这个message。
srs_error_t SrsRecvThread::do_cycle()
{
srs_error_t err = srs_success;
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "recv thread");
}
// When the pumper is interrupted, wait then retry.
if (pumper->interrupted()) {
srs_usleep(timeout);
continue;
}
SrsCommonMessage* msg = NULL;
// Process the received message. 处理收到的消息
if ((err = rtmp->recv_message(&msg)) == srs_success) {
err = pumper->consume(msg); //数据读取出来后去消费这个数据
}
if (err != srs_success) {
// Interrupt the receive thread for any error.
trd->interrupt();
// Notify the pumper to quit for error.
pumper->interrupt(err);
return srs_error_wrap(err, "recv thread");
}
}
return err;
}
处理接收到的message
- 从协议栈接收流式数据,解码成RTMP的message,即流式数据->chunk->message。
- 对于协议控制消息,会进行解析成packet进行相应处理。
- SrsProtocol::recv_message的代码如下:
srs_error_t SrsProtocol::recv_message(SrsCommonMessage** pmsg)
{
*pmsg = NULL;
srs_error_t err = srs_success;
while (true) {
SrsCommonMessage* msg = NULL;
//来自协议栈的面向Recv字节的RTMP消息。流式数据->chunk->message,返回的是message
if ((err = recv_interlaced_message(&msg)) != srs_success) {
srs_freep(msg);
return srs_error_wrap(err, "recv interlaced message");
}
if (!msg) { //若获取到一个空消息,则继续获取下一个消息
continue;
}
if (msg->size <= 0 || msg->header.payload_length <= 0) {
srs_trace("ignore empty message(type=%d, size=%d, time=%" PRId64 ", sid=%d).",
msg->header.message_type, msg->header.payload_length,
msg->header.timestamp, msg->header.stream_id);
srs_freep(msg);
continue;
}
//该函数首先检测当前接收到的字节数是否已经达到当前窗口大小,若是,则回应客户端窗口消息,然后接着对接收到的若为 应答窗口大小(5)、设置块大小(1)、用户控制消息(4) 则会进行解码,
if ((err = on_recv_message(msg)) != srs_success) { //并根据解析后的内容更新当前 rtmp 服务器的上下文信息
srs_freep(msg);
return srs_error_wrap(err, "on recv message");
}
*pmsg = msg;
break;
}
return err;
}
从协议栈接收流式数据,解码成RTMP的message,即流式数据->chunk->message。
RTMP传输时会对数据格式化为RTMP message,实际传输的时候为了更好实现多路复用,分包和信息的公平性,发送端会把message分为带message id的chunk,每个chunk可以是单独一个message,也可以是message的一部分。
chunk又称消息块,由四部分组成:
- basic header:标识此chunk。
- message header:标识此chunk负载所属消息。
- Extended Timestamp:当时间戳溢出时才出现。
- chunk data:此chunk的payload。
当时间戳不溢出时,解码成RTMP的chunk分为三个步骤:
- 解析basic header,见代码SrsProtocol::read_basic_header
- 解析message header,见代码SrsProtocol::read_message_header
- 解析message payload,见代码SrsProtocol::read_message_payload
入口代码:
srs_error_t SrsProtocol::recv_interlaced_message(SrsCommonMessage** pmsg)
{
srs_error_t err = srs_success;
// chunk stream basic header. 注:RTMP的chunk stream basic header
char fmt = 0;
int cid = 0;
if ((err = read_basic_header(fmt, cid)) != srs_success) { // 读取rtmp header
return srs_error_wrap(err, "read basic header");
}
// the cid must not negative.
srs_assert(cid >= 0);
// get the cached chunk stream.
SrsChunkStream* chunk = NULL; //传入的块流可能是交错的,使用块流缓存输入RTMP块流。
// use chunk stream cache to get the chunk info. 一个消息客户端可能会分成几个 chunk 发送,因此需要把每次读取的 chunk 的信息和负载缓存起来
// @see https://github.com/ossrs/srs/issues/249 使用块流缓存来获取块信息,有助用提高性能,在块大小较小时提高约10%的性能,在大块时提高5%。
if (cid < SRS_PERF_CHUNK_STREAM_CACHE) { // cid小于16时使用块缓存
// already init, use it direclty
chunk = cs_cache[cid]; //从cs_cache获取的chunk message是null的
} else {
// chunk stream cache miss, use map. 块缓存没有,使用map,key为cid,value为SrsChunkStream
if (chunk_streams.find(cid) == chunk_streams.end()) {
chunk = chunk_streams[cid] = new SrsChunkStream(cid); //没有找到则创建一个新的SrsChunkStream
// set the perfer cid of chunk,
// which will copy to the message received.
chunk->header.perfer_cid = cid;
} else {
chunk = chunk_streams[cid];
}
}
// chunk stream message header 读取RTMP的message header
if ((err = read_message_header(chunk, fmt)) != srs_success) {
return srs_error_wrap(err, "read message header");
}
// read msg payload from chunk stream. 读取msg payload
SrsCommonMessage* msg = NULL;
if ((err = read_message_payload(chunk, &msg)) != srs_success) {
return srs_error_wrap(err, "read message payload");
}
// not got an entire RTMP message, try next chunk.
if (!msg) {
return err;
}
//获取到完整的消息
*pmsg = msg;
return err;
}
解析basic header
basic header可能为1,2,3字节,包含 chunk type(chunk类型)和 chunk stream id(流通道id)
- chunk type(fmt),2bit,决定了后面message header的格式。
- chunk stream id简写为CSID,,唯一标识特定的流通道。
由于chunk type固定为2bit,所以CSID长度是6bit(8bit-2bit),14bit或者22bit,RTMP协议支持用户自定义3~65599之间的CSID,其中0,1,2由协议保留为特殊信息。
- 0表示basic header要占用2个字节,csid在64~319之间
- 1表示basic header要占用3个字节,csid在64~65599之间
- 2表示该chunk是控制信息和一些命令命令。
当basic header为1字节,CSID占6bit,csid范围在0到63之间,用户自定义范围是3到63之间。
* 0 1 2 3 4 5 6 7
* +-+-+-+-+-+-+-+-+
* |fmt| cs id |
* +-+-+-+-+-+-+-+-+
当basic header为2字节时,CSID占14bit,将第1字节的后6bit置为0,第2字节存储CSID,8bit可以表示0到255,所以csid的范围是64到319,319=255+64。
* 0 1
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* |fmt| 0 | cs id - 64 |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
当basic header为3字节时,CDID占22bit,将第1字节的后6bit置为1,剩下16bit表示CSID,16bit可以表示0到65535,所以csid的范围是64到65599,65599=65535+64
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
* |fmt| 1 | cs id - 64 |
* +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
代码分析:
srs_error_t SrsProtocol::read_basic_header(char& fmt, int& cid)
{
srs_error_t err = srs_success;
if ((err = in_buffer->grow(skt, 1)) != srs_success) {
return srs_error_wrap(err, "basic header requires 1 bytes");
}
fmt = in_buffer->read_1byte(); //从缓冲区读取一个字节,移动到下一个字节。
cid = fmt & 0x3f; //获取cid:chunk stream id
fmt = (fmt >> 6) & 0x03; //获取fmt
// 2-63, 1B chunk header
if (cid > 1) { // cid>1,说明chunk basic header表示1bit,返回即可,2bit和3bit的basic header的cid都大于1
return err;
// 64-319, 2B chunk header,第一字节的cid=0,说明是2bit的basic header
} else if (cid == 0) { // 第1byte的0-5bit值为0,说明basic header长度为2字节,再读取一个byte
if ((err = in_buffer->grow(skt, 1)) != srs_success) {
return srs_error_wrap(err, "basic header requires 2 bytes");
}
cid = 64;
cid += (uint8_t)in_buffer->read_1byte(); // cid计算为:64+第二字节的int值
// 64-65599, 3B chunk header
} else { //第1byte的0-5bit值为1,说明basic header长度为3字节,还需要读取两个byte
srs_assert(cid == 1);
if ((err = in_buffer->grow(skt, 2)) != srs_success) {
return srs_error_wrap(err, "basic header requires 3 bytes");
}
cid = 64; //cid计算公式为:cid = 64+第二字节的int值+(256x第三字节的int值)
cid += (uint8_t)in_buffer->read_1byte();
cid += ((uint8_t)in_buffer->read_1byte()) * 256;
}
return err;
}
解析message header
message header 包含发送信息的描述信息。
message header的格式和长度取决于 basic header的chunk type(即fmt,2bit),共有4种格式。
Type=0时,message header 占11个字节,其他三种数据都能表示。
在chunk stream 开始的第一个chunk和头信息中时间戳回退(即值与上一个chunk小,在回退播放会出现这种情况)的时候采用这种格式。
timestamp:占3字节,表示时间戳,单位是ms。最大值为2^24-1,即最多能播放4.6个小时,超过时置为1,去extended timestamp解析实际时间戳,extended timestamp最多能播放1193个小时。
message length:占3字节,表示要发送的数据的长度,比如音频或者视频数据长度。
message type id:占1字节,表示消息类型,比如8代表音频数据,9代表视频数据。
message stream id:占4字节,表示chunk所在的流id。
Type = 1时,message header占7字节,省去了message stream id的4字节,表示此chunk和上一个chunk所在流相同。
timestamp delta:占3字节,与type=0时timestamp不同,timestamp delta存储是和上一个chunk的时间戳差值。当值超过3个字节所能表达的最大值时都置为1,时间时间戳差值从extended timestamp字段中。
Type = 2时,message header占3字节,相对于type=1省去了3字节的消息长度和1字节的消息类型,表示此chunk和上一个chunk所在流,消息长度,类型都相同,余下3字节表示timestamp delta。
Type = 3时,message header占0字节,表示此chunk的message header和上一个完全相同。
当它跟在type=0的chunk后面,表示和前一个chunk的时间戳完全相同,即message拆分成chunk的情况。
当它跟在type=1或者type=2后面,表示和前一个chunk的时间戳差值是相同的。
比如第一个chunk type=0,timestamp=100,第二个chunk type=2,timestamp delta=20,表示时间戳120,第三个chunk type=3,表示timestamp delta=20,时间戳为140。
代码分析:
srs_error_t SrsProtocol::read_message_header(SrsChunkStream* chunk, char fmt)
{
srs_error_t err = srs_success;
/**
* we should not assert anything about fmt, for the first packet.
* (when first packet, the chunk->msg is NULL).
* the fmt maybe 0/1/2/3, the FMLE will send a 0xC4 for some audio packet.
* the previous packet is: 当前包格式如下:
* 04 // fmt=0, cid=4
* 00 00 1a // timestamp=26
* 00 00 9d // payload_length=157
* 08 // message_type=8(audio)
* 01 00 00 00 // stream_id=1
* the current packet maybe: 那么下一个包可以省略为:表示时间戳增值、payload_length、message_type、stream_id都和上一个相同
* c4 // fmt=3, cid=4
* it's ok, for the packet is audio, and timestamp delta is 26.
* the current packet must be parsed as:
* fmt=0, cid=4
* timestamp=26+26=52
* payload_length=157
* message_type=8(audio)
* stream_id=1
* so we must update the timestamp even fmt=3 for first packet. 需要注意的就是时间戳一定要更新
*/
// fresh packet used to update the timestamp even fmt=3 for first packet.
// fresh packet always means the chunk is the first one of message.
bool is_first_chunk_of_msg = !chunk->msg; // 是否是第一个chunk,是为true
// but, we can ensure that when a chunk stream is fresh,
// the fmt must be 0, a new stream. 新的stream的fmt必须为0
if (chunk->msg_count == 0 && fmt != RTMP_FMT_TYPE0) {
// for librtmp, if ping, it will send a fresh stream with fmt=1,
// 0x42 where: fmt=1, cid=2, protocol contorl user-control message 协议控制user-control消息
// 0x00 0x00 0x00 where: timestamp=0
// 0x00 0x00 0x06 where: payload_length=6
// 0x04 where: message_type=4(protocol control user-control message)
// 0x00 0x06 where: event Ping(0x06)
// 0x00 0x00 0x0d 0x0f where: event data 4bytes ping timestamp.
// @see: https://github.com/ossrs/srs/issues/98
if (fmt == RTMP_FMT_TYPE1) {
srs_warn("fresh chunk starts with fmt=1");
} else {
// must be a RTMP protocol level error.
return srs_error_new(ERROR_RTMP_CHUNK_START, "fresh chunk expect fmt=0, actual=%d, cid=%d", fmt, chunk->cid);
}
}
// when exists cache msg, means got an partial message,
// the fmt must not be type0 which means new message.
if (chunk->msg && fmt == RTMP_FMT_TYPE0) { //当存在缓存消息时,意味着得到了一个部分消息,FMT不能是type0,这意味着新的消息。
return srs_error_new(ERROR_RTMP_CHUNK_START, "for existed chunk, fmt should not be 0");
}
// create msg when new chunk stream start 如果是新的chunk stream则需要创建新的SrsCommonMessage
if (!chunk->msg) { //SrsCommonMessage:该消息是原始数据RTMP消息,面向字节,协议始终接收RTMP消息,可以发送RTMP消息或RTMP包。
chunk->msg = new SrsCommonMessage(); //公共消息是从底层协议sdk读取的。而共享的PTR消息用于复制和发送。
}
// read message header from socket to buffer.
static char mh_sizes[] = {11, 7, 3, 0};
int mh_size = mh_sizes[(int)fmt]; //fmt=0时,mh_size=11,表示message header的长度
if (mh_size > 0 && (err = in_buffer->grow(skt, mh_size)) != srs_success) {
return srs_error_wrap(err, "read %d bytes message header", mh_size);
}
/**
* parse the message header.
* 3bytes: timestamp delta, fmt=0,1,2
* 3bytes: payload length, fmt=0,1
* 1bytes: message type, fmt=0,1
* 4bytes: stream id, fmt=0
* where:
* fmt=0, 0x0X
* fmt=1, 0x4X
* fmt=2, 0x8X
* fmt=3, 0xCX
*/
// see also: ngx_rtmp_recv
if (fmt <= RTMP_FMT_TYPE2) {
char* p = in_buffer->read_slice(mh_size);
char* pp = (char*)&chunk->header.timestamp_delta;
pp[2] = *p++;
pp[1] = *p++;
pp[0] = *p++;
pp[3] = 0;
// fmt: 0 fmt=0
// timestamp: 3 bytes 时间戳:3字节
// If the timestamp is greater than or equal to 16777215 如果时间戳大于0x00ffffff,那么时间戳的值在extended timestamp header
// (hexadecimal 0x00ffffff), this value MUST be 16777215, and the 否则以这个时间戳为准
// 'extended timestamp header' MUST be present. Otherwise, this value
// SHOULD be the entire timestamp.
//
// fmt: 1 or 2 fmt=1或者2
// timestamp delta: 3 bytes 时间戳增量:3字节
// If the delta is greater than or equal to 16777215 (hexadecimal 如果时间戳增量大于0x00ffffff,时间戳见extended timestamp header
// 0x00ffffff), this value MUST be 16777215, and the 'extended 否则以这个时间戳增量为准
// timestamp header' MUST be present. Otherwise, this value SHOULD be
// the entire delta.
chunk->extended_timestamp = (chunk->header.timestamp_delta >= RTMP_EXTENDED_TIMESTAMP); //判断前3字节的值是否超过RTMP_EXTENDED_TIMESTAMP,如果超过需要到extended_timestamp读取时间戳
if (!chunk->extended_timestamp) { //如果不需要扩展时间戳,获取当前的时间戳
// Extended timestamp: 0 or 4 bytes
// This field MUST be sent when the normal timsestamp is set to
// 0xffffff, it MUST NOT be sent if the normal timestamp is set to
// anything else. So for values less than 0xffffff the normal
// timestamp field SHOULD be used in which case the extended timestamp
// MUST NOT be present. For values greater than or equal to 0xffffff
// the normal timestamp field MUST NOT be used and MUST be set to
// 0xffffff and the extended timestamp MUST be sent.
if (fmt == RTMP_FMT_TYPE0) {
// 6.1.2.1. Type 0
// For a type-0 chunk, the absolute timestamp of the message is sent
// here. 如果type=0,timestamp就是前3字节
chunk->header.timestamp = chunk->header.timestamp_delta;
} else {
// 6.1.2.2. Type 1
// 6.1.2.3. Type 2
// For a type-1 or type-2 chunk, the difference between the previous
// chunk's timestamp and the current chunk's timestamp is sent here.
chunk->header.timestamp += chunk->header.timestamp_delta; //type=1或者2时,timestamp+=timestamp_delta
}
}
if (fmt <= RTMP_FMT_TYPE1) { //如果fmt小于等于1,计算payload_length大小
int32_t payload_length = 0;
pp = (char*)&payload_length;
pp[2] = *p++;
pp[1] = *p++;
pp[0] = *p++;
pp[3] = 0;
// for a message, if msg exists in cache, the size must not changed.
// always use the actual msg size to compare, for the cache payload length can changed,
// for the fmt type1(stream_id not changed), user can change the payload
// length(it's not allowed in the continue chunks).
if (!is_first_chunk_of_msg && chunk->header.payload_length != payload_length) {
return srs_error_new(ERROR_RTMP_PACKET_SIZE, "msg in chunk cache, size=%d cannot change to %d", chunk->header.payload_length, payload_length);
}
chunk->header.payload_length = payload_length; //消息数据长度
chunk->header.message_type = *p++; //消息的类型id:占用1个字节,表示实际发送的数据的类型,如8代表音频数据、9代表视频数据。
if (fmt == RTMP_FMT_TYPE0) { //如果是type=0,还需要读取stream id
pp = (char*)&chunk->header.stream_id;
pp[0] = *p++;
pp[1] = *p++;
pp[2] = *p++;
pp[3] = *p++;
}
}
} else {
// update the timestamp even fmt=3 for first chunk packet
if (is_first_chunk_of_msg && !chunk->extended_timestamp) {
chunk->header.timestamp += chunk->header.timestamp_delta;
}
}
// read extended-timestamp
if (chunk->extended_timestamp) { //如果存在extended-timestamp,读取
mh_size += 4;
if ((err = in_buffer->grow(skt, 4)) != srs_success) {
return srs_error_wrap(err, "read 4 bytes ext timestamp");
}
// the ptr to the slice maybe invalid when grow()
// reset the p to get 4bytes slice.
char* p = in_buffer->read_slice(4);
uint32_t timestamp = 0x00;
char* pp = (char*)×tamp;
pp[3] = *p++;
pp[2] = *p++;
pp[1] = *p++;
pp[0] = *p++;
// always use 31bits timestamp, for some server may use 32bits extended timestamp.
// @see https://github.com/ossrs/srs/issues/111
timestamp &= 0x7fffffff;
/**
* RTMP specification and ffmpeg/librtmp is false, RTMP规范和ffmpeg/librtmp是假的,但是,adobe改变了规范,所以flash/FMLE/FMS总是真。
* but, adobe changed the specification, so flash/FMLE/FMS always true. 默认为true,以支持flash/FMLE/FMS。
* default to true to support flash/FMLE/FMS.
*
* ffmpeg/librtmp may donot send this filed, need to detect the value. Ffmpeg /librtmp可能不发送此文件,需要检测值
* @see also: http://blog.csdn.net/win_lin/article/details/13363699
* compare to the chunk timestamp, which is set by chunk message header 与chunk时间戳相比,它是由块消息头类型0、1或2设置的。
* type 0,1 or 2.
*
* @remark, nginx send the extended-timestamp in sequence-header, nginx在序列头中发送扩展的时间戳,在继续的C1块中发送时间戳增量,因此与ffmpeg兼容,
* and timestamp delta in continue C1 chunks, and so compatible with ffmpeg, 也就是说,在nginx-rtmp中没有继续的时间戳和扩展的时间戳。
* that is, there is no continue chunks and extended-timestamp in nginx-rtmp.
*
* @remark, srs always send the extended-timestamp, to keep simple,
* and compatible with adobe products. SRS总是发送扩展的时间戳,以保持简单,并与adobe产品兼容。
*/
uint32_t chunk_timestamp = (uint32_t)chunk->header.timestamp;
/**
* if chunk_timestamp<=0, the chunk previous packet has no extended-timestamp,
* always use the extended timestamp. 如果chunk_timestamp<=0,表示chunk的前一个包没有扩展时间戳,则始终使用扩展时间戳。
*/
/**
* about the is_first_chunk_of_msg. 对于消息的第一块,始终使用扩展的时间戳。
* @remark, for the first chunk of message, always use the extended timestamp.
*/
if (!is_first_chunk_of_msg && chunk_timestamp > 0 && chunk_timestamp != timestamp) {
mh_size -= 4;
in_buffer->skip(-4);
} else {
chunk->header.timestamp = timestamp;
}
}
// the extended-timestamp must be unsigned-int,
// 24bits timestamp: 0xffffff = 16777215ms = 16777.215s = 4.66h
// 32bits timestamp: 0xffffffff = 4294967295ms = 4294967.295s = 1193.046h = 49.71d
// because the rtmp protocol says the 32bits timestamp is about "50 days":
// 3. Byte Order, Alignment, and Time Format
// Because timestamps are generally only 32 bits long, they will roll
// over after fewer than 50 days.
//
// but, its sample says the timestamp is 31bits:
// An application could assume, for example, that all
// adjacent timestamps are within 2^31 milliseconds of each other, so
// 10000 comes after 4000000000, while 3000000000 comes before
// 4000000000.
// and flv specification says timestamp is 31bits:
// Extension of the Timestamp field to form a SI32 value. This
// field represents the upper 8 bits, while the previous
// Timestamp field represents the lower 24 bits of the time in
// milliseconds.
// in a word, 31bits timestamp is ok.
// convert extended timestamp to 31bits.
chunk->header.timestamp &= 0x7fffffff;
// valid message, the payload_length is 24bits,
// so it should never be negative.
srs_assert(chunk->header.payload_length >= 0);
// copy header to msg
chunk->msg->header = chunk->header;
// increase the msg count, the chunk stream can accept fmt=1/2/3 message now.
chunk->msg_count++;
return err;
}
解析message payload
message payload即Chunk Data,
获取message的payload,代码如下:
srs_error_t SrsProtocol::read_message_payload(SrsChunkStream* chunk, SrsCommonMessage** pmsg)
{
srs_error_t err = srs_success;
// empty message
if (chunk->header.payload_length <= 0) {
srs_trace("get an empty RTMP message(type=%d, size=%d, time=%" PRId64 ", sid=%d)", chunk->header.message_type,
chunk->header.payload_length, chunk->header.timestamp, chunk->header.stream_id);
*pmsg = chunk->msg;
chunk->msg = NULL;
return err;
}
srs_assert(chunk->header.payload_length > 0);
// the chunk payload size.
int payload_size = chunk->header.payload_length - chunk->msg->size;
payload_size = srs_min(payload_size, in_chunk_size);
// create msg payload if not initialized 创建msg payload如果没有被初始化
if (!chunk->msg->payload) {
chunk->msg->create_payload(chunk->header.payload_length);
}
// read payload to buffer
if ((err = in_buffer->grow(skt, payload_size)) != srs_success) {
return srs_error_wrap(err, "read %d bytes payload", payload_size);
}
memcpy(chunk->msg->payload + chunk->msg->size, in_buffer->read_slice(payload_size), payload_size);
chunk->msg->size += payload_size;
// got entire RTMP message? 如果payload_length=msg->size,表示收到完整的message
if (chunk->header.payload_length == chunk->msg->size) {
*pmsg = chunk->msg;
chunk->msg = NULL;
return err;
}
return err;
}
对于协议控制消息,会进行解析成packet进行相应处理
对于协议控制消息,会传入message进行解析成packet进行相应处理。
RTMP为协议控制消息保留消息类型id 1-7。这些消息包含RTM Chunk Stream协议或RTMP本身所需的信息。
- id为1和2的协议消息保留给RTM Chunk Stream协议使用。
- id为3-6的协议消息保留给RTMP使用。
- 在边缘服务器和源服务器之间使用ID为7的协议消息。
#define RTMP_MSG_SetChunkSize 0x01
#define RTMP_MSG_AbortMessage 0x02
#define RTMP_MSG_Acknowledgement 0x03
#define RTMP_MSG_UserControlMessage 0x04
#define RTMP_MSG_WindowAcknowledgementSize 0x05
#define RTMP_MSG_SetPeerBandwidth 0x06
#define RTMP_MSG_EdgeAndOriginServerCommand 0x07
代码如下:
srs_error_t SrsProtocol::on_recv_message(SrsCommonMessage* msg)
{
srs_error_t err = srs_success;
srs_assert(msg != NULL);
// try to response acknowledgement
if ((err = response_acknowledgement_message()) != srs_success) {
return srs_error_wrap(err, "response ack");
}
SrsPacket* packet = NULL;
switch (msg->header.message_type) {
case RTMP_MSG_SetChunkSize:
case RTMP_MSG_UserControlMessage:
case RTMP_MSG_WindowAcknowledgementSize:
if ((err = decode_message(msg, &packet)) != srs_success) {
return srs_error_wrap(err, "decode message");
}
break;
case RTMP_MSG_VideoMessage:
case RTMP_MSG_AudioMessage:
print_debug_info();
default:
return err; //message type=18无法匹配,直接返回
}
srs_assert(packet);
// always free the packet.
SrsAutoFree(SrsPacket, packet);
switch (msg->header.message_type) {
case RTMP_MSG_WindowAcknowledgementSize: {
SrsSetWindowAckSizePacket* pkt = dynamic_cast<SrsSetWindowAckSizePacket*>(packet);
srs_assert(pkt != NULL);
if (pkt->ackowledgement_window_size > 0) {
in_ack_size.window = (uint32_t)pkt->ackowledgement_window_size;
// @remark, we ignore this message, for user noneed to care.
// but it's important for dev, for client/server will block if required
// ack msg not arrived.
}
break;
}
case RTMP_MSG_SetChunkSize: {
SrsSetChunkSizePacket* pkt = dynamic_cast<SrsSetChunkSizePacket*>(packet);
srs_assert(pkt != NULL);
// for some server, the actual chunk size can greater than the max value(65536),
// so we just warning the invalid chunk size, and actually use it is ok,
// @see: https://github.com/ossrs/srs/issues/160
if (pkt->chunk_size < SRS_CONSTS_RTMP_MIN_CHUNK_SIZE || pkt->chunk_size > SRS_CONSTS_RTMP_MAX_CHUNK_SIZE) {
srs_warn("accept chunk=%d, should in [%d, %d], please see #160",
pkt->chunk_size, SRS_CONSTS_RTMP_MIN_CHUNK_SIZE, SRS_CONSTS_RTMP_MAX_CHUNK_SIZE);
}
// @see: https://github.com/ossrs/srs/issues/541
if (pkt->chunk_size < SRS_CONSTS_RTMP_MIN_CHUNK_SIZE) {
return srs_error_new(ERROR_RTMP_CHUNK_SIZE, "chunk size should be %d+, value=%d", SRS_CONSTS_RTMP_MIN_CHUNK_SIZE, pkt->chunk_size);
}
in_chunk_size = pkt->chunk_size;
break;
}
case RTMP_MSG_UserControlMessage: {
SrsUserControlPacket* pkt = dynamic_cast<SrsUserControlPacket*>(packet);
srs_assert(pkt != NULL);
if (pkt->event_type == SrcPCUCSetBufferLength) {
in_buffer_length = pkt->extra_data;
}
if (pkt->event_type == SrcPCUCPingRequest) {
if ((err = response_ping_message(pkt->event_data)) != srs_success) {
return srs_error_wrap(err, "response ping");
}
}
break;
}
default:
break;
}
return err;
}
消费message
1. 解析完成message后,就要消费这个message,具体执行函数在SrsPublishRecvThread::consume。
srs_error_t SrsPublishRecvThread::consume(SrsCommonMessage* msg) //rtmp推流时的数据处理入口
{
srs_error_t err = srs_success;
// when cid changed, change it.
if (ncid != cid) {
_srs_context->set_id(ncid);
cid = ncid;
}
_nb_msgs++; //每接收到一个消息,该将该消息计数值加 1
if (msg->header.is_video()) {
video_frames++; //若当前消息为视频,则视频帧数加 1
}
// log to show the time of recv thread.
srs_verbose("recv thread now=%" PRId64 "us, got msg time=%" PRId64 "ms, size=%d",
srs_update_system_time(), msg->header.timestamp, msg->size);
// the rtmp connection will handle this message RTMP连接将处理此消息
err = _conn->handle_publish_message(_source, msg);
// must always free it,
// the source will copy it if need to use.
srs_freep(msg);
if (err != srs_success) {
return srs_error_wrap(err, "handle publish message");
}
return err;
}
2. 接着调用SrsRtmpConn::handle_publish_message处理message。
3. 对应matedata、audio、video数据在SrsRtmpConn::process_publish_message函数中处理。
srs_error_t SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg)
{
srs_error_t err = srs_success;
// for edge, directly proxy message to origin.
if (info->edge) { //如果边缘节点,走edge proxy推流逻辑,推到SrsEdgeForwarder的queue
if ((err = source->on_edge_proxy_publish(msg)) != srs_success) {
return srs_error_wrap(err, "rtmp: proxy publish");
}
return err; //直接返回,不走下面逻辑
}
// process audio packet
if (msg->header.is_audio()) {
if ((err = source->on_audio(msg)) != srs_success) { //处理音频message
return srs_error_wrap(err, "rtmp: consume audio");
}
return err;
}
// process video packet
if (msg->header.is_video()) {
if ((err = source->on_video(msg)) != srs_success) { //处理视频message
return srs_error_wrap(err, "rtmp: consume video");
}
return err;
}
// process aggregate packet
if (msg->header.is_aggregate()) {
if ((err = source->on_aggregate(msg)) != srs_success) {
return srs_error_wrap(err, "rtmp: consume aggregate");
}
return err;
}
// process onMetaData RTMP_MSG_AMF0DataMessage 18 或 RTMP_MSG_AMF3DataMessage 15
if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {
SrsPacket* pkt = NULL;
if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) { //解析元数据
return srs_error_wrap(err, "rtmp: decode message");
}
SrsAutoFree(SrsPacket, pkt);
if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {
SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);
if ((err = source->on_meta_data(msg, metadata)) != srs_success) { //处理元数据
return srs_error_wrap(err, "rtmp: consume metadata");
}
return err;
}
return err;
}
return err;
}
处理onMetaData message
如果message type是17或者20表示命令消息,即onMetaData数据包,通常接收到的第一个媒体数据包就是onMetaData。
命令消息在客户机和服务器之间携带amf编码的命令。对于AMF0编码,这些消息的消息类型值为20,对于AMF3编码,消息类型值为17。
- 这些消息被发送来执行一些操作,比如连接、createStream、发布、播放、暂停。onstatus、result等命令消息用于通知发送者所请求命令的状态。
- 命令消息由命令名、事务ID和包含相关参数的命令对象组成。客户端或服务器可以通过使用命令消息与对等端通信的流请求远程过程调用(RPC)。
#define RTMP_MSG_AMF3CommandMessage 17 // 0x11
#define RTMP_MSG_AMF0CommandMessage 20 // 0x14
抓包图如下图所示。
解析onMetaData
如果message type的值为17或者20,那么将调用SrsRtmpServer::decode_message,SrsRtmpServer::decode_message继而调用SrsProtocol::decode_message。
srs_error_t SrsProtocol::decode_message(SrsCommonMessage* msg, SrsPacket** ppacket)
{
*ppacket = NULL;
srs_error_t err = srs_success;
srs_assert(msg != NULL);
srs_assert(msg->payload != NULL);
srs_assert(msg->size > 0);
SrsBuffer stream(msg->payload, msg->size);
// decode the packet.
SrsPacket* packet = NULL; //根据message header的message type返回对应的packet
if ((err = do_decode_message(msg->header, &stream, &packet)) != srs_success) {
srs_freep(packet);
return srs_error_wrap(err, "decode message");
}
// set to output ppacket only when success.
*ppacket = packet;
return err;
}
其中SrsProtocol::do_decode_message函数会先从stream中获取command名称,然后根据command判断packet类型生成对应的packet进行解码。
srs_error_t SrsProtocol::do_decode_message(SrsMessageHeader& header, SrsBuffer* stream, SrsPacket** ppacket)
{
srs_error_t err = srs_success;
SrsPacket* packet = NULL;
// decode specified packet type
if (header.is_amf0_command() || header.is_amf3_command() || header.is_amf0_data() || header.is_amf3_data()) {
// skip 1bytes to decode the amf3 command.
if (header.is_amf3_command() && stream->require(1)) {
stream->skip(1);
}
// amf0 command message.
// need to read the command name.
std::string command;
if ((err = srs_amf0_read_string(stream, command)) != srs_success) { //从stream中获取command名称
return srs_error_wrap(err, "decode command name");
}
...
} else if (command == SRS_CONSTS_RTMP_SET_DATAFRAME) { //OnMetaData
*ppacket = packet = new SrsOnMetaDataPacket();
return packet->decode(stream);
}
...
}
实际解码函数是SrsOnMetaDataPacket::decode
srs_error_t SrsOnMetaDataPacket::decode(SrsBuffer* stream)
{ //解析 metadata 数据,然后将其保存在 SrsOnMetaDataPacket 类的成员 metadata 中。
srs_error_t err = srs_success;
if ((err = srs_amf0_read_string(stream, name)) != srs_success) { //读取一个amf0 name:@setDataFrame
return srs_error_wrap(err, "name");
}
// ignore the @setDataFrame
if (name == SRS_CONSTS_RTMP_SET_DATAFRAME) {
if ((err = srs_amf0_read_string(stream, name)) != srs_success) { //第二个amf0 name:onMetaData
return srs_error_wrap(err, "name");
}
}
// the metadata maybe object or ecma array 元数据可能是对象或ecma数组
SrsAmf0Any* any = NULL;
if ((err = srs_amf0_read_any(stream, &any)) != srs_success) {
return srs_error_wrap(err, "metadata");
}
srs_assert(any);
if (any->is_object()) {
srs_freep(metadata);
metadata = any->to_object();
return err;
}
SrsAutoFree(SrsAmf0Any, any);
if (any->is_ecma_array()) { //如果是RTMP_AMF0_EcmaArray
SrsAmf0EcmaArray* arr = any->to_ecma_array();
// if ecma array, copy to object.
for (int i = 0; i < arr->count(); i++) { //将解析出来的数据拷贝到metadata的properties中
metadata->set(arr->key_at(i), arr->value_at(i)->copy());
}
}
return err;
}
srs_amf0_read_any函数用于读取metadata携带的各项property
srs_error_t srs_amf0_read_any(SrsBuffer* stream, SrsAmf0Any** ppvalue)
{
srs_error_t err = srs_success;
// 读取marker类型进行匹配,如果是ecma array类型,返回一个SrsAmf0EcmaArray对象
if ((err = SrsAmf0Any::discovery(stream, ppvalue)) != srs_success) {
return srs_error_wrap(err, "discovery");
}
srs_assert(*ppvalue);
// 调用 SrsAmf0EcmaArray 类实现的 read 函数读取metadata携带的各项property
if ((err = (*ppvalue)->read(stream)) != srs_success) {
srs_freep(*ppvalue);
return srs_error_wrap(err, "parse elem");
}
return err;
}
其中SrsAmf0EcmaArray::read读取ECMA array的property内容。
srs_error_t SrsAmf0EcmaArray::read(SrsBuffer* stream)
{
srs_error_t err = srs_success;
// marker
if (!stream->require(1)) {
return srs_error_new(ERROR_RTMP_AMF0_DECODE, "requires 1 only %d bytes", stream->left());
}
char marker = stream->read_1bytes(); //读取 AMF0 type:ECMA array 为 0x08
if (marker != RTMP_AMF0_EcmaArray) {
return srs_error_new(ERROR_RTMP_AMF0_DECODE, "EcmaArray invalid marker=%#x", marker);
}
// count
if (!stream->require(4)) {
return srs_error_new(ERROR_RTMP_AMF0_DECODE, "requires 4 only %d bytes", stream->left());
}
int32_t count = stream->read_4bytes(); //读取该 ECMA array 中有多少个 property
// value
this->_count = count;
while (!stream->empty()) {
// detect whether is eof.
if (srs_amf0_is_object_eof(stream)) {
SrsAmf0ObjectEOF pbj_eof;
if ((err = pbj_eof.read(stream)) != srs_success) {
return srs_error_wrap(err, "read EOF");
}
break;
}
// property-name: utf8 string 读取 property 的名称
std::string property_name;
if ((err =srs_amf0_read_utf8(stream, property_name)) != srs_success) {
return srs_error_wrap(err, "read property name");
}
// property-value: any
SrsAmf0Any* property_value = NULL; //读取 property 的值:number or string or boolean
if ((err = srs_amf0_read_any(stream, &property_value)) != srs_success) {
return srs_error_wrap(err, "read property value, name=%s", property_name.c_str());
}
// add property 将获取到的每一个 property 以该 property 的名称为 key,保存到 SrsAmf0EcmaArray 类的
this->set(property_name, property_value);
}
return err;
}
处理onMetaData
解析metadata数据后,调用SrsSource::on_meta_data函数对解析后的metadata做进一步的处理。
srs_error_t SrsSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata)
{
srs_error_t err = srs_success;
// if allow atc_auto and bravo-atc detected, open atc for vhost.
SrsAmf0Any* prop = NULL;
atc = _srs_config->get_atc(req->vhost);
if (_srs_config->get_atc_auto(req->vhost)) {
if ((prop = metadata->metadata->get_property("bravo_atc")) != NULL) {
if (prop->is_string() && prop->to_str() == "true") {
atc = true;
}
}
}
// Update the meta cache. 更新metadata缓存
bool updated = false;
if ((err = meta->update_data(&msg->header, metadata, updated)) != srs_success) {
return srs_error_wrap(err, "update metadata");
}
if (!updated) {
return err;
}
// when already got metadata, drop when reduce sequence header.
bool drop_for_reduce = false;
if (meta->data() && _srs_config->get_reduce_sequence_header(req->vhost)) {
drop_for_reduce = true;
srs_warn("drop for reduce sh metadata, size=%d", msg->size);
}
// copy to all consumer
if (!drop_for_reduce) {
std::vector<SrsConsumer*>::iterator it; //如果有其他客户端订阅了该直播流,将metadata加入SrsMessageQueue队列
for (it = consumers.begin(); it != consumers.end(); ++it) { //注,如果没有其他客户端,不会进此逻辑,即不会存储metadata
SrsConsumer* consumer = *it;
if ((err = consumer->enqueue(meta->data(), atc, jitter_algorithm)) != srs_success) {
return srs_error_wrap(err, "consume metadata");
}
}
}
// Copy to hub to all utilities.
return hub->on_meta_data(meta->data(), metadata); //如果设置了forward模式,将onMetaData转发给各个forward节点
}
更新metadata缓存
srs_error_t SrsMetaCache::update_data(SrsMessageHeader* header, SrsOnMetaDataPacket* metadata, bool& updated)
{
updated = false;
srs_error_t err = srs_success;
SrsAmf0Any* prop = NULL;
// when exists the duration, remove it to make ExoPlayer happy.
if (metadata->metadata->get_property("duration") != NULL) {
metadata->metadata->remove("duration");
}
// generate metadata info to print
std::stringstream ss;
if ((prop = metadata->metadata->ensure_property_number("width")) != NULL) {
ss << ", width=" << (int)prop->to_number();
}
if ((prop = metadata->metadata->ensure_property_number("height")) != NULL) {
ss << ", height=" << (int)prop->to_number();
}
if ((prop = metadata->metadata->ensure_property_number("videocodecid")) != NULL) {
ss << ", vcodec=" << (int)prop->to_number();
}
if ((prop = metadata->metadata->ensure_property_number("audiocodecid")) != NULL) {
ss << ", acodec=" << (int)prop->to_number();
}
srs_trace("got metadata%s", ss.str().c_str()); //got metadata, width=768, height=320, vcodec=7, acodec=10
// add server info to metadata
metadata->metadata->set("server", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER));
// version, for example, 1.0.0
// add version to metadata, please donot remove it, for debug.
metadata->metadata->set("server_version", SrsAmf0Any::str(RTMP_SIG_SRS_VERSION));
// encode the metadata to payload
int size = 0;
char* payload = NULL; //编码metadata为payload
if ((err = metadata->encode(size, payload)) != srs_success) {
return srs_error_wrap(err, "encode metadata");
}
if (size <= 0) {
srs_warn("ignore the invalid metadata. size=%d", size);
return err;
}
// create a shared ptr message. 创建一个共享的metadata
srs_freep(meta);
meta = new SrsSharedPtrMessage();
updated = true;
// dump message to shared ptr message.
// the payload/size managed by cache_metadata, user should not free it.
if ((err = meta->create(header, payload, size)) != srs_success) {
return srs_error_wrap(err, "create metadata");
}
return err;
}
如果有其他客户端订阅了该直播流,通知这些客户端,将metadata加入SrsMessageQueue队列,注:
- 如果没有其他客户端,不会进此逻辑,即不会存储metadata
- SrsMessageQueue队列包括onMetaData,video,audio数据
srs_error_t SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag)
{
srs_error_t err = srs_success;
//拷贝一个副本给msg
SrsSharedPtrMessage* msg = shared_msg->copy();
if (!atc) { //如果atc为false,检测时间抖动并校正。
if ((err = jitter->correct(msg, ag)) != srs_success) {
return srs_error_wrap(err, "consume message");
}
}
if ((err = queue->enqueue(msg, NULL)) != srs_success) { //将共享message添加到SrsMessageQueue队列,包括onMetaData,video,audio
return srs_error_wrap(err, "enqueue message");
}
#ifdef SRS_PERF_QUEUE_COND_WAIT
// fire the mw when msgs is enough.
if (mw_waiting) {
srs_utime_t duration = queue->duration();
bool match_min_msgs = queue->size() > mw_min_msgs;
// For ATC, maybe the SH timestamp bigger than A/V packet,
// when encoder republish or overflow.
// @see https://github.com/ossrs/srs/pull/749
if (atc && duration < 0) {
srs_cond_signal(mw_wait);
mw_waiting = false;
return err;
}
// when duration ok, signal to flush.
if (match_min_msgs && duration > mw_duration) {
srs_cond_signal(mw_wait);
mw_waiting = false;
return err;
}
}
#endif
return err;
}
如果atc为false,检测时间抖动并校正。若传入的第二个参数为 SrsRtmpJitterAlgorithmOFF,则禁止所有的 jitter 校正,构造 SrsSource 的时候默认初始化为 SrsRtmpJitterAlgorithmOFF。
srs_error_t SrsRtmpJitter::correct(SrsSharedPtrMessage* msg, SrsRtmpJitterAlgorithm ag)
{
srs_error_t err = srs_success;
// for performance issue
if (ag != SrsRtmpJitterAlgorithmFULL) {
// all jitter correct features is disabled, ignore.
if (ag == SrsRtmpJitterAlgorithmOFF) {
return err;
}
// start at zero, but donot ensure monotonically increasing.
if (ag == SrsRtmpJitterAlgorithmZERO) {
// for the first time, last_pkt_correct_time is -1.
if (last_pkt_correct_time == -1) {
last_pkt_correct_time = msg->timestamp;
}
msg->timestamp -= last_pkt_correct_time;
return err;
}
// other algorithm, ignore.
return err;
}
// full jitter algorithm, do jitter correct.
// set to 0 for metadata.
if (!msg->is_av()) {
msg->timestamp = 0;
return err;
}
/**
* we use a very simple time jitter detect/correct algorithm:
* 1. delta: ensure the delta is positive and valid,
* we set the delta to DEFAULT_FRAME_TIME_MS,
* if the delta of time is nagative or greater than CONST_MAX_JITTER_MS.
* 2. last_pkt_time: specifies the original packet time,
* is used to detect next jitter.
* 3. last_pkt_correct_time: simply add the positive delta,
* and enforce the time monotonically.
*/
int64_t time = msg->timestamp;
int64_t delta = time - last_pkt_time;
// if jitter detected, reset the delta.
if (delta < CONST_MAX_JITTER_MS_NEG || delta > CONST_MAX_JITTER_MS) {
// use default 10ms to notice the problem of stream.
// @see https://github.com/ossrs/srs/issues/425
delta = DEFAULT_FRAME_TIME_MS;
}
last_pkt_correct_time = srs_max(0, last_pkt_correct_time + delta);
msg->timestamp = last_pkt_correct_time;
last_pkt_time = time;
return err;
}
SrsMessageQueue::enqueue具体负责将onMetaData的message插入到SrsMessageQueue队列
srs_error_t SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow)
{
srs_error_t err = srs_success;
if (msg->is_av()) { //如果是视频或者音频message
if (av_start_time == -1) {
av_start_time = srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS);
}
av_end_time = srs_utime_t(msg->timestamp * SRS_UTIME_MILLISECONDS);
}
msgs.push_back(msg); //加入SrsMessageQueue队列
while (av_end_time - av_start_time > max_queue_size) {
// notice the caller queue already overflow and shrinked.
if (is_overflow) {
*is_overflow = true;
}
shrink();
}
return err;
}
存储到SrsFastVector的SrsMessageQueue队列。
void SrsFastVector::push_back(SrsSharedPtrMessage* msg)
{
// increase vector.
if (count >= nb_msgs) {
int size = srs_max(SRS_PERF_MW_MSGS * 8, nb_msgs * 2);
SrsSharedPtrMessage** buf = new SrsSharedPtrMessage*[size];
for (int i = 0; i < nb_msgs; i++) {
buf[i] = msgs[i];
}
srs_info("fast vector incrase %d=>%d", nb_msgs, size);
// use new array.
srs_freepa(msgs);
msgs = buf;
nb_msgs = size;
}
msgs[count++] = msg;
}
最后看SrsSource::on_meta_data函数的SrsOriginHub::on_meta_data函数。
如果设置了forward模式,将onMetaData转发给各个forward节点,这部分等分析forward模式时再展开说明。
srs_error_t SrsOriginHub::on_meta_data(SrsSharedPtrMessage* shared_metadata, SrsOnMetaDataPacket* packet)
{
srs_error_t err = srs_success;
if ((err = format->on_metadata(packet)) != srs_success) {
return srs_error_wrap(err, "Format parse metadata");
}
// copy to all forwarders 将onMetaData拷贝到各个forward节点
if (true) {
std::vector<SrsForwarder*>::iterator it;
for (it = forwarders.begin(); it != forwarders.end(); ++it) {
SrsForwarder* forwarder = *it;
if ((err = forwarder->on_meta_data(shared_metadata)) != srs_success) {
return srs_error_wrap(err, "Forwarder consume metadata");
}
}
}
if ((err = dvr->on_meta_data(shared_metadata)) != srs_success) {
return srs_error_wrap(err, "DVR consume metadata");
}
return err;
}
处理video message
回到SrsRtmpConn::process_publish_message函数看处理video流程,SrsSource::on_video函数是具体处理video数据的逻辑。
srs_error_t SrsSource::on_video(SrsCommonMessage* shared_video)
{
srs_error_t err = srs_success;
// monotically increase detect. 判断流时间戳是否递增,不递增建议打开mix_correct
if (!mix_correct && is_monotonically_increase) {
if (last_packet_time > 0 && shared_video->header.timestamp < last_packet_time) {
is_monotonically_increase = false;
srs_warn("VIDEO: stream not monotonically increase, please open mix_correct.");
}
}
last_packet_time = shared_video->header.timestamp; //记录最后一个packet的时间戳
// drop any unknown header video.
// @see https://github.com/ossrs/srs/issues/421
if (!SrsFlvVideo::acceptable(shared_video->payload, shared_video->size)) {
char b0 = 0x00;
if (shared_video->size > 0) {
b0 = shared_video->payload[0];
}
srs_warn("drop unknown header video, size=%d, bytes[0]=%#x", shared_video->size, b0);
return err;
}
// convert shared_video to msg, user should not use shared_video again.
// the payload is transfer to msg, and set to NULL in shared_video.
SrsSharedPtrMessage msg;
if ((err = msg.create(shared_video)) != srs_success) { //将视频message相关header、payload数据存储到
return srs_error_wrap(err, "create message");//SrsSharedPtrMessage的SrsSharedPtrPayload中
}
// directly process the audio message.
if (!mix_correct) { // mix_correct默认为false,所以直接处理
return on_video_imp(&msg);
}
// insert msg to the queue.
mix_queue->push(msg.copy()); //mix_correct为true,将msg插入到SrsMixQueue队列,包括音频和视频
// fetch someone from mix queue.
SrsSharedPtrMessage* m = mix_queue->pop(); //从SrsMixQueue队列取出最前的msg
if (!m) {
return err;
}
// consume the monotonically increase message.
if (m->is_audio()) {
err = on_audio_imp(m); //如果是音频,进入音频message处理逻辑
} else {
err = on_video_imp(m); //如果是视频,进入视频message处理逻辑
}
srs_freep(m);
return err;
}
SrsSource::on_video_imp函数中实现功能:
- 将msg发送到其他实体,比如转推到各个forward节点。
- 如果有消费客户端,将共享message发送到SrsMessageQueue队列,队列包括onMetaData,video,audio。如果没有消费客户端,不会进行存储。
- 如果是音频和视频数据,缓存gop cache。
srs_error_t SrsSource::on_video_imp(SrsSharedPtrMessage* msg)
{
srs_error_t err = srs_success;
bool is_sequence_header = SrsFlvVideo::sh(msg->payload, msg->size);
// whether consumer should drop for the duplicated sequence header.
bool drop_for_reduce = false;
if (is_sequence_header && meta->previous_vsh() && _srs_config->get_reduce_sequence_header(req->vhost)) {
if (meta->previous_vsh()->size == msg->size) {
drop_for_reduce = srs_bytes_equals(meta->previous_vsh()->payload, msg->payload, msg->size);
srs_warn("drop for reduce sh video, size=%d", msg->size);
}
}
// cache the sequence header if h264 缓存序列头如果是h264,不要缓存序列头到gop_cache
// donot cache the sequence header to gop_cache, return here.
if (is_sequence_header && (err = meta->update_vsh(msg)) != srs_success) {
return srs_error_wrap(err, "meta update video");
}
// Copy to hub to all utilities.
if ((err = hub->on_video(msg, is_sequence_header)) != srs_success) { //复制msg到其他地方,如转推到各个forward节点。
return srs_error_wrap(err, "hub consume video");
}
// copy to all consumer 如果有消费客户端,将共享message发送到SrsMessageQueue队列,队列包括onMetaData,video,audio
if (!drop_for_reduce) {
for (int i = 0; i < (int)consumers.size(); i++) {
SrsConsumer* consumer = consumers.at(i);
if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {
return srs_error_wrap(err, "consume video");
}
}
}
// when sequence header, donot push to gop cache and adjust the timestamp.
if (is_sequence_header) {
return err;
}
// cache the last gop packets 将msg缓存到go cache
if ((err = gop_cache->cache(msg)) != srs_success) {
return srs_error_wrap(err, "gop cache consume vdieo");
}
// if atc, update the sequence header to abs time.
if (atc) {
if (meta->vsh()) {
meta->vsh()->timestamp = msg->timestamp;
}
if (meta->data()) {
meta->data()->timestamp = msg->timestamp;
}
}
return err;
}
SrsConsumer::enqueue函数解析见onMetaData部分。
如果是音频和视频数据,缓存gop cache。
srs_error_t SrsGopCache::cache(SrsSharedPtrMessage* shared_msg)
{
srs_error_t err = srs_success;
if (!enable_gop_cache) {
return err;
}
// the gop cache know when to gop it.
SrsSharedPtrMessage* msg = shared_msg;
// got video, update the video count if acceptable 如果是视频,更新视频个数
if (msg->is_video()) {
// drop video when not h.264
if (!SrsFlvVideo::h264(msg->payload, msg->size)) {
return err;
}
cached_video_count++;
audio_after_last_video_count = 0;
}
// no acceptable video or pure audio, disable the cache.
if (pure_audio()) { //纯音频不做cache
return err;
}
// ok, gop cache enabled, and got an audio. 统计连续音频message个数
if (msg->is_audio()) {
audio_after_last_video_count++;
}
// clear gop cache when pure audio count overflow 如果连续音频message个数超过115,情况gop cache
if (audio_after_last_video_count > SRS_PURE_AUDIO_GUESS_COUNT) {
srs_warn("clear gop cache for guess pure audio overflow");
clear();
return err;
}
// clear gop cache when got key frame 新来一个关键帧,清空gop cache
if (msg->is_video() && SrsFlvVideo::keyframe(msg->payload, msg->size)) {
clear();
// curent msg is video frame, so we set to 1.
cached_video_count = 1;
}
// cache the frame. 将message插入gop cache
gop_cache.push_back(msg->copy());
return err;
}
处理audio message
回到SrsRtmpConn::process_publish_message函数看处理audio流程,SrsSource::on_audio函数是具体处理audio数据的逻辑。
srs_error_t SrsSource::on_audio(SrsCommonMessage* shared_audio)
{
srs_error_t err = srs_success;
// monotically increase detect.
if (!mix_correct && is_monotonically_increase) {
if (last_packet_time > 0 && shared_audio->header.timestamp < last_packet_time) {
is_monotonically_increase = false;
srs_warn("AUDIO: stream not monotonically increase, please open mix_correct.");
}
}
last_packet_time = shared_audio->header.timestamp;
// convert shared_audio to msg, user should not use shared_audio again.
// the payload is transfer to msg, and set to NULL in shared_audio.
SrsSharedPtrMessage msg; // 类似c++11智能指针,数据拷贝实际上是浅拷贝,通过引用计数的方式,引用计数为0时释放
if ((err = msg.create(shared_audio)) != srs_success) {
return srs_error_wrap(err, "create message");
}
// directly process the audio message. //在配置文件full.conf 496,mix_correct为off,所以会直接进行音频处理
if (!mix_correct) {
return on_audio_imp(&msg); //默认不做校正
}
// insert msg to the queue. 如果mix_correct为true
mix_queue->push(msg.copy()); //把message插入到队列
// fetch someone from mix queue.
SrsSharedPtrMessage* m = mix_queue->pop(); //pop时间timestamp最小的出来
if (!m) {
return err;
}
// consume the monotonically increase message.
if (m->is_audio()) {
err = on_audio_imp(m);
} else {
err = on_video_imp(m);
}
srs_freep(m);
return err;
}
SrsSource::on_audio_imp函数中实现功能:
- 将msg发送到其他实体,比如转推到各个forward节点。
- 如果有消费客户端,将共享message发送到SrsMessageQueue队列,队列包括onMetaData,video,audio。如果没有消费客户端,不会进行存储。
- 如果是音频和视频数据,缓存gop cache。
srs_error_t SrsSource::on_audio_imp(SrsSharedPtrMessage* msg)
{
srs_error_t err = srs_success;
bool is_aac_sequence_header = SrsFlvAudio::sh(msg->payload, msg->size);
bool is_sequence_header = is_aac_sequence_header;
// whether consumer should drop for the duplicated sequence header.
bool drop_for_reduce = false;
if (is_sequence_header && meta->previous_ash() && _srs_config->get_reduce_sequence_header(req->vhost)) {
if (meta->previous_ash()->size == msg->size) {
drop_for_reduce = srs_bytes_equals(meta->previous_ash()->payload, msg->payload, msg->size);
srs_warn("drop for reduce sh audio, size=%d", msg->size);
}
}
// copy to all consumer
if (!drop_for_reduce) { // 如果有消费客户端,把数据发给消费客户端
for (int i = 0; i < (int)consumers.size(); i++) {
SrsConsumer* consumer = consumers.at(i);
if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) { //把数据发给拉流者
return srs_error_wrap(err, "consume message");
}
}
}
// Copy to hub to all utilities.
if ((err = hub->on_audio(msg)) != srs_success) {
return srs_error_wrap(err, "consume audio");
}
// cache the sequence header of aac, or first packet of mp3.
// for example, the mp3 is used for hls to write the "right" audio codec.
// TODO: FIXME: to refine the stream info system.
if (is_aac_sequence_header || !meta->ash()) {
if ((err = meta->update_ash(msg)) != srs_success) {
return srs_error_wrap(err, "meta consume audio");
}
}
// when sequence header, donot push to gop cache and adjust the timestamp.
if (is_sequence_header) { //gop cache不缓存 sequence header
return err;
}
// cache the last gop packets 缓存最后一个gop
if ((err = gop_cache->cache(msg)) != srs_success) {
return srs_error_wrap(err, "gop cache consume audio");
}
// if atc, update the sequence header to abs time.
if (atc) {
if (meta->ash()) {
meta->ash()->timestamp = msg->timestamp;
}
if (meta->data()) {
meta->data()->timestamp = msg->timestamp;
}
}
return err;
}
SrsGopCache::cache解析见处理video message部分。
来源链接:https://www.yuque.com/wahaha-0yfyj/mnfloz/lkxg8o
版权声明:本文内容转自互联网,本文观点仅代表作者本人。本站仅提供信息存储空间服务,所有权归原作者所有。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至1393616908@qq.com 举报,一经查实,本站将立刻删除。