From 9a088f48256603e3bf01e9c6cc9c8a49dd7da269 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Sat, 12 Sep 2020 18:53:51 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E9=AB=98PSRtpSender=E5=AF=B9=E8=B1=A1?= =?UTF-8?q?=E7=9A=84=E7=BA=BF=E7=A8=8B=E5=AE=89=E5=85=A8=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Rtp/PSRtpSender.cpp | 91 +++++++++++++++++++++++++---------------- src/Rtp/PSRtpSender.h | 6 ++- 2 files changed, 60 insertions(+), 37 deletions(-) diff --git a/src/Rtp/PSRtpSender.cpp b/src/Rtp/PSRtpSender.cpp index b88e76dc..fe97b9a3 100644 --- a/src/Rtp/PSRtpSender.cpp +++ b/src/Rtp/PSRtpSender.cpp @@ -29,34 +29,36 @@ PSRtpSender::~PSRtpSender() { InfoL << this << " " << printSSRC(_rtp_encoder->getSsrc()); } -void PSRtpSender::onPS(uint32_t stamp, void *packet, size_t bytes) { - //此函数在其他线程执行 - _rtp_encoder->inputFrame(std::make_shared((char *) packet, bytes, stamp)); -} - void PSRtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, const function &cb){ _is_udp = is_udp; - //确保Socket对象的线程安全 - _socket = std::make_shared(_poller, true); + _socket = std::make_shared(_poller, false); _dst_url = dst_url; _dst_port = dst_port; weak_ptr weak_self = shared_from_this(); if (is_udp) { _socket->bindUdpSock(0); - WorkThreadPool::Instance().getPoller()->async([cb, dst_url, dst_port, weak_self]() { - //切换线程目的是为了dns解析放在后台线程执行 + auto poller = _poller; + WorkThreadPool::Instance().getPoller()->async([cb, dst_url, dst_port, weak_self, poller]() { struct sockaddr addr; + //切换线程目的是为了dns解析放在后台线程执行 if (!SockUtil::getDomainIP(dst_url.data(), dst_port, addr)) { - cb(SockException(Err_dns, StrPrinter << "dns解析域名失败:" << dst_url)); + poller->async([dst_url, cb]() { + //切回自己的线程 + cb(SockException(Err_dns, StrPrinter << "dns解析域名失败:" << dst_url)); + }); return; } - cb(SockException()); - auto strong_self = weak_self.lock(); - if (strong_self) { - //dns解析成功 - strong_self->_socket->setSendPeerAddr(&addr); - strong_self->onConnect(); - } + + //dns解析成功 + poller->async([addr, weak_self, cb]() { + //切回自己的线程 + cb(SockException()); + auto strong_self = weak_self.lock(); + if (strong_self) { + strong_self->_socket->setSendPeerAddr(&addr); + strong_self->onConnect(); + } + }); }); } else { _socket->connect(dst_url, dst_port, [cb, weak_self](const SockException &err) { @@ -74,7 +76,7 @@ void PSRtpSender::onConnect(){ _is_connect = true; //加大发送缓存,防止udp丢包之类的问题 SockUtil::setSendBuf(_socket->rawFD(), 4 * 1024 * 1024); - if(!_is_udp){ + if (!_is_udp) { //关闭tcp no_delay并开启MSG_MORE, 提高发送性能 SockUtil::setNoDelay(_socket->rawFD(), false); _socket->setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); @@ -90,28 +92,46 @@ void PSRtpSender::onConnect(){ InfoL << "开始发送 ps rtp:" << _socket->get_peer_ip() << ":" << _socket->get_peer_port() << ", 是否为udp方式:" << _is_udp; } -void PSRtpSender::onRtp(const RtpPacket::Ptr &rtp, bool) { - //此函数在其他线程执行 - if(!_is_connect){ - return; +//此函数在其他线程执行 +void PSRtpSender::inputFrame(const Frame::Ptr &frame) { + if (_is_connect) { + //连接成功后才做实质操作(节省cpu资源) + PSEncoder::inputFrame(frame); } +} +//此函数在其他线程执行 +void PSRtpSender::onPS(uint32_t stamp, void *packet, size_t bytes) { + _rtp_encoder->inputFrame(std::make_shared((char *) packet, bytes, stamp)); +} + +//此函数在其他线程执行 +void PSRtpSender::onRtp(const RtpPacket::Ptr &rtp, bool) { //开启合并写提高发送性能 PacketCache::inputPacket(true, rtp, false); } -void PSRtpSender::onFlush(shared_ptr> &rtp_list, bool key_pos) { - //此函数在其他线程执行 - int i = 0; - int size = rtp_list->size(); - rtp_list->for_each([&](const RtpPacket::Ptr &packet) { - if (_is_udp) { - //udp模式,rtp over tcp前4个字节可以忽略 - _socket->send(std::make_shared(packet, 4), nullptr, 0, ++i == size); - } else { - //tcp模式, rtp over tcp前2个字节可以忽略,只保留后续rtp长度的2个字节 - _socket->send(std::make_shared(packet, 2), nullptr, 0, ++i == size); - } +//此函数在其他线程执行 +void PSRtpSender::onFlush(shared_ptr> &rtp_list, bool) { + if(!_is_connect){ + //连接成功后才能发送数据 + return; + } + + auto is_udp = _is_udp; + auto socket = _socket; + _poller->async([rtp_list, is_udp, socket]() { + int i = 0; + int size = rtp_list->size(); + rtp_list->for_each([&](const RtpPacket::Ptr &packet) { + if (is_udp) { + //udp模式,rtp over tcp前4个字节可以忽略 + socket->send(std::make_shared(packet, 4), nullptr, 0, ++i == size); + } else { + //tcp模式, rtp over tcp前2个字节可以忽略,只保留后续rtp长度的2个字节 + socket->send(std::make_shared(packet, 2), nullptr, 0, ++i == size); + } + }); }); } @@ -142,5 +162,4 @@ void PSRtpSender::onErr(const SockException &ex, bool is_connect) { }, _poller); } -}//namespace mediakit - +}//namespace mediakit \ No newline at end of file diff --git a/src/Rtp/PSRtpSender.h b/src/Rtp/PSRtpSender.h index 3f20f143..9cb9a653 100644 --- a/src/Rtp/PSRtpSender.h +++ b/src/Rtp/PSRtpSender.h @@ -54,6 +54,11 @@ public: */ void startSend(const string &dst_url, uint16_t dst_port, bool is_udp, const function &cb); + /** + * 输入帧数据 + */ + void inputFrame(const Frame::Ptr &frame) override; + protected: //mpeg-ps回调 void onPS(uint32_t stamp, void *packet, size_t bytes) override; @@ -65,7 +70,6 @@ protected: */ void onFlush(std::shared_ptr > &rtp_list, bool key_pos) override; - private: //rtp打包后回调 void onRtp(const RtpPacket::Ptr &in, bool is_key);