本文分享从SRS流媒体服务器拉流消息处理流程。
从SRS流媒体服务器拉流消息处理概述
1. 整体播放流程:
2. 从srs流媒体服务器拉RTMP流wiresharek抓包如下:
RTMP拉流消息前半部分(_result消息前)都已分析过,见:RTMP推流到SRS流媒体服务器消息处理。
重点分析从_result开始的消息。
_result
拉流消息中的_result消息回复的是客户端发送过来createStream消息,具体解析见第3小节。
代码:
1. 解析createStream:SrsProtocol::do_decode_message
} else if (command == RTMP_AMF0_COMMAND_CREATE_STREAM) {
*ppacket = packet = new SrsCreateStreamPacket(); //第五个接收到的message为"createStream"消息
return packet->decode(stream);
}
2. 反馈createStream消息:SrsRtmpServer::identify_client
if (dynamic_cast<SrsCreateStreamPacket*>(pkt)) {
return identify_create_stream_client(dynamic_cast<SrsCreateStreamPacket*>(pkt), stream_id, 3, type, stream_name, duration);
}
拉流客户端发送getStreamLength、play和set Buffer Length消息
拉流客户端接收到SRS服务器的_result消息后,会连续发三个消息:getStreamLength、play和set Buffer Length消息给SRS服务器。
getStreamLength消息
wiresharek截图:
拉流客户端生成’getStreamLength’调用并将其发送到服务器。如果服务器知道所选流的持续时间,它将以秒为单位进行应答。
FFmpeg对应代码:
/**
* Generate 'getStreamLength' call and send it to the server. If the server
* knows the duration of the selected stream, it will reply with the duration
* in seconds.
*/
static int gen_get_stream_length(URLContext *s, RTMPContext *rt)
{
RTMPPacket pkt;
uint8_t *p;
int ret;
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SOURCE_CHANNEL, RTMP_PT_INVOKE,
0, 31 + strlen(rt->playpath))) < 0)
return ret;
p = pkt.data;
ff_amf_write_string(&p, "getStreamLength");
ff_amf_write_number(&p, ++rt->nb_invokes);
ff_amf_write_null(&p);
ff_amf_write_string(&p, rt->playpath);
return rtmp_send_packet(rt, &pkt, 1);
}
play消息
wiresharek截图:
由客户端向服务器发起请求从服务器端接受数据(如果传输的信息是视频的话就是请求开始播流),可以多次调用,这样本地就会形成一组数据流的接收者。注意其中有一个reset字段,表示是覆盖之前的播流(设为true)还是重新开始一路播放(设为false)。
play命令的结构如下:
FFmpeg对应代码:
/**
* Generate 'play' call and send it to the server, then ping the server
* to start actual playing.
*/
static int gen_play(URLContext *s, RTMPContext *rt)
{
RTMPPacket pkt;
uint8_t *p;
int ret;
av_log(s, AV_LOG_DEBUG, "Sending play command for '%s'\n", rt->playpath);
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_SOURCE_CHANNEL, RTMP_PT_INVOKE,
0, 29 + strlen(rt->playpath))) < 0)
return ret;
pkt.extra = rt->stream_id;
p = pkt.data;
ff_amf_write_string(&p, "play");
ff_amf_write_number(&p, ++rt->nb_invokes);
ff_amf_write_null(&p);
ff_amf_write_string(&p, rt->playpath);
ff_amf_write_number(&p, rt->live * 1000);
return rtmp_send_packet(rt, &pkt, 1);
}
set Buffer Length消息
wiresharek截图:
生成客户端缓冲区时间并将其发送到服务器。
/**
* Generate client buffer time and send it to the server.
*/
static int gen_buffer_time(URLContext *s, RTMPContext *rt)
{
RTMPPacket pkt;
uint8_t *p;
int ret;
if ((ret = ff_rtmp_packet_create(&pkt, RTMP_NETWORK_CHANNEL, RTMP_PT_USER_CONTROL,
1, 10)) < 0)
return ret;
p = pkt.data;
bytestream_put_be16(&p, 3); // SetBuffer Length
bytestream_put_be32(&p, rt->stream_id);
bytestream_put_be32(&p, rt->client_buffer_time);
return rtmp_send_packet(rt, &pkt, 0);
}
SRS服务器发送Stream Begin、onStatus(NetStream.Play.Reset)和onStatus(NetStream.Play.Start)等消息给拉流客户端
对应SRS代码在SrsRtmpServer::start_play:
srs_error_t SrsRtmpServer::start_play(int stream_id)
{
srs_error_t err = srs_success;
// StreamBegin
if (true) {
SrsUserControlPacket* pkt = new SrsUserControlPacket();
pkt->event_type = SrcPCUCStreamBegin;
pkt->event_data = stream_id;
if ((err = protocol->send_and_free_packet(pkt, 0)) != srs_success) {
return srs_error_wrap(err, "send StreamBegin");
}
}
// onStatus(NetStream.Play.Reset)
if (true) {
SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
pkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus));
pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodeStreamReset));
pkt->data->set(StatusDescription, SrsAmf0Any::str("Playing and resetting stream."));
pkt->data->set(StatusDetails, SrsAmf0Any::str("stream"));
pkt->data->set(StatusClientId, SrsAmf0Any::str(RTMP_SIG_CLIENT_ID));
if ((err = protocol->send_and_free_packet(pkt, stream_id)) != srs_success) {
return srs_error_wrap(err, "send NetStream.Play.Reset");
}
}
// onStatus(NetStream.Play.Start)
if (true) {
SrsOnStatusCallPacket* pkt = new SrsOnStatusCallPacket();
pkt->data->set(StatusLevel, SrsAmf0Any::str(StatusLevelStatus));
pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodeStreamStart));
pkt->data->set(StatusDescription, SrsAmf0Any::str("Started playing stream."));
pkt->data->set(StatusDetails, SrsAmf0Any::str("stream"));
pkt->data->set(StatusClientId, SrsAmf0Any::str(RTMP_SIG_CLIENT_ID));
if ((err = protocol->send_and_free_packet(pkt, stream_id)) != srs_success) {
return srs_error_wrap(err, "send NetStream.Play.Start");
}
}
// |RtmpSampleAccess(false, false)
if (true) {
SrsSampleAccessPacket* pkt = new SrsSampleAccessPacket();
// allow audio/video sample.
// @see: https://github.com/ossrs/srs/issues/49
pkt->audio_sample_access = true;
pkt->video_sample_access = true;
if ((err = protocol->send_and_free_packet(pkt, stream_id)) != srs_success) {
return srs_error_wrap(err, "send |RtmpSampleAccess true");
}
}
// onStatus(NetStream.Data.Start)
if (true) {
SrsOnStatusDataPacket* pkt = new SrsOnStatusDataPacket();
pkt->data->set(StatusCode, SrsAmf0Any::str(StatusCodeDataStart));
if ((err = protocol->send_and_free_packet(pkt, stream_id)) != srs_success) {
return srs_error_wrap(err, "send NetStream.Data.Start");
}
}
return err;
}
版权声明:本文内容转自互联网,本文观点仅代表作者本人。本站仅提供信息存储空间服务,所有权归原作者所有。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至1393616908@qq.com 举报,一经查实,本站将立刻删除。