From 27bc19dd645c6be0ae2936ff3269f0a375779924 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Tue, 30 Oct 2018 10:31:27 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E5=86=B3rtsp=E6=92=AD=E6=94=BE?= =?UTF-8?q?=E5=99=A8=E7=B2=98=E5=8C=85=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Rtsp/RtspPlayer.cpp | 167 ++++++++------------------------------ src/Rtsp/RtspPlayer.h | 24 ++++-- src/Rtsp/RtspSplitter.cpp | 82 +++++++++++++++++++ src/Rtsp/RtspSplitter.h | 72 ++++++++++++++++ 4 files changed, 203 insertions(+), 142 deletions(-) create mode 100644 src/Rtsp/RtspSplitter.cpp create mode 100644 src/Rtsp/RtspSplitter.h diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index 18bc0bf1..e90cf3cb 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -55,11 +55,7 @@ RtspPlayer::RtspPlayer(void){ _pktPool.setSize(64); } RtspPlayer::~RtspPlayer(void) { - teardown(); - if (_pucRtpBuf) { - delete[] _pucRtpBuf; - _pucRtpBuf = nullptr; - } + RtspPlayer::teardown(); DebugL<data(); - int size = pBuf->size(); - if (_onHandshake) { - //rtsp回复 - int offset = 0; - while(offset < size - 4){ - char *pos = (char *)memchr(buf + offset, 'R', size - offset); - if(pos == NULL){ - break; - } - if(memcmp(pos, "RTSP", 4) == 0){ - try { - pos += onProcess(pos); - } catch (std::exception &err) { - SockException ex(Err_other, err.what()); - onPlayResult_l(ex); - onShutdown_l(ex); - teardown(); - return; - } - }else{ - pos += 1; - } - offset = pos - buf; - } - } - - if (_eType == RTP_TCP && _pucRtpBuf) { - //RTP data - while (size > 0) { - int added = RTP_BUF_SIZE - _uiRtpBufLen; - added = (added > size ? size : added); - memcpy(_pucRtpBuf + _uiRtpBufLen, buf, added); - _uiRtpBufLen += added; - size -= added; - buf += added; - splitRtp(_pucRtpBuf, _uiRtpBufLen); - } - } + input(pBuf->data(),pBuf->size()); } void RtspPlayer::onErr(const SockException &ex) { onShutdown_l (ex); @@ -260,7 +215,6 @@ void RtspPlayer::handleResDESCRIBE(const Parser& parser) { throw std::runtime_error( StrPrinter << "DESCRIBE:" << parser.Url() << " " << parser.Tail() << endl); } - auto strSdp = parser.Content(); _strContentBase = parser["Content-Base"]; if(_strContentBase.empty()){ @@ -270,19 +224,14 @@ void RtspPlayer::handleResDESCRIBE(const Parser& parser) { _strContentBase.pop_back(); } - auto iLen = atoi(parser["Content-Length"].data()); - if(iLen > 0){ - strSdp.erase(iLen); - } - //解析sdp - _sdpAttr.load(strSdp); + _sdpAttr.load(parser.Content()); _aTrackInfo = _sdpAttr.getAvailableTrack(); if (_aTrackInfo.empty()) { throw std::runtime_error("无有效的Sdp Track"); } - if (!onCheckSDP(strSdp, _sdpAttr)) { + if (!onCheckSDP(parser.Content(), _sdpAttr)) { throw std::runtime_error("onCheckSDP faied"); } @@ -346,6 +295,8 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) _eType = RTP_UDP; } + RtspSplitter::enableRecvRtp(_eType == RTP_TCP); + if(_eType == RTP_TCP) { string interleaved = FindField( FindField((strTransport + ";").c_str(), "interleaved=", ";").c_str(), NULL, "-"); _aTrackInfo[uiTrackIndex]->_interleaved = atoi(interleaved.c_str()); @@ -416,6 +367,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) bool RtspPlayer::sendOptions() { _onHandshake = [](const Parser& parser){ +// DebugL << "options response"; return true; }; return sendRtspRequest("OPTIONS",_strContentBase); @@ -494,88 +446,37 @@ void RtspPlayer::handleResPAUSE(const Parser& parser, bool bPause) { } } -int RtspPlayer::onProcess(const char* pcBuf) { - auto strRtsp = FindField(pcBuf, "RTSP", "\r\n\r\n"); - if(strRtsp.empty()){ - return 4; +void RtspPlayer::onWholeRtspPacket(Parser &parser) { + try { + decltype(_onHandshake) fun; + _onHandshake.swap(fun); + if(fun){ + fun(parser); + } + parser.Clear(); + } catch (std::exception &err) { + SockException ex(Err_other, err.what()); + onPlayResult_l(ex); + onShutdown_l(ex); + teardown(); } - - strRtsp = string("RTSP") + strRtsp + "\r\n\r\n"; - Parser parser; - parser.Parse(strRtsp.data()); - int iLen = 0; - if (parser.Url() == "200") { - iLen = atoi(parser["Content-Length"].data()); - if (iLen) { - string strContent(pcBuf + strRtsp.size(), iLen); - parser.setContent(strContent); - } - } - auto fun = _onHandshake; - _onHandshake = nullptr; - if(fun){ - fun(parser); - } - parser.Clear(); - return strRtsp.size() + iLen; } - -void RtspPlayer::splitRtp(unsigned char* pucRtp, unsigned int uiLen) { - unsigned char* rtp_ptr = pucRtp; - while (uiLen >= 4) { - if (rtp_ptr[0] == '$') { - //通道0 - uint8_t interleaved = rtp_ptr[1]; - uint16_t length = (rtp_ptr[2] << 8) | rtp_ptr[3]; - if (length > 1600) { - //没有大于MTU的包 - //WarnL << "没有大于MTU的包:" << length; - rtp_ptr += 1; - uiLen -= 1; - continue; - } - if ((unsigned int) length + 4 + 4 > uiLen) { - //buf 太小,还没到该RTP包的结尾 - break; - } - auto nextPkt = rtp_ptr + length + 4; - if (*nextPkt != '$' && memcmp(nextPkt,"RTSP",4)!=0 ) { - //没有找到该包的尾部 - //WarnL << "没有找到该包的尾部"; - rtp_ptr += 1; - uiLen -= 1; - continue; - } - int trackIdx = -1; - if(interleaved %2 ==0){ - trackIdx = getTrackIndexByInterleaved(interleaved); - } - if (trackIdx != -1) { - handleOneRtp(trackIdx, rtp_ptr + 4, length); - } - rtp_ptr += (length + 4); - uiLen -= (length + 4); - continue; - } - unsigned char *pos = (unsigned char *) memchr(rtp_ptr + 1, '$', uiLen - 1); - if (pos == NULL) { - //缓存里面没有任何RTP包 - //WarnL << "缓存里面没有任何RTP包"; - uiLen = 0; - break; - } - //有RTP包起始头 - uiLen -= (pos - rtp_ptr); - rtp_ptr = pos; - } - _uiRtpBufLen = uiLen; - if (rtp_ptr != pucRtp) { - memmove(pucRtp, rtp_ptr, uiLen); - } +void RtspPlayer::onRtpPacket(const char *data, uint64_t len) { + if(len > 1600){ + //没有大于MTU的包 + return; + } + int trackIdx = -1; + uint8_t interleaved = data[1]; + if(interleaved %2 ==0){ + trackIdx = getTrackIndexByInterleaved(interleaved); + } + if (trackIdx != -1) { + handleOneRtp(trackIdx, (unsigned char *)data + 4, len - 4); + } } - # define AV_RB16(x) \ ((((const uint8_t*)(x))[0] << 8) | \ ((const uint8_t*)(x))[1]) diff --git a/src/Rtsp/RtspPlayer.h b/src/Rtsp/RtspPlayer.h index 54dd5b3c..cee1ffd3 100644 --- a/src/Rtsp/RtspPlayer.h +++ b/src/Rtsp/RtspPlayer.h @@ -39,6 +39,7 @@ #include "Poller/Timer.h" #include "Network/Socket.h" #include "Network/TcpClient.h" +#include "RtspSplitter.h" using namespace std; using namespace toolkit; @@ -46,7 +47,7 @@ using namespace toolkit; namespace mediakit { //实现了rtsp播放器协议部分的功能 -class RtspPlayer: public PlayerBase,public TcpClient { +class RtspPlayer: public PlayerBase,public TcpClient, public RtspSplitter { public: typedef std::shared_ptr Ptr; @@ -62,6 +63,19 @@ protected: virtual void onRecvRTP(const RtpPacket::Ptr &pRtppt, const SdpTrack::Ptr &track) = 0; uint32_t getProgressMilliSecond() const; void seekToMilliSecond(uint32_t ms); + + /** + * 收到完整的rtsp包回调,包括sdp等content数据 + * @param parser rtsp包 + */ + void onWholeRtspPacket(Parser &parser) override ; + + /** + * 收到rtp包回调 + * @param data + * @param len + */ + void onRtpPacket(const char *data,uint64_t len) override ; private: void onShutdown_l(const SockException &ex); void onRecvRTP_l(const RtpPacket::Ptr &pRtppt, int iTrackidx); @@ -81,11 +95,6 @@ private: void handleResDESCRIBE(const Parser &parser); bool handleAuthenticationFailure(const string &wwwAuthenticateParamsStr); void handleResPAUSE(const Parser &parser, bool bPause); - - //发数据给服务器 - int onProcess(const char* strBuf); - //生成rtp包结构体 - void splitRtp(unsigned char *pucData, unsigned int uiLen); //处理一个rtp包 bool handleOneRtp(int iTrackidx, unsigned char *ucData, unsigned int uiLen); @@ -103,9 +112,6 @@ private: function _onHandshake; RtspMediaSource::PoolType _pktPool; - - uint8_t *_pucRtpBuf = nullptr; - unsigned int _uiRtpBufLen = 0; Socket::Ptr _apUdpSock[2]; //rtsp info string _strSession; diff --git a/src/Rtsp/RtspSplitter.cpp b/src/Rtsp/RtspSplitter.cpp new file mode 100644 index 00000000..05212514 --- /dev/null +++ b/src/Rtsp/RtspSplitter.cpp @@ -0,0 +1,82 @@ +/* + * MIT License + * + * Copyright (c) 2016 xiongziliang <771730766@qq.com> + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include "RtspSplitter.h" + +namespace mediakit{ + +const char *RtspSplitter::onSearchPacketTail(const char *data, int len) { + if(!_enableRecvRtp){ + _isRtpPacket = false; + return HttpRequestSplitter::onSearchPacketTail(data, len); + } + if(data[0] != '$'){ + //这是rtsp包 + _isRtpPacket = false; + return HttpRequestSplitter::onSearchPacketTail(data, len); + } + //这是rtp包 + if(len < 4){ + //数据不够 + return nullptr; + } + uint16_t length = (((uint8_t *)data)[2] << 8) | ((uint8_t *)data)[3]; + if(len < length + 4){ + //数据不够 + return nullptr; + } + //返回rtp包末尾 + _isRtpPacket = true; + return data + 4 + length; +} + +int64_t RtspSplitter::onRecvHeader(const char *data, uint64_t len) { + if(_isRtpPacket){ + onRtpPacket(data,len); + return 0; + } + _parser.Parse(data); + auto ret = atoi(_parser["Content-Length"].data()); + if(ret == 0){ + onWholeRtspPacket(_parser); + } + return ret; +} + +void RtspSplitter::onRecvContent(const char *data, uint64_t len) { + _parser.setContent(string(data,len)); + onWholeRtspPacket(_parser); +} + +void RtspSplitter::enableRecvRtp(bool enable) { + _enableRecvRtp = enable; +} + + +}//namespace mediakit + + + diff --git a/src/Rtsp/RtspSplitter.h b/src/Rtsp/RtspSplitter.h new file mode 100644 index 00000000..9e0d9028 --- /dev/null +++ b/src/Rtsp/RtspSplitter.h @@ -0,0 +1,72 @@ +/* + * MIT License + * + * Copyright (c) 2016 xiongziliang <771730766@qq.com> + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#ifndef ZLMEDIAKIT_RTSPSPLITTER_H +#define ZLMEDIAKIT_RTSPSPLITTER_H + +#include "Rtsp.h" +#include "Http/HttpRequestSplitter.h" + +namespace mediakit{ + +class RtspSplitter : public HttpRequestSplitter{ +public: + RtspSplitter(){} + virtual ~RtspSplitter(){} +protected: + /** + * 收到完整的rtsp包回调,包括sdp等content数据 + * @param parser rtsp包 + */ + virtual void onWholeRtspPacket(Parser &parser) = 0; + + /** + * 收到rtp包回调 + * @param data + * @param len + */ + virtual void onRtpPacket(const char *data,uint64_t len) = 0; + + /** + * 是否允许接收rtp包 + * @param enable + */ + void enableRecvRtp(bool enable); +protected: + const char *onSearchPacketTail(const char *data,int len) override ; + int64_t onRecvHeader(const char *data,uint64_t len) override; + void onRecvContent(const char *data,uint64_t len) override; +private: + bool _enableRecvRtp = false; + bool _isRtpPacket = false; + Parser _parser; +}; + +}//namespace mediakit + + + +#endif //ZLMEDIAKIT_RTSPSPLITTER_H