本文介绍如何从SRS服务器拉RTMP流。
SrsRtmpConn::stream_service_cycle
客户端从SRS服务器拉流主要逻辑入口在SrsRtmpConn::stream_service_cycle:
srs_error_t SrsRtmpConn::stream_service_cycle()
{
...
switch (info->type) {
case SrsRtmpConnPlay: { //拉流
// response connection start play
if ((err = rtmp->start_play(info->res->stream_id)) != srs_success) {
return srs_error_wrap(err, "rtmp: start play");
}
if ((err = http_hooks_on_play()) != srs_success) { // 回调接口通知vhost开始play
return srs_error_wrap(err, "rtmp: callback on play");
}
err = playing(source);
http_hooks_on_stop(); // 回调接口通知vhost play停止
return err;
}
...
}
如果客户端类型为SrsRtmpConnPlay表示拉流。其中:
- http_hooks_on_play() 方法中回调 on_play() 方法通知 vhost,用户已经开始 play。
- http_hooks_on_stop() 方法中回调 on_stop() 方法通知 vhost,用户已经停止 play。
- 最重要的函数是 playing(source),进入该函数。
SrsRtmpConn::playing
srs_error_t SrsRtmpConn::playing(SrsSource* source)
{
srs_error_t err = srs_success;
// Check page referer of player.
SrsRequest* req = info->req;
if (_srs_config->get_refer_enabled(req->vhost)) {
if ((err = refer->check(req->pageUrl, _srs_config->get_refer_play(req->vhost))) != srs_success) {
return srs_error_wrap(err, "rtmp: referer check");
}
}
// When origin cluster enabled, try to redirect to the origin which is active.
// A active origin is a server which is delivering stream. 启动origin集群时,先尝试重定向到有流信息的origin节点。
... //省略关于origin集群代码,后面介绍
// Set the socket options for transport.
set_sock_options();
// Create a consumer of source. 每来一个拉流请求,创建一个消费者
SrsConsumer* consumer = NULL;
if ((err = source->create_consumer(this, consumer)) != srs_success) {
return srs_error_wrap(err, "rtmp: create consumer");
}
SrsAutoFree(SrsConsumer, consumer);
// Use receiving thread to receive packets from peer.
// @see: https://github.com/ossrs/srs/issues/217
SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP, _srs_context->get_id());
if ((err = trd.start()) != srs_success) {
return srs_error_wrap(err, "rtmp: start receive thread");
}
// Deliver packets to peer. 向拉流端发送音视频数据。
wakable = consumer;
err = do_playing(source, consumer, &trd);
wakable = NULL;
trd.stop();
// Drop all packets in receiving thread.
if (!trd.empty()) {
srs_warn("drop the received %d messages", trd.size());
}
return err;
}
主流程在do_playing(source, consumer, &trd),进入该函数。
SrsRtmpConn::do_playing
srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* rtrd)
{
srs_error_t err = srs_success;
SrsRequest* req = info->req;
srs_assert(req);
srs_assert(consumer);
// initialize other components
SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play();
SrsAutoFree(SrsPithyPrint, pprint);
SrsMessageArray msgs(SRS_PERF_MW_MSGS); //message数组
bool user_specified_duration_to_stop = (req->duration > 0);
int64_t starttime = -1;
// setup the realtime.
realtime = _srs_config->get_realtime_enabled(req->vhost);
// setup the mw config.
// when mw_sleep changed, resize the socket send buffer.
mw_enabled = true;
change_mw_sleep(_srs_config->get_mw_sleep(req->vhost));
// initialize the send_min_interval
send_min_interval = _srs_config->get_send_min_interval(req->vhost);
srs_trace("start play smi=%dms, mw_sleep=%d, mw_enabled=%d, realtime=%d, tcp_nodelay=%d",
srsu2msi(send_min_interval), srsu2msi(mw_sleep), mw_enabled, realtime, tcp_nodelay);
while (true) {
// when source is set to expired, disconnect it.
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "rtmp: thread quit");
}
// collect elapse for pithy print.
pprint->elapse();
// to use isolate thread to recv, can improve about 33% performance. 使用单独协程接收,能提高大约33%的性能。
// @see: https://github.com/ossrs/srs/issues/196
// @see: https://github.com/ossrs/srs/issues/217
while (!rtrd->empty()) { //如果SrsQueueRecvThread的queue有message,取出处理
SrsCommonMessage* msg = rtrd->pump();
if ((err = process_play_control_msg(consumer, msg)) != srs_success) { //播放控制信息,比如结束,暂停等
return srs_error_wrap(err, "rtmp: play control message");
}
}
// quit when recv thread error.
if ((err = rtrd->error_code()) != srs_success) {
return srs_error_wrap(err, "rtmp: recv thread");
}
#ifdef SRS_PERF_QUEUE_COND_WAIT
// wait for message to incoming.
// @see https://github.com/ossrs/srs/issues/251
// @see https://github.com/ossrs/srs/issues/257
if (realtime) {
// for realtime, min required msgs is 0, send when got one+ msgs.
consumer->wait(0, mw_sleep);
} else {
// for no-realtime, got some msgs then send.
consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep);
}
#endif
// get messages from consumer. 从消费者处获取message。
// each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
// @remark when enable send_min_interval, only fetch one message a time.
int count = (send_min_interval > 0)? 1 : 0;
if ((err = consumer->dump_packets(&msgs, count)) != srs_success) {
return srs_error_wrap(err, "rtmp: consumer dump packets");
}
// reportable
if (pprint->can_print()) {
kbps->sample();
srs_trace("-> " SRS_CONSTS_LOG_PLAY " time=%d, msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d",
(int)pprint->age(), count, kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), srsu2msi(mw_sleep));
}
if (count <= 0) {
#ifndef SRS_PERF_QUEUE_COND_WAIT
srs_usleep(mw_sleep);
#endif
// ignore when nothing got.
continue;
}
// only when user specifies the duration,
// we start to collect the durations for each message.
if (user_specified_duration_to_stop) {
for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = msgs.msgs[i];
// foreach msg, collect the duration.
// @remark: never use msg when sent it, for the protocol sdk will free it.
if (starttime < 0 || starttime > msg->timestamp) {
starttime = msg->timestamp;
}
duration += (msg->timestamp - starttime) * SRS_UTIME_MILLISECONDS;
starttime = msg->timestamp;
}
}
// sendout messages, all messages are freed by send_and_free_messages().
// no need to assert msg, for the rtmp will assert it. //发送音视频数据给play客户端
if (count > 0 && (err = rtmp->send_and_free_messages(msgs.msgs, count, info->res->stream_id)) != srs_success) {
return srs_error_wrap(err, "rtmp: send %d messages", count);
}
// if duration specified, and exceed it, stop play live.
// @see: https://github.com/ossrs/srs/issues/45
if (user_specified_duration_to_stop) {
if (duration >= req->duration) {
return srs_error_new(ERROR_RTMP_DURATION_EXCEED, "rtmp: time %d up %d", srsu2msi(duration), srsu2msi(req->duration));
}
}
// apply the minimal interval for delivery stream in srs_utime_t.
if (send_min_interval > 0) {
srs_usleep(send_min_interval);
}
}
return err;
}
srs_error_t SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* rtrd)
{
srs_error_t err = srs_success;
SrsRequest* req = info->req;
srs_assert(req);
srs_assert(consumer);
// initialize other components
SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play();
SrsAutoFree(SrsPithyPrint, pprint);
SrsMessageArray msgs(SRS_PERF_MW_MSGS); //message数组
bool user_specified_duration_to_stop = (req->duration > 0);
int64_t starttime = -1;
// setup the realtime.
realtime = _srs_config->get_realtime_enabled(req->vhost);
// setup the mw config.
// when mw_sleep changed, resize the socket send buffer.
mw_enabled = true;
change_mw_sleep(_srs_config->get_mw_sleep(req->vhost));
// initialize the send_min_interval
send_min_interval = _srs_config->get_send_min_interval(req->vhost);
srs_trace("start play smi=%dms, mw_sleep=%d, mw_enabled=%d, realtime=%d, tcp_nodelay=%d",
srsu2msi(send_min_interval), srsu2msi(mw_sleep), mw_enabled, realtime, tcp_nodelay);
while (true) {
// when source is set to expired, disconnect it.
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "rtmp: thread quit");
}
// collect elapse for pithy print.
pprint->elapse();
// to use isolate thread to recv, can improve about 33% performance. 使用单独协程接收,能提高大约33%的性能。
// @see: https://github.com/ossrs/srs/issues/196
// @see: https://github.com/ossrs/srs/issues/217
while (!rtrd->empty()) { //如果SrsQueueRecvThread的queue有message,取出处理
SrsCommonMessage* msg = rtrd->pump();
if ((err = process_play_control_msg(consumer, msg)) != srs_success) { //播放控制信息,比如结束,暂停等
return srs_error_wrap(err, "rtmp: play control message");
}
}
// quit when recv thread error.
if ((err = rtrd->error_code()) != srs_success) {
return srs_error_wrap(err, "rtmp: recv thread");
}
#ifdef SRS_PERF_QUEUE_COND_WAIT
// wait for message to incoming.
// @see https://github.com/ossrs/srs/issues/251
// @see https://github.com/ossrs/srs/issues/257
if (realtime) {
// for realtime, min required msgs is 0, send when got one+ msgs.
consumer->wait(0, mw_sleep);
} else {
// for no-realtime, got some msgs then send.
consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep);
}
#endif
// get messages from consumer. 从消费者处获取message。
// each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
// @remark when enable send_min_interval, only fetch one message a time.
int count = (send_min_interval > 0)? 1 : 0;
if ((err = consumer->dump_packets(&msgs, count)) != srs_success) {
return srs_error_wrap(err, "rtmp: consumer dump packets");
}
// reportable
if (pprint->can_print()) {
kbps->sample();
srs_trace("-> " SRS_CONSTS_LOG_PLAY " time=%d, msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d",
(int)pprint->age(), count, kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), srsu2msi(mw_sleep));
}
if (count <= 0) {
#ifndef SRS_PERF_QUEUE_COND_WAIT
srs_usleep(mw_sleep);
#endif
// ignore when nothing got.
continue;
}
// only when user specifies the duration,
// we start to collect the durations for each message.
if (user_specified_duration_to_stop) {
for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = msgs.msgs[i];
// foreach msg, collect the duration.
// @remark: never use msg when sent it, for the protocol sdk will free it.
if (starttime < 0 || starttime > msg->timestamp) {
starttime = msg->timestamp;
}
duration += (msg->timestamp - starttime) * SRS_UTIME_MILLISECONDS;
starttime = msg->timestamp;
}
}
// sendout messages, all messages are freed by send_and_free_messages().
// no need to assert msg, for the rtmp will assert it. //发送音视频数据给play客户端
if (count > 0 && (err = rtmp->send_and_free_messages(msgs.msgs, count, info->res->stream_id)) != srs_success) {
return srs_error_wrap(err, "rtmp: send %d messages", count);
}
// if duration specified, and exceed it, stop play live.
// @see: https://github.com/ossrs/srs/issues/45
if (user_specified_duration_to_stop) {
if (duration >= req->duration) {
return srs_error_new(ERROR_RTMP_DURATION_EXCEED, "rtmp: time %d up %d", srsu2msi(duration), srsu2msi(req->duration));
}
}
// apply the minimal interval for delivery stream in srs_utime_t.
if (send_min_interval > 0) {
srs_usleep(send_min_interval);
}
}
return err;
}
SrsRtmpConn::do_playing主要有三个功能。
- 播放控制信息,比如结束,暂停等。
- 从消费者列表中取出Rtmp message(SrsMessageQueue)
- 发送message给拉流客户端。
播放控制信息(SrsRtmpConn::process_play_control_msg)
对应代码如下:
srs_error_t SrsRtmpConn::process_play_control_msg(SrsConsumer* consumer, SrsCommonMessage* msg)
{
srs_error_t err = srs_success;
if (!msg) {
return err;
}
SrsAutoFree(SrsCommonMessage, msg);
if (!msg->header.is_amf0_command() && !msg->header.is_amf3_command()) {
return err;
}
SrsPacket* pkt = NULL;
if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) {
return srs_error_wrap(err, "rtmp: decode message");
}
SrsAutoFree(SrsPacket, pkt);
// for jwplayer/flowplayer, which send close as pause message.
// @see https://github.com/ossrs/srs/issues/6
SrsCloseStreamPacket* close = dynamic_cast<SrsCloseStreamPacket*>(pkt);
if (close) {
return srs_error_new(ERROR_CONTROL_RTMP_CLOSE, "rtmp: close stream");
}
// call msg,
// support response null first,
// @see https://github.com/ossrs/srs/issues/106
// TODO: FIXME: response in right way, or forward in edge mode.
SrsCallPacket* call = dynamic_cast<SrsCallPacket*>(pkt);
if (call) {
// only response it when transaction id not zero,
// for the zero means donot need response.
if (call->transaction_id > 0) {
SrsCallResPacket* res = new SrsCallResPacket(call->transaction_id);
res->command_object = SrsAmf0Any::null();
res->response = SrsAmf0Any::null();
if ((err = rtmp->send_and_free_packet(res, 0)) != srs_success) {
return srs_error_wrap(err, "rtmp: send packets");
}
}
return err;
}
// pause
SrsPausePacket* pause = dynamic_cast<SrsPausePacket*>(pkt);
if (pause) {
if ((err = rtmp->on_play_client_pause(info->res->stream_id, pause->is_pause)) != srs_success) {
return srs_error_wrap(err, "rtmp: pause");
}
if ((err = consumer->on_play_client_pause(pause->is_pause)) != srs_success) {
return srs_error_wrap(err, "rtmp: pause");
}
return err;
}
// other msg.
return err;
}
从消费者列表中取出Rtmp message(SrsConsumer::dump_packets)
从SrsMessageQueue获取messages,对应代码如下:
srs_error_t SrsConsumer::dump_packets(SrsMessageArray* msgs, int& count)
{
srs_error_t err = srs_success;
srs_assert(count >= 0);
srs_assert(msgs->max > 0);
// the count used as input to reset the max if positive.
int max = count? srs_min(count, msgs->max) : msgs->max;
// the count specifies the max acceptable count,
// here maybe 1+, and we must set to 0 when got nothing.
count = 0;
if (should_update_source_id) {
srs_trace("update source_id=%d/%d", source->source_id(), source->pre_source_id());
should_update_source_id = false;
}
// paused, return nothing.
if (paused) {
return err;
}
// pump msgs from queue. 从SrsMessageQueue获取messages
if ((err = queue->dump_packets(max, msgs->msgs, count)) != srs_success) {
return srs_error_wrap(err, "dump packets");
}
return err;
}
数据实际存储在:SrsMessageQueue的SrsFastVector中,注:
RTMP推流时,通过SrsMessageQueue::enqueue函数将onMetadata、audio和video数据加入到SrsFastVector数组中。详见:RTMP推流到SRS流媒体服务器metadata,video,audio数据处理
发送message给拉流客户端
SrsProtocol::send_and_free_messages函数负责将message发送给拉流客户端。
srs_error_t SrsProtocol::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id)
{
// always not NULL msg.
srs_assert(msgs);
srs_assert(nb_msgs > 0);
// update the stream id in header.
for (int i = 0; i < nb_msgs; i++) { //遍历获取msg
SrsSharedPtrMessage* msg = msgs[i];
if (!msg) {
continue;
}
// check perfer cid and stream, 检查stream id
// when one msg stream id is ok, ignore left.
if (msg->check(stream_id)) {
break;
}
}
// donot use the auto free to free the msg,
// for performance issue. 不要使用auto free来释放msg,因为性能问题。
srs_error_t err = do_send_messages(msgs, nb_msgs);
for (int i = 0; i < nb_msgs; i++) {
SrsSharedPtrMessage* msg = msgs[i];
srs_freep(msg);
}
// donot flush when send failed
if (err != srs_success) {
return srs_error_wrap(err, "send messages");
}
// flush messages in manual queue
if ((err = manual_response_flush()) != srs_success) {
return srs_error_wrap(err, "manual flush response");
}
print_debug_info();
return err;
}
发送消息。
srs_error_t SrsProtocol::do_iovs_send(iovec* iovs, int size)
{
return srs_write_large_iovs(skt, iovs, size);
}
srs_error_t srs_write_large_iovs(ISrsProtocolReadWriter* skt, iovec* iovs, int size, ssize_t* pnwrite)
{
srs_error_t err = srs_success;
// the limits of writev iovs.
// for srs-librtmp, @see https://github.com/ossrs/srs/issues/213
#ifndef _WIN32
// for linux, generally it's 1024.
static int limits = (int)sysconf(_SC_IOV_MAX);
#else
static int limits = 1024;
#endif
// send in a time.
if (size <= limits) {
if ((err = skt->writev(iovs, size, pnwrite)) != srs_success) {
return srs_error_wrap(err, "writev");
}
return err;
}
// send in multiple times.
int cur_iov = 0;
ssize_t nwrite = 0;
while (cur_iov < size) {
int cur_count = srs_min(limits, size - cur_iov);
if ((err = skt->writev(iovs + cur_iov, cur_count, &nwrite)) != srs_success) { //转发给拉流端的流转发出去。
return srs_error_wrap(err, "writev");
}
cur_iov += cur_count;
if (pnwrite) {
*pnwrite += nwrite;
}
}
return err;
}
原文:https://www.yuque.com/wahaha-0yfyj/mnfloz/shzn6x
版权声明:本文内容转自互联网,本文观点仅代表作者本人。本站仅提供信息存储空间服务,所有权归原作者所有。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至1393616908@qq.com 举报,一经查实,本站将立刻删除。