diff --git a/src/Rtmp/RtmpPlayer.cpp b/src/Rtmp/RtmpPlayer.cpp index ceddaa01..e0185ba3 100644 --- a/src/Rtmp/RtmpPlayer.cpp +++ b/src/Rtmp/RtmpPlayer.cpp @@ -44,11 +44,16 @@ void RtmpPlayer::teardown() { _deque_on_status.clear(); } -void RtmpPlayer::play(const string &strUrl) { +void RtmpPlayer::play(const string &url) { teardown(); - string host_url = FindField(strUrl.data(), "://", "/"); - _app = FindField(strUrl.data(), (host_url + "/").data(), "/"); - _stream_id = FindField(strUrl.data(), (host_url + "/" + _app + "/").data(), NULL); + string host_url = FindField(url.data(), "://", "/"); + { + auto pos = url.find_last_of('/'); + if (pos != string::npos) { + _stream_id = url.substr(pos + 1); + } + } + _app = FindField(url.data(), (host_url + "/").data(), ("/" + _stream_id).data()); _tc_url = string("rtmp://") + host_url + "/" + _app; if (!_app.size() || !_stream_id.size()) { @@ -109,16 +114,16 @@ void RtmpPlayer::onPlayResult_l(const SockException &ex, bool handshake_done) { //播放成功,恢复rtmp接收超时定时器 _rtmp_recv_ticker.resetTime(); auto timeout_ms = (*this)[Client::kMediaTimeoutMS].as(); - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - auto lam = [weakSelf, timeout_ms]() { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + auto lam = [weak_self, timeout_ms]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { return false; } - if (strongSelf->_rtmp_recv_ticker.elapsedTime() > timeout_ms) { + if (strong_self->_rtmp_recv_ticker.elapsedTime() > timeout_ms) { //接收rtmp媒体数据超时 SockException ex(Err_timeout, "receive rtmp timeout"); - strongSelf->onPlayResult_l(ex, true); + strong_self->onPlayResult_l(ex, true); return false; } return true; @@ -130,19 +135,17 @@ void RtmpPlayer::onPlayResult_l(const SockException &ex, bool handshake_done) { } } -void RtmpPlayer::onConnect(const SockException &err){ +void RtmpPlayer::onConnect(const SockException &err) { if (err.getErrCode() != Err_success) { onPlayResult_l(err, false); return; } - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - startClientSession([weakSelf]() { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { - return; + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + startClientSession([weak_self]() { + if (auto strong_self = weak_self.lock()) { + strong_self->send_connect(); } - strongSelf->send_connect(); - }); + },_app.find("vod") != 0); // 实测发现vod点播时,使用复杂握手fms无响应:issue #2007 } void RtmpPlayer::onRecv(const Buffer::Ptr &buf){ @@ -249,14 +252,14 @@ inline void RtmpPlayer::send_pause(bool pause) { _beat_timer.reset(); if (pause) { - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - _beat_timer.reset(new Timer((*this)[Client::kBeatIntervalMS].as() / 1000.0f, [weakSelf]() { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + _beat_timer.reset(new Timer((*this)[Client::kBeatIntervalMS].as() / 1000.0f, [weak_self]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { return false; } uint32_t timeStamp = (uint32_t)::time(NULL); - strongSelf->sendUserControl(CONTROL_PING_REQUEST, timeStamp); + strong_self->sendUserControl(CONTROL_PING_REQUEST, timeStamp); return true; }, getPoller())); } diff --git a/src/Rtmp/RtmpProtocol.cpp b/src/Rtmp/RtmpProtocol.cpp index ee391b61..dc536c03 100644 --- a/src/Rtmp/RtmpProtocol.cpp +++ b/src/Rtmp/RtmpProtocol.cpp @@ -288,12 +288,14 @@ const char *RtmpProtocol::onSearchPacketTail(const char *data,size_t len){ } ////for client//// -void RtmpProtocol::startClientSession(const function &func) { +void RtmpProtocol::startClientSession(const function &func, bool complex) { //发送 C0C1 char handshake_head = HANDSHAKE_PLAINTEXT; onSendRawData(obtainBuffer(&handshake_head, 1)); RtmpHandshake c1(0); - c1.create_complex_c0c1(); + if (complex) { + c1.create_complex_c0c1(); + } onSendRawData(obtainBuffer((char *) (&c1), sizeof(c1))); _next_step_func = [this, func](const char *data, size_t len) { //等待 S0+S1+S2 @@ -754,7 +756,8 @@ void RtmpProtocol::handle_chunk(RtmpPacket::Ptr packet) { case MSG_WIN_SIZE: { //如果窗口太小,会导致发送sendAcknowledgement时无限递归:https://github.com/ZLMediaKit/ZLMediaKit/issues/1839 - _windows_size = max(load_be32(&chunk_data.buffer[0]), 32 * 1024U); + //窗口太大,也可能导致fms服务器认为播放器心跳超时 + _windows_size = min(max(load_be32(&chunk_data.buffer[0]), 32 * 1024U), 1280 * 1024U); TraceL << "MSG_WIN_SIZE:" << _windows_size; break; } @@ -806,7 +809,15 @@ void RtmpProtocol::handle_chunk(RtmpPacket::Ptr packet) { break; } - default: onRtmpChunk(std::move(packet)); break; + default: { + _bytes_recv += packet->size(); + if (_windows_size > 0 && _bytes_recv - _bytes_recv_last >= _windows_size) { + _bytes_recv_last = _bytes_recv; + sendAcknowledgement(_bytes_recv); + } + onRtmpChunk(std::move(packet)); + break; + } } } diff --git a/src/Rtmp/RtmpProtocol.h b/src/Rtmp/RtmpProtocol.h index ccd0e4c4..c94a6c36 100644 --- a/src/Rtmp/RtmpProtocol.h +++ b/src/Rtmp/RtmpProtocol.h @@ -32,7 +32,7 @@ public: void onParseRtmp(const char *data, size_t size); //作为客户端发送c0c1,等待s0s1s2并且回调 - void startClientSession(const std::function &cb); + void startClientSession(const std::function &cb, bool complex = true); protected: virtual void onSendRawData(toolkit::Buffer::Ptr buffer) = 0; @@ -94,8 +94,10 @@ private: size_t _chunk_size_in = DEFAULT_CHUNK_LEN; size_t _chunk_size_out = DEFAULT_CHUNK_LEN; ////////////Acknowledgement//////////// - uint32_t _bytes_sent = 0; - uint32_t _bytes_sent_last = 0; + uint64_t _bytes_sent = 0; + uint64_t _bytes_sent_last = 0; + uint64_t _bytes_recv = 0; + uint64_t _bytes_recv_last = 0; uint32_t _windows_size = 0; ///////////PeerBandwidth/////////// uint32_t _bandwidth = 2500000;