diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index d4e44d4e..9dcab1ff 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -174,7 +174,7 @@ public: } uint32_t getStamp(const RtmpPacket::Ptr &packet) { - return packet->timeStamp; + return packet->time_stamp; } bool isFlushAble(bool is_video, bool is_key, uint32_t new_stamp, int cache_size); diff --git a/src/Extension/AAC.cpp b/src/Extension/AAC.cpp index 0a1e082e..1a820e42 100644 --- a/src/Extension/AAC.cpp +++ b/src/Extension/AAC.cpp @@ -144,7 +144,7 @@ int dumpAacConfig(const string &config, int length, uint8_t *out, int out_size) #ifndef ENABLE_MP4 AdtsHeader header; parseAacConfig(config, header); - header.aac_frame_length = length; + header.aac_frame_length = ADTS_HEADER_LEN + length; dumpAdtsHeader(header, out); return ADTS_HEADER_LEN; #else diff --git a/src/Extension/AACRtmp.cpp b/src/Extension/AACRtmp.cpp index 30e60e02..ec9bd7e0 100644 --- a/src/Extension/AACRtmp.cpp +++ b/src/Extension/AACRtmp.cpp @@ -21,11 +21,11 @@ static string getAacCfg(const RtmpPacket &thiz) { if (!thiz.isCfgFrame()) { return ret; } - if (thiz.strBuf.size() < 4) { + if (thiz.buffer.size() < 4) { WarnL << "bad aac cfg!"; return ret; } - ret = thiz.strBuf.substr(2); + ret = thiz.buffer.substr(2); return ret; } @@ -37,7 +37,7 @@ bool AACRtmpDecoder::inputRtmp(const RtmpPacket::Ptr &pkt, bool) { } if (!_aac_cfg.empty()) { - onGetAAC(pkt->strBuf.data() + 2, pkt->strBuf.size() - 2, pkt->timeStamp); + onGetAAC(pkt->buffer.data() + 2, pkt->buffer.size() - 2, pkt->time_stamp); } return false; } @@ -97,21 +97,21 @@ void AACRtmpEncoder::inputFrame(const Frame::Ptr &frame) { if(!_aac_cfg.empty()){ RtmpPacket::Ptr rtmpPkt = ResourcePoolHelper::obtainObj(); - rtmpPkt->strBuf.clear(); + rtmpPkt->buffer.clear(); //header uint8_t is_config = false; - rtmpPkt->strBuf.push_back(_audio_flv_flags); - rtmpPkt->strBuf.push_back(!is_config); + rtmpPkt->buffer.push_back(_audio_flv_flags); + rtmpPkt->buffer.push_back(!is_config); //aac data - rtmpPkt->strBuf.append(frame->data() + frame->prefixSize(), frame->size() - frame->prefixSize()); + rtmpPkt->buffer.append(frame->data() + frame->prefixSize(), frame->size() - frame->prefixSize()); - rtmpPkt->bodySize = rtmpPkt->strBuf.size(); - rtmpPkt->chunkId = CHUNK_AUDIO; - rtmpPkt->streamId = STREAM_MEDIA; - rtmpPkt->timeStamp = frame->dts(); - rtmpPkt->typeId = MSG_AUDIO; + rtmpPkt->body_size = rtmpPkt->buffer.size(); + rtmpPkt->chunk_id = CHUNK_AUDIO; + rtmpPkt->stream_index = STREAM_MEDIA; + rtmpPkt->time_stamp = frame->dts(); + rtmpPkt->type_id = MSG_AUDIO; RtmpCodec::inputRtmp(rtmpPkt, false); } } @@ -119,20 +119,20 @@ void AACRtmpEncoder::inputFrame(const Frame::Ptr &frame) { void AACRtmpEncoder::makeAudioConfigPkt() { _audio_flv_flags = getAudioRtmpFlags(std::make_shared(_aac_cfg)); RtmpPacket::Ptr rtmpPkt = ResourcePoolHelper::obtainObj(); - rtmpPkt->strBuf.clear(); + rtmpPkt->buffer.clear(); //header uint8_t is_config = true; - rtmpPkt->strBuf.push_back(_audio_flv_flags); - rtmpPkt->strBuf.push_back(!is_config); + rtmpPkt->buffer.push_back(_audio_flv_flags); + rtmpPkt->buffer.push_back(!is_config); //aac config - rtmpPkt->strBuf.append(_aac_cfg); + rtmpPkt->buffer.append(_aac_cfg); - rtmpPkt->bodySize = rtmpPkt->strBuf.size(); - rtmpPkt->chunkId = CHUNK_AUDIO; - rtmpPkt->streamId = STREAM_MEDIA; - rtmpPkt->timeStamp = 0; - rtmpPkt->typeId = MSG_AUDIO; + rtmpPkt->body_size = rtmpPkt->buffer.size(); + rtmpPkt->chunk_id = CHUNK_AUDIO; + rtmpPkt->stream_index = STREAM_MEDIA; + rtmpPkt->time_stamp = 0; + rtmpPkt->type_id = MSG_AUDIO; RtmpCodec::inputRtmp(rtmpPkt, false); } diff --git a/src/Extension/CommonRtmp.cpp b/src/Extension/CommonRtmp.cpp index 3405f352..783c4f37 100644 --- a/src/Extension/CommonRtmp.cpp +++ b/src/Extension/CommonRtmp.cpp @@ -31,8 +31,8 @@ void CommonRtmpDecoder::obtainFrame() { bool CommonRtmpDecoder::inputRtmp(const RtmpPacket::Ptr &rtmp, bool) { //拷贝负载 - _frame->_buffer.assign(rtmp->strBuf.data() + 1, rtmp->strBuf.size() - 1); - _frame->_dts = rtmp->timeStamp; + _frame->_buffer.assign(rtmp->buffer.data() + 1, rtmp->buffer.size() - 1); + _frame->_dts = rtmp->time_stamp; //写入环形缓存 RtmpCodec::inputFrame(_frame); //创建下一帧 @@ -51,16 +51,16 @@ void CommonRtmpEncoder::inputFrame(const Frame::Ptr &frame) { return; } RtmpPacket::Ptr rtmp = ResourcePoolHelper::obtainObj(); - rtmp->strBuf.clear(); + rtmp->buffer.clear(); //header - rtmp->strBuf.push_back(_audio_flv_flags); + rtmp->buffer.push_back(_audio_flv_flags); //data - rtmp->strBuf.append(frame->data() + frame->prefixSize(), frame->size() - frame->prefixSize()); - rtmp->bodySize = rtmp->strBuf.size(); - rtmp->chunkId = CHUNK_AUDIO; - rtmp->streamId = STREAM_MEDIA; - rtmp->timeStamp = frame->dts(); - rtmp->typeId = MSG_AUDIO; + rtmp->buffer.append(frame->data() + frame->prefixSize(), frame->size() - frame->prefixSize()); + rtmp->body_size = rtmp->buffer.size(); + rtmp->chunk_id = CHUNK_AUDIO; + rtmp->stream_index = STREAM_MEDIA; + rtmp->time_stamp = frame->dts(); + rtmp->type_id = MSG_AUDIO; RtmpCodec::inputRtmp(rtmp, false); } diff --git a/src/Extension/H264Rtmp.cpp b/src/Extension/H264Rtmp.cpp index 798de496..9a24a8b9 100644 --- a/src/Extension/H264Rtmp.cpp +++ b/src/Extension/H264Rtmp.cpp @@ -39,18 +39,18 @@ static string getH264SPS(const RtmpPacket &thiz) { if (!thiz.isCfgFrame()) { return ret; } - if (thiz.strBuf.size() < 13) { + if (thiz.buffer.size() < 13) { WarnL << "bad H264 cfg!"; return ret; } uint16_t sps_size ; - memcpy(&sps_size, thiz.strBuf.data() + 11,2); + memcpy(&sps_size, thiz.buffer.data() + 11, 2); sps_size = ntohs(sps_size); - if ((int) thiz.strBuf.size() < 13 + sps_size) { + if ((int) thiz.buffer.size() < 13 + sps_size) { WarnL << "bad H264 cfg!"; return ret; } - ret.assign(thiz.strBuf.data() + 13, sps_size); + ret.assign(thiz.buffer.data() + 13, sps_size); return ret; } @@ -66,27 +66,27 @@ static string getH264PPS(const RtmpPacket &thiz) { if (!thiz.isCfgFrame()) { return ret; } - if (thiz.strBuf.size() < 13) { + if (thiz.buffer.size() < 13) { WarnL << "bad H264 cfg!"; return ret; } uint16_t sps_size ; - memcpy(&sps_size,thiz.strBuf.data() + 11,2); + memcpy(&sps_size, thiz.buffer.data() + 11, 2); sps_size = ntohs(sps_size); - if ((int) thiz.strBuf.size() < 13 + sps_size + 1 + 2) { + if ((int) thiz.buffer.size() < 13 + sps_size + 1 + 2) { WarnL << "bad H264 cfg!"; return ret; } uint16_t pps_size ; - memcpy(&pps_size, thiz.strBuf.data() + 13 + sps_size + 1,2); + memcpy(&pps_size, thiz.buffer.data() + 13 + sps_size + 1, 2); pps_size = ntohs(pps_size); - if ((int) thiz.strBuf.size() < 13 + sps_size + 1 + 2 + pps_size) { + if ((int) thiz.buffer.size() < 13 + sps_size + 1 + 2 + pps_size) { WarnL << "bad H264 cfg!"; return ret; } - ret.assign(thiz.strBuf.data() + 13 + sps_size + 1 + 2, pps_size); + ret.assign(thiz.buffer.data() + 13 + sps_size + 1 + 2, pps_size); return ret; } @@ -95,27 +95,27 @@ bool H264RtmpDecoder::decodeRtmp(const RtmpPacket::Ptr &pkt) { //缓存sps pps,后续插入到I帧之前 _sps = getH264SPS(*pkt); _pps = getH264PPS(*pkt); - onGetH264(_sps.data(), _sps.size(), pkt->timeStamp , pkt->timeStamp); - onGetH264(_pps.data(), _pps.size(), pkt->timeStamp , pkt->timeStamp); + onGetH264(_sps.data(), _sps.size(), pkt->time_stamp , pkt->time_stamp); + onGetH264(_pps.data(), _pps.size(), pkt->time_stamp , pkt->time_stamp); return false; } - if (pkt->strBuf.size() > 9) { - uint32_t iTotalLen = pkt->strBuf.size(); + if (pkt->buffer.size() > 9) { + uint32_t iTotalLen = pkt->buffer.size(); uint32_t iOffset = 5; - uint8_t *cts_ptr = (uint8_t *) (pkt->strBuf.data() + 2); + uint8_t *cts_ptr = (uint8_t *) (pkt->buffer.data() + 2); int32_t cts = (((cts_ptr[0] << 16) | (cts_ptr[1] << 8) | (cts_ptr[2])) + 0xff800000) ^ 0xff800000; - auto pts = pkt->timeStamp + cts; + auto pts = pkt->time_stamp + cts; while(iOffset + 4 < iTotalLen){ uint32_t iFrameLen; - memcpy(&iFrameLen, pkt->strBuf.data() + iOffset, 4); + memcpy(&iFrameLen, pkt->buffer.data() + iOffset, 4); iFrameLen = ntohl(iFrameLen); iOffset += 4; if(iFrameLen + iOffset > iTotalLen){ break; } - onGetH264(pkt->strBuf.data() + iOffset, iFrameLen, pkt->timeStamp , pts); + onGetH264(pkt->buffer.data() + iOffset, iFrameLen, pkt->time_stamp , pts); iOffset += iFrameLen; } } @@ -190,7 +190,7 @@ void H264RtmpEncoder::inputFrame(const Frame::Ptr &frame) { } } - if(_lastPacket && _lastPacket->timeStamp != frame->dts()) { + if(_lastPacket && _lastPacket->time_stamp != frame->dts()) { RtmpCodec::inputRtmp(_lastPacket, _lastPacket->isVideoKeyFrame()); _lastPacket = nullptr; } @@ -202,23 +202,23 @@ void H264RtmpEncoder::inputFrame(const Frame::Ptr &frame) { flags |= (((frame->configFrame() || frame->keyFrame()) ? FLV_KEY_FRAME : FLV_INTER_FRAME) << 4); _lastPacket = ResourcePoolHelper::obtainObj(); - _lastPacket->strBuf.clear(); - _lastPacket->strBuf.push_back(flags); - _lastPacket->strBuf.push_back(!is_config); + _lastPacket->buffer.clear(); + _lastPacket->buffer.push_back(flags); + _lastPacket->buffer.push_back(!is_config); auto cts = frame->pts() - frame->dts(); cts = htonl(cts); - _lastPacket->strBuf.append((char *)&cts + 1, 3); + _lastPacket->buffer.append((char *)&cts + 1, 3); - _lastPacket->chunkId = CHUNK_VIDEO; - _lastPacket->streamId = STREAM_MEDIA; - _lastPacket->timeStamp = frame->dts(); - _lastPacket->typeId = MSG_VIDEO; + _lastPacket->chunk_id = CHUNK_VIDEO; + _lastPacket->stream_index = STREAM_MEDIA; + _lastPacket->time_stamp = frame->dts(); + _lastPacket->type_id = MSG_VIDEO; } auto size = htonl(iLen); - _lastPacket->strBuf.append((char *) &size, 4); - _lastPacket->strBuf.append(pcData, iLen); - _lastPacket->bodySize = _lastPacket->strBuf.size(); + _lastPacket->buffer.append((char *) &size, 4); + _lastPacket->buffer.append(pcData, iLen); + _lastPacket->body_size = _lastPacket->buffer.size(); } void H264RtmpEncoder::makeVideoConfigPkt() { @@ -227,38 +227,38 @@ void H264RtmpEncoder::makeVideoConfigPkt() { bool is_config = true; RtmpPacket::Ptr rtmpPkt = ResourcePoolHelper::obtainObj(); - rtmpPkt->strBuf.clear(); + rtmpPkt->buffer.clear(); //header - rtmpPkt->strBuf.push_back(flags); - rtmpPkt->strBuf.push_back(!is_config); + rtmpPkt->buffer.push_back(flags); + rtmpPkt->buffer.push_back(!is_config); //cts - rtmpPkt->strBuf.append("\x0\x0\x0", 3); + rtmpPkt->buffer.append("\x0\x0\x0", 3); //AVCDecoderConfigurationRecord start - rtmpPkt->strBuf.push_back(1); // version - rtmpPkt->strBuf.push_back(_sps[1]); // profile - rtmpPkt->strBuf.push_back(_sps[2]); // compat - rtmpPkt->strBuf.push_back(_sps[3]); // level - rtmpPkt->strBuf.push_back(0xff); // 6 bits reserved + 2 bits nal size length - 1 (11) - rtmpPkt->strBuf.push_back(0xe1); // 3 bits reserved + 5 bits number of sps (00001) + rtmpPkt->buffer.push_back(1); // version + rtmpPkt->buffer.push_back(_sps[1]); // profile + rtmpPkt->buffer.push_back(_sps[2]); // compat + rtmpPkt->buffer.push_back(_sps[3]); // level + rtmpPkt->buffer.push_back(0xff); // 6 bits reserved + 2 bits nal size length - 1 (11) + rtmpPkt->buffer.push_back(0xe1); // 3 bits reserved + 5 bits number of sps (00001) //sps uint16_t size = _sps.size(); size = htons(size); - rtmpPkt->strBuf.append((char *) &size, 2); - rtmpPkt->strBuf.append(_sps); + rtmpPkt->buffer.append((char *) &size, 2); + rtmpPkt->buffer.append(_sps); //pps - rtmpPkt->strBuf.push_back(1); // version + rtmpPkt->buffer.push_back(1); // version size = _pps.size(); size = htons(size); - rtmpPkt->strBuf.append((char *) &size, 2); - rtmpPkt->strBuf.append(_pps); + rtmpPkt->buffer.append((char *) &size, 2); + rtmpPkt->buffer.append(_pps); - rtmpPkt->bodySize = rtmpPkt->strBuf.size(); - rtmpPkt->chunkId = CHUNK_VIDEO; - rtmpPkt->streamId = STREAM_MEDIA; - rtmpPkt->timeStamp = 0; - rtmpPkt->typeId = MSG_VIDEO; + rtmpPkt->body_size = rtmpPkt->buffer.size(); + rtmpPkt->chunk_id = CHUNK_VIDEO; + rtmpPkt->stream_index = STREAM_MEDIA; + rtmpPkt->time_stamp = 0; + rtmpPkt->type_id = MSG_VIDEO; RtmpCodec::inputRtmp(rtmpPkt, false); } diff --git a/src/Extension/H265Rtmp.cpp b/src/Extension/H265Rtmp.cpp index 150ac1a8..f91d83f2 100644 --- a/src/Extension/H265Rtmp.cpp +++ b/src/Extension/H265Rtmp.cpp @@ -43,13 +43,13 @@ static bool getH265ConfigFrame(const RtmpPacket &thiz,string &frame) { if (!thiz.isCfgFrame()) { return false; } - if (thiz.strBuf.size() < 6) { + if (thiz.buffer.size() < 6) { WarnL << "bad H265 cfg!"; return false; } - auto extra = thiz.strBuf.data() + 5; - auto bytes = thiz.strBuf.size() - 5; + auto extra = thiz.buffer.data() + 5; + auto bytes = thiz.buffer.size() - 5; struct mpeg4_hevc_t hevc = {0}; if (mpeg4_hevc_decoder_configuration_record_load((uint8_t *) extra, bytes, &hevc) > 0) { @@ -70,7 +70,7 @@ bool H265RtmpDecoder::decodeRtmp(const RtmpPacket::Ptr &pkt) { #ifdef ENABLE_MP4 string config; if(getH265ConfigFrame(*pkt,config)){ - onGetH265(config.data(), config.size(), pkt->timeStamp , pkt->timeStamp); + onGetH265(config.data(), config.size(), pkt->time_stamp , pkt->time_stamp); } #else WarnL << "请开启MP4相关功能并使能\"ENABLE_MP4\",否则对H265-RTMP支持不完善"; @@ -78,22 +78,22 @@ bool H265RtmpDecoder::decodeRtmp(const RtmpPacket::Ptr &pkt) { return false; } - if (pkt->strBuf.size() > 9) { - uint32_t iTotalLen = pkt->strBuf.size(); + if (pkt->buffer.size() > 9) { + uint32_t iTotalLen = pkt->buffer.size(); uint32_t iOffset = 5; - uint8_t *cts_ptr = (uint8_t *) (pkt->strBuf.data() + 2); + uint8_t *cts_ptr = (uint8_t *) (pkt->buffer.data() + 2); int32_t cts = (((cts_ptr[0] << 16) | (cts_ptr[1] << 8) | (cts_ptr[2])) + 0xff800000) ^ 0xff800000; - auto pts = pkt->timeStamp + cts; + auto pts = pkt->time_stamp + cts; while(iOffset + 4 < iTotalLen){ uint32_t iFrameLen; - memcpy(&iFrameLen, pkt->strBuf.data() + iOffset, 4); + memcpy(&iFrameLen, pkt->buffer.data() + iOffset, 4); iFrameLen = ntohl(iFrameLen); iOffset += 4; if(iFrameLen + iOffset > iTotalLen){ break; } - onGetH265(pkt->strBuf.data() + iOffset, iFrameLen, pkt->timeStamp , pts); + onGetH265(pkt->buffer.data() + iOffset, iFrameLen, pkt->time_stamp , pts); iOffset += iFrameLen; } } @@ -176,7 +176,7 @@ void H265RtmpEncoder::inputFrame(const Frame::Ptr &frame) { return; } - if(_lastPacket && _lastPacket->timeStamp != frame->dts()) { + if(_lastPacket && _lastPacket->time_stamp != frame->dts()) { RtmpCodec::inputRtmp(_lastPacket, _lastPacket->isVideoKeyFrame()); _lastPacket = nullptr; } @@ -188,23 +188,23 @@ void H265RtmpEncoder::inputFrame(const Frame::Ptr &frame) { flags |= (((frame->configFrame() || frame->keyFrame()) ? FLV_KEY_FRAME : FLV_INTER_FRAME) << 4); _lastPacket = ResourcePoolHelper::obtainObj(); - _lastPacket->strBuf.clear(); - _lastPacket->strBuf.push_back(flags); - _lastPacket->strBuf.push_back(!is_config); + _lastPacket->buffer.clear(); + _lastPacket->buffer.push_back(flags); + _lastPacket->buffer.push_back(!is_config); auto cts = frame->pts() - frame->dts(); cts = htonl(cts); - _lastPacket->strBuf.append((char *)&cts + 1, 3); + _lastPacket->buffer.append((char *)&cts + 1, 3); - _lastPacket->chunkId = CHUNK_VIDEO; - _lastPacket->streamId = STREAM_MEDIA; - _lastPacket->timeStamp = frame->dts(); - _lastPacket->typeId = MSG_VIDEO; + _lastPacket->chunk_id = CHUNK_VIDEO; + _lastPacket->stream_index = STREAM_MEDIA; + _lastPacket->time_stamp = frame->dts(); + _lastPacket->type_id = MSG_VIDEO; } auto size = htonl(iLen); - _lastPacket->strBuf.append((char *) &size, 4); - _lastPacket->strBuf.append(pcData, iLen); - _lastPacket->bodySize = _lastPacket->strBuf.size(); + _lastPacket->buffer.append((char *) &size, 4); + _lastPacket->buffer.append(pcData, iLen); + _lastPacket->body_size = _lastPacket->buffer.size(); } void H265RtmpEncoder::makeVideoConfigPkt() { @@ -214,13 +214,13 @@ void H265RtmpEncoder::makeVideoConfigPkt() { bool is_config = true; RtmpPacket::Ptr rtmpPkt = ResourcePoolHelper::obtainObj(); - rtmpPkt->strBuf.clear(); + rtmpPkt->buffer.clear(); //header - rtmpPkt->strBuf.push_back(flags); - rtmpPkt->strBuf.push_back(!is_config); + rtmpPkt->buffer.push_back(flags); + rtmpPkt->buffer.push_back(!is_config); //cts - rtmpPkt->strBuf.append("\x0\x0\x0", 3); + rtmpPkt->buffer.append("\x0\x0\x0", 3); struct mpeg4_hevc_t hevc = {0}; string vps_sps_pps = string("\x00\x00\x00\x01", 4) + _vps + @@ -235,13 +235,13 @@ void H265RtmpEncoder::makeVideoConfigPkt() { } //HEVCDecoderConfigurationRecord - rtmpPkt->strBuf.append((char *)extra_data, extra_data_size); + rtmpPkt->buffer.append((char *)extra_data, extra_data_size); - rtmpPkt->bodySize = rtmpPkt->strBuf.size(); - rtmpPkt->chunkId = CHUNK_VIDEO; - rtmpPkt->streamId = STREAM_MEDIA; - rtmpPkt->timeStamp = 0; - rtmpPkt->typeId = MSG_VIDEO; + rtmpPkt->body_size = rtmpPkt->buffer.size(); + rtmpPkt->chunk_id = CHUNK_VIDEO; + rtmpPkt->stream_index = STREAM_MEDIA; + rtmpPkt->time_stamp = 0; + rtmpPkt->type_id = MSG_VIDEO; RtmpCodec::inputRtmp(rtmpPkt, false); #else WarnL << "请开启MP4相关功能并使能\"ENABLE_MP4\",否则对H265-RTMP支持不完善"; diff --git a/src/Rtmp/FlvMuxer.cpp b/src/Rtmp/FlvMuxer.cpp index 35564867..7418a436 100644 --- a/src/Rtmp/FlvMuxer.cpp +++ b/src/Rtmp/FlvMuxer.cpp @@ -73,10 +73,10 @@ void FlvMuxer::onWriteFlvHeader(const RtmpMediaSource::Ptr &mediaSrc) { bool is_have_audio = false,is_have_video = false; mediaSrc->getConfigFrame([&](const RtmpPacket::Ptr &pkt){ - if(pkt->typeId == MSG_VIDEO){ + if(pkt->type_id == MSG_VIDEO){ is_have_video = true; } - if(pkt->typeId == MSG_AUDIO){ + if(pkt->type_id == MSG_AUDIO){ is_have_audio = true; } }); @@ -133,16 +133,16 @@ public: #pragma pack(pop) #endif // defined(_WIN32) -void FlvMuxer::onWriteFlvTag(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp , bool flush) { - onWriteFlvTag(pkt->typeId,pkt,ui32TimeStamp, flush); +void FlvMuxer::onWriteFlvTag(const RtmpPacket::Ptr &pkt, uint32_t time_stamp , bool flush) { + onWriteFlvTag(pkt->type_id, pkt, time_stamp, flush); } -void FlvMuxer::onWriteFlvTag(uint8_t ui8Type, const Buffer::Ptr &buffer, uint32_t ui32TimeStamp, bool flush) { +void FlvMuxer::onWriteFlvTag(uint8_t type, const Buffer::Ptr &buffer, uint32_t time_stamp, bool flush) { RtmpTagHeader header; - header.type = ui8Type; + header.type = type; set_be24(header.data_size, buffer->size()); - header.timestamp_ex = (uint8_t) ((ui32TimeStamp >> 24) & 0xff); - set_be24(header.timestamp,ui32TimeStamp & 0xFFFFFF); + header.timestamp_ex = (uint8_t) ((time_stamp >> 24) & 0xff); + set_be24(header.timestamp, time_stamp & 0xFFFFFF); //tag header onWrite(std::make_shared((char *)&header, sizeof(header)), false); //tag data @@ -154,7 +154,7 @@ void FlvMuxer::onWriteFlvTag(uint8_t ui8Type, const Buffer::Ptr &buffer, uint32_ void FlvMuxer::onWriteRtmp(const RtmpPacket::Ptr &pkt,bool flush) { int64_t dts_out; - _stamp[pkt->typeId % 2].revise(pkt->timeStamp, 0, dts_out, dts_out); + _stamp[pkt->type_id % 2].revise(pkt->time_stamp, 0, dts_out, dts_out); onWriteFlvTag(pkt, dts_out,flush); } diff --git a/src/Rtmp/FlvMuxer.h b/src/Rtmp/FlvMuxer.h index 86be5ee3..fa16a1c8 100644 --- a/src/Rtmp/FlvMuxer.h +++ b/src/Rtmp/FlvMuxer.h @@ -25,21 +25,23 @@ public: FlvMuxer(); virtual ~FlvMuxer(); void stop(); + protected: - void start(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &media); + void start(const EventPoller::Ptr &poller, const RtmpMediaSource::Ptr &media); virtual void onWrite(const Buffer::Ptr &data, bool flush) = 0; virtual void onDetach() = 0; virtual std::shared_ptr getSharedPtr() = 0; + private: void onWriteFlvHeader(const RtmpMediaSource::Ptr &media); - void onWriteRtmp(const RtmpPacket::Ptr &pkt,bool flush); - void onWriteFlvTag(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp, bool flush); - void onWriteFlvTag(uint8_t ui8Type, const Buffer::Ptr &buffer, uint32_t ui32TimeStamp, bool flush); + void onWriteRtmp(const RtmpPacket::Ptr &pkt, bool flush); + void onWriteFlvTag(const RtmpPacket::Ptr &pkt, uint32_t time_stamp, bool flush); + void onWriteFlvTag(uint8_t type, const Buffer::Ptr &buffer, uint32_t time_stamp, bool flush); + private: - RtmpMediaSource::RingType::RingReader::Ptr _ring_reader; //时间戳修整器 Stamp _stamp[2]; - + RtmpMediaSource::RingType::RingReader::Ptr _ring_reader; }; class FlvRecorder : public FlvMuxer , public std::enable_shared_from_this{ @@ -47,12 +49,14 @@ public: typedef std::shared_ptr Ptr; FlvRecorder(); virtual ~FlvRecorder(); - void startRecord(const EventPoller::Ptr &poller,const string &vhost,const string &app,const string &stream,const string &file_path); - void startRecord(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &media,const string &file_path); + void startRecord(const EventPoller::Ptr &poller, const RtmpMediaSource::Ptr &media, const string &file_path); + void startRecord(const EventPoller::Ptr &poller, const string &vhost, const string &app, const string &stream, const string &file_path); + private: virtual void onWrite(const Buffer::Ptr &data, bool flush) override ; virtual void onDetach() override; virtual std::shared_ptr getSharedPtr() override; + private: std::shared_ptr _file; recursive_mutex _file_mtx; @@ -60,5 +64,4 @@ private: }//namespace mediakit - #endif //ZLMEDIAKIT_FLVMUXER_H diff --git a/src/Rtmp/Rtmp.h b/src/Rtmp/Rtmp.h index 284536a2..0d2e5fb3 100644 --- a/src/Rtmp/Rtmp.h +++ b/src/Rtmp/Rtmp.h @@ -23,9 +23,6 @@ using namespace toolkit; -#define PORT 1935 -#define DEFAULT_CHUNK_LEN 128 - #if !defined(_WIN32) #define PACKED __attribute__((packed)) #else @@ -33,6 +30,7 @@ using namespace toolkit; #endif //!defined(_WIN32) +#define DEFAULT_CHUNK_LEN 128 #define HANDSHAKE_PLAINTEXT 0x03 #define RANDOM_LEN (1536 - 8) @@ -91,22 +89,24 @@ class RtmpHandshake { public: RtmpHandshake(uint32_t _time, uint8_t *_random = nullptr) { _time = htonl(_time); - memcpy(timeStamp, &_time, 4); + memcpy(time_stamp, &_time, 4); if (!_random) { random_generate((char *) random, sizeof(random)); } else { memcpy(random, _random, sizeof(random)); } } - uint8_t timeStamp[4]; + + uint8_t time_stamp[4]; uint8_t zero[4] = {0}; uint8_t random[RANDOM_LEN]; - void random_generate(char* bytes, int size) { - static char cdata[] = { 0x73, 0x69, 0x6d, 0x70, 0x6c, 0x65, 0x2d, 0x72, - 0x74, 0x6d, 0x70, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, - 0x2d, 0x77, 0x69, 0x6e, 0x6c, 0x69, 0x6e, 0x2d, 0x77, 0x69, - 0x6e, 0x74, 0x65, 0x72, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, - 0x40, 0x31, 0x32, 0x36, 0x2e, 0x63, 0x6f, 0x6d }; + + void random_generate(char *bytes, int size) { + static char cdata[] = {0x73, 0x69, 0x6d, 0x70, 0x6c, 0x65, 0x2d, 0x72, + 0x74, 0x6d, 0x70, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, + 0x2d, 0x77, 0x69, 0x6e, 0x6c, 0x69, 0x6e, 0x2d, 0x77, 0x69, + 0x6e, 0x74, 0x65, 0x72, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, + 0x40, 0x31, 0x32, 0x36, 0x2e, 0x63, 0x6f, 0x6d}; for (int i = 0; i < size; i++) { bytes[i] = cdata[rand() % (sizeof(cdata) - 1)]; } @@ -116,10 +116,10 @@ public: class RtmpHeader { public: uint8_t flags; - uint8_t timeStamp[3]; - uint8_t bodySize[3]; - uint8_t typeId; - uint8_t streamId[4]; /* Note, this is little-endian while others are BE */ + uint8_t time_stamp[3]; + uint8_t body_size[3]; + uint8_t type_id; + uint8_t stream_index[4]; /* Note, this is little-endian while others are BE */ }PACKED; #if defined(_WIN32) @@ -129,21 +129,23 @@ public: class RtmpPacket : public Buffer{ public: typedef std::shared_ptr Ptr; - uint8_t typeId; - uint32_t bodySize = 0; - uint32_t timeStamp = 0; - bool hasAbsStamp = false; - uint32_t tsField = 0; - uint32_t streamId; - uint32_t chunkId; - std::string strBuf; + uint8_t type_id; + uint32_t body_size = 0; + uint32_t time_stamp = 0; + bool is_abs_stamp = false; + uint32_t ts_field = 0; + uint32_t stream_index; + uint32_t chunk_id; + std::string buffer; + public: char *data() const override{ - return (char*)strBuf.data(); + return (char*)buffer.data(); } uint32_t size() const override { - return strBuf.size(); - }; + return buffer.size(); + } + public: RtmpPacket() = default; RtmpPacket(const RtmpPacket &that) = delete; @@ -151,58 +153,64 @@ public: RtmpPacket &operator=(RtmpPacket &&that) = delete; RtmpPacket(RtmpPacket &&that){ - typeId = that.typeId; - bodySize = that.bodySize; - timeStamp = that.timeStamp; - hasAbsStamp = that.hasAbsStamp; - tsField = that.tsField; - streamId = that.streamId; - chunkId = that.chunkId; - strBuf = std::move(that.strBuf); + type_id = that.type_id; + body_size = that.body_size; + time_stamp = that.time_stamp; + is_abs_stamp = that.is_abs_stamp; + ts_field = that.ts_field; + stream_index = that.stream_index; + chunk_id = that.chunk_id; + buffer = std::move(that.buffer); } + bool isVideoKeyFrame() const { - return typeId == MSG_VIDEO && (uint8_t) strBuf[0] >> 4 == FLV_KEY_FRAME && (uint8_t) strBuf[1] == 1; + return type_id == MSG_VIDEO && (uint8_t) buffer[0] >> 4 == FLV_KEY_FRAME && (uint8_t) buffer[1] == 1; } + bool isCfgFrame() const { - switch (typeId){ - case MSG_VIDEO : return strBuf[1] == 0; + switch (type_id){ + case MSG_VIDEO : return buffer[1] == 0; case MSG_AUDIO : { switch (getMediaType()){ - case FLV_CODEC_AAC : return strBuf[1] == 0; + case FLV_CODEC_AAC : return buffer[1] == 0; default : return false; } } default : return false; } } + int getMediaType() const { - switch (typeId) { - case MSG_VIDEO : return (uint8_t) strBuf[0] & 0x0F; - case MSG_AUDIO : return (uint8_t) strBuf[0] >> 4; + switch (type_id) { + case MSG_VIDEO : return (uint8_t) buffer[0] & 0x0F; + case MSG_AUDIO : return (uint8_t) buffer[0] >> 4; default : return 0; } } + int getAudioSampleRate() const { - if (typeId != MSG_AUDIO) { + if (type_id != MSG_AUDIO) { return 0; } - int flvSampleRate = ((uint8_t) strBuf[0] & 0x0C) >> 2; + int flvSampleRate = ((uint8_t) buffer[0] & 0x0C) >> 2; const static int sampleRate[] = { 5512, 11025, 22050, 44100 }; return sampleRate[flvSampleRate]; } + int getAudioSampleBit() const { - if (typeId != MSG_AUDIO) { + if (type_id != MSG_AUDIO) { return 0; } - int flvSampleBit = ((uint8_t) strBuf[0] & 0x02) >> 1; + int flvSampleBit = ((uint8_t) buffer[0] & 0x02) >> 1; const static int sampleBit[] = { 8, 16 }; return sampleBit[flvSampleBit]; } + int getAudioChannel() const { - if (typeId != MSG_AUDIO) { + if (type_id != MSG_AUDIO) { return 0; } - int flvStereoOrMono = (uint8_t) strBuf[0] & 0x01; + int flvStereoOrMono = (uint8_t) buffer[0] & 0x01; const static int channel[] = { 1, 2 }; return channel[flvStereoOrMono]; } diff --git a/src/Rtmp/RtmpCodec.h b/src/Rtmp/RtmpCodec.h index bb74fb69..29cdc768 100644 --- a/src/Rtmp/RtmpCodec.h +++ b/src/Rtmp/RtmpCodec.h @@ -23,14 +23,14 @@ public: typedef std::shared_ptr Ptr; typedef RingBuffer RingType; - RtmpRing(){} - virtual ~RtmpRing(){} + RtmpRing() {} + virtual ~RtmpRing() {} /** * 获取rtmp环形缓存 * @return */ - virtual RingType::Ptr getRtmpRing() const{ + virtual RingType::Ptr getRtmpRing() const { return _rtmpRing; } @@ -38,7 +38,7 @@ public: * 设置rtmp环形缓存 * @param ring */ - virtual void setRtmpRing(const RingType::Ptr &ring){ + virtual void setRtmpRing(const RingType::Ptr &ring) { _rtmpRing = ring; } @@ -48,17 +48,17 @@ public: * @param key_pos 是否为关键帧 * @return 是否为关键帧 */ - virtual bool inputRtmp(const RtmpPacket::Ptr &rtmp, bool key_pos){ - if(_rtmpRing){ - _rtmpRing->write(rtmp,key_pos); + virtual bool inputRtmp(const RtmpPacket::Ptr &rtmp, bool key_pos) { + if (_rtmpRing) { + _rtmpRing->write(rtmp, key_pos); } return key_pos; } + protected: RingType::Ptr _rtmpRing; }; - class RtmpCodec : public RtmpRing, public FrameDispatcher , public CodecInfo{ public: typedef std::shared_ptr Ptr; @@ -69,5 +69,4 @@ public: }//namespace mediakit - #endif //ZLMEDIAKIT_RTMPCODEC_H diff --git a/src/Rtmp/RtmpDemuxer.cpp b/src/Rtmp/RtmpDemuxer.cpp index 6e38b0d7..f50c4920 100644 --- a/src/Rtmp/RtmpDemuxer.cpp +++ b/src/Rtmp/RtmpDemuxer.cpp @@ -66,27 +66,27 @@ bool RtmpDemuxer::loadMetaData(const AMFValue &val){ } bool RtmpDemuxer::inputRtmp(const RtmpPacket::Ptr &pkt) { - switch (pkt->typeId) { + switch (pkt->type_id) { case MSG_VIDEO: { - if(!_tryedGetVideoTrack){ - _tryedGetVideoTrack = true; + if(!_try_get_video_track){ + _try_get_video_track = true; auto codec = AMFValue(pkt->getMediaType()); makeVideoTrack(codec); } - if(_videoRtmpDecoder){ - return _videoRtmpDecoder->inputRtmp(pkt, true); + if(_video_rtmp_decoder){ + return _video_rtmp_decoder->inputRtmp(pkt, true); } return false; } case MSG_AUDIO: { - if(!_tryedGetAudioTrack) { - _tryedGetAudioTrack = true; + if(!_try_get_audio_track) { + _try_get_audio_track = true; auto codec = AMFValue(pkt->getMediaType()); makeAudioTrack(codec, pkt->getAudioSampleRate(), pkt->getAudioChannel(), pkt->getAudioSampleBit()); } - if(_audioRtmpDecoder){ - _audioRtmpDecoder->inputRtmp(pkt, false); + if(_audio_rtmp_decoder){ + _audio_rtmp_decoder->inputRtmp(pkt, false); return false; } return false; @@ -101,12 +101,12 @@ void RtmpDemuxer::makeVideoTrack(const AMFValue &videoCodec) { _videoTrack = dynamic_pointer_cast(Factory::getVideoTrackByAmf(videoCodec)); if (_videoTrack) { //生成rtmpCodec对象以便解码rtmp - _videoRtmpDecoder = Factory::getRtmpCodecByTrack(_videoTrack, false); - if (_videoRtmpDecoder) { + _video_rtmp_decoder = Factory::getRtmpCodecByTrack(_videoTrack, false); + if (_video_rtmp_decoder) { //设置rtmp解码器代理,生成的frame写入该Track - _videoRtmpDecoder->addDelegate(_videoTrack); + _video_rtmp_decoder->addDelegate(_videoTrack); onAddTrack(_videoTrack); - _tryedGetVideoTrack = true; + _try_get_video_track = true; } else { //找不到相应的rtmp解码器,该track无效 _videoTrack.reset(); @@ -119,12 +119,12 @@ void RtmpDemuxer::makeAudioTrack(const AMFValue &audioCodec,int sample_rate, int _audioTrack = dynamic_pointer_cast(Factory::getAudioTrackByAmf(audioCodec, sample_rate, channels, sample_bit)); if (_audioTrack) { //生成rtmpCodec对象以便解码rtmp - _audioRtmpDecoder = Factory::getRtmpCodecByTrack(_audioTrack, false); - if (_audioRtmpDecoder) { + _audio_rtmp_decoder = Factory::getRtmpCodecByTrack(_audioTrack, false); + if (_audio_rtmp_decoder) { //设置rtmp解码器代理,生成的frame写入该Track - _audioRtmpDecoder->addDelegate(_audioTrack); + _audio_rtmp_decoder->addDelegate(_audioTrack); onAddTrack(_audioTrack); - _tryedGetAudioTrack = true; + _try_get_audio_track = true; } else { //找不到相应的rtmp解码器,该track无效 _audioTrack.reset(); diff --git a/src/Rtmp/RtmpDemuxer.h b/src/Rtmp/RtmpDemuxer.h index 7c6bdfa7..90f98cff 100644 --- a/src/Rtmp/RtmpDemuxer.h +++ b/src/Rtmp/RtmpDemuxer.h @@ -28,7 +28,7 @@ public: typedef std::shared_ptr Ptr; RtmpDemuxer() = default; - virtual ~RtmpDemuxer() = default; + ~RtmpDemuxer() override = default; bool loadMetaData(const AMFValue &metadata); @@ -38,14 +38,16 @@ public: * @return true 代表是i帧 */ bool inputRtmp(const RtmpPacket::Ptr &pkt); + private: void makeVideoTrack(const AMFValue &val); void makeAudioTrack(const AMFValue &val, int sample_rate, int channels, int sample_bit); + private: - bool _tryedGetVideoTrack = false; - bool _tryedGetAudioTrack = false; - RtmpCodec::Ptr _audioRtmpDecoder; - RtmpCodec::Ptr _videoRtmpDecoder; + bool _try_get_video_track = false; + bool _try_get_audio_track = false; + RtmpCodec::Ptr _audio_rtmp_decoder; + RtmpCodec::Ptr _video_rtmp_decoder; }; } /* namespace mediakit */ diff --git a/src/Rtmp/RtmpMediaSource.h b/src/Rtmp/RtmpMediaSource.h index 5718c94f..03975fcc 100644 --- a/src/Rtmp/RtmpMediaSource.h +++ b/src/Rtmp/RtmpMediaSource.h @@ -122,17 +122,17 @@ public: */ void onWrite(const RtmpPacket::Ptr &pkt, bool key = true) override { lock_guard lock(_mtx); - if(pkt->typeId == MSG_VIDEO){ + if(pkt->type_id == MSG_VIDEO){ //有视频,那么启用GOP缓存 _have_video = true; } if (pkt->isCfgFrame()) { - _config_frame_map[pkt->typeId] = pkt; + _config_frame_map[pkt->type_id] = pkt; return; } //保存当前时间戳 - _track_stamps_map[pkt->typeId] = pkt->timeStamp; + _track_stamps_map[pkt->type_id] = pkt->time_stamp; if (!_ring) { weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); @@ -154,7 +154,7 @@ public: regist(); } } - PacketCache::inputPacket(pkt->typeId == MSG_VIDEO, pkt, key); + PacketCache::inputPacket(pkt->type_id == MSG_VIDEO, pkt, key); } /** diff --git a/src/Rtmp/RtmpMediaSourceImp.h b/src/Rtmp/RtmpMediaSourceImp.h index cae09c5b..53976a88 100644 --- a/src/Rtmp/RtmpMediaSourceImp.h +++ b/src/Rtmp/RtmpMediaSourceImp.h @@ -161,11 +161,12 @@ public: } private: - RtmpDemuxer::Ptr _demuxer; - MultiMediaSourceMuxer::Ptr _muxer; - AMFValue _metadata; bool _all_track_ready = false; bool _recreate_metadata = false; + AMFValue _metadata; + RtmpDemuxer::Ptr _demuxer; + MultiMediaSourceMuxer::Ptr _muxer; + }; } /* namespace mediakit */ diff --git a/src/Rtmp/RtmpMediaSourceMuxer.h b/src/Rtmp/RtmpMediaSourceMuxer.h index c1ab506c..6c8c44d7 100644 --- a/src/Rtmp/RtmpMediaSourceMuxer.h +++ b/src/Rtmp/RtmpMediaSourceMuxer.h @@ -24,34 +24,35 @@ public: const string &strApp, const string &strId, const TitleMeta::Ptr &title = nullptr) : RtmpMuxer(title){ - _mediaSouce = std::make_shared(vhost,strApp,strId); - getRtmpRing()->setDelegate(_mediaSouce); + _media_src = std::make_shared(vhost, strApp, strId); + getRtmpRing()->setDelegate(_media_src); } virtual ~RtmpMediaSourceMuxer(){} void setListener(const std::weak_ptr &listener){ - _mediaSouce->setListener(listener); + _media_src->setListener(listener); } void setTimeStamp(uint32_t stamp){ - _mediaSouce->setTimeStamp(stamp); + _media_src->setTimeStamp(stamp); } int readerCount() const{ - return _mediaSouce->readerCount(); + return _media_src->readerCount(); } void onAllTrackReady(){ makeConfigPacket(); - _mediaSouce->setMetaData(getMetadata()); + _media_src->setMetaData(getMetadata()); } // 设置TrackSource void setTrackSource(const std::weak_ptr &track_src){ - _mediaSouce->setTrackSource(track_src); + _media_src->setTrackSource(track_src); } + private: - RtmpMediaSource::Ptr _mediaSouce; + RtmpMediaSource::Ptr _media_src; }; diff --git a/src/Rtmp/RtmpMuxer.cpp b/src/Rtmp/RtmpMuxer.cpp index 50686d01..a44266c2 100644 --- a/src/Rtmp/RtmpMuxer.cpp +++ b/src/Rtmp/RtmpMuxer.cpp @@ -19,7 +19,7 @@ RtmpMuxer::RtmpMuxer(const TitleMeta::Ptr &title) { }else{ _metadata = title->getMetadata(); } - _rtmpRing = std::make_shared(); + _rtmp_ring = std::make_shared(); } void RtmpMuxer::addTrack(const Track::Ptr &track) { @@ -31,7 +31,7 @@ void RtmpMuxer::addTrack(const Track::Ptr &track) { } //设置rtmp输出环形缓存 - encoder->setRtmpRing(_rtmpRing); + encoder->setRtmpRing(_rtmp_ring); //添加metadata Metadata::addTrack(_metadata,track); @@ -57,7 +57,7 @@ const AMFValue &RtmpMuxer::getMetadata() const { } RtmpRing::RingType::Ptr RtmpMuxer::getRtmpRing() const { - return _rtmpRing; + return _rtmp_ring; } void RtmpMuxer::resetTracks() { diff --git a/src/Rtmp/RtmpMuxer.h b/src/Rtmp/RtmpMuxer.h index ad6f27e6..6533e9a4 100644 --- a/src/Rtmp/RtmpMuxer.h +++ b/src/Rtmp/RtmpMuxer.h @@ -61,7 +61,7 @@ public: */ void makeConfigPacket(); private: - RtmpRing::RingType::Ptr _rtmpRing; + RtmpRing::RingType::Ptr _rtmp_ring; AMFValue _metadata; RtmpCodec::Ptr _encoder[TrackMax]; }; diff --git a/src/Rtmp/RtmpPlayer.cpp b/src/Rtmp/RtmpPlayer.cpp index 09556454..d00c8e7a 100644 --- a/src/Rtmp/RtmpPlayer.cpp +++ b/src/Rtmp/RtmpPlayer.cpp @@ -18,8 +18,7 @@ using namespace mediakit::Client; namespace mediakit { -RtmpPlayer::RtmpPlayer(const EventPoller::Ptr &poller) : TcpClient(poller) { -} +RtmpPlayer::RtmpPlayer(const EventPoller::Ptr &poller) : TcpClient(poller) {} RtmpPlayer::~RtmpPlayer() { DebugL << endl; @@ -29,97 +28,82 @@ void RtmpPlayer::teardown() { if (alive()) { shutdown(SockException(Err_shutdown,"teardown")); } - _strApp.clear(); - _strStream.clear(); - _strTcUrl.clear(); - _pBeatTimer.reset(); - _pPlayTimer.reset(); - _pMediaTimer.reset(); - _iSeekTo = 0; + _app.clear(); + _stream_id.clear(); + _tc_url.clear(); + _beat_timer.reset(); + _play_timer.reset(); + _rtmp_recv_timer.reset(); + _seek_ms = 0; RtmpProtocol::reset(); - CLEAR_ARR(_aiFistStamp); - CLEAR_ARR(_aiNowStamp); + CLEAR_ARR(_fist_stamp); + CLEAR_ARR(_now_stamp); - lock_guard lck(_mtxOnResultCB); - _mapOnResultCB.clear(); - lock_guard lck2(_mtxOnStatusCB); - _dqOnStatusCB.clear(); + lock_guard lck(_mtx_on_result); + _map_on_result.clear(); + lock_guard lck2(_mtx_on_status); + _deque_on_status.clear(); } void RtmpPlayer::play(const string &strUrl) { teardown(); - string strHost = FindField(strUrl.data(), "://", "/"); - _strApp = FindField(strUrl.data(), (strHost + "/").data(), "/"); - _strStream = FindField(strUrl.data(), (strHost + "/" + _strApp + "/").data(), NULL); - _strTcUrl = string("rtmp://") + strHost + "/" + _strApp; + string host_url = FindField(strUrl.data(), "://", "/"); + _app = FindField(strUrl.data(), (host_url + "/").data(), "/"); + _stream_id = FindField(strUrl.data(), (host_url + "/" + _app + "/").data(), NULL); + _tc_url = string("rtmp://") + host_url + "/" + _app; - if (!_strApp.size() || !_strStream.size()) { - onPlayResult_l(SockException(Err_other,"rtmp url非法"),false); + if (!_app.size() || !_stream_id.size()) { + onPlayResult_l(SockException(Err_other, "rtmp url非法"), false); return; } - DebugL << strHost << " " << _strApp << " " << _strStream; + DebugL << host_url << " " << _app << " " << _stream_id; - auto iPort = atoi(FindField(strHost.data(), ":", NULL).data()); + auto iPort = atoi(FindField(host_url.data(), ":", NULL).data()); if (iPort <= 0) { //rtmp 默认端口1935 iPort = 1935; } else { //服务器域名 - strHost = FindField(strHost.data(), NULL, ":"); + host_url = FindField(host_url.data(), NULL, ":"); } - if(!(*this)[kNetAdapter].empty()){ + if (!(*this)[kNetAdapter].empty()) { setNetAdapter((*this)[kNetAdapter]); } - weak_ptr weakSelf= dynamic_pointer_cast(shared_from_this()); - float playTimeOutSec = (*this)[kTimeoutMS].as() / 1000.0; - _pPlayTimer.reset( new Timer(playTimeOutSec, [weakSelf]() { - auto strongSelf=weakSelf.lock(); - if(!strongSelf) { + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + float play_timeout_sec = (*this)[kTimeoutMS].as() / 1000.0; + _play_timer.reset(new Timer(play_timeout_sec, [weak_self]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { return false; } - strongSelf->onPlayResult_l(SockException(Err_timeout,"play rtmp timeout"),false); + strong_self->onPlayResult_l(SockException(Err_timeout, "play rtmp timeout"), false); return false; - },getPoller())); + }, getPoller())); _metadata_got = false; - startConnect(strHost, iPort , playTimeOutSec); + startConnect(host_url, iPort, play_timeout_sec); } + void RtmpPlayer::onErr(const SockException &ex){ //定时器_pPlayTimer为空后表明握手结束了 - onPlayResult_l(ex, !_pPlayTimer); + onPlayResult_l(ex, !_play_timer); } -void RtmpPlayer::onPlayResult_l(const SockException &ex , bool handshakeCompleted) { - WarnL << ex.getErrCode() << " " << ex.what(); - - if(!ex){ - //播放成功,恢复rtmp接收超时定时器 - _mediaTicker.resetTime(); - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - int timeoutMS = (*this)[kMediaTimeoutMS].as(); - //创建rtmp数据接收超时检测定时器 - _pMediaTimer.reset( new Timer(timeoutMS / 2000.0, [weakSelf,timeoutMS]() { - auto strongSelf=weakSelf.lock(); - if(!strongSelf) { - return false; - } - if(strongSelf->_mediaTicker.elapsedTime()> timeoutMS) { - //接收rtmp媒体数据超时 - strongSelf->onPlayResult_l(SockException(Err_timeout,"receive rtmp timeout"),true); - return false; - } - return true; - },getPoller())); +void RtmpPlayer::onPlayResult_l(const SockException &ex, bool handshakeCompleted) { + if (ex.getErrCode() == Err_shutdown) { + //主动shutdown的,不触发回调 + return; } + WarnL << ex.getErrCode() << " " << ex.what(); if (!handshakeCompleted) { //开始播放阶段 - _pPlayTimer.reset(); - onPlayResult(ex); + _play_timer.reset(); //是否为性能测试模式 _benchmark_mode = (*this)[Client::kBenchmarkMode].as(); + onPlayResult(ex); } else if (ex) { //播放成功后异常断开回调 onShutdown(ex); @@ -128,36 +112,58 @@ void RtmpPlayer::onPlayResult_l(const SockException &ex , bool handshakeComplete onResume(); } - if(ex){ + if (!ex) { + //播放成功,恢复rtmp接收超时定时器 + _rtmp_recv_ticker.resetTime(); + int timeout_ms = (*this)[kMediaTimeoutMS].as(); + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + auto lam = [weakSelf, timeout_ms]() { + auto strongSelf = weakSelf.lock(); + if (!strongSelf) { + return false; + } + if (strongSelf->_rtmp_recv_ticker.elapsedTime() > timeout_ms) { + //接收rtmp媒体数据超时 + SockException ex(Err_timeout, "receive rtmp timeout"); + strongSelf->onPlayResult_l(ex, true); + return false; + } + return true; + }; + //创建rtmp数据接收超时检测定时器 + _rtmp_recv_timer = std::make_shared(timeout_ms / 2000.0, lam, getPoller()); + } else { teardown(); } } + void RtmpPlayer::onConnect(const SockException &err){ - if(err.getErrCode() != Err_success) { + if (err.getErrCode() != Err_success) { onPlayResult_l(err, false); return; } - weak_ptr weakSelf= dynamic_pointer_cast(shared_from_this()); - startClientSession([weakSelf](){ - auto strongSelf=weakSelf.lock(); - if(!strongSelf) { + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + startClientSession([weakSelf]() { + auto strongSelf = weakSelf.lock(); + if (!strongSelf) { return; } strongSelf->send_connect(); }); } + void RtmpPlayer::onRecv(const Buffer::Ptr &pBuf){ try { - if(_benchmark_mode && !_pPlayTimer){ + if (_benchmark_mode && !_play_timer) { //在性能测试模式下,如果rtmp握手完毕后,不再解析rtmp包 - _mediaTicker.resetTime(); + _rtmp_recv_ticker.resetTime(); return; } onParseRtmp(pBuf->data(), pBuf->size()); } catch (exception &e) { SockException ex(Err_other, e.what()); //定时器_pPlayTimer为空后表明握手结束了 - onPlayResult_l(ex, !_pPlayTimer); + onPlayResult_l(ex, !_play_timer); } } @@ -167,8 +173,8 @@ void RtmpPlayer::pause(bool bPause) { inline void RtmpPlayer::send_connect() { AMFValue obj(AMF_OBJECT); - obj.set("app", _strApp); - obj.set("tcUrl", _strTcUrl); + obj.set("app", _app); + obj.set("tcUrl", _tc_url); //未使用代理 obj.set("fpad", false); //参考librtmp,什么作用? @@ -176,18 +182,18 @@ inline void RtmpPlayer::send_connect() { //SUPPORT_VID_CLIENT_SEEK 支持seek obj.set("videoFunction", 1); //只支持aac - obj.set("audioCodecs", (double)(0x0400)); + obj.set("audioCodecs", (double) (0x0400)); //只支持H264 - obj.set("videoCodecs", (double)(0x0080)); + obj.set("videoCodecs", (double) (0x0080)); sendInvoke("connect", obj); - addOnResultCB([this](AMFDecoder &dec){ + addOnResultCB([this](AMFDecoder &dec) { //TraceL << "connect result"; dec.load(); auto val = dec.load(); auto level = val["level"].as_string(); auto code = val["code"].as_string(); - if(level != "status"){ - throw std::runtime_error(StrPrinter <<"connect 失败:" << level << " " << code << endl); + if (level != "status") { + throw std::runtime_error(StrPrinter << "connect 失败:" << level << " " << code << endl); } send_createStream(); }); @@ -196,24 +202,24 @@ inline void RtmpPlayer::send_connect() { inline void RtmpPlayer::send_createStream() { AMFValue obj(AMF_NULL); sendInvoke("createStream", obj); - addOnResultCB([this](AMFDecoder &dec){ + addOnResultCB([this](AMFDecoder &dec) { //TraceL << "createStream result"; dec.load(); - _ui32StreamId = dec.load(); + _stream_index = dec.load(); send_play(); }); } inline void RtmpPlayer::send_play() { AMFEncoder enc; - enc << "play" << ++_iReqID << nullptr << _strStream << (double)_ui32StreamId; + enc << "play" << ++_send_req_id << nullptr << _stream_id << (double) _stream_index; sendRequest(MSG_CMD, enc.data()); - auto fun = [this](AMFValue &val){ + auto fun = [this](AMFValue &val) { //TraceL << "play onStatus"; auto level = val["level"].as_string(); auto code = val["code"].as_string(); - if(level != "status"){ - throw std::runtime_error(StrPrinter <<"play 失败:" << level << " " << code << endl); + if (level != "status") { + throw std::runtime_error(StrPrinter << "play 失败:" << level << " " << code << endl); } }; addOnStatusCB(fun); @@ -222,76 +228,77 @@ inline void RtmpPlayer::send_play() { inline void RtmpPlayer::send_pause(bool bPause) { AMFEncoder enc; - enc << "pause" << ++_iReqID << nullptr << bPause; + enc << "pause" << ++_send_req_id << nullptr << bPause; sendRequest(MSG_CMD, enc.data()); - auto fun = [this,bPause](AMFValue &val){ + auto fun = [this, bPause](AMFValue &val) { //TraceL << "pause onStatus"; auto level = val["level"].as_string(); auto code = val["code"].as_string(); - if(level != "status") { - if(!bPause){ - throw std::runtime_error(StrPrinter <<"pause 恢复播放失败:" << level << " " << code << endl); + if (level != "status") { + if (!bPause) { + throw std::runtime_error(StrPrinter << "pause 恢复播放失败:" << level << " " << code << endl); } - }else{ - _bPaused = bPause; - if(!bPause){ + } else { + _paused = bPause; + if (!bPause) { onPlayResult_l(SockException(Err_success, "resum rtmp success"), true); - }else{ + } else { //暂停播放 - _pMediaTimer.reset(); + _rtmp_recv_timer.reset(); } } }; addOnStatusCB(fun); - _pBeatTimer.reset(); - if(bPause){ + _beat_timer.reset(); + if (bPause) { weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - _pBeatTimer.reset(new Timer((*this)[kBeatIntervalMS].as() / 1000.0,[weakSelf](){ + _beat_timer.reset(new Timer((*this)[kBeatIntervalMS].as() / 1000.0, [weakSelf]() { auto strongSelf = weakSelf.lock(); - if (!strongSelf){ + if (!strongSelf) { return false; } uint32_t timeStamp = ::time(NULL); strongSelf->sendUserControl(CONTROL_PING_REQUEST, timeStamp); return true; - },getPoller())); + }, getPoller())); } } void RtmpPlayer::onCmd_result(AMFDecoder &dec){ - auto iReqId = dec.load(); - lock_guard lck(_mtxOnResultCB); - auto it = _mapOnResultCB.find(iReqId); - if(it != _mapOnResultCB.end()){ + auto req_id = dec.load(); + lock_guard lck(_mtx_on_result); + auto it = _map_on_result.find(req_id); + if (it != _map_on_result.end()) { it->second(dec); - _mapOnResultCB.erase(it); - }else{ + _map_on_result.erase(it); + } else { WarnL << "unhandled _result"; } } + void RtmpPlayer::onCmd_onStatus(AMFDecoder &dec) { AMFValue val; - while(true){ + while (true) { val = dec.load(); - if(val.type() == AMF_OBJECT){ + if (val.type() == AMF_OBJECT) { break; } } - if(val.type() != AMF_OBJECT){ + if (val.type() != AMF_OBJECT) { throw std::runtime_error("onStatus:the result object was not found"); } - - lock_guard lck(_mtxOnStatusCB); - if(_dqOnStatusCB.size()){ - _dqOnStatusCB.front()(val); - _dqOnStatusCB.pop_front(); - }else{ + + lock_guard lck(_mtx_on_status); + if (_deque_on_status.size()) { + _deque_on_status.front()(val); + _deque_on_status.pop_front(); + } else { auto level = val["level"]; auto code = val["code"].as_string(); - if(level.type() == AMF_STRING){ - if(level.as_string() != "status"){ - throw std::runtime_error(StrPrinter <<"onStatus 失败:" << level.as_string() << " " << code << endl); + if (level.type() == AMF_STRING) { + if (level.as_string() != "status") { + throw std::runtime_error(StrPrinter << "onStatus 失败:" << level.as_string() << " " << code << endl); } } //WarnL << "unhandled onStatus:" << code; @@ -301,31 +308,31 @@ void RtmpPlayer::onCmd_onStatus(AMFDecoder &dec) { void RtmpPlayer::onCmd_onMetaData(AMFDecoder &dec) { //TraceL; auto val = dec.load(); - if(!onCheckMeta(val)){ + if (!onCheckMeta(val)) { throw std::runtime_error("onCheckMeta failed"); } _metadata_got = true; } -void RtmpPlayer::onStreamDry(uint32_t ui32StreamId) { - //TraceL << ui32StreamId; - onPlayResult_l(SockException(Err_other,"rtmp stream dry"), true); +void RtmpPlayer::onStreamDry(uint32_t stream_id) { + //TraceL << stream_id; + onPlayResult_l(SockException(Err_other, "rtmp stream dry"), true); } void RtmpPlayer::onMediaData_l(const RtmpPacket::Ptr &packet) { - _mediaTicker.resetTime(); - if(!_pPlayTimer){ + _rtmp_recv_ticker.resetTime(); + if (!_play_timer) { //已经触发了onPlayResult事件,直接触发onMediaData事件 onMediaData(packet); return; } - if(packet->isCfgFrame()){ + if (packet->isCfgFrame()) { //输入配置帧以便初始化完成各个track onMediaData(packet); - }else{ + } else { //先触发onPlayResult事件,这个时候解码器才能初始化完毕 - onPlayResult_l(SockException(Err_success,"play rtmp success"), false); + onPlayResult_l(SockException(Err_success, "play rtmp success"), false); //触发onPlayResult事件后,再把帧数据输入到解码器 onMediaData(packet); } @@ -336,77 +343,76 @@ void RtmpPlayer::onRtmpChunk(RtmpPacket &chunkData) { typedef void (RtmpPlayer::*rtmp_func_ptr)(AMFDecoder &dec); static unordered_map s_func_map; static onceToken token([]() { - s_func_map.emplace("_error",&RtmpPlayer::onCmd_result); - s_func_map.emplace("_result",&RtmpPlayer::onCmd_result); - s_func_map.emplace("onStatus",&RtmpPlayer::onCmd_onStatus); - s_func_map.emplace("onMetaData",&RtmpPlayer::onCmd_onMetaData); - }, []() {}); + s_func_map.emplace("_error", &RtmpPlayer::onCmd_result); + s_func_map.emplace("_result", &RtmpPlayer::onCmd_result); + s_func_map.emplace("onStatus", &RtmpPlayer::onCmd_onStatus); + s_func_map.emplace("onMetaData", &RtmpPlayer::onCmd_onMetaData); + }); - switch (chunkData.typeId) { + switch (chunkData.type_id) { case MSG_CMD: case MSG_CMD3: case MSG_DATA: case MSG_DATA3: { - AMFDecoder dec(chunkData.strBuf, 0); + AMFDecoder dec(chunkData.buffer, 0); std::string type = dec.load(); auto it = s_func_map.find(type); - if(it != s_func_map.end()){ + if (it != s_func_map.end()) { auto fun = it->second; (this->*fun)(dec); - }else{ + } else { WarnL << "can not support cmd:" << type; } - } break; + } + case MSG_AUDIO: case MSG_VIDEO: { - auto idx = chunkData.typeId%2; - if (_aNowStampTicker[idx].elapsedTime() > 500) { + auto idx = chunkData.type_id % 2; + if (_now_stamp_ticker[idx].elapsedTime() > 500) { //计算播放进度时间轴用 - _aiNowStamp[idx] = chunkData.timeStamp; + _now_stamp[idx] = chunkData.time_stamp; } - if(!_metadata_got){ - if(!onCheckMeta(TitleMeta().getMetadata())){ + if (!_metadata_got) { + if (!onCheckMeta(TitleMeta().getMetadata())) { throw std::runtime_error("onCheckMeta failed"); } _metadata_got = true; } onMediaData_l(std::make_shared(std::move(chunkData))); - } - break; - default: - //WarnL << "unhandled message:" << (int) chunkData.typeId << hexdump(chunkData.strBuf.data(), chunkData.strBuf.size()); break; } + + default: break; + } } uint32_t RtmpPlayer::getProgressMilliSecond() const{ - uint32_t iTime[2] = {0,0}; - for(auto i = 0 ;i < 2 ;i++){ - iTime[i] = _aiNowStamp[i] - _aiFistStamp[i]; + uint32_t stamp[2] = {0, 0}; + for (auto i = 0; i < 2; i++) { + stamp[i] = _now_stamp[i] - _fist_stamp[i]; } - return _iSeekTo + MAX(iTime[0],iTime[1]); + return _seek_ms + MAX(stamp[0], stamp[1]); } + void RtmpPlayer::seekToMilliSecond(uint32_t seekMS){ - if (_bPaused) { + if (_paused) { pause(false); } AMFEncoder enc; - enc << "seek" << ++_iReqID << nullptr << seekMS * 1.0; + enc << "seek" << ++_send_req_id << nullptr << seekMS * 1.0; sendRequest(MSG_CMD, enc.data()); - addOnStatusCB([this,seekMS](AMFValue &val) { + addOnStatusCB([this, seekMS](AMFValue &val) { //TraceL << "seek result"; - _aNowStampTicker[0].resetTime(); - _aNowStampTicker[1].resetTime(); + _now_stamp_ticker[0].resetTime(); + _now_stamp_ticker[1].resetTime(); int iTimeInc = seekMS - getProgressMilliSecond(); - for(auto i = 0 ;i < 2 ;i++){ - _aiFistStamp[i] = _aiNowStamp[i] + iTimeInc; - _aiNowStamp[i] = _aiFistStamp[i]; + for (auto i = 0; i < 2; i++) { + _fist_stamp[i] = _now_stamp[i] + iTimeInc; + _now_stamp[i] = _fist_stamp[i]; } - _iSeekTo = seekMS; + _seek_ms = seekMS; }); - } } /* namespace mediakit */ - diff --git a/src/Rtmp/RtmpPlayer.h b/src/Rtmp/RtmpPlayer.h index 9b7f1b66..51a42228 100644 --- a/src/Rtmp/RtmpPlayer.h +++ b/src/Rtmp/RtmpPlayer.h @@ -8,8 +8,8 @@ * may be found in the AUTHORS file in the root of the source tree. */ -#ifndef SRC_RTMP_RtmpPlayer2_H_ -#define SRC_RTMP_RtmpPlayer2_H_ +#ifndef SRC_RTMP_RtmpPlayer_H_ +#define SRC_RTMP_RtmpPlayer_H_ #include #include @@ -28,21 +28,24 @@ using namespace toolkit; using namespace mediakit::Client; namespace mediakit { + //实现了rtmp播放器协议部分的功能,及数据接收功能 -class RtmpPlayer:public PlayerBase, public TcpClient, public RtmpProtocol{ +class RtmpPlayer : public PlayerBase, public TcpClient, public RtmpProtocol { public: typedef std::shared_ptr Ptr; RtmpPlayer(const EventPoller::Ptr &poller); - virtual ~RtmpPlayer(); + ~RtmpPlayer() override; void play(const string &strUrl) override; void pause(bool bPause) override; void teardown() override; + protected: virtual bool onCheckMeta(const AMFValue &val) =0; virtual void onMediaData(const RtmpPacket::Ptr &chunkData) =0; uint32_t getProgressMilliSecond() const; void seekToMilliSecond(uint32_t ms); + protected: void onMediaData_l(const RtmpPacket::Ptr &chunkData); //在获取config帧后才触发onPlayResult_l(而不是收到play命令回复),所以此时所有track都初始化完毕了 @@ -59,15 +62,15 @@ protected: send(buffer); } - template - inline void addOnResultCB(const FUN &fun) { - lock_guard lck(_mtxOnResultCB); - _mapOnResultCB.emplace(_iReqID, fun); + template + inline void addOnResultCB(const FUNC &func) { + lock_guard lck(_mtx_on_result); + _map_on_result.emplace(_send_req_id, func); } - template - inline void addOnStatusCB(const FUN &fun) { - lock_guard lck(_mtxOnStatusCB); - _dqOnStatusCB.emplace_back(fun); + template + inline void addOnStatusCB(const FUNC &func) { + lock_guard lck(_mtx_on_status); + _deque_on_status.emplace_back(func); } void onCmd_result(AMFDecoder &dec); @@ -78,34 +81,37 @@ protected: inline void send_createStream(); inline void send_play(); inline void send_pause(bool bPause); + private: - string _strApp; - string _strStream; - string _strTcUrl; - bool _bPaused = false; + string _app; + string _stream_id; + string _tc_url; - unordered_map > _mapOnResultCB; - recursive_mutex _mtxOnResultCB; - deque > _dqOnStatusCB; - recursive_mutex _mtxOnStatusCB; - - //超时功能实现 - Ticker _mediaTicker; - std::shared_ptr _pMediaTimer; - std::shared_ptr _pPlayTimer; - //心跳定时器 - std::shared_ptr _pBeatTimer; - - //播放进度控制 - uint32_t _iSeekTo = 0; - uint32_t _aiFistStamp[2] = { 0, 0 }; - uint32_t _aiNowStamp[2] = { 0, 0 }; - Ticker _aNowStampTicker[2]; + bool _paused = false; bool _metadata_got = false; //是否为性能测试模式 bool _benchmark_mode = false; + + //播放进度控制 + uint32_t _seek_ms = 0; + uint32_t _fist_stamp[2] = {0, 0}; + uint32_t _now_stamp[2] = {0, 0}; + Ticker _now_stamp_ticker[2]; + + recursive_mutex _mtx_on_result; + recursive_mutex _mtx_on_status; + deque > _deque_on_status; + unordered_map > _map_on_result; + + //rtmp接收超时计时器 + Ticker _rtmp_recv_ticker; + //心跳发送定时器 + std::shared_ptr _beat_timer; + //播放超时定时器 + std::shared_ptr _play_timer; + //rtmp接收超时定时器 + std::shared_ptr _rtmp_recv_timer; }; } /* namespace mediakit */ - -#endif /* SRC_RTMP_RtmpPlayer2_H_ */ +#endif /* SRC_RTMP_RtmpPlayer_H_ */ diff --git a/src/Rtmp/RtmpPlayerImp.h b/src/Rtmp/RtmpPlayerImp.h index f7678dbe..4ac573c6 100644 --- a/src/Rtmp/RtmpPlayerImp.h +++ b/src/Rtmp/RtmpPlayerImp.h @@ -27,51 +27,59 @@ namespace mediakit { class RtmpPlayerImp: public PlayerImp { public: typedef std::shared_ptr Ptr; - RtmpPlayerImp(const EventPoller::Ptr &poller) : PlayerImp(poller){}; - virtual ~RtmpPlayerImp(){ - DebugL< 0){ + + RtmpPlayerImp(const EventPoller::Ptr &poller) : PlayerImp(poller) {}; + + ~RtmpPlayerImp() override { + DebugL << endl; + } + + float getProgress() const override { + if (getDuration() > 0) { return getProgressMilliSecond() / (getDuration() * 1000); } return PlayerBase::getProgress(); - }; - void seekTo(float fProgress) override{ - fProgress = MAX(float(0),MIN(fProgress,float(1.0))); - seekToMilliSecond(fProgress * getDuration() * 1000); - }; - void play(const string &strUrl) override { - PlayerImp::play(strUrl); } + + void seekTo(float fProgress) override { + fProgress = MAX(float(0), MIN(fProgress, float(1.0))); + seekToMilliSecond(fProgress * getDuration() * 1000); + } + + void play(const string &strUrl) override { + PlayerImp::play(strUrl); + } + private: //派生类回调函数 bool onCheckMeta(const AMFValue &val) override { - _pRtmpMediaSrc = dynamic_pointer_cast(_pMediaSrc); - if(_pRtmpMediaSrc){ - _pRtmpMediaSrc->setMetaData(val); + _rtmp_src = dynamic_pointer_cast(_pMediaSrc); + if (_rtmp_src) { + _rtmp_src->setMetaData(val); _set_meta_data = true; } _delegate.reset(new RtmpDemuxer); _delegate->loadMetaData(val); return true; } + void onMediaData(const RtmpPacket::Ptr &chunkData) override { - if(_pRtmpMediaSrc){ - if(!_set_meta_data && !chunkData->isCfgFrame()){ + if (_rtmp_src) { + if (!_set_meta_data && !chunkData->isCfgFrame()) { _set_meta_data = true; - _pRtmpMediaSrc->setMetaData(TitleMeta().getMetadata()); + _rtmp_src->setMetaData(TitleMeta().getMetadata()); } - _pRtmpMediaSrc->onWrite(chunkData); + _rtmp_src->onWrite(chunkData); } - if(!_delegate){ + if (!_delegate) { //这个流没有metadata _delegate.reset(new RtmpDemuxer()); } _delegate->inputRtmp(chunkData); } + private: - RtmpMediaSource::Ptr _pRtmpMediaSrc; + RtmpMediaSource::Ptr _rtmp_src; bool _set_meta_data = false; }; diff --git a/src/Rtmp/RtmpProtocol.cpp b/src/Rtmp/RtmpProtocol.cpp index 6bd803cc..d1ed4d5a 100644 --- a/src/Rtmp/RtmpProtocol.cpp +++ b/src/Rtmp/RtmpProtocol.cpp @@ -20,9 +20,17 @@ using namespace toolkit; #include #include -static string openssl_HMACsha256(const void *key,unsigned int key_len, - const void *data,unsigned int data_len){ - std::shared_ptr out(new char[32],[](char *ptr){delete [] ptr;}); +#define C1_DIGEST_SIZE 32 +#define C1_KEY_SIZE 128 +#define C1_SCHEMA_SIZE 764 +#define C1_HANDSHARK_SIZE (RANDOM_LEN + 8) +#define C1_FPKEY_SIZE 30 +#define S1_FMS_KEY_SIZE 36 +#define S2_FMS_KEY_SIZE 68 +#define C1_OFFSET_SIZE 4 + +static string openssl_HMACsha256(const void *key, unsigned int key_len, const void *data,unsigned int data_len){ + std::shared_ptr out(new char[32], [](char *ptr) { delete[] ptr; }); unsigned int out_len; #if defined(OPENSSL_VERSION_NUMBER) && (OPENSSL_VERSION_NUMBER > 0x10100000L) @@ -46,273 +54,271 @@ static string openssl_HMACsha256(const void *key,unsigned int key_len, } #endif //ENABLE_OPENSSL - -#define C1_DIGEST_SIZE 32 -#define C1_KEY_SIZE 128 -#define C1_SCHEMA_SIZE 764 -#define C1_HANDSHARK_SIZE (RANDOM_LEN + 8) -#define C1_FPKEY_SIZE 30 -#define S1_FMS_KEY_SIZE 36 -#define S2_FMS_KEY_SIZE 68 -#define C1_OFFSET_SIZE 4 - namespace mediakit { RtmpProtocol::RtmpProtocol() { - _nextHandle = [this](){ + _next_step_func = [this]() { handle_C0C1(); }; } + RtmpProtocol::~RtmpProtocol() { reset(); } + void RtmpProtocol::reset() { ////////////ChunkSize//////////// - _iChunkLenIn = DEFAULT_CHUNK_LEN; - _iChunkLenOut = DEFAULT_CHUNK_LEN; + _chunk_size_in = DEFAULT_CHUNK_LEN; + _chunk_size_out = DEFAULT_CHUNK_LEN; ////////////Acknowledgement//////////// - _ui32ByteSent = 0; - _ui32LastSent = 0; - _ui32WinSize = 0; + _bytes_sent = 0; + _bytes_sent_last = 0; + _windows_size = 0; ///////////PeerBandwidth/////////// - _ui32Bandwidth = 2500000; - _ui8LimitType = 2; + _bandwidth = 2500000; + _band_limit_type = 2; ////////////Chunk//////////// - _mapChunkData.clear(); - _iNowStreamID = 0; - _iNowChunkID = 0; + _map_chunk_data.clear(); + _now_stream_index = 0; + _now_chunk_id = 0; //////////Invoke Request////////// - _iReqID = 0; + _send_req_id = 0; //////////Rtmp parser////////// - _strRcvBuf.clear(); - _ui32StreamId = STREAM_CONTROL; - _nextHandle = [this]() { + _recv_data_buf.clear(); + _stream_index = STREAM_CONTROL; + _next_step_func = [this]() { handle_C0C1(); }; } -void RtmpProtocol::sendAcknowledgement(uint32_t ui32Size) { - std::string control; - uint32_t stream = htonl(ui32Size); - control.append((char *) &stream, 4); - sendRequest(MSG_ACK, control); +void RtmpProtocol::sendAcknowledgement(uint32_t size) { + size = htonl(size); + std::string acknowledgement((char *) &size, 4); + sendRequest(MSG_ACK, acknowledgement); } -void RtmpProtocol::sendAcknowledgementSize(uint32_t ui32Size) { - uint32_t windowSize = htonl(ui32Size); - std::string set_windowSize((char *) &windowSize, 4); +void RtmpProtocol::sendAcknowledgementSize(uint32_t size) { + size = htonl(size); + std::string set_windowSize((char *) &size, 4); sendRequest(MSG_WIN_SIZE, set_windowSize); } -void RtmpProtocol::sendPeerBandwidth(uint32_t ui32Size) { - uint32_t peerBandwidth = htonl(ui32Size); - std::string set_peerBandwidth((char *) &peerBandwidth, 4); +void RtmpProtocol::sendPeerBandwidth(uint32_t size) { + size = htonl(size); + std::string set_peerBandwidth((char *) &size, 4); set_peerBandwidth.push_back((char) 0x02); sendRequest(MSG_SET_PEER_BW, set_peerBandwidth); } -void RtmpProtocol::sendChunkSize(uint32_t ui32Size) { - uint32_t len = htonl(ui32Size); +void RtmpProtocol::sendChunkSize(uint32_t size) { + uint32_t len = htonl(size); std::string set_chunk((char *) &len, 4); sendRequest(MSG_SET_CHUNK, set_chunk); - _iChunkLenOut = ui32Size; + _chunk_size_out = size; } -void RtmpProtocol::sendPingRequest(uint32_t ui32TimeStamp) { - sendUserControl(CONTROL_PING_REQUEST, ui32TimeStamp); +void RtmpProtocol::sendPingRequest(uint32_t stamp) { + sendUserControl(CONTROL_PING_REQUEST, stamp); } -void RtmpProtocol::sendPingResponse(uint32_t ui32TimeStamp) { - sendUserControl(CONTROL_PING_RESPONSE, ui32TimeStamp); +void RtmpProtocol::sendPingResponse(uint32_t time_stamp) { + sendUserControl(CONTROL_PING_RESPONSE, time_stamp); } -void RtmpProtocol::sendSetBufferLength(uint32_t ui32StreamId, - uint32_t ui32Length) { +void RtmpProtocol::sendSetBufferLength(uint32_t stream_index, uint32_t len) { std::string control; - ui32StreamId = htonl(ui32StreamId); - control.append((char *) &ui32StreamId, 4); - ui32Length = htonl(ui32Length); - control.append((char *) &ui32Length, 4); + stream_index = htonl(stream_index); + control.append((char *) &stream_index, 4); + + len = htonl(len); + control.append((char *) &len, 4); sendUserControl(CONTROL_SETBUFFER, control); } -void RtmpProtocol::sendUserControl(uint16_t ui16EventType, - uint32_t ui32EventData) { +void RtmpProtocol::sendUserControl(uint16_t event_type, uint32_t event_data) { std::string control; - uint16_t type = htons(ui16EventType); - control.append((char *) &type, 2); - uint32_t stream = htonl(ui32EventData); - control.append((char *) &stream, 4); + event_type = htons(event_type); + control.append((char *) &event_type, 2); + + event_data = htonl(event_data); + control.append((char *) &event_data, 4); sendRequest(MSG_USER_CONTROL, control); } -void RtmpProtocol::sendUserControl(uint16_t ui16EventType, - const string& strEventData) { +void RtmpProtocol::sendUserControl(uint16_t event_type, const string &event_data) { std::string control; - uint16_t type = htons(ui16EventType); - control.append((char *) &type, 2); - control.append(strEventData); + event_type = htons(event_type); + control.append((char *) &event_type, 2); + control.append(event_data); sendRequest(MSG_USER_CONTROL, control); } -void RtmpProtocol::sendResponse(int iType, const string& str) { - if(!_bDataStarted && (iType == MSG_DATA)){ - _bDataStarted = true; +void RtmpProtocol::sendResponse(int type, const string &str) { + if(!_data_started && (type == MSG_DATA)){ + _data_started = true; } - sendRtmp(iType, _iNowStreamID, str, 0, _bDataStarted ? CHUNK_CLIENT_REQUEST_AFTER : CHUNK_CLIENT_REQUEST_BEFORE); + sendRtmp(type, _now_stream_index, str, 0, _data_started ? CHUNK_CLIENT_REQUEST_AFTER : CHUNK_CLIENT_REQUEST_BEFORE); } -void RtmpProtocol::sendInvoke(const string& strCmd, const AMFValue& val) { +void RtmpProtocol::sendInvoke(const string &cmd, const AMFValue &val) { AMFEncoder enc; - enc << strCmd << ++_iReqID << val; + enc << cmd << ++_send_req_id << val; sendRequest(MSG_CMD, enc.data()); } -void RtmpProtocol::sendRequest(int iCmd, const string& str) { - sendRtmp(iCmd, _ui32StreamId, str, 0, CHUNK_SERVER_REQUEST); +void RtmpProtocol::sendRequest(int cmd, const string& str) { + sendRtmp(cmd, _stream_index, str, 0, CHUNK_SERVER_REQUEST); } class BufferPartial : public Buffer { public: - BufferPartial(const Buffer::Ptr &buffer,uint32_t offset,uint32_t size){ + BufferPartial(const Buffer::Ptr &buffer, uint32_t offset, uint32_t size){ _buffer = buffer; _data = buffer->data() + offset; _size = size; } - ~BufferPartial(){} + ~BufferPartial() override{} char *data() const override { return _data; } + uint32_t size() const override{ return _size; } + private: - Buffer::Ptr _buffer; char *_data; uint32_t _size; + Buffer::Ptr _buffer; }; -void RtmpProtocol::sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, - const std::string& strBuf, uint32_t ui32TimeStamp, int iChunkId) { - sendRtmp(ui8Type,ui32StreamId,std::make_shared(strBuf),ui32TimeStamp,iChunkId); +void RtmpProtocol::sendRtmp(uint8_t type, uint32_t stream_index, const std::string &buffer, uint32_t stamp, int chunk_id) { + sendRtmp(type, stream_index, std::make_shared(buffer), stamp, chunk_id); } -void RtmpProtocol::sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, - const Buffer::Ptr &buf, uint32_t ui32TimeStamp, int iChunkId){ - if (iChunkId < 2 || iChunkId > 63) { - auto strErr = StrPrinter << "不支持发送该类型的块流 ID:" << iChunkId << endl; +void RtmpProtocol::sendRtmp(uint8_t type, uint32_t stream_index, const Buffer::Ptr &buf, uint32_t stamp, int chunk_id){ + if (chunk_id < 2 || chunk_id > 63) { + auto strErr = StrPrinter << "不支持发送该类型的块流 ID:" << chunk_id << endl; throw std::runtime_error(strErr); } //是否有扩展时间戳 - bool bExtStamp = ui32TimeStamp >= 0xFFFFFF; + bool ext_stamp = stamp >= 0xFFFFFF; //rtmp头 - BufferRaw::Ptr bufferHeader = obtainBuffer(); - bufferHeader->setCapacity(sizeof(RtmpHeader)); - bufferHeader->setSize(sizeof(RtmpHeader)); + BufferRaw::Ptr buffer_header = obtainBuffer(); + buffer_header->setCapacity(sizeof(RtmpHeader)); + buffer_header->setSize(sizeof(RtmpHeader)); //对rtmp头赋值,如果使用整形赋值,在arm android上可能由于数据对齐导致总线错误的问题 - RtmpHeader *header = (RtmpHeader*) bufferHeader->data(); - header->flags = (iChunkId & 0x3f) | (0 << 6); - header->typeId = ui8Type; - set_be24(header->timeStamp, bExtStamp ? 0xFFFFFF : ui32TimeStamp); - set_be24(header->bodySize, buf->size()); - set_le32(header->streamId, ui32StreamId); + RtmpHeader *header = (RtmpHeader *) buffer_header->data(); + header->flags = (chunk_id & 0x3f) | (0 << 6); + header->type_id = type; + set_be24(header->time_stamp, ext_stamp ? 0xFFFFFF : stamp); + set_be24(header->body_size, buf->size()); + set_le32(header->stream_index, stream_index); //发送rtmp头 - onSendRawData(bufferHeader); + onSendRawData(buffer_header); //扩展时间戳字段 - BufferRaw::Ptr bufferExtStamp; - if (bExtStamp) { + BufferRaw::Ptr buffer_ext_stamp; + if (ext_stamp) { //生成扩展时间戳 - bufferExtStamp = obtainBuffer(); - bufferExtStamp->setCapacity(4); - bufferExtStamp->setSize(4); - set_be32(bufferExtStamp->data(), ui32TimeStamp); + buffer_ext_stamp = obtainBuffer(); + buffer_ext_stamp->setCapacity(4); + buffer_ext_stamp->setSize(4); + set_be32(buffer_ext_stamp->data(), stamp); } //生成一个字节的flag,标明是什么chunkId - BufferRaw::Ptr bufferFlags = obtainBuffer(); - bufferFlags->setCapacity(1); - bufferFlags->setSize(1); - bufferFlags->data()[0] = (iChunkId & 0x3f) | (3 << 6); - + BufferRaw::Ptr buffer_flags = obtainBuffer(); + buffer_flags->setCapacity(1); + buffer_flags->setSize(1); + buffer_flags->data()[0] = (chunk_id & 0x3f) | (3 << 6); + size_t offset = 0; uint32_t totalSize = sizeof(RtmpHeader); while (offset < buf->size()) { if (offset) { - onSendRawData(bufferFlags); + onSendRawData(buffer_flags); totalSize += 1; } - if (bExtStamp) { + if (ext_stamp) { //扩展时间戳 - onSendRawData(bufferExtStamp); + onSendRawData(buffer_ext_stamp); totalSize += 4; } - size_t chunk = min(_iChunkLenOut, buf->size() - offset); - onSendRawData(std::make_shared(buf,offset,chunk)); + size_t chunk = min(_chunk_size_out, buf->size() - offset); + onSendRawData(std::make_shared(buf, offset, chunk)); totalSize += chunk; offset += chunk; } - _ui32ByteSent += totalSize; - if (_ui32WinSize > 0 && _ui32ByteSent - _ui32LastSent >= _ui32WinSize) { - _ui32LastSent = _ui32ByteSent; - sendAcknowledgement(_ui32ByteSent); + _bytes_sent += totalSize; + if (_windows_size > 0 && _bytes_sent - _bytes_sent_last >= _windows_size) { + _bytes_sent_last = _bytes_sent; + sendAcknowledgement(_bytes_sent); } } +void RtmpProtocol::onParseRtmp(const char *data, int size) { + _recv_data_buf.append(data, size); + //移动拷贝提高性能 + function next_step_func(std::move(_next_step_func)); + //执行下一步 + next_step_func(); -void RtmpProtocol::onParseRtmp(const char *pcRawData, int iSize) { - _strRcvBuf.append(pcRawData, iSize); - auto cb = _nextHandle; - cb(); + if (!_next_step_func) { + //为设置下一步,恢复之 + next_step_func.swap(_next_step_func); + } } ////for client//// -void RtmpProtocol::startClientSession(const function &callBack) { +void RtmpProtocol::startClientSession(const function &func) { //发送 C0C1 char handshake_head = HANDSHAKE_PLAINTEXT; onSendRawData(obtainBuffer(&handshake_head, 1)); RtmpHandshake c1(0); onSendRawData(obtainBuffer((char *) (&c1), sizeof(c1))); - _nextHandle = [this,callBack]() { + _next_step_func = [this, func]() { //等待 S0+S1+S2 - handle_S0S1S2(callBack); + handle_S0S1S2(func); }; } -void RtmpProtocol::handle_S0S1S2(const function &callBack) { - if (_strRcvBuf.size() < 1 + 2 * C1_HANDSHARK_SIZE) { + +void RtmpProtocol::handle_S0S1S2(const function &func) { + if (_recv_data_buf.size() < 1 + 2 * C1_HANDSHARK_SIZE) { //数据不够 return; } - if (_strRcvBuf[0] != HANDSHAKE_PLAINTEXT) { + if (_recv_data_buf[0] != HANDSHAKE_PLAINTEXT) { throw std::runtime_error("only plaintext[0x03] handshake supported"); } //发送 C2 - const char *pcC2 = _strRcvBuf.data() + 1; + const char *pcC2 = _recv_data_buf.data() + 1; onSendRawData(obtainBuffer(pcC2, C1_HANDSHARK_SIZE)); - _strRcvBuf.erase(0, 1 + 2 * C1_HANDSHARK_SIZE); + _recv_data_buf.erase(0, 1 + 2 * C1_HANDSHARK_SIZE); //握手结束 - _nextHandle = [this]() { + _next_step_func = [this]() { //握手结束并且开始进入解析命令模式 handle_rtmp(); }; - callBack(); + func(); } + ////for server //// void RtmpProtocol::handle_C0C1() { - if (_strRcvBuf.size() < 1 + C1_HANDSHARK_SIZE) { + if (_recv_data_buf.size() < 1 + C1_HANDSHARK_SIZE) { //need more data! return; } - if (_strRcvBuf[0] != HANDSHAKE_PLAINTEXT) { + if (_recv_data_buf[0] != HANDSHAKE_PLAINTEXT) { throw std::runtime_error("only plaintext[0x03] handshake supported"); } - if(memcmp(_strRcvBuf.data() + 5,"\x00\x00\x00\x00",4) ==0 ){ + if (memcmp(_recv_data_buf.data() + 5, "\x00\x00\x00\x00", 4) == 0) { //simple handsharke handle_C1_simple(); - }else{ + } else { #ifdef ENABLE_OPENSSL //complex handsharke handle_C1_complex(); @@ -321,8 +327,9 @@ void RtmpProtocol::handle_C0C1() { handle_C1_simple(); #endif//ENABLE_OPENSSL } - _strRcvBuf.erase(0, 1 + C1_HANDSHARK_SIZE); + _recv_data_buf.erase(0, 1 + C1_HANDSHARK_SIZE); } + void RtmpProtocol::handle_C1_simple(){ //发送S0 char handshake_head = HANDSHAKE_PLAINTEXT; @@ -331,51 +338,52 @@ void RtmpProtocol::handle_C1_simple(){ RtmpHandshake s1(0); onSendRawData(obtainBuffer((char *) &s1, C1_HANDSHARK_SIZE)); //发送S2 - onSendRawData(obtainBuffer(_strRcvBuf.data() + 1, C1_HANDSHARK_SIZE)); + onSendRawData(obtainBuffer(_recv_data_buf.data() + 1, C1_HANDSHARK_SIZE)); //等待C2 - _nextHandle = [this]() { + _next_step_func = [this]() { handle_C2(); }; } + #ifdef ENABLE_OPENSSL void RtmpProtocol::handle_C1_complex(){ //参考自:http://blog.csdn.net/win_lin/article/details/13006803 //skip c0,time,version - const char *c1_start = _strRcvBuf.data() + 1; + const char *c1_start = _recv_data_buf.data() + 1; const char *schema_start = c1_start + 8; char *digest_start; - try{ + try { /* c1s1 schema0 time: 4bytes version: 4bytes key: 764bytes digest: 764bytes */ - auto digest = get_C1_digest((uint8_t *)schema_start + C1_SCHEMA_SIZE,&digest_start); - string c1_joined(c1_start,C1_HANDSHARK_SIZE); - c1_joined.erase(digest_start - c1_start , C1_DIGEST_SIZE ); - check_C1_Digest(digest,c1_joined); + auto digest = get_C1_digest((uint8_t *) schema_start + C1_SCHEMA_SIZE, &digest_start); + string c1_joined(c1_start, C1_HANDSHARK_SIZE); + c1_joined.erase(digest_start - c1_start, C1_DIGEST_SIZE); + check_C1_Digest(digest, c1_joined); - send_complex_S0S1S2(0,digest); + send_complex_S0S1S2(0, digest); // InfoL << "schema0"; - }catch(std::exception &ex){ + } catch (std::exception &ex) { //貌似flash从来都不用schema1 // WarnL << "try rtmp complex schema0 failed:" << ex.what(); - try{ + try { /* c1s1 schema1 time: 4bytes version: 4bytes digest: 764bytes key: 764bytes */ - auto digest = get_C1_digest((uint8_t *)schema_start,&digest_start); - string c1_joined(c1_start,C1_HANDSHARK_SIZE); - c1_joined.erase(digest_start - c1_start , C1_DIGEST_SIZE ); - check_C1_Digest(digest,c1_joined); + auto digest = get_C1_digest((uint8_t *) schema_start, &digest_start); + string c1_joined(c1_start, C1_HANDSHARK_SIZE); + c1_joined.erase(digest_start - c1_start, C1_DIGEST_SIZE); + check_C1_Digest(digest, c1_joined); - send_complex_S0S1S2(1,digest); + send_complex_S0S1S2(1, digest); // InfoL << "schema1"; - }catch(std::exception &ex){ + } catch (std::exception &ex) { // WarnL << "try rtmp complex schema1 failed:" << ex.what(); handle_C1_simple(); } @@ -408,14 +416,16 @@ static u_int8_t FPKey[] = { 0x6E, 0xEC, 0x5D, 0x2D, 0x29, 0x80, 0x6F, 0xAB, 0x93, 0xB8, 0xE6, 0x36, 0xCF, 0xEB, 0x31, 0xAE }; // 62 + void RtmpProtocol::check_C1_Digest(const string &digest,const string &data){ - auto sha256 = openssl_HMACsha256(FPKey,C1_FPKEY_SIZE,data.data(),data.size()); - if(sha256 != digest){ + auto sha256 = openssl_HMACsha256(FPKey, C1_FPKEY_SIZE, data.data(), data.size()); + if (sha256 != digest) { throw std::runtime_error("digest mismatched"); - }else{ + } else { InfoL << "check rtmp complex handshark success!"; } } + string RtmpProtocol::get_C1_digest(const uint8_t *ptr,char **digestPos){ /* 764bytes digest结构 offset: 4bytes @@ -424,15 +434,16 @@ string RtmpProtocol::get_C1_digest(const uint8_t *ptr,char **digestPos){ random-data: (764-4-offset-32)bytes */ int offset = 0; - for(int i=0;i> 6]; - _iNowChunkID = flags & 0x3f; - switch (_iNowChunkID) { + while (!_recv_data_buf.empty()) { + int offset = 0; + uint8_t flags = _recv_data_buf[0]; + size_t header_len = HEADER_LENGTH[flags >> 6]; + _now_chunk_id = flags & 0x3f; + switch (_now_chunk_id) { case 0: { //0 值表示二字节形式,并且 ID 范围 64 - 319 //(第二个字节 + 64)。 - if (_strRcvBuf.size() < 2) { + if (_recv_data_buf.size() < 2) { //need more data return; } - _iNowChunkID = 64 + (uint8_t) (_strRcvBuf[1]); - iOffset = 1; - } + _now_chunk_id = 64 + (uint8_t) (_recv_data_buf[1]); + offset = 1; break; + } + case 1: { //1 值表示三字节形式,并且 ID 范围为 64 - 65599 //((第三个字节) * 256 + 第二个字节 + 64)。 - if (_strRcvBuf.size() < 3) { + if (_recv_data_buf.size() < 3) { //need more data return; } - _iNowChunkID = 64 + ((uint8_t) (_strRcvBuf[2]) << 8) + (uint8_t) (_strRcvBuf[1]); - iOffset = 2; + _now_chunk_id = 64 + ((uint8_t) (_recv_data_buf[2]) << 8) + (uint8_t) (_recv_data_buf[1]); + offset = 2; + break; } - break; - default: - //带有 2 值的块流 ID 被保留,用于下层协议控制消息和命令。 - break; + + //带有 2 值的块流 ID 被保留,用于下层协议控制消息和命令。 + default : break; } - if (_strRcvBuf.size() < iHeaderLen + iOffset) { + if (_recv_data_buf.size() < header_len + offset) { //need more data return; } - RtmpHeader &header = *((RtmpHeader *) (_strRcvBuf.data() + iOffset)); - auto &chunkData = _mapChunkData[_iNowChunkID]; - chunkData.chunkId = _iNowChunkID; - switch (iHeaderLen) { + + RtmpHeader &header = *((RtmpHeader *) (_recv_data_buf.data() + offset)); + auto &chunk_data = _map_chunk_data[_now_chunk_id]; + chunk_data.chunk_id = _now_chunk_id; + switch (header_len) { case 12: - chunkData.hasAbsStamp = true; - chunkData.streamId = load_le32(header.streamId); + chunk_data.is_abs_stamp = true; + chunk_data.stream_index = load_le32(header.stream_index); case 8: - chunkData.bodySize = load_be24(header.bodySize); - chunkData.typeId = header.typeId; + chunk_data.body_size = load_be24(header.body_size); + chunk_data.type_id = header.type_id; case 4: - chunkData.tsField = load_be24(header.timeStamp); + chunk_data.ts_field = load_be24(header.time_stamp); } - auto timeStamp = chunkData.tsField; - if (chunkData.tsField == 0xFFFFFF) { - if (_strRcvBuf.size() < iHeaderLen + iOffset + 4) { + auto time_stamp = chunk_data.ts_field; + if (chunk_data.ts_field == 0xFFFFFF) { + if (_recv_data_buf.size() < header_len + offset + 4) { //need more data return; } - timeStamp = load_be32(_strRcvBuf.data() + iOffset + iHeaderLen); - iOffset += 4; + time_stamp = load_be32(_recv_data_buf.data() + offset + header_len); + offset += 4; } - if (chunkData.bodySize < chunkData.strBuf.size()) { + if (chunk_data.body_size < chunk_data.buffer.size()) { throw std::runtime_error("非法的bodySize"); } - auto iMore = min(_iChunkLenIn, chunkData.bodySize - chunkData.strBuf.size()); - if (_strRcvBuf.size() < iHeaderLen + iOffset + iMore) { + auto iMore = min(_chunk_size_in, chunk_data.body_size - chunk_data.buffer.size()); + if (_recv_data_buf.size() < header_len + offset + iMore) { //need more data return; } - chunkData.strBuf.append(_strRcvBuf, iHeaderLen + iOffset, iMore); - _strRcvBuf.erase(0, iHeaderLen + iOffset + iMore); - if (chunkData.strBuf.size() == chunkData.bodySize) { + chunk_data.buffer.append(_recv_data_buf, header_len + offset, iMore); + _recv_data_buf.erase(0, header_len + offset + iMore); + if (chunk_data.buffer.size() == chunk_data.body_size) { //frame is ready - _iNowStreamID = chunkData.streamId; - chunkData.timeStamp = timeStamp + (chunkData.hasAbsStamp ? 0 : chunkData.timeStamp); - if(chunkData.bodySize){ - handle_rtmpChunk(chunkData); + _now_stream_index = chunk_data.stream_index; + chunk_data.time_stamp = time_stamp + (chunk_data.is_abs_stamp ? 0 : chunk_data.time_stamp); + if (chunk_data.body_size) { + handle_rtmpChunk(chunk_data); } - chunkData.strBuf.clear(); - chunkData.hasAbsStamp = false; + chunk_data.buffer.clear(); + chunk_data.is_abs_stamp = false; } } } -void RtmpProtocol::handle_rtmpChunk(RtmpPacket& chunkData) { - switch (chunkData.typeId) { +void RtmpProtocol::handle_rtmpChunk(RtmpPacket& chunk_data) { + switch (chunk_data.type_id) { case MSG_ACK: { - if (chunkData.strBuf.size() < 4) { + if (chunk_data.buffer.size() < 4) { throw std::runtime_error("MSG_ACK: Not enough data"); } - //auto bytePeerRecv = load_be32(&chunkData.strBuf[0]); + //auto bytePeerRecv = load_be32(&chunk_data.buffer[0]); //TraceL << "MSG_ACK:" << bytePeerRecv; - } break; + } + case MSG_SET_CHUNK: { - if (chunkData.strBuf.size() < 4) { + if (chunk_data.buffer.size() < 4) { throw std::runtime_error("MSG_SET_CHUNK :Not enough data"); } - _iChunkLenIn = load_be32(&chunkData.strBuf[0]); - TraceL << "MSG_SET_CHUNK:" << _iChunkLenIn; - } + _chunk_size_in = load_be32(&chunk_data.buffer[0]); + TraceL << "MSG_SET_CHUNK:" << _chunk_size_in; break; + } + case MSG_USER_CONTROL: { //user control message - if (chunkData.strBuf.size() < 2) { + if (chunk_data.buffer.size() < 2) { throw std::runtime_error("MSG_USER_CONTROL: Not enough data."); } - uint16_t event_type = load_be16(&chunkData.strBuf[0]); - chunkData.strBuf.erase(0, 2); + uint16_t event_type = load_be16(&chunk_data.buffer[0]); + chunk_data.buffer.erase(0, 2); switch (event_type) { - case CONTROL_PING_REQUEST: { - if (chunkData.strBuf.size() < 4) { + case CONTROL_PING_REQUEST: { + if (chunk_data.buffer.size() < 4) { throw std::runtime_error("CONTROL_PING_REQUEST: Not enough data."); } - uint32_t timeStamp = load_be32(&chunkData.strBuf[0]); - //TraceL << "CONTROL_PING_REQUEST:" << timeStamp; + uint32_t timeStamp = load_be32(&chunk_data.buffer[0]); + //TraceL << "CONTROL_PING_REQUEST:" << time_stamp; sendUserControl(CONTROL_PING_RESPONSE, timeStamp); - } break; - case CONTROL_PING_RESPONSE: { - if (chunkData.strBuf.size() < 4) { - throw std::runtime_error("CONTROL_PING_RESPONSE: Not enough data."); } - //uint32_t timeStamp = load_be32(&chunkData.strBuf[0]); - //TraceL << "CONTROL_PING_RESPONSE:" << timeStamp; - } - break; - case CONTROL_STREAM_BEGIN: { - //开始播放 - if (chunkData.strBuf.size() < 4) { - throw std::runtime_error("CONTROL_STREAM_BEGIN: Not enough data."); - } - uint32_t stramId = load_be32(&chunkData.strBuf[0]); - onStreamBegin(stramId); - TraceL << "CONTROL_STREAM_BEGIN:" << stramId; - } - break; - case CONTROL_STREAM_EOF: { - //暂停 - if (chunkData.strBuf.size() < 4) { - throw std::runtime_error("CONTROL_STREAM_EOF: Not enough data."); + case CONTROL_PING_RESPONSE: { + if (chunk_data.buffer.size() < 4) { + throw std::runtime_error("CONTROL_PING_RESPONSE: Not enough data."); + } + //uint32_t time_stamp = load_be32(&chunk_data.buffer[0]); + //TraceL << "CONTROL_PING_RESPONSE:" << time_stamp; + break; } - uint32_t stramId = load_be32(&chunkData.strBuf[0]); - onStreamEof(stramId); - TraceL << "CONTROL_STREAM_EOF:" << stramId; - } - break; - case CONTROL_STREAM_DRY: { - //停止播放 - if (chunkData.strBuf.size() < 4) { - throw std::runtime_error("CONTROL_STREAM_DRY: Not enough data."); + + case CONTROL_STREAM_BEGIN: { + //开始播放 + if (chunk_data.buffer.size() < 4) { + throw std::runtime_error("CONTROL_STREAM_BEGIN: Not enough data."); + } + uint32_t stream_index = load_be32(&chunk_data.buffer[0]); + onStreamBegin(stream_index); + TraceL << "CONTROL_STREAM_BEGIN:" << stream_index; + break; } - uint32_t stramId = load_be32(&chunkData.strBuf[0]); - onStreamDry(stramId); - TraceL << "CONTROL_STREAM_DRY:" << stramId; + + case CONTROL_STREAM_EOF: { + //暂停 + if (chunk_data.buffer.size() < 4) { + throw std::runtime_error("CONTROL_STREAM_EOF: Not enough data."); + } + uint32_t stream_index = load_be32(&chunk_data.buffer[0]); + onStreamEof(stream_index); + TraceL << "CONTROL_STREAM_EOF:" << stream_index; + break; + } + + case CONTROL_STREAM_DRY: { + //停止播放 + if (chunk_data.buffer.size() < 4) { + throw std::runtime_error("CONTROL_STREAM_DRY: Not enough data."); + } + uint32_t stream_index = load_be32(&chunk_data.buffer[0]); + onStreamDry(stream_index); + TraceL << "CONTROL_STREAM_DRY:" << stream_index; + break; + } + + default: /*WarnL << "unhandled user control:" << event_type; */ break; } - break; - default: - //WarnL << "unhandled user control:" << event_type; - break; - } - } break; + } case MSG_WIN_SIZE: { - _ui32WinSize = load_be32(&chunkData.strBuf[0]); - TraceL << "MSG_WIN_SIZE:" << _ui32WinSize; - } + _windows_size = load_be32(&chunk_data.buffer[0]); + TraceL << "MSG_WIN_SIZE:" << _windows_size; break; + } + case MSG_SET_PEER_BW: { - _ui32Bandwidth = load_be32(&chunkData.strBuf[0]); - _ui8LimitType = chunkData.strBuf[4]; - TraceL << "MSG_SET_PEER_BW:" << _ui32WinSize; - } + _bandwidth = load_be32(&chunk_data.buffer[0]); + _band_limit_type = chunk_data.buffer[4]; + TraceL << "MSG_SET_PEER_BW:" << _windows_size; break; + } + case MSG_AGGREGATE: { - auto ptr = (uint8_t*)chunkData.strBuf.data(); - auto ptr_tail = ptr + chunkData.strBuf.length() ; - while(ptr + 8 + 3 < ptr_tail){ + auto ptr = (uint8_t *) chunk_data.buffer.data(); + auto ptr_tail = ptr + chunk_data.buffer.length(); + while (ptr + 8 + 3 < ptr_tail) { auto type = *ptr; ptr += 1; auto size = load_be24(ptr); @@ -701,56 +723,28 @@ void RtmpProtocol::handle_rtmpChunk(RtmpPacket& chunkData) { ptr += 3; ts |= (*ptr << 24); ptr += 1; - - //参考ffmpeg忽略了3个字节 - /** - * while (next - pkt->data < pkt->size - RTMP_HEADER) { - type = bytestream_get_byte(&next); - size = bytestream_get_be24(&next); - cts = bytestream_get_be24(&next); - cts |= bytestream_get_byte(&next) << 24; - if (!pts) - pts = cts; - ts += cts - pts; - pts = cts; - if (size + 3 + 4 > pkt->data + pkt->size - next) - break; - bytestream_put_byte(&p, type); - bytestream_put_be24(&p, size); - bytestream_put_be24(&p, ts); - bytestream_put_byte(&p, ts >> 24); - memcpy(p, next, size + 3 + 4); - p += size + 3; - bytestream_put_be32(&p, size + RTMP_HEADER); - next += size + 3 + 4; - } - */ ptr += 3; //参考FFmpeg多拷贝了4个字节 size += 4; - if(ptr + size > ptr_tail){ -// ErrorL << ptr + size << " " << ptr_tail << " " << ptr_tail - ptr - size; + if (ptr + size > ptr_tail) { break; } -// DebugL << (int)type << " " << size << " " << ts << " " << chunkData.timeStamp << " " << ptr_tail - ptr; - RtmpPacket sub_packet ; - sub_packet.strBuf.resize(size); - memcpy((char *)sub_packet.strBuf.data(),ptr,size); - sub_packet.typeId = type; - sub_packet.bodySize = size; - sub_packet.timeStamp = ts; - sub_packet.streamId = chunkData.streamId; - sub_packet.chunkId = chunkData.chunkId; + RtmpPacket sub_packet; + sub_packet.buffer.resize(size); + memcpy((char *) sub_packet.buffer.data(), ptr, size); + sub_packet.type_id = type; + sub_packet.body_size = size; + sub_packet.time_stamp = ts; + sub_packet.stream_index = chunk_data.stream_index; + sub_packet.chunk_id = chunk_data.chunk_id; handle_rtmpChunk(sub_packet); ptr += size; } -// InfoL << ptr_tail - ptr; - } - break; - default: - onRtmpChunk(chunkData); break; } + + default: onRtmpChunk(chunk_data); break; + } } BufferRaw::Ptr RtmpProtocol::obtainBuffer() { @@ -759,7 +753,7 @@ BufferRaw::Ptr RtmpProtocol::obtainBuffer() { BufferRaw::Ptr RtmpProtocol::obtainBuffer(const void *data, int len) { auto buffer = obtainBuffer(); - buffer->assign((const char *)data,len); + buffer->assign((const char *) data, len); return buffer; } diff --git a/src/Rtmp/RtmpProtocol.h b/src/Rtmp/RtmpProtocol.h index 3eb2d07f..6f2df9dd 100644 --- a/src/Rtmp/RtmpProtocol.h +++ b/src/Rtmp/RtmpProtocol.h @@ -31,45 +31,42 @@ class RtmpProtocol { public: RtmpProtocol(); virtual ~RtmpProtocol(); + + void onParseRtmp(const char *data, int size); //作为客户端发送c0c1,等待s0s1s2并且回调 void startClientSession(const function &cb); - void onParseRtmp(const char *pcRawData,int iSize); - void reset(); + protected: virtual void onSendRawData(const Buffer::Ptr &buffer) = 0; - virtual void onRtmpChunk(RtmpPacket &chunkData) = 0; - virtual void onStreamBegin(uint32_t ui32StreamId){ - _ui32StreamId = ui32StreamId; + virtual void onRtmpChunk(RtmpPacket &chunk_data) = 0; + virtual void onStreamBegin(uint32_t stream_index){ + _stream_index = stream_index; } - virtual void onStreamEof(uint32_t ui32StreamId){}; - virtual void onStreamDry(uint32_t ui32StreamId){}; -protected: - void sendAcknowledgement(uint32_t ui32Size); - void sendAcknowledgementSize(uint32_t ui32Size); - void sendPeerBandwidth(uint32_t ui32Size); - void sendChunkSize(uint32_t ui32Size); - void sendPingRequest(uint32_t ui32TimeStamp = ::time(NULL)); - void sendPingResponse(uint32_t ui32TimeStamp = ::time(NULL)); - void sendSetBufferLength(uint32_t ui32StreamId, uint32_t ui32Length); - void sendUserControl(uint16_t ui16EventType, uint32_t ui32EventData); - void sendUserControl(uint16_t ui16EventType, const string &strEventData); + virtual void onStreamEof(uint32_t stream_index){}; + virtual void onStreamDry(uint32_t stream_index){}; - void sendInvoke(const string &strCmd, const AMFValue &val); - void sendRequest(int iCmd, const string &str); - void sendResponse(int iType, const string &str); - void sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, const std::string &strBuf, uint32_t ui32TimeStamp, int iChunkID); - void sendRtmp(uint8_t ui8Type, uint32_t ui32StreamId, const Buffer::Ptr &buffer, uint32_t ui32TimeStamp, int iChunkID); protected: - int _iReqID = 0; - uint32_t _ui32StreamId = STREAM_CONTROL; - int _iNowStreamID = 0; - int _iNowChunkID = 0; - bool _bDataStarted = false; - inline BufferRaw::Ptr obtainBuffer(); - inline BufferRaw::Ptr obtainBuffer(const void *data, int len); - //ResourcePool _bufferPool; + void reset(); + BufferRaw::Ptr obtainBuffer(); + BufferRaw::Ptr obtainBuffer(const void *data, int len); + + void sendAcknowledgement(uint32_t size); + void sendAcknowledgementSize(uint32_t size); + void sendPeerBandwidth(uint32_t size); + void sendChunkSize(uint32_t size); + void sendPingRequest(uint32_t ti = ::time(NULL)); + void sendPingResponse(uint32_t time_stamp = ::time(NULL)); + void sendSetBufferLength(uint32_t stream_index, uint32_t len); + void sendUserControl(uint16_t event_type, uint32_t event_data); + void sendUserControl(uint16_t event_type, const string &event_data); + void sendInvoke(const string &cmd, const AMFValue &val); + void sendRequest(int cmd, const string &str); + void sendResponse(int type, const string &str); + void sendRtmp(uint8_t type, uint32_t stream_index, const std::string &buffer, uint32_t stamp, int chunk_id); + void sendRtmp(uint8_t type, uint32_t stream_index, const Buffer::Ptr &buffer, uint32_t stamp, int chunk_id); + private: - void handle_S0S1S2(const function &cb); + void handle_S0S1S2(const function &func); void handle_C0C1(); void handle_C1_simple(); #ifdef ENABLE_OPENSSL @@ -82,26 +79,32 @@ private: void handle_C2(); void handle_rtmp(); - void handle_rtmpChunk(RtmpPacket &chunkData); + void handle_rtmpChunk(RtmpPacket &chunk_data); + +protected: + int _send_req_id = 0; + uint32_t _stream_index = STREAM_CONTROL; private: + int _now_stream_index = 0; + int _now_chunk_id = 0; + bool _data_started = false; ////////////ChunkSize//////////// - size_t _iChunkLenIn = DEFAULT_CHUNK_LEN; - size_t _iChunkLenOut = DEFAULT_CHUNK_LEN; + size_t _chunk_size_in = DEFAULT_CHUNK_LEN; + size_t _chunk_size_out = DEFAULT_CHUNK_LEN; ////////////Acknowledgement//////////// - uint32_t _ui32ByteSent = 0; - uint32_t _ui32LastSent = 0; - uint32_t _ui32WinSize = 0; + uint32_t _bytes_sent = 0; + uint32_t _bytes_sent_last = 0; + uint32_t _windows_size = 0; ///////////PeerBandwidth/////////// - uint32_t _ui32Bandwidth = 2500000; - uint8_t _ui8LimitType = 2; - ////////////Chunk//////////// - unordered_map _mapChunkData; + uint32_t _bandwidth = 2500000; + uint8_t _band_limit_type = 2; //////////Rtmp parser////////// - string _strRcvBuf; - function _nextHandle; + string _recv_data_buf; + function _next_step_func; + ////////////Chunk//////////// + unordered_map _map_chunk_data; }; } /* namespace mediakit */ - #endif /* SRC_RTMP_RTMPPROTOCOL_H_ */ diff --git a/src/Rtmp/RtmpPusher.cpp b/src/Rtmp/RtmpPusher.cpp index 725868b5..e8dddac7 100644 --- a/src/Rtmp/RtmpPusher.cpp +++ b/src/Rtmp/RtmpPusher.cpp @@ -164,13 +164,13 @@ inline void RtmpPusher::send_createStream() { addOnResultCB([this](AMFDecoder &dec){ //TraceL << "createStream result"; dec.load(); - _ui32StreamId = dec.load(); + _stream_index = dec.load(); send_publish(); }); } inline void RtmpPusher::send_publish() { AMFEncoder enc; - enc << "publish" << ++_iReqID << nullptr << _strStream << _strApp ; + enc << "publish" << ++_send_req_id << nullptr << _strStream << _strApp ; sendRequest(MSG_CMD, enc.data()); addOnStatusCB([this](AMFValue &val) { @@ -195,7 +195,7 @@ inline void RtmpPusher::send_metaData(){ sendRequest(MSG_DATA, enc.data()); src->getConfigFrame([&](const RtmpPacket::Ptr &pkt){ - sendRtmp(pkt->typeId, _ui32StreamId, pkt, pkt->timeStamp, pkt->chunkId ); + sendRtmp(pkt->type_id, _stream_index, pkt, pkt->time_stamp, pkt->chunk_id ); }); _pRtmpReader = src->getRing()->attach(getPoller()); @@ -213,7 +213,7 @@ inline void RtmpPusher::send_metaData(){ if(++i == size){ strongSelf->setSendFlushFlag(true); } - strongSelf->sendRtmp(rtmp->typeId, strongSelf->_ui32StreamId, rtmp, rtmp->timeStamp, rtmp->chunkId); + strongSelf->sendRtmp(rtmp->type_id, strongSelf->_stream_index, rtmp, rtmp->time_stamp, rtmp->chunk_id); }); }); _pRtmpReader->setDetachCB([weakSelf](){ @@ -275,7 +275,7 @@ void RtmpPusher::onCmd_onStatus(AMFDecoder &dec) { } void RtmpPusher::onRtmpChunk(RtmpPacket &chunkData) { - switch (chunkData.typeId) { + switch (chunkData.type_id) { case MSG_CMD: case MSG_CMD3: { typedef void (RtmpPusher::*rtmpCMDHandle)(AMFDecoder &dec); @@ -284,9 +284,9 @@ void RtmpPusher::onRtmpChunk(RtmpPacket &chunkData) { g_mapCmd.emplace("_error",&RtmpPusher::onCmd_result); g_mapCmd.emplace("_result",&RtmpPusher::onCmd_result); g_mapCmd.emplace("onStatus",&RtmpPusher::onCmd_onStatus); - }, []() {}); + }); - AMFDecoder dec(chunkData.strBuf, 0); + AMFDecoder dec(chunkData.buffer, 0); std::string type = dec.load(); auto it = g_mapCmd.find(type); if(it != g_mapCmd.end()){ @@ -298,7 +298,7 @@ void RtmpPusher::onRtmpChunk(RtmpPacket &chunkData) { } break; default: - //WarnL << "unhandled message:" << (int) chunkData.typeId << hexdump(chunkData.strBuf.data(), chunkData.strBuf.size()); + //WarnL << "unhandled message:" << (int) chunkData.type_id << hexdump(chunkData.buffer.data(), chunkData.buffer.size()); break; } } diff --git a/src/Rtmp/RtmpPusher.h b/src/Rtmp/RtmpPusher.h index af825968..9614c9f7 100644 --- a/src/Rtmp/RtmpPusher.h +++ b/src/Rtmp/RtmpPusher.h @@ -52,7 +52,7 @@ private: template inline void addOnResultCB(const FUN &fun) { lock_guard lck(_mtxOnResultCB); - _mapOnResultCB.emplace(_iReqID, fun); + _mapOnResultCB.emplace(_send_req_id, fun); } template inline void addOnStatusCB(const FUN &fun) { diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 777f3a13..8c123fc3 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -13,12 +13,12 @@ #include "Util/onceToken.h" namespace mediakit { -RtmpSession::RtmpSession(const Socket::Ptr &pSock) : TcpSession(pSock) { +RtmpSession::RtmpSession(const Socket::Ptr &sock) : TcpSession(sock) { DebugP(this); GET_CONFIG(uint32_t,keep_alive_sec,Rtmp::kKeepAliveSecond); - pSock->setSendTimeOutSecond(keep_alive_sec); + sock->setSendTimeOutSecond(keep_alive_sec); //起始接收buffer缓存设置为4K,节省内存 - pSock->setReadBuffer(std::make_shared(4 * 1024)); + sock->setReadBuffer(std::make_shared(4 * 1024)); } RtmpSession::~RtmpSession() { @@ -26,20 +26,20 @@ RtmpSession::~RtmpSession() { } void RtmpSession::onError(const SockException& err) { - bool isPlayer = !_pPublisherSrc; + bool isPlayer = !_publisher_src; uint64_t duration = _ticker.createdTime()/1000; WarnP(this) << (isPlayer ? "RTMP播放器(" : "RTMP推流器(") - << _mediaInfo._vhost << "/" - << _mediaInfo._app << "/" - << _mediaInfo._streamid + << _media_info._vhost << "/" + << _media_info._app << "/" + << _media_info._streamid << ")断开:" << err.what() << ",耗时(s):" << duration; //流量统计事件广播 GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); - if(_ui64TotalBytes > iFlowThreshold * 1024){ - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration, isPlayer, static_cast(*this)); + if(_total_bytes > iFlowThreshold * 1024){ + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, isPlayer, static_cast(*this)); } } @@ -48,11 +48,11 @@ void RtmpSession::onManager() { GET_CONFIG(uint32_t,keep_alive_sec,Rtmp::kKeepAliveSecond); if (_ticker.createdTime() > handshake_sec * 1000) { - if (!_pRingReader && !_pPublisherSrc) { + if (!_ring_reader && !_publisher_src) { shutdown(SockException(Err_timeout,"illegal connection")); } } - if (_pPublisherSrc) { + if (_publisher_src) { //publisher if (_ticker.elapsedTime() > keep_alive_sec * 1000) { shutdown(SockException(Err_timeout,"recv data from rtmp pusher timeout")); @@ -60,22 +60,22 @@ void RtmpSession::onManager() { } } -void RtmpSession::onRecv(const Buffer::Ptr &pBuf) { +void RtmpSession::onRecv(const Buffer::Ptr &buf) { _ticker.resetTime(); try { - _ui64TotalBytes += pBuf->size(); - onParseRtmp(pBuf->data(), pBuf->size()); - } catch (exception &e) { - shutdown(SockException(Err_shutdown, e.what())); + _total_bytes += buf->size(); + onParseRtmp(buf->data(), buf->size()); + } catch (exception &ex) { + shutdown(SockException(Err_shutdown, ex.what())); } } void RtmpSession::onCmd_connect(AMFDecoder &dec) { auto params = dec.load(); - double amfVer = 0; + double amf_ver = 0; AMFValue objectEncoding = params["objectEncoding"]; if(objectEncoding){ - amfVer = objectEncoding.as_number(); + amf_ver = objectEncoding.as_number(); } ///////////set chunk size//////////////// sendChunkSize(60000); @@ -84,11 +84,11 @@ void RtmpSession::onCmd_connect(AMFDecoder &dec) { ///////////set peerBandwidth//////////////// sendPeerBandwidth(5000000); - _mediaInfo._app = params["app"].as_string(); - _strTcUrl = params["tcUrl"].as_string(); - if(_strTcUrl.empty()){ + _media_info._app = params["app"].as_string(); + _tc_url = params["tcUrl"].as_string(); + if(_tc_url.empty()){ //defaultVhost:默认vhost - _strTcUrl = string(RTMP_SCHEMA) + "://" + DEFAULT_VHOST + "/" + _mediaInfo._app; + _tc_url = string(RTMP_SCHEMA) + "://" + DEFAULT_VHOST + "/" + _media_info._app; } bool ok = true; //(app == APP_NAME); AMFValue version(AMF_OBJECT); @@ -98,10 +98,10 @@ void RtmpSession::onCmd_connect(AMFDecoder &dec) { status.set("level", ok ? "status" : "error"); status.set("code", ok ? "NetConnection.Connect.Success" : "NetConnection.Connect.InvalidApp"); status.set("description", ok ? "Connection succeeded." : "InvalidApp."); - status.set("objectEncoding", amfVer); + status.set("objectEncoding", amf_ver); sendReply(ok ? "_result" : "_error", version, status); if (!ok) { - throw std::runtime_error("Unsupported application: " + _mediaInfo._app); + throw std::runtime_error("Unsupported application: " + _media_info._app); } AMFEncoder invoke; @@ -114,75 +114,75 @@ void RtmpSession::onCmd_createStream(AMFDecoder &dec) { } void RtmpSession::onCmd_publish(AMFDecoder &dec) { - std::shared_ptr pTicker(new Ticker); - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - std::shared_ptr pToken(new onceToken(nullptr,[pTicker,weakSelf](){ - auto strongSelf = weakSelf.lock(); - if(strongSelf){ - DebugP(strongSelf.get()) << "publish 回复时间:" << pTicker->elapsedTime() << "ms"; + std::shared_ptr ticker(new Ticker); + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + std::shared_ptr pToken(new onceToken(nullptr,[ticker,weak_self](){ + auto strong_self = weak_self.lock(); + if(strong_self){ + DebugP(strong_self.get()) << "publish 回复时间:" << ticker->elapsedTime() << "ms"; } })); dec.load();/* NULL */ - _mediaInfo.parse(_strTcUrl + "/" + getStreamId(dec.load())); - _mediaInfo._schema = RTMP_SCHEMA; + _media_info.parse(_tc_url + "/" + getStreamId(dec.load())); + _media_info._schema = RTMP_SCHEMA; - auto onRes = [this,pToken](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){ + auto on_res = [this,pToken](const string &err, bool enableRtxp, bool enableHls, bool enableMP4){ auto src = dynamic_pointer_cast(MediaSource::find(RTMP_SCHEMA, - _mediaInfo._vhost, - _mediaInfo._app, - _mediaInfo._streamid)); - bool authSuccess = err.empty(); - bool ok = (!src && !_pPublisherSrc && authSuccess); + _media_info._vhost, + _media_info._app, + _media_info._streamid)); + bool auth_success = err.empty(); + bool ok = (!src && !_publisher_src && auth_success); AMFValue status(AMF_OBJECT); status.set("level", ok ? "status" : "error"); - status.set("code", ok ? "NetStream.Publish.Start" : (authSuccess ? "NetStream.Publish.BadName" : "NetStream.Publish.BadAuth")); - status.set("description", ok ? "Started publishing stream." : (authSuccess ? "Already publishing." : err.data())); + status.set("code", ok ? "NetStream.Publish.Start" : (auth_success ? "NetStream.Publish.BadName" : "NetStream.Publish.BadAuth")); + status.set("description", ok ? "Started publishing stream." : (auth_success ? "Already publishing." : err.data())); status.set("clientid", "0"); sendReply("onStatus", nullptr, status); if (!ok) { - string errMsg = StrPrinter << (authSuccess ? "already publishing:" : err.data()) << " " - << _mediaInfo._vhost << " " - << _mediaInfo._app << " " - << _mediaInfo._streamid; + string errMsg = StrPrinter << (auth_success ? "already publishing:" : err.data()) << " " + << _media_info._vhost << " " + << _media_info._app << " " + << _media_info._streamid; shutdown(SockException(Err_shutdown,errMsg)); return; } - _pPublisherSrc.reset(new RtmpMediaSourceImp(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid)); - _pPublisherSrc->setListener(dynamic_pointer_cast(shared_from_this())); + _publisher_src.reset(new RtmpMediaSourceImp(_media_info._vhost, _media_info._app, _media_info._streamid)); + _publisher_src->setListener(dynamic_pointer_cast(shared_from_this())); //设置转协议 - _pPublisherSrc->setProtocolTranslation(enableRtxp,enableHls,enableMP4); + _publisher_src->setProtocolTranslation(enableRtxp, enableHls, enableMP4); //如果是rtmp推流客户端,那么加大TCP接收缓存,这样能提升接收性能 _sock->setReadBuffer(std::make_shared(256 * 1024)); setSocketFlags(); }; - if(_mediaInfo._app.empty() || _mediaInfo._streamid.empty()){ + if(_media_info._app.empty() || _media_info._streamid.empty()){ //不允许莫名其妙的推流url - onRes("rtmp推流url非法", false, false, false); + on_res("rtmp推流url非法", false, false, false); return; } - Broadcast::PublishAuthInvoker invoker = [weakSelf,onRes,pToken](const string &err,bool enableRtxp,bool enableHls,bool enableMP4){ - auto strongSelf = weakSelf.lock(); + Broadcast::PublishAuthInvoker invoker = [weak_self,on_res,pToken](const string &err, bool enableRtxp, bool enableHls, bool enableMP4){ + auto strongSelf = weak_self.lock(); if(!strongSelf){ return; } - strongSelf->async([weakSelf,onRes,err,pToken,enableRtxp,enableHls,enableMP4](){ - auto strongSelf = weakSelf.lock(); + strongSelf->async([weak_self,on_res,err,pToken,enableRtxp,enableHls,enableMP4](){ + auto strongSelf = weak_self.lock(); if(!strongSelf){ return; } - onRes(err,enableRtxp,enableHls,enableMP4); + on_res(err, enableRtxp, enableHls, enableMP4); }); }; - auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,static_cast(*this)); + auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, _media_info, invoker, static_cast(*this)); if(!flag){ //该事件无人监听,默认鉴权成功 - GET_CONFIG(bool,toRtxp,General::kPublishToRtxp); - GET_CONFIG(bool,toHls,General::kPublishToHls); - GET_CONFIG(bool,toMP4,General::kPublishToMP4); - onRes("",toRtxp,toHls,toMP4); + GET_CONFIG(bool,to_rtxp,General::kPublishToRtxp); + GET_CONFIG(bool,to_hls,General::kPublishToHls); + GET_CONFIG(bool,to_mp4,General::kPublishToMP4); + on_res("", to_rtxp, to_hls, to_mp4); } } @@ -196,8 +196,8 @@ void RtmpSession::onCmd_deleteStream(AMFDecoder &dec) { } void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr &src){ - bool authSuccess = err.empty(); - bool ok = (src.operator bool() && authSuccess); + bool auth_success = err.empty(); + bool ok = (src.operator bool() && auth_success); if (ok) { //stream begin sendUserControl(CONTROL_STREAM_BEGIN, STREAM_MEDIA); @@ -205,17 +205,17 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr // onStatus(NetStream.Play.Reset) AMFValue status(AMF_OBJECT); status.set("level", ok ? "status" : "error"); - status.set("code", ok ? "NetStream.Play.Reset" : (authSuccess ? "NetStream.Play.StreamNotFound" : "NetStream.Play.BadAuth")); - status.set("description", ok ? "Resetting and playing." : (authSuccess ? "No such stream." : err.data())); - status.set("details", _mediaInfo._streamid); + status.set("code", ok ? "NetStream.Play.Reset" : (auth_success ? "NetStream.Play.StreamNotFound" : "NetStream.Play.BadAuth")); + status.set("description", ok ? "Resetting and playing." : (auth_success ? "No such stream." : err.data())); + status.set("details", _media_info._streamid); status.set("clientid", "0"); sendReply("onStatus", nullptr, status); if (!ok) { - string errMsg = StrPrinter << (authSuccess ? "no such stream:" : err.data()) << " " - << _mediaInfo._vhost << " " - << _mediaInfo._app << " " - << _mediaInfo._streamid; - shutdown(SockException(Err_shutdown,errMsg)); + string err_msg = StrPrinter << (auth_success ? "no such stream:" : err.data()) << " " + << _media_info._vhost << " " + << _media_info._app << " " + << _media_info._streamid; + shutdown(SockException(Err_shutdown, err_msg)); return; } @@ -224,7 +224,7 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr status.set("level", "status"); status.set("code", "NetStream.Play.Start"); status.set("description", "Started playing."); - status.set("details", _mediaInfo._streamid); + status.set("details", _media_info._streamid); status.set("clientid", "0"); sendReply("onStatus", nullptr, status); @@ -245,7 +245,7 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr status.set("level", "status"); status.set("code", "NetStream.Play.PublishNotify"); status.set("description", "Now published."); - status.set("details", _mediaInfo._streamid); + status.set("details", _media_info._streamid); status.set("clientid", "0"); sendReply("onStatus", nullptr, status); @@ -273,9 +273,9 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr //音频同步于视频 _stamp[0].syncTo(_stamp[1]); - _pRingReader = src->getRing()->attach(getPoller()); + _ring_reader = src->getRing()->attach(getPoller()); weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - _pRingReader->setReadCB([weakSelf](const RtmpMediaSource::RingDataType &pkt) { + _ring_reader->setReadCB([weakSelf](const RtmpMediaSource::RingDataType &pkt) { auto strongSelf = weakSelf.lock(); if (!strongSelf) { return; @@ -293,14 +293,14 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr strongSelf->onSendMedia(rtmp); }); }); - _pRingReader->setDetachCB([weakSelf]() { + _ring_reader->setDetachCB([weakSelf]() { auto strongSelf = weakSelf.lock(); if (!strongSelf) { return; } strongSelf->shutdown(SockException(Err_shutdown,"rtmp ring buffer detached")); }); - _pPlayerSrc = src; + _player_src = src; if (src->totalReaderCount() == 1) { src->seekTo(0); } @@ -317,46 +317,47 @@ void RtmpSession::doPlayResponse(const string &err,const std::function weakSelf = dynamic_pointer_cast(shared_from_this()); - MediaSource::findAsync(_mediaInfo,weakSelf.lock(),[weakSelf,cb](const MediaSource::Ptr &src){ + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + MediaSource::findAsync(_media_info, weak_self.lock(), [weak_self,cb](const MediaSource::Ptr &src){ auto rtmp_src = dynamic_pointer_cast(src); - auto strongSelf = weakSelf.lock(); - if(strongSelf){ - strongSelf->sendPlayResponse("", rtmp_src); + auto strong_self = weak_self.lock(); + if(strong_self){ + strong_self->sendPlayResponse("", rtmp_src); } cb(rtmp_src.operator bool()); }); } void RtmpSession::doPlay(AMFDecoder &dec){ - std::shared_ptr pTicker(new Ticker); - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - std::shared_ptr pToken(new onceToken(nullptr,[pTicker,weakSelf](){ - auto strongSelf = weakSelf.lock(); - if(strongSelf) { - DebugP(strongSelf.get()) << "play 回复时间:" << pTicker->elapsedTime() << "ms"; + std::shared_ptr ticker(new Ticker); + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + std::shared_ptr token(new onceToken(nullptr, [ticker,weak_self](){ + auto strongSelf = weak_self.lock(); + if (strongSelf) { + DebugP(strongSelf.get()) << "play 回复时间:" << ticker->elapsedTime() << "ms"; } })); - Broadcast::AuthInvoker invoker = [weakSelf,pToken](const string &err){ - auto strongSelf = weakSelf.lock(); - if(!strongSelf){ + Broadcast::AuthInvoker invoker = [weak_self,token](const string &err){ + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } - strongSelf->async([weakSelf,err,pToken](){ - auto strongSelf = weakSelf.lock(); - if(!strongSelf){ + strong_self->async([weak_self, err, token]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { return; } - strongSelf->doPlayResponse(err,[pToken](bool){}); + strong_self->doPlayResponse(err, [token](bool) {}); }); }; - auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,static_cast(*this)); + auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, _media_info, invoker, static_cast(*this)); if(!flag){ //该事件无人监听,默认不鉴权 - doPlayResponse("",[pToken](bool){}); + doPlayResponse("",[token](bool){}); } } + void RtmpSession::onCmd_play2(AMFDecoder &dec) { doPlay(dec); } @@ -365,30 +366,30 @@ string RtmpSession::getStreamId(const string &str){ string stream_id; string params; auto pos = str.find('?'); - if(pos != string::npos){ + if (pos != string::npos) { //有url参数 - stream_id = str.substr(0,pos); + stream_id = str.substr(0, pos); //获取url参数 params = str.substr(pos + 1); - }else{ + } else { //没有url参数 stream_id = str; } pos = stream_id.find(":"); - if(pos != string::npos){ + if (pos != string::npos) { //vlc和ffplay在播放 rtmp://127.0.0.1/record/0.mp4时, //传过来的url会是rtmp://127.0.0.1/record/mp4:0, //我们在这里还原成0.mp4 //实际使用时发现vlc,mpv等会传过来rtmp://127.0.0.1/record/mp4:0.mp4,这里做个判断 - auto ext = stream_id.substr(0,pos); + auto ext = stream_id.substr(0, pos); stream_id = stream_id.substr(pos + 1); - if(stream_id.find(ext) == string::npos){ + if (stream_id.find(ext) == string::npos) { stream_id = stream_id + "." + ext; } } - if(params.empty()){ + if (params.empty()) { //没有url参数 return stream_id; } @@ -399,8 +400,8 @@ string RtmpSession::getStreamId(const string &str){ void RtmpSession::onCmd_play(AMFDecoder &dec) { dec.load();/* NULL */ - _mediaInfo.parse(_strTcUrl + "/" + getStreamId(dec.load())); - _mediaInfo._schema = RTMP_SCHEMA; + _media_info.parse(_tc_url + "/" + getStreamId(dec.load())); + _media_info._schema = RTMP_SCHEMA; doPlay(dec); } @@ -419,7 +420,7 @@ void RtmpSession::onCmd_pause(AMFDecoder &dec) { } void RtmpSession::setMetaData(AMFDecoder &dec) { - if (!_pPublisherSrc) { + if (!_publisher_src) { throw std::runtime_error("not a publisher"); } std::string type = dec.load(); @@ -428,22 +429,23 @@ void RtmpSession::setMetaData(AMFDecoder &dec) { } auto metadata = dec.load(); // dumpMetadata(metadata); - _pPublisherSrc->setMetaData(metadata); + _publisher_src->setMetaData(metadata); _set_meta_data = true; } void RtmpSession::onProcessCmd(AMFDecoder &dec) { - typedef void (RtmpSession::*rtmpCMDHandle)(AMFDecoder &dec); - static unordered_map s_cmd_functions; + typedef void (RtmpSession::*cmd_function)(AMFDecoder &dec); + static unordered_map s_cmd_functions; static onceToken token([]() { - s_cmd_functions.emplace("connect",&RtmpSession::onCmd_connect); - s_cmd_functions.emplace("createStream",&RtmpSession::onCmd_createStream); - s_cmd_functions.emplace("publish",&RtmpSession::onCmd_publish); - s_cmd_functions.emplace("deleteStream",&RtmpSession::onCmd_deleteStream); - s_cmd_functions.emplace("play",&RtmpSession::onCmd_play); - s_cmd_functions.emplace("play2",&RtmpSession::onCmd_play2); - s_cmd_functions.emplace("seek",&RtmpSession::onCmd_seek); - s_cmd_functions.emplace("pause",&RtmpSession::onCmd_pause);}, []() {}); + s_cmd_functions.emplace("connect", &RtmpSession::onCmd_connect); + s_cmd_functions.emplace("createStream", &RtmpSession::onCmd_createStream); + s_cmd_functions.emplace("publish", &RtmpSession::onCmd_publish); + s_cmd_functions.emplace("deleteStream", &RtmpSession::onCmd_deleteStream); + s_cmd_functions.emplace("play", &RtmpSession::onCmd_play); + s_cmd_functions.emplace("play2", &RtmpSession::onCmd_play2); + s_cmd_functions.emplace("seek", &RtmpSession::onCmd_seek); + s_cmd_functions.emplace("pause", &RtmpSession::onCmd_pause); + }); std::string method = dec.load(); auto it = s_cmd_functions.find(method); @@ -451,52 +453,54 @@ void RtmpSession::onProcessCmd(AMFDecoder &dec) { // TraceP(this) << "can not support cmd:" << method; return; } - _dNowReqID = dec.load(); + _recv_req_id = dec.load(); auto fun = it->second; (this->*fun)(dec); } -void RtmpSession::onRtmpChunk(RtmpPacket &chunkData) { - switch (chunkData.typeId) { +void RtmpSession::onRtmpChunk(RtmpPacket &chunk_data) { + switch (chunk_data.type_id) { case MSG_CMD: case MSG_CMD3: { - AMFDecoder dec(chunkData.strBuf, chunkData.typeId == MSG_CMD3 ? 1 : 0); + AMFDecoder dec(chunk_data.buffer, chunk_data.type_id == MSG_CMD3 ? 1 : 0); onProcessCmd(dec); - } break; + } case MSG_DATA: case MSG_DATA3: { - AMFDecoder dec(chunkData.strBuf, chunkData.typeId == MSG_CMD3 ? 1 : 0); + AMFDecoder dec(chunk_data.buffer, chunk_data.type_id == MSG_CMD3 ? 1 : 0); std::string type = dec.load(); if (type == "@setDataFrame") { setMetaData(dec); - }else{ + } else { TraceP(this) << "unknown notify:" << type; } - } break; + } + case MSG_AUDIO: case MSG_VIDEO: { - if (!_pPublisherSrc) { + if (!_publisher_src) { throw std::runtime_error("Not a rtmp publisher!"); } - GET_CONFIG(bool,rtmp_modify_stamp,Rtmp::kModifyStamp); - if(rtmp_modify_stamp){ + GET_CONFIG(bool, rtmp_modify_stamp, Rtmp::kModifyStamp); + if (rtmp_modify_stamp) { int64_t dts_out; - _stamp[chunkData.typeId % 2].revise(chunkData.timeStamp, chunkData.timeStamp, dts_out, dts_out, true); - chunkData.timeStamp = dts_out; + _stamp[chunk_data.type_id % 2].revise(chunk_data.time_stamp, chunk_data.time_stamp, dts_out, dts_out, true); + chunk_data.time_stamp = dts_out; } - if(!_set_meta_data && !chunkData.isCfgFrame()){ + if (!_set_meta_data && !chunk_data.isCfgFrame()) { _set_meta_data = true; - _pPublisherSrc->setMetaData(TitleMeta().getMetadata()); + _publisher_src->setMetaData(TitleMeta().getMetadata()); } - _pPublisherSrc->onWrite(std::make_shared(std::move(chunkData))); - } + _publisher_src->onWrite(std::make_shared(std::move(chunk_data))); break; + } + default: - WarnP(this) << "unhandled message:" << (int) chunkData.typeId << hexdump(chunkData.strBuf.data(), chunkData.strBuf.size()); + WarnP(this) << "unhandled message:" << (int) chunk_data.type_id << hexdump(chunk_data.buffer.data(), chunk_data.buffer.size()); break; } } @@ -512,23 +516,23 @@ void RtmpSession::onCmd_seek(AMFDecoder &dec) { auto milliSeconds = dec.load().as_number(); InfoP(this) << "rtmp seekTo(ms):" << milliSeconds; - auto stongSrc = _pPlayerSrc.lock(); - if (stongSrc) { - stongSrc->seekTo(milliSeconds); + auto strong_src = _player_src.lock(); + if (strong_src) { + strong_src->seekTo(milliSeconds); } } void RtmpSession::onSendMedia(const RtmpPacket::Ptr &pkt) { //rtmp播放器时间戳从零开始 int64_t dts_out; - _stamp[pkt->typeId % 2].revise(pkt->timeStamp, 0, dts_out, dts_out); - sendRtmp(pkt->typeId, pkt->streamId, pkt, dts_out, pkt->chunkId); + _stamp[pkt->type_id % 2].revise(pkt->time_stamp, 0, dts_out, dts_out); + sendRtmp(pkt->type_id, pkt->stream_index, pkt, dts_out, pkt->chunk_id); } bool RtmpSession::close(MediaSource &sender,bool force) { //此回调在其他线程触发 - if(!_pPublisherSrc || (!force && _pPublisherSrc->totalReaderCount())){ + if(!_publisher_src || (!force && _publisher_src->totalReaderCount())){ return false; } string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; @@ -537,12 +541,12 @@ bool RtmpSession::close(MediaSource &sender,bool force) { } int RtmpSession::totalReaderCount(MediaSource &sender) { - return _pPublisherSrc ? _pPublisherSrc->totalReaderCount() : sender.readerCount(); + return _publisher_src ? _publisher_src->totalReaderCount() : sender.readerCount(); } void RtmpSession::setSocketFlags(){ - GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS); - if(mergeWriteMS > 0) { + GET_CONFIG(int, merge_write_ms, General::kMergeWriteMS); + if (merge_write_ms > 0) { //推流模式下,关闭TCP_NODELAY会增加推流端的延时,但是服务器性能将提高 SockUtil::setNoDelay(_sock->rawFD(), false); //播放模式下,开启MSG_MORE会增加延时,但是能提高发送性能 @@ -551,14 +555,14 @@ void RtmpSession::setSocketFlags(){ } void RtmpSession::dumpMetadata(const AMFValue &metadata) { - if(metadata.type() != AMF_OBJECT && metadata.type() != AMF_ECMA_ARRAY){ + if (metadata.type() != AMF_OBJECT && metadata.type() != AMF_ECMA_ARRAY) { WarnL << "invalid metadata type:" << metadata.type(); - return ; + return; } _StrPrinter printer; - metadata.object_for_each([&](const string &key, const AMFValue &val){ - printer << "\r\n" << key << "\t:" << val.to_string() ; + metadata.object_for_each([&](const string &key, const AMFValue &val) { + printer << "\r\n" << key << "\t:" << val.to_string(); }); - InfoL << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid << (string)printer; + InfoL << _media_info._vhost << " " << _media_info._app << " " << _media_info._streamid << (string) printer; } } /* namespace mediakit */ diff --git a/src/Rtmp/RtmpSession.h b/src/Rtmp/RtmpSession.h index f88d2eb6..860422f3 100644 --- a/src/Rtmp/RtmpSession.h +++ b/src/Rtmp/RtmpSession.h @@ -30,11 +30,13 @@ namespace mediakit { class RtmpSession: public TcpSession ,public RtmpProtocol , public MediaSourceEvent{ public: typedef std::shared_ptr Ptr; - RtmpSession(const Socket::Ptr &_sock); - virtual ~RtmpSession(); - void onRecv(const Buffer::Ptr &pBuf) override; + RtmpSession(const Socket::Ptr &sock); + ~RtmpSession() override; + + void onRecv(const Buffer::Ptr &buf) override; void onError(const SockException &err) override; void onManager() override; + private: void onProcessCmd(AMFDecoder &dec); void onCmd_connect(AMFDecoder &dec); @@ -55,15 +57,15 @@ private: void onSendMedia(const RtmpPacket::Ptr &pkt); void onSendRawData(const Buffer::Ptr &buffer) override{ - _ui64TotalBytes += buffer->size(); + _total_bytes += buffer->size(); send(buffer); } - void onRtmpChunk(RtmpPacket &chunkData) override; + void onRtmpChunk(RtmpPacket &chunk_data) override; template inline void sendReply(const char *str, const first &reply, const second &status) { AMFEncoder invoke; - invoke << str << _dNowReqID << reply << status; + invoke << str << _recv_req_id << reply << status; sendResponse(MSG_CMD, invoke.data()); } @@ -74,29 +76,30 @@ private: void setSocketFlags(); string getStreamId(const string &str); void dumpMetadata(const AMFValue &metadata); + private: - std::string _strTcUrl; - MediaInfo _mediaInfo; - double _dNowReqID = 0; + bool _paused = false; bool _set_meta_data = false; - Ticker _ticker;//数据接收时间 - RtmpMediaSource::RingType::RingReader::Ptr _pRingReader; - std::shared_ptr _pPublisherSrc; - std::weak_ptr _pPlayerSrc; + double _recv_req_id = 0; + //消耗的总流量 + uint64_t _total_bytes = 0; + + std::string _tc_url; //时间戳修整器 Stamp _stamp[2]; - //消耗的总流量 - uint64_t _ui64TotalBytes = 0; - bool _paused = false; + //数据接收超时计时器 + Ticker _ticker; + MediaInfo _media_info; + std::weak_ptr _player_src; + std::shared_ptr _publisher_src; + RtmpMediaSource::RingType::RingReader::Ptr _ring_reader; }; - /** * 支持ssl加密的rtmp服务器 */ typedef TcpSessionWithSSL RtmpSessionWithSSL; } /* namespace mediakit */ - #endif /* SRC_RTMP_RTMPSESSION_H_ */ diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index f1fa3a9e..6c960cb9 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -733,28 +733,12 @@ void RtspPlayer::onRecvRTP_l(const RtpPacket::Ptr &rtp, const SdpTrack::Ptr &tra } void RtspPlayer::onPlayResult_l(const SockException &ex , bool handshakeCompleted) { - WarnL << ex.getErrCode() << " " << ex.what(); - - if(!ex){ - //播放成功,恢复rtp接收超时定时器 - _rtp_recv_ticker.resetTime(); - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - int timeoutMS = (*this)[kMediaTimeoutMS].as(); - //创建rtp数据接收超时检测定时器 - _rtp_check_timer.reset(new Timer(timeoutMS / 2000.0, [weakSelf,timeoutMS]() { - auto strongSelf=weakSelf.lock(); - if(!strongSelf) { - return false; - } - if(strongSelf->_rtp_recv_ticker.elapsedTime() > timeoutMS) { - //接收rtp媒体数据包超时 - strongSelf->onPlayResult_l(SockException(Err_timeout,"receive rtp timeout"), true); - return false; - } - return true; - }, getPoller())); + if (ex.getErrCode() == Err_shutdown) { + //主动shutdown的,不触发回调 + return; } + WarnL << ex.getErrCode() << " " << ex.what(); if (!handshakeCompleted) { //开始播放阶段 _play_check_timer.reset(); @@ -769,7 +753,26 @@ void RtspPlayer::onPlayResult_l(const SockException &ex , bool handshakeComplete onResume(); } - if(ex){ + if (!ex) { + //播放成功,恢复rtp接收超时定时器 + _rtp_recv_ticker.resetTime(); + int timeoutMS = (*this)[kMediaTimeoutMS].as(); + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + auto lam = [weakSelf, timeoutMS]() { + auto strongSelf = weakSelf.lock(); + if (!strongSelf) { + return false; + } + if (strongSelf->_rtp_recv_ticker.elapsedTime() > timeoutMS) { + //接收rtp媒体数据包超时 + strongSelf->onPlayResult_l(SockException(Err_timeout, "receive rtp timeout"), true); + return false; + } + return true; + }; + //创建rtp数据接收超时检测定时器 + _rtp_check_timer = std::make_shared(timeoutMS / 2000.0, lam, getPoller()); + } else { teardown(); } } diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index c0b49472..9eef2b26 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -1123,7 +1123,7 @@ inline void RtspSession::onSendRtpPacket(const RtpPacket::Ptr &pkt){ //send rtcp every 5 second ticker.resetTime(); //直接保存网络字节序 - memcpy(&counter.timeStamp, pkt->data() + 8, 4); + memcpy(&counter.time_stamp, pkt->data() + 8, 4); sendSenderReport(_rtp_type == Rtsp::RTP_TCP, track_index); } #endif