diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index c3acf2bd..fe572323 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit c3acf2bd7fff96651da4f77d47e7e0aeb728e5d0 +Subproject commit fe572323b10d72819a4d69b326dd70e73c7bf1a6 diff --git a/3rdpart/media-server b/3rdpart/media-server index 0e726dd4..e399b938 160000 --- a/3rdpart/media-server +++ b/3rdpart/media-server @@ -1 +1 @@ -Subproject commit 0e726dd4e06ab4ed3723deaf3f73386e100bb10d +Subproject commit e399b93802610dcf574ff64bcb7677572cd028c1 diff --git a/Android/app/src/main/cpp/test_server.cpp b/Android/app/src/main/cpp/test_server.cpp index 5c17b097..2867919b 100644 --- a/Android/app/src/main/cpp/test_server.cpp +++ b/Android/app/src/main/cpp/test_server.cpp @@ -102,112 +102,158 @@ static onceToken token1([](){ #define REALM "realm_zlmedaikit" static map s_mapFlvRecorder; static mutex s_mtxFlvRecorder; -static onceToken s_token([](){ - //监听kBroadcastOnGetRtspRealm事件决定rtsp链接是否需要鉴权(传统的rtsp鉴权方案)才能访问 - NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastOnGetRtspRealm,[](BroadcastOnGetRtspRealmArgs){ - DebugL << "RTSP是否需要鉴权事件:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " " << args._param_strs ; - if(string("1") == args._streamid ){ - // live/1需要认证 - //该流需要认证,并且设置realm - invoker(REALM); - }else{ - //有时我们要查询redis或数据库来判断该流是否需要认证,通过invoker的方式可以做到完全异步 - //该流我们不需要认证 - invoker(""); - } - }); - //监听kBroadcastOnRtspAuth事件返回正确的rtsp鉴权用户密码 - NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastOnRtspAuth,[](BroadcastOnRtspAuthArgs){ - DebugL << "RTSP播放鉴权:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " " << args._param_strs ; - DebugL << "RTSP用户:" << user_name << (must_no_encrypt ? " Base64" : " MD5" )<< " 方式登录"; - string user = user_name; - //假设我们异步读取数据库 - if(user == "test0"){ - //假设数据库保存的是明文 - invoker(false,"pwd0"); - return; - } +static void initEvent() { + static onceToken s_token([]() { + //监听kBroadcastOnGetRtspRealm事件决定rtsp链接是否需要鉴权(传统的rtsp鉴权方案)才能访问 + NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastOnGetRtspRealm, + [](BroadcastOnGetRtspRealmArgs) { + DebugL << "RTSP是否需要鉴权事件:" << args._schema << " " + << args._vhost << " " << args._app << " " + << args._streamid << " " + << args._param_strs; + if (string("1") == args._streamid) { + // live/1需要认证 + //该流需要认证,并且设置realm + invoker(REALM); + } else { + //有时我们要查询redis或数据库来判断该流是否需要认证,通过invoker的方式可以做到完全异步 + //该流我们不需要认证 + invoker(""); + } + }); - if(user == "test1"){ - //假设数据库保存的是密文 - auto encrypted_pwd = MD5(user + ":" + REALM + ":" + "pwd1").hexdigest(); - invoker(true,encrypted_pwd); - return; - } - if(user == "test2" && must_no_encrypt){ - //假设登录的是test2,并且以base64方式登录,此时我们提供加密密码,那么会导致认证失败 - //可以通过这个方式屏蔽base64这种不安全的加密方式 - invoker(true,"pwd2"); - return; - } + //监听kBroadcastOnRtspAuth事件返回正确的rtsp鉴权用户密码 + NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastOnRtspAuth, + [](BroadcastOnRtspAuthArgs) { + DebugL << "RTSP播放鉴权:" << args._schema << " " + << args._vhost << " " << args._app << " " + << args._streamid << " " + << args._param_strs; + DebugL << "RTSP用户:" << user_name + << (must_no_encrypt ? " Base64" : " MD5") + << " 方式登录"; + string user = user_name; + //假设我们异步读取数据库 + if (user == "test0") { + //假设数据库保存的是明文 + invoker(false, "pwd0"); + return; + } - //其他用户密码跟用户名一致 - invoker(false,user); - }); + if (user == "test1") { + //假设数据库保存的是密文 + auto encrypted_pwd = MD5( + user + ":" + REALM + ":" + + "pwd1").hexdigest(); + invoker(true, encrypted_pwd); + return; + } + if (user == "test2" && must_no_encrypt) { + //假设登录的是test2,并且以base64方式登录,此时我们提供加密密码,那么会导致认证失败 + //可以通过这个方式屏蔽base64这种不安全的加密方式 + invoker(true, "pwd2"); + return; + } + + //其他用户密码跟用户名一致 + invoker(false, user); + }); - //监听rtsp/rtmp推流事件,返回结果告知是否有推流权限 - NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastMediaPublish,[](BroadcastMediaPublishArgs){ - DebugL << "推流鉴权:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " " << args._param_strs ; - invoker("");//鉴权成功 - //invoker("this is auth failed message");//鉴权失败 - }); + //监听rtsp/rtmp推流事件,返回结果告知是否有推流权限 + NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaPublish, + [](BroadcastMediaPublishArgs) { + DebugL << "推流鉴权:" << args._schema << " " + << args._vhost << " " << args._app << " " + << args._streamid << " " + << args._param_strs; + invoker("");//鉴权成功 + //invoker("this is auth failed message");//鉴权失败 + }); - //监听rtsp/rtsps/rtmp/http-flv播放事件,返回结果告知是否有播放权限(rtsp通过kBroadcastOnRtspAuth或此事件都可以实现鉴权) - NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastMediaPlayed,[](BroadcastMediaPlayedArgs){ - DebugL << "播放鉴权:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " " << args._param_strs ; - invoker("");//鉴权成功 - //invoker("this is auth failed message");//鉴权失败 - }); + //监听rtsp/rtsps/rtmp/http-flv播放事件,返回结果告知是否有播放权限(rtsp通过kBroadcastOnRtspAuth或此事件都可以实现鉴权) + NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaPlayed, + [](BroadcastMediaPlayedArgs) { + DebugL << "播放鉴权:" << args._schema << " " + << args._vhost << " " << args._app << " " + << args._streamid << " " + << args._param_strs; + invoker("");//鉴权成功 + //invoker("this is auth failed message");//鉴权失败 + }); - //shell登录事件,通过shell可以登录进服务器执行一些命令 - NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastShellLogin,[](BroadcastShellLoginArgs){ - DebugL << "shell login:" << user_name << " " << passwd; - invoker("");//鉴权成功 - //invoker("this is auth failed message");//鉴权失败 - }); + //shell登录事件,通过shell可以登录进服务器执行一些命令 + NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastShellLogin, + [](BroadcastShellLoginArgs) { + DebugL << "shell login:" << user_name << " " + << passwd; + invoker("");//鉴权成功 + //invoker("this is auth failed message");//鉴权失败 + }); - //监听rtsp、rtmp源注册或注销事件;此处用于测试rtmp保存为flv录像,保存在http根目录下 - NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastMediaChanged,[](BroadcastMediaChangedArgs){ - if(schema == RTMP_SCHEMA && app == "live"){ - lock_guard lck(s_mtxFlvRecorder); - if(bRegist){ - DebugL << "开始录制RTMP:" << schema << " " << vhost << " " << app << " " << stream; - GET_CONFIG(string,http_root,Http::kRootPath); - auto path = http_root + "/" + vhost + "/" + app + "/" + stream + "_" + to_string(time(NULL)) + ".flv"; - FlvRecorder::Ptr recorder(new FlvRecorder); - try{ - recorder->startRecord(EventPollerPool::Instance().getPoller(),dynamic_pointer_cast(sender.shared_from_this()),path); - s_mapFlvRecorder[vhost + "/" + app + "/" + stream] = recorder; - }catch(std::exception &ex){ - WarnL << ex.what(); - } - }else{ - s_mapFlvRecorder.erase(vhost + "/" + app + "/" + stream); - } - } - }); + //监听rtsp、rtmp源注册或注销事件;此处用于测试rtmp保存为flv录像,保存在http根目录下 + NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaChanged, + [](BroadcastMediaChangedArgs) { + if (schema == RTMP_SCHEMA && app == "live") { + lock_guard lck(s_mtxFlvRecorder); + if (bRegist) { + DebugL << "开始录制RTMP:" << schema << " " + << vhost << " " << app << " " + << stream; + GET_CONFIG(string, http_root, + Http::kRootPath); + auto path = http_root + "/" + vhost + "/" + + app + "/" + stream + "_" + + to_string(time(NULL)) + ".flv"; + FlvRecorder::Ptr recorder(new FlvRecorder); + try { + recorder->startRecord( + EventPollerPool::Instance().getPoller(), + dynamic_pointer_cast( + sender.shared_from_this()), + path); + s_mapFlvRecorder[vhost + "/" + app + + "/" + + stream] = recorder; + } catch (std::exception &ex) { + WarnL << ex.what(); + } + } else { + s_mapFlvRecorder.erase( + vhost + "/" + app + "/" + stream); + } + } + }); - //监听播放失败(未找到特定的流)事件 - NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastNotFoundStream,[](BroadcastNotFoundStreamArgs){ - /** - * 你可以在这个事件触发时再去拉流,这样就可以实现按需拉流 - * 拉流成功后,ZLMediaKit会把其立即转发给播放器(最大等待时间约为5秒,如果5秒都未拉流成功,播放器会播放失败) - */ - DebugL << "未找到流事件:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " " << args._param_strs ; - }); + //监听播放失败(未找到特定的流)事件 + NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastNotFoundStream, + [](BroadcastNotFoundStreamArgs) { + /** + * 你可以在这个事件触发时再去拉流,这样就可以实现按需拉流 + * 拉流成功后,ZLMediaKit会把其立即转发给播放器(最大等待时间约为5秒,如果5秒都未拉流成功,播放器会播放失败) + */ + DebugL << "未找到流事件:" << args._schema << " " + << args._vhost << " " << args._app << " " + << args._streamid << " " + << args._param_strs; + }); - //监听播放或推流结束时消耗流量事件 - NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastFlowReport,[](BroadcastFlowReportArgs){ - DebugL << "播放器(推流器)断开连接事件:" << args._schema << " " << args._vhost << " " << args._app << " " << args._streamid << " " << args._param_strs - << "\r\n使用流量:" << totalBytes << " bytes,连接时长:" << totalDuration << "秒" ; + //监听播放或推流结束时消耗流量事件 + NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastFlowReport, + [](BroadcastFlowReportArgs) { + DebugL << "播放器(推流器)断开连接事件:" << args._schema << " " + << args._vhost << " " << args._app << " " + << args._streamid << " " << args._param_strs + << "\r\n使用流量:" << totalBytes + << " bytes,连接时长:" << totalDuration << "秒"; - }); + }); -}, nullptr); + }, nullptr); +} #if !defined(SIGHUP) #define SIGHUP 1 @@ -296,6 +342,7 @@ static int do_main(string ini_file) { TcpServer::Ptr rtspSSLSrv(new TcpServer()); rtspSSLSrv->start(rtspsPort);//默认322 + initEvent(); //服务器支持动态切换端口(不影响现有连接) NoticeCenter::Instance().addListener(ReloadConfigTag,Broadcast::kBroadcastReloadConfig,[&](BroadcastReloadConfigArgs){ //重新创建服务器 diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index c591a376..5bc42a8d 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -9,7 +9,7 @@ else() file(GLOB MediaServer_src_list ./*.cpp ./*.h) endif() -message(STATUS ${MediaServer_src_list}) +#message(STATUS ${MediaServer_src_list}) add_executable(MediaServer ${MediaServer_src_list}) diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 836d381c..5adac5c8 100644 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -449,6 +449,8 @@ void installWebApi() { const string &app, const string &stream, const string &url, + bool enable_rtsp, + bool enable_rtmp, bool enable_hls, bool enable_mp4, int rtp_type, @@ -461,7 +463,7 @@ void installWebApi() { return; } //添加拉流代理 - PlayerProxy::Ptr player(new PlayerProxy(vhost,app,stream,enable_hls,enable_mp4)); + PlayerProxy::Ptr player(new PlayerProxy(vhost,app,stream,enable_rtsp,enable_rtmp,enable_hls,enable_mp4)); s_proxyMap[key] = player; //指定RTP over TCP(播放rtsp时有效) @@ -484,16 +486,18 @@ void installWebApi() { }; //动态添加rtsp/rtmp拉流代理 - //测试url http://127.0.0.1/index/api/addStreamProxy?vhost=__defaultVhost__&app=proxy&stream=0&url=rtmp://127.0.0.1/live/obs + //测试url http://127.0.0.1/index/api/addStreamProxy?vhost=__defaultVhost__&app=proxy&enable_rtsp=1&enable_rtmp=1&stream=0&url=rtmp://127.0.0.1/live/obs API_REGIST_INVOKER(api,addStreamProxy,{ CHECK_SECRET(); - CHECK_ARGS("vhost","app","stream","url"); + CHECK_ARGS("vhost","app","stream","url","enable_rtsp","enable_rtmp"); addStreamProxy(allArgs["vhost"], allArgs["app"], allArgs["stream"], allArgs["url"], - allArgs["enable_hls"], - allArgs["enable_mp4"], + allArgs["enable_rtsp"],/* 是否rtsp转发 */ + allArgs["enable_rtmp"],/* 是否rtmp转发 */ + allArgs["enable_hls"],/* 是否hls转发 */ + allArgs["enable_mp4"],/* 是否MP4录制 */ allArgs["rtp_type"], [invoker,val,headerOut](const SockException &ex,const string &key){ if(ex){ @@ -612,7 +616,7 @@ void installWebApi() { #if !defined(_WIN32) - API_REGIST_INVOKER(hook,on_stream_not_found,{ + API_REGIST_INVOKER(hook,on_stream_not_found_ffmpeg,{ //媒体未找到事件,我们都及时拉流hks作为替代品,目的是为了测试按需拉流 CHECK_SECRET(); CHECK_ARGS("vhost","app","stream"); @@ -640,8 +644,7 @@ void installWebApi() { invoker("200 OK", headerOut, val.toStyledString()); }); }); - -#else +#endif//!defined(_WIN32) API_REGIST_INVOKER(hook,on_stream_not_found,{ //媒体未找到事件,我们都及时拉流hks作为替代品,目的是为了测试按需拉流 @@ -652,9 +655,11 @@ void installWebApi() { allArgs["app"], allArgs["stream"], /** 支持rtsp和rtmp方式拉流 ,rtsp支持h265/h264/aac,rtmp仅支持h264/aac **/ - "rtsp://184.72.239.149/vod/mp4:BigBuckBunny_115k.mov",//rtmp://live.hkstv.hk.lxdns.com/live/hks2 - false, - false, + "rtsp://184.72.239.149/vod/mp4:BigBuckBunny_115k.mov", + true,/* 开启rtsp转发 */ + true,/* 开启rtmp转发 */ + true,/* 开启hls转发 */ + false,/* 禁用MP4录制 */ 0,//rtp over tcp方式拉流 [invoker,val,headerOut](const SockException &ex,const string &key){ if(ex){ @@ -666,7 +671,6 @@ void installWebApi() { invoker("200 OK", headerOut, val.toStyledString()); }); }); -#endif // !defined(_WIN32) API_REGIST(hook,on_record_mp4,{ //录制mp4分片完毕事件 diff --git a/src/Common/Device.cpp b/src/Common/Device.cpp index 5f450311..d98683e1 100644 --- a/src/Common/Device.cpp +++ b/src/Common/Device.cpp @@ -41,9 +41,11 @@ DevChannel::DevChannel(const string &strVhost, const string &strApp, const string &strId, float fDuration, + bool bEanbleRtsp, + bool bEanbleRtmp, bool bEanbleHls, bool bEnableMp4) : - MultiMediaSourceMuxer(strVhost, strApp, strId, fDuration, bEanbleHls, bEnableMp4) {} + MultiMediaSourceMuxer(strVhost, strApp, strId, fDuration, bEanbleRtsp, bEanbleRtmp, bEanbleHls, bEnableMp4) {} DevChannel::~DevChannel() {} @@ -101,7 +103,7 @@ void DevChannel::inputH264(const char* pcData, int iDataLen, uint32_t dts,uint32 } else { prefixeSize = 0; } - inputFrame(std::make_shared((char *)pcData,iDataLen,dts,pts,prefixeSize)); + inputFrame(std::make_shared((char *)pcData,iDataLen,dts,pts,prefixeSize)); } void DevChannel::inputAAC(const char* pcData, int iDataLen, uint32_t uiStamp,bool withAdtsHeader) { @@ -117,12 +119,12 @@ void DevChannel::inputAAC(const char *pcDataWithoutAdts,int iDataLen, uint32_t u uiStamp = (uint32_t)_aTicker[1].elapsedTime(); } if(pcAdtsHeader + 7 == pcDataWithoutAdts){ - inputFrame(std::make_shared((char *)pcDataWithoutAdts - 7,iDataLen + 7,uiStamp,7)); + inputFrame(std::make_shared((char *)pcDataWithoutAdts - 7,iDataLen + 7,uiStamp,7)); } else { char *dataWithAdts = new char[iDataLen + 7]; memcpy(dataWithAdts,pcAdtsHeader,7); memcpy(dataWithAdts + 7 , pcDataWithoutAdts , iDataLen); - inputFrame(std::make_shared(dataWithAdts,iDataLen + 7,uiStamp,7)); + inputFrame(std::make_shared(dataWithAdts,iDataLen + 7,uiStamp,7)); delete [] dataWithAdts; } } diff --git a/src/Common/Device.h b/src/Common/Device.h index e056f099..7e5e7684 100644 --- a/src/Common/Device.h +++ b/src/Common/Device.h @@ -74,6 +74,8 @@ public: const string &strApp, const string &strId, float fDuration = 0, + bool bEanbleRtsp = true, + bool bEanbleRtmp = true, bool bEanbleHls = true, bool bEnableMp4 = false); diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index 0be8901a..7f1535a6 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -241,9 +241,17 @@ void MediaInfo::parse(const string &url){ } void MediaSourceEvent::onNoneReader(MediaSource &sender){ - WarnL << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId(); //没有任何读取器消费该源,表明该源可以关闭了 - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader,sender); + WarnL << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId(); + weak_ptr weakPtr = sender.shared_from_this(); + + //异步广播该事件,防止同步调用sender.close()导致在接收rtp或rtmp包时清空包缓存等操作 + EventPollerPool::Instance().getPoller()->async([weakPtr](){ + auto strongPtr = weakPtr.lock(); + if(strongPtr){ + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader,*strongPtr); + } + },false); } diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index c6c51772..82eca138 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -39,12 +39,18 @@ public: const string &strApp, const string &strId, float dur_sec = 0.0, + bool bEanbleRtsp = true, + bool bEanbleRtmp = true, bool bEanbleHls = true, - bool bEnableMp4 = false){ - _rtmp = std::make_shared(vhost,strApp,strId,std::make_shared(dur_sec)); - _rtsp = std::make_shared(vhost,strApp,strId,std::make_shared(dur_sec)); + bool bEnableMp4 = false + ){ + if (bEanbleRtmp) { + _rtmp = std::make_shared(vhost, strApp, strId, std::make_shared(dur_sec)); + } + if (bEanbleRtsp) { + _rtsp = std::make_shared(vhost, strApp, strId, std::make_shared(dur_sec)); + } _record = std::make_shared(vhost,strApp,strId,bEanbleHls,bEnableMp4); - } virtual ~MultiMediaSourceMuxer(){} @@ -54,8 +60,12 @@ public: * @param track 媒体描述 */ void addTrack(const Track::Ptr & track) { - _rtmp->addTrack(track); - _rtsp->addTrack(track); + if(_rtmp){ + _rtmp->addTrack(track); + } + if(_rtsp){ + _rtsp->addTrack(track); + } _record->addTrack(track); } @@ -64,8 +74,12 @@ public: * @param frame 帧数据 */ void inputFrame(const Frame::Ptr &frame) override { - _rtmp->inputFrame(frame); - _rtsp->inputFrame(frame); + if(_rtmp) { + _rtmp->inputFrame(frame); + } + if(_rtsp) { + _rtsp->inputFrame(frame); + } _record->inputFrame(frame); } @@ -74,8 +88,12 @@ public: * @param listener */ void setListener(const std::weak_ptr &listener){ - _rtmp->setListener(listener); - _rtsp->setListener(listener); + if(_rtmp) { + _rtmp->setListener(listener); + } + if(_rtsp) { + _rtsp->setListener(listener); + } } /** @@ -83,11 +101,13 @@ public: * @return */ int readerCount() const{ - return _rtsp->readerCount() + _rtmp->readerCount(); + return (_rtsp ? _rtsp->readerCount() : 0) + (_rtmp ? _rtmp->readerCount() : 0); } void setTimeStamp(uint32_t stamp){ - _rtsp->setTimeStamp(stamp); + if(_rtsp){ + _rtsp->setTimeStamp(stamp); + } } private: RtmpMediaSourceMuxer::Ptr _rtmp; diff --git a/src/Common/Parser.h b/src/Common/Parser.h index fdbdc8f2..bfe2ceed 100644 --- a/src/Common/Parser.h +++ b/src/Common/Parser.h @@ -16,7 +16,9 @@ namespace mediakit{ string FindField(const char *buf, const char *start, const char *end, int bufSize = 0); struct StrCaseCompare { - bool operator()(const string &__x, const string &__y) const { return strcasecmp(__x.data(), __y.data()) < 0; } + bool operator()(const string &__x, const string &__y) const { + return strcasecmp(__x.data(), __y.data()) < 0; + } }; @@ -25,17 +27,19 @@ class StrCaseMap : public multimap{ typedef multimap Super ; StrCaseMap() = default; ~StrCaseMap() = default; - string &operator[](const string &key){ - auto it = find(key); + + template + string &operator[](K &&k){ + auto it = find(std::forward(k)); if(it == end()){ - it = Super::emplace(key,""); + it = Super::emplace(std::forward(k),""); } return it->second; } template void emplace(K &&k , V &&v) { - auto it = find(k); + auto it = find(std::forward(k)); if(it != end()){ return; } diff --git a/src/Common/config.cpp b/src/Common/config.cpp index 0e19df23..42aedb2d 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -161,11 +161,13 @@ namespace Rtsp { const string kAuthBasic = RTSP_FIELD"authBasic"; const string kHandshakeSecond = RTSP_FIELD"handshakeSecond"; const string kKeepAliveSecond = RTSP_FIELD"keepAliveSecond"; +const string kDirectProxy = RTSP_FIELD"directProxy";; onceToken token([](){ //默认Md5方式认证 mINI::Instance()[kAuthBasic] = 0; mINI::Instance()[kHandshakeSecond] = 15; mINI::Instance()[kKeepAliveSecond] = 15; + mINI::Instance()[kDirectProxy] = 1; },nullptr); } //namespace Rtsp diff --git a/src/Common/config.h b/src/Common/config.h index d396b5c7..8807d175 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -202,6 +202,13 @@ extern const string kAuthBasic; extern const string kHandshakeSecond; //维持链接超时时间,默认15秒 extern const string kKeepAliveSecond; + +//rtsp拉流代理是否直接代理 +//直接代理后支持任意编码格式,但是会导致GOP缓存无法定位到I帧,可能会导致开播花屏 +//并且如果是tcp方式拉流,如果rtp大于mtu会导致无法使用udp方式代理 +//假定您的拉流源地址不是264或265或AAC,那么你可以使用直接代理的方式来支持rtsp代理 +//默认开启rtsp直接代理,rtmp由于没有这些问题,是强制开启直接代理的 +extern const string kDirectProxy; } //namespace Rtsp ////////////RTMP服务器配置/////////// diff --git a/src/Extension/AAC.h b/src/Extension/AAC.h index c47b0a9d..e445b7fe 100644 --- a/src/Extension/AAC.h +++ b/src/Extension/AAC.h @@ -105,11 +105,11 @@ public: uint32_t iPrefixSize = 7; } ; -class AACFrameNoCopyAble : public FrameNoCopyAble { +class AACFrameNoCacheAble : public FrameNoCacheAble { public: - typedef std::shared_ptr Ptr; + typedef std::shared_ptr Ptr; - AACFrameNoCopyAble(char *ptr,uint32_t size,uint32_t dts,int prefixeSize = 7){ + AACFrameNoCacheAble(char *ptr,uint32_t size,uint32_t dts,int prefixeSize = 7){ _ptr = ptr; _size = size; _dts = dts; diff --git a/src/Extension/Factory.cpp b/src/Extension/Factory.cpp index 1658f0df..ced72b6e 100644 --- a/src/Extension/Factory.cpp +++ b/src/Extension/Factory.cpp @@ -118,8 +118,20 @@ Track::Ptr Factory::getTrackByCodecId(CodecId codecId) { RtpCodec::Ptr Factory::getRtpEncoderBySdp(const Sdp::Ptr &sdp) { GET_CONFIG(uint32_t,audio_mtu,Rtp::kAudioMtuSize); GET_CONFIG(uint32_t,video_mtu,Rtp::kVideoMtuSize); - // ssrc不冲突即可 - uint32_t ssrc = ((uint64_t) sdp.get()) & 0xFFFFFFFF; + // ssrc不冲突即可,可以为任意的32位整形 + static atomic s_ssrc(0); + uint32_t ssrc = s_ssrc++; + if(!ssrc){ + //ssrc不能为0 + ssrc = 1; + } + if(sdp->getTrackType() == TrackVideo){ + //视频的ssrc是偶数,方便调试 + ssrc = 2 * ssrc; + }else{ + //音频ssrc是奇数 + ssrc = 2 * ssrc + 1; + } auto mtu = (sdp->getTrackType() == TrackVideo ? video_mtu : audio_mtu); auto sample_rate = sdp->getSampleRate(); auto pt = sdp->getPlayloadType(); diff --git a/src/Extension/Frame.cpp b/src/Extension/Frame.cpp new file mode 100644 index 00000000..0b6f5232 --- /dev/null +++ b/src/Extension/Frame.cpp @@ -0,0 +1,42 @@ +/* + * MIT License + * + * Copyright (c) 2016-2019 xiongziliang <771730766@qq.com> + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include "Frame.h" + +using namespace std; +using namespace toolkit; + +namespace mediakit{ + +Frame::Ptr Frame::getCacheAbleFrame(const Frame::Ptr &frame){ + if(frame->cacheAble()){ + return frame; + } + return std::make_shared(frame); +} + +}//namespace mediakit + diff --git a/src/Extension/Frame.h b/src/Extension/Frame.h index 57b72433..5d49753a 100644 --- a/src/Extension/Frame.h +++ b/src/Extension/Frame.h @@ -77,7 +77,7 @@ public: /** * 帧类型的抽象接口 */ -class Frame : public Buffer, public CodecInfo{ +class Frame : public Buffer, public CodecInfo { public: typedef std::shared_ptr Ptr; virtual ~Frame(){} @@ -116,6 +116,17 @@ public: * @return */ virtual bool keyFrame() const = 0; + + /** + * 是否可以缓存 + */ + virtual bool cacheAble() const { return true; } + + /** + * 返回可缓存的frame + * @return + */ + static Ptr getCacheAbleFrame(const Ptr &frame); }; /** @@ -281,9 +292,12 @@ private: map _delegateMap; }; -class FrameNoCopyAble : public Frame{ +/** + * 通过Frame接口包装指针,方便使用者把自己的数据快速接入ZLMediaKit + */ +class FrameFromPtr : public Frame{ public: - typedef std::shared_ptr Ptr; + typedef std::shared_ptr Ptr; char *data() const override{ return _ptr; } @@ -305,7 +319,6 @@ public: uint32_t prefixSize() const override{ return _prefixSize; } - protected: char *_ptr; uint32_t _size; @@ -314,6 +327,81 @@ protected: uint32_t _prefixSize; }; +/** + * 不可缓存的帧,在DevChannel类中有用到。 + * 该帧类型用于防止内存拷贝,直接使用指针传递数据 + * 在大多数情况下,ZLMediaKit是同步对帧数据进行使用和处理的 + * 所以提供此类型的帧很有必要,但是有时又无法避免缓存帧做后续处理 + * 所以可以通过Frame::getCacheAbleFrame方法拷贝一个可缓存的帧 + */ +class FrameNoCacheAble : public FrameFromPtr{ +public: + typedef std::shared_ptr Ptr; + + /** + * 该帧不可缓存 + * @return + */ + bool cacheAble() const override { + return false; + } +}; + +/** + * 该对象的功能是把一个不可缓存的帧转换成可缓存的帧 + * @see FrameNoCacheAble + */ +class FrameCacheAble : public FrameFromPtr { +public: + typedef std::shared_ptr Ptr; + + FrameCacheAble(const Frame::Ptr &frame){ + if(frame->cacheAble()){ + _frame = frame; + _ptr = frame->data(); + }else{ + _buffer = std::make_shared(); + _buffer->assign(frame->data(),frame->size()); + _ptr = _buffer->data(); + } + _size = frame->size(); + _dts = frame->dts(); + _pts = frame->pts(); + _prefixSize = frame->prefixSize(); + _trackType = frame->getTrackType(); + _codec = frame->getCodecId(); + _key = frame->keyFrame(); + } + + virtual ~FrameCacheAble() = default; + + /** + * 可以被缓存 + * @return + */ + bool cacheAble() const override { + return true; + } + + TrackType getTrackType() const override{ + return _trackType; + } + + CodecId getCodecId() const override{ + return _codec; + } + + bool keyFrame() const override{ + return _key; + } +private: + Frame::Ptr _frame; + BufferRaw::Ptr _buffer; + TrackType _trackType; + CodecId _codec; + bool _key; +}; + }//namespace mediakit diff --git a/src/Extension/H264.h b/src/Extension/H264.h index e8e0f801..853c7f60 100644 --- a/src/Extension/H264.h +++ b/src/Extension/H264.h @@ -92,11 +92,16 @@ public: }; -class H264FrameNoCopyAble : public FrameNoCopyAble { +/** + * 防止内存拷贝的H264类 + * 用户可以通过该类型快速把一个指针无拷贝的包装成Frame类 + * 该类型在DevChannel中有使用 + */ +class H264FrameNoCacheAble : public FrameNoCacheAble { public: - typedef std::shared_ptr Ptr; + typedef std::shared_ptr Ptr; - H264FrameNoCopyAble(char *ptr,uint32_t size,uint32_t dts , uint32_t pts ,int prefixeSize = 4){ + H264FrameNoCacheAble(char *ptr,uint32_t size,uint32_t dts , uint32_t pts ,int prefixeSize = 4){ _ptr = ptr; _size = size; _dts = dts; @@ -117,17 +122,26 @@ public: } }; -class H264FrameSubFrame : public H264FrameNoCopyAble{ +/** + * 一个H264Frame类中可以有多个帧,他们通过 0x 00 00 01 分隔 + * ZLMediaKit会先把这种复合帧split成单个帧然后再处理 + * 一个复合帧可以通过无内存拷贝的方式切割成多个H264FrameSubFrame + * 提供该类的目的是切换复合帧时防止内存拷贝,提高性能 + */ +class H264FrameSubFrame : public H264FrameNoCacheAble{ public: typedef std::shared_ptr Ptr; - H264FrameSubFrame(const Frame::Ptr &strongRef, + H264FrameSubFrame(const Frame::Ptr &parent_frame, char *ptr, uint32_t size, - int prefixeSize) : H264FrameNoCopyAble(ptr,size,strongRef->dts(),strongRef->pts(),prefixeSize){ - _strongRef = strongRef; + int prefixeSize) : H264FrameNoCacheAble(ptr,size,parent_frame->dts(),parent_frame->pts(),prefixeSize){ + _parent_frame = parent_frame; + } + bool cacheAble() const override { + return _parent_frame->cacheAble(); } private: - Frame::Ptr _strongRef; + Frame::Ptr _parent_frame; }; /** diff --git a/src/Extension/H264Rtmp.cpp b/src/Extension/H264Rtmp.cpp index 3adb0b7e..8f735fe3 100644 --- a/src/Extension/H264Rtmp.cpp +++ b/src/Extension/H264Rtmp.cpp @@ -108,6 +108,7 @@ inline void H264RtmpDecoder::onGetH264_l(const char* pcData, int iLen, uint32_t } } inline void H264RtmpDecoder::onGetH264(const char* pcData, int iLen, uint32_t dts,uint32_t pts) { +#if 1 _h264frame->type = H264_TYPE(pcData[0]); _h264frame->timeStamp = dts; _h264frame->ptsStamp = pts; @@ -117,6 +118,11 @@ inline void H264RtmpDecoder::onGetH264(const char* pcData, int iLen, uint32_t dt //写入环形缓存 RtmpCodec::inputFrame(_h264frame); _h264frame = obtainFrame(); +#else + //防止内存拷贝,这样产生的264帧不会有0x00 00 01头 + auto frame = std::make_shared((char *)pcData,iLen,dts,pts,0); + RtmpCodec::inputFrame(frame); +#endif } diff --git a/src/Extension/H265.h b/src/Extension/H265.h index 5fd3a6af..9ba82458 100644 --- a/src/Extension/H265.h +++ b/src/Extension/H265.h @@ -121,11 +121,11 @@ public: }; -class H265FrameNoCopyAble : public FrameNoCopyAble { +class H265FrameNoCacheAble : public FrameNoCacheAble { public: - typedef std::shared_ptr Ptr; + typedef std::shared_ptr Ptr; - H265FrameNoCopyAble(char *ptr, uint32_t size, uint32_t dts,uint32_t pts, int prefixeSize = 4) { + H265FrameNoCacheAble(char *ptr, uint32_t size, uint32_t dts,uint32_t pts, int prefixeSize = 4) { _ptr = ptr; _size = size; _dts = dts; diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index 938b24c7..24d83c70 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -918,7 +918,11 @@ void HttpSession::responseDelay(const string &Origin,bool bClose, headerOther["Access-Control-Allow-Origin"] = Origin; headerOther["Access-Control-Allow-Credentials"] = "true"; } - const_cast(headerOut).insert(headerOther.begin(), headerOther.end()); + + for (auto &pr : headerOther){ + //添加默认http头,默认http头不能覆盖用户自定义的头 + const_cast(headerOut).emplace(pr.first,pr.second); + } sendResponse(codeOut.data(), headerOut, contentOut); } inline void HttpSession::sendNotFound(bool bClose) { diff --git a/src/MediaFile/MediaReader.cpp b/src/MediaFile/MediaReader.cpp index 3d2b7e22..8bf100b8 100644 --- a/src/MediaFile/MediaReader.cpp +++ b/src/MediaFile/MediaReader.cpp @@ -37,6 +37,7 @@ namespace mediakit { #ifdef ENABLE_MP4V2 MediaReader::MediaReader(const string &strVhost,const string &strApp, const string &strId,const string &filePath ) { + _poller = EventPollerPool::Instance().getPoller(); auto strFileName = filePath; if(strFileName.empty()){ GET_CONFIG(string,recordPath,Record::kFilePath); @@ -137,7 +138,7 @@ MediaReader::MediaReader(const string &strVhost,const string &strApp, const stri } _iDuration = MAX(_video_ms,_audio_ms); - _mediaMuxer.reset(new MultiMediaSourceMuxer(strVhost,strApp,strId,_iDuration/1000.0,false, false)); + _mediaMuxer.reset(new MultiMediaSourceMuxer(strVhost, strApp, strId, _iDuration / 1000.0, true, true, false, false)); if (_audio_trId != MP4_INVALID_TRACK_ID) { AACTrack::Ptr track = std::make_shared(_strAacCfg); _mediaMuxer->addTrack(track); @@ -164,7 +165,7 @@ void MediaReader::startReadMP4() { _timer = std::make_shared(sampleMS / 1000.0f,[strongSelf](){ return strongSelf->readSample(0,false); - }, nullptr); + }, _poller); //先读sampleMS毫秒的数据用于产生MediaSouce readSample(sampleMS, false); @@ -260,11 +261,11 @@ inline bool MediaReader::readAudioSample(int iTimeInc,bool justSeekSyncFrame) { } inline void MediaReader::writeH264(uint8_t *pucData,int iLen,uint32_t dts,uint32_t pts) { - _mediaMuxer->inputFrame(std::make_shared((char*)pucData,iLen,dts,pts)); + _mediaMuxer->inputFrame(std::make_shared((char*)pucData,iLen,dts,pts)); } inline void MediaReader::writeAAC(uint8_t *pucData,int iLen,uint32_t uiStamp) { - _mediaMuxer->inputFrame(std::make_shared((char*)pucData,iLen,uiStamp)); + _mediaMuxer->inputFrame(std::make_shared((char*)pucData,iLen,uiStamp)); } inline MP4SampleId MediaReader::getVideoSampleId(int iTimeInc ) { diff --git a/src/MediaFile/MediaReader.h b/src/MediaFile/MediaReader.h index 8b5dac5c..a2917367 100644 --- a/src/MediaFile/MediaReader.h +++ b/src/MediaFile/MediaReader.h @@ -132,6 +132,7 @@ private: Ticker _alive; recursive_mutex _mtx; Timer::Ptr _timer; + EventPoller::Ptr _poller; #endif //ENABLE_MP4V2 }; diff --git a/src/MediaFile/Mp4Maker.cpp b/src/MediaFile/Mp4Maker.cpp index 5b24afe5..4afa2912 100644 --- a/src/MediaFile/Mp4Maker.cpp +++ b/src/MediaFile/Mp4Maker.cpp @@ -25,7 +25,7 @@ */ #ifdef ENABLE_MP4V2 - +#include #include #include "Common/config.h" #include "Mp4Maker.h" diff --git a/src/MediaFile/TsMuxer.cpp b/src/MediaFile/TsMuxer.cpp index 5a16438b..2eabd145 100644 --- a/src/MediaFile/TsMuxer.cpp +++ b/src/MediaFile/TsMuxer.cpp @@ -70,7 +70,12 @@ void TsMuxer::inputFrame(const Frame::Ptr &frame) { if(_frameCached.size() != 1){ string merged; _frameCached.for_each([&](const Frame::Ptr &frame){ - merged.append(frame->data(),frame->size()); + if(frame->prefixSize()){ + merged.append(frame->data(),frame->size()); + } else{ + merged.append("\x00\x00\x00\x01",4); + merged.append(frame->data(),frame->size()); + } }); merged_frame = std::make_shared(std::move(merged)); } @@ -78,7 +83,7 @@ void TsMuxer::inputFrame(const Frame::Ptr &frame) { mpeg_ts_write(_context, it->second, back->keyFrame() ? 0x0001 : 0, back->pts() * 90LL, back->dts() * 90LL, merged_frame->data(), merged_frame->size()); _frameCached.clear(); } - _frameCached.emplace_back(frame); + _frameCached.emplace_back(Frame::getCacheAbleFrame(frame)); } break; default: { diff --git a/src/Player/PlayerBase.cpp b/src/Player/PlayerBase.cpp index 9c517595..4de82c30 100644 --- a/src/Player/PlayerBase.cpp +++ b/src/Player/PlayerBase.cpp @@ -40,12 +40,23 @@ PlayerBase::Ptr PlayerBase::createPlayer(const EventPoller::Ptr &poller,const st ptr->teardown(); }; string prefix = FindField(strUrl.data(), NULL, "://"); + + if (strcasecmp("rtsps",prefix.data()) == 0) { + return PlayerBase::Ptr(new TcpClientWithSSL(poller),releasePlayer); + } + if (strcasecmp("rtsp",prefix.data()) == 0) { return PlayerBase::Ptr(new RtspPlayerImp(poller),releasePlayer); } + + if (strcasecmp("rtmps",prefix.data()) == 0) { + return PlayerBase::Ptr(new TcpClientWithSSL(poller),releasePlayer); + } + if (strcasecmp("rtmp",prefix.data()) == 0) { return PlayerBase::Ptr(new RtmpPlayerImp(poller),releasePlayer); } + return PlayerBase::Ptr(new RtspPlayerImp(poller),releasePlayer); } diff --git a/src/Player/PlayerBase.h b/src/Player/PlayerBase.h index 8338a0b2..cb115ee7 100644 --- a/src/Player/PlayerBase.h +++ b/src/Player/PlayerBase.h @@ -216,7 +216,7 @@ public: void setMediaSouce(const MediaSource::Ptr & src) override { if (_parser) { - return _parser->setMediaSouce(src); + _parser->setMediaSouce(src); } _pMediaSrc = src; } diff --git a/src/Player/PlayerProxy.cpp b/src/Player/PlayerProxy.cpp index b7656272..5b821d85 100644 --- a/src/Player/PlayerProxy.cpp +++ b/src/Player/PlayerProxy.cpp @@ -65,6 +65,8 @@ static uint8_t s_mute_adts[] = {0xff, 0xf1, 0x6c, 0x40, 0x2d, 0x3f, 0xfc, 0x00, PlayerProxy::PlayerProxy(const string &strVhost, const string &strApp, const string &strSrc, + bool bEnableRtsp, + bool bEnableRtmp, bool bEnableHls, bool bEnableMp4, int iRetryCount, @@ -72,6 +74,8 @@ PlayerProxy::PlayerProxy(const string &strVhost, _strVhost = strVhost; _strApp = strApp; _strSrc = strSrc; + _bEnableRtsp = bEnableRtsp; + _bEnableRtmp = bEnableRtmp; _bEnableHls = bEnableHls; _bEnableMp4 = bEnableMp4; _iRetryCount = iRetryCount; @@ -126,13 +130,30 @@ void PlayerProxy::play(const string &strUrlTmp) { } }); MediaPlayer::play(strUrlTmp); + + MediaSource::Ptr mediaSource; + if(dynamic_pointer_cast(_parser)){ + //rtsp拉流 + GET_CONFIG(bool,directProxy,Rtsp::kDirectProxy); + if(directProxy && _bEnableRtsp){ + mediaSource = std::make_shared(_strVhost,_strApp,_strSrc); + } + }else if(dynamic_pointer_cast(_parser)){ + //rtmp拉流 + if(_bEnableRtmp){ + mediaSource = std::make_shared(_strVhost,_strApp,_strSrc); + } + } + if(mediaSource){ + setMediaSouce(mediaSource); + mediaSource->setListener(shared_from_this()); + } } PlayerProxy::~PlayerProxy() { _timer.reset(); } void PlayerProxy::rePlay(const string &strUrl,int iFailedCnt){ - auto iTaskId = reinterpret_cast(this); auto iDelay = MAX(2 * 1000, MIN(iFailedCnt * 3000,60*1000)); weak_ptr weakSelf = shared_from_this(); _timer = std::make_shared(iDelay / 1000.0f,[weakSelf,strUrl,iFailedCnt]() { @@ -146,8 +167,13 @@ void PlayerProxy::rePlay(const string &strUrl,int iFailedCnt){ return false; }, getPoller()); } + +int PlayerProxy::readerCount(){ + return (_mediaMuxer ? _mediaMuxer->readerCount() : 0) + (_pMediaSrc ? _pMediaSrc->readerCount() : 0); +} + bool PlayerProxy::close(MediaSource &sender,bool force) { - if(!_mediaMuxer || (!force && _mediaMuxer->readerCount() != 0)){ + if(!force && readerCount() != 0){ return false; } @@ -157,6 +183,7 @@ bool PlayerProxy::close(MediaSource &sender,bool force) { auto stronSelf = weakSlef.lock(); if (stronSelf) { stronSelf->_mediaMuxer.reset(); + stronSelf->setMediaSouce(nullptr); stronSelf->teardown(); if(stronSelf->_onClose){ stronSelf->_onClose(); @@ -185,7 +212,7 @@ public: auto iAudioIndex = frame->stamp() / MUTE_ADTS_DATA_MS; if(_iAudioIndex != iAudioIndex){ _iAudioIndex = iAudioIndex; - auto aacFrame = std::make_shared((char *)MUTE_ADTS_DATA, + auto aacFrame = std::make_shared((char *)MUTE_ADTS_DATA, MUTE_ADTS_DATA_LEN, _iAudioIndex * MUTE_ADTS_DATA_MS); FrameRingInterfaceDelegate::inputFrame(aacFrame); @@ -197,7 +224,16 @@ private: }; void PlayerProxy::onPlaySuccess() { - _mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost,_strApp,_strSrc,getDuration(),_bEnableHls,_bEnableMp4)); + if (dynamic_pointer_cast(_pMediaSrc)) { + //rtsp拉流代理 + _mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost, _strApp, _strSrc, getDuration(), false, _bEnableRtmp, _bEnableHls, _bEnableMp4)); + } else if (dynamic_pointer_cast(_pMediaSrc)) { + //rtmp拉流代理 + _mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost, _strApp, _strSrc, getDuration(), _bEnableRtsp, false, _bEnableHls, _bEnableMp4)); + } else { + //其他拉流代理 + _mediaMuxer.reset(new MultiMediaSourceMuxer(_strVhost, _strApp, _strSrc, getDuration(), _bEnableRtsp, _bEnableRtmp, _bEnableHls, _bEnableMp4)); + } _mediaMuxer->setListener(shared_from_this()); auto videoTrack = getTrack(TrackVideo,false); diff --git a/src/Player/PlayerProxy.h b/src/Player/PlayerProxy.h index 6c25a157..aff0e3d4 100644 --- a/src/Player/PlayerProxy.h +++ b/src/Player/PlayerProxy.h @@ -49,6 +49,8 @@ public: PlayerProxy(const string &strVhost, const string &strApp, const string &strSrc, + bool bEnableRtsp = true, + bool bEnableRtmp = true, bool bEnableHls = true, bool bEnableMp4 = false, int iRetryCount = -1, @@ -84,9 +86,12 @@ private: void onNoneReader(MediaSource &sender) override; void rePlay(const string &strUrl,int iFailedCnt); void onPlaySuccess(); + int readerCount() ; private: - bool _bEnableHls; - bool _bEnableMp4; + bool _bEnableRtsp; + bool _bEnableRtmp; + bool _bEnableHls; + bool _bEnableMp4; int _iRetryCount; MultiMediaSourceMuxer::Ptr _mediaMuxer; string _strVhost; diff --git a/src/Pusher/PusherBase.cpp b/src/Pusher/PusherBase.cpp index 47d7226e..a8f47d92 100644 --- a/src/Pusher/PusherBase.cpp +++ b/src/Pusher/PusherBase.cpp @@ -44,12 +44,23 @@ PusherBase::Ptr PusherBase::createPusher(const EventPoller::Ptr &poller, ptr->teardown(); }; string prefix = FindField(strUrl.data(), NULL, "://"); + + if (strcasecmp("rtsps",prefix.data()) == 0) { + return PusherBase::Ptr(new TcpClientWithSSL(poller,dynamic_pointer_cast(src)),releasePusher); + } + if (strcasecmp("rtsp",prefix.data()) == 0) { return PusherBase::Ptr(new RtspPusher(poller,dynamic_pointer_cast(src)),releasePusher); } + + if (strcasecmp("rtmps",prefix.data()) == 0) { + return PusherBase::Ptr(new TcpClientWithSSL(poller,dynamic_pointer_cast(src)),releasePusher); + } + if (strcasecmp("rtmp",prefix.data()) == 0) { return PusherBase::Ptr(new RtmpPusher(poller,dynamic_pointer_cast(src)),releasePusher); } + return PusherBase::Ptr(new RtspPusher(poller,dynamic_pointer_cast(src)),releasePusher); } diff --git a/src/Rtmp/RtmpToRtspMediaSource.h b/src/Rtmp/RtmpToRtspMediaSource.h index 22dfd076..98c059f2 100644 --- a/src/Rtmp/RtmpToRtspMediaSource.h +++ b/src/Rtmp/RtmpToRtspMediaSource.h @@ -38,8 +38,8 @@ #include "Rtmp.h" #include "RtmpMediaSource.h" #include "RtmpDemuxer.h" -#include "MediaFile/MediaRecorder.h" -#include "Rtsp/RtspMediaSourceMuxer.h" +#include "Common/MultiMediaSourceMuxer.h" + using namespace std; using namespace toolkit; @@ -54,49 +54,53 @@ public: const string &id, bool bEnableHls = true, bool bEnableMp4 = false, - int ringSize = 0):RtmpMediaSource(vhost, app, id,ringSize){ - _recorder = std::make_shared(vhost, app, id, bEnableHls, bEnableMp4); - _rtmpDemuxer = std::make_shared(); + int ringSize = 0) : RtmpMediaSource(vhost, app, id,ringSize){ + _bEnableHls = bEnableHls; + _bEnableMp4 = bEnableMp4; + _demuxer = std::make_shared(); } virtual ~RtmpToRtspMediaSource(){} void onGetMetaData(const AMFValue &metadata) override { - _rtmpDemuxer = std::make_shared(metadata); + _demuxer = std::make_shared(metadata); RtmpMediaSource::onGetMetaData(metadata); } void onWrite(const RtmpPacket::Ptr &pkt,bool key_pos) override { - _rtmpDemuxer->inputRtmp(pkt); - if(!_rtspMuxer && _rtmpDemuxer->isInited(2000)){ - _rtspMuxer = std::make_shared(getVhost(), - getApp(), - getId(), - std::make_shared(_rtmpDemuxer->getDuration())); - for (auto &track : _rtmpDemuxer->getTracks(false)){ - _rtspMuxer->addTrack(track); - _recorder->addTrack(track); - track->addDelegate(_rtspMuxer); - track->addDelegate(_recorder); + _demuxer->inputRtmp(pkt); + if(!_muxer && _demuxer->isInited(2000)){ + _muxer = std::make_shared(getVhost(), + getApp(), + getId(), + _demuxer->getDuration(), + true,//转rtsp + false,//不重复生成rtmp + _bEnableHls, + _bEnableMp4); + for (auto &track : _demuxer->getTracks(false)){ + _muxer->addTrack(track); + track->addDelegate(_muxer); } - _rtspMuxer->setListener(_listener); + _muxer->setListener(_listener); } RtmpMediaSource::onWrite(pkt,key_pos); } void setListener(const std::weak_ptr &listener) override { RtmpMediaSource::setListener(listener); - if(_rtspMuxer){ - _rtspMuxer->setListener(listener); + if(_muxer){ + _muxer->setListener(listener); } } int readerCount() override { - return RtmpMediaSource::readerCount() + (_rtspMuxer ? _rtspMuxer->readerCount() : 0); + return RtmpMediaSource::readerCount() + (_muxer ? _muxer->readerCount() : 0); } private: - RtmpDemuxer::Ptr _rtmpDemuxer; - RtspMediaSourceMuxer::Ptr _rtspMuxer; - MediaRecorder::Ptr _recorder; + RtmpDemuxer::Ptr _demuxer; + MultiMediaSourceMuxer::Ptr _muxer; + bool _bEnableHls; + bool _bEnableMp4; }; } /* namespace mediakit */ diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index e803b051..df20c36b 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -232,6 +232,31 @@ void RtspPlayer::handleResDESCRIBE(const Parser& parser) { sendSetup(0); } + +//有必要的情况下创建udp端口 +void RtspPlayer::createUdpSockIfNecessary(int track_idx){ + auto &rtpSockRef = _apRtpSock[track_idx]; + auto &rtcpSockRef = _apRtcpSock[track_idx]; + if(!rtpSockRef){ + rtpSockRef.reset(new Socket(getPoller())); + //rtp随机端口 + if (!rtpSockRef->bindUdpSock(0, get_local_ip().data())) { + rtpSockRef.reset(); + throw std::runtime_error("open rtp sock failed"); + } + } + + if(!rtcpSockRef){ + rtcpSockRef.reset(new Socket(getPoller())); + //rtcp端口为rtp端口+1,目的是为了兼容某些服务器,其实更推荐随机端口 + if (!rtcpSockRef->bindUdpSock(rtpSockRef->get_local_port() + 1, get_local_ip().data())) { + rtcpSockRef.reset(); + throw std::runtime_error("open rtcp sock failed"); + } + } +} + + //发送SETUP命令 void RtspPlayer::sendSetup(unsigned int trackIndex) { _onHandshake = std::bind(&RtspPlayer::handleResSETUP,this, placeholders::_1,trackIndex); @@ -247,16 +272,7 @@ void RtspPlayer::sendSetup(unsigned int trackIndex) { } break; case Rtsp::RTP_UDP: { - _apRtpSock[trackIndex].reset(new Socket(getPoller())); - if (!_apRtpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) { - _apRtpSock[trackIndex].reset(); - throw std::runtime_error("open rtp sock err"); - } - _apRtcpSock[trackIndex].reset(new Socket(getPoller())); - if (!_apRtcpSock[trackIndex]->bindUdpSock(_apRtpSock[trackIndex]->get_local_port() + 1, get_local_ip().data())) { - _apRtcpSock[trackIndex].reset(); - throw std::runtime_error("open rtcp sock err"); - } + createUdpSockIfNecessary(trackIndex); sendRtspRequest("SETUP",baseUrl,{"Transport", StrPrinter << "RTP/AVP;unicast;client_port=" << _apRtpSock[trackIndex]->get_local_port() << "-" @@ -280,7 +296,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) } auto strTransport = parser["Transport"]; - if(strTransport.find("TCP") != string::npos){ + if(strTransport.find("TCP") != string::npos || strTransport.find("interleaved") != string::npos){ _eType = Rtsp::RTP_TCP; }else if(strTransport.find("multicast") != string::npos){ _eType = Rtsp::RTP_MULTICAST; @@ -314,7 +330,8 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex) SockUtil::joinMultiAddr(fd, multiAddr.data(),get_local_ip().data()); } } else { - //udp单播 + createUdpSockIfNecessary(uiTrackIndex); + //udp单播 struct sockaddr_in rtpto; rtpto.sin_port = ntohs(rtp_port); rtpto.sin_family = AF_INET; diff --git a/src/Rtsp/RtspPlayer.h b/src/Rtsp/RtspPlayer.h index 68235d18..4023548b 100644 --- a/src/Rtsp/RtspPlayer.h +++ b/src/Rtsp/RtspPlayer.h @@ -93,6 +93,11 @@ protected: * @param uiLen */ virtual void onRtcpPacket(int iTrackidx, SdpTrack::Ptr &track, unsigned char *pucData, unsigned int uiLen); + + /////////////TcpClient override///////////// + void onConnect(const SockException &err) override; + void onRecv(const Buffer::Ptr &pBuf) override; + void onErr(const SockException &ex) override; private: void onRecvRTP_l(const RtpPacket::Ptr &pRtppt, const SdpTrack::Ptr &track); void onPlayResult_l(const SockException &ex); @@ -102,10 +107,6 @@ private: int getTrackIndexByTrackType(TrackType trackType) const; void play(const string &strUrl, const string &strUser, const string &strPwd, Rtsp::eRtpType eType); - void onConnect(const SockException &err) override; - void onRecv(const Buffer::Ptr &pBuf) override; - void onErr(const SockException &ex) override; - void handleResSETUP(const Parser &parser, unsigned int uiTrackIndex); void handleResDESCRIBE(const Parser &parser); bool handleAuthenticationFailure(const string &wwwAuthenticateParamsStr); @@ -120,6 +121,7 @@ private: void sendRtspRequest(const string &cmd, const string &url ,const StrCaseMap &header = StrCaseMap()); void sendRtspRequest(const string &cmd, const string &url ,const std::initializer_list &header); void sendReceiverReport(bool overTcp,int iTrackIndex); + void createUdpSockIfNecessary(int track_idx); private: string _strUrl; SdpParser _sdpParser; diff --git a/src/Rtsp/RtspPusher.cpp b/src/Rtsp/RtspPusher.cpp index c30e6152..bb5f9431 100644 --- a/src/Rtsp/RtspPusher.cpp +++ b/src/Rtsp/RtspPusher.cpp @@ -242,6 +242,19 @@ bool RtspPusher::handleAuthenticationFailure(const string ¶msStr) { return false; } +//有必要的情况下创建udp端口 +void RtspPusher::createUdpSockIfNecessary(int track_idx){ + auto &rtpSockRef = _apUdpSock[track_idx]; + if(!rtpSockRef){ + rtpSockRef.reset(new Socket(getPoller())); + //rtp随机端口 + if (!rtpSockRef->bindUdpSock(0, get_local_ip().data())) { + rtpSockRef.reset(); + throw std::runtime_error("open rtp sock failed"); + } + } +} + void RtspPusher::sendSetup(unsigned int trackIndex) { _onHandshake = std::bind(&RtspPusher::handleResSetup,this, placeholders::_1,trackIndex); auto &track = _aTrackInfo[trackIndex]; @@ -252,11 +265,7 @@ void RtspPusher::sendSetup(unsigned int trackIndex) { } break; case Rtsp::RTP_UDP: { - _apUdpSock[trackIndex].reset(new Socket(getPoller())); - if (!_apUdpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) { - _apUdpSock[trackIndex].reset(); - throw std::runtime_error("open udp sock err"); - } + createUdpSockIfNecessary(trackIndex); int port = _apUdpSock[trackIndex]->get_local_port(); sendRtspRequest("SETUP",baseUrl,{"Transport",StrPrinter << "RTP/AVP;unicast;client_port=" << port << "-" << port + 1}); } @@ -266,6 +275,7 @@ void RtspPusher::sendSetup(unsigned int trackIndex) { } } + void RtspPusher::handleResSetup(const Parser &parser, unsigned int uiTrackIndex) { if (parser.Url() != "200") { throw std::runtime_error( @@ -278,7 +288,7 @@ void RtspPusher::handleResSetup(const Parser &parser, unsigned int uiTrackIndex) } auto strTransport = parser["Transport"]; - if(strTransport.find("TCP") != string::npos){ + if(strTransport.find("TCP") != string::npos || strTransport.find("interleaved") != string::npos){ _eType = Rtsp::RTP_TCP; string interleaved = FindField( FindField((strTransport + ";").data(), "interleaved=", ";").data(), NULL, "-"); _aTrackInfo[uiTrackIndex]->_interleaved = atoi(interleaved.data()); @@ -286,19 +296,15 @@ void RtspPusher::handleResSetup(const Parser &parser, unsigned int uiTrackIndex) throw std::runtime_error("SETUP rtsp pusher can not support multicast!"); }else{ _eType = Rtsp::RTP_UDP; + createUdpSockIfNecessary(uiTrackIndex); const char *strPos = "server_port=" ; auto port_str = FindField((strTransport + ";").data(), strPos, ";"); uint16_t port = atoi(FindField(port_str.data(), NULL, "-").data()); - auto &pUdpSockRef = _apUdpSock[uiTrackIndex]; - if(!pUdpSockRef){ - pUdpSockRef.reset(new Socket(getPoller())); - } - struct sockaddr_in rtpto; rtpto.sin_port = ntohs(port); rtpto.sin_family = AF_INET; rtpto.sin_addr.s_addr = inet_addr(get_peer_ip().data()); - pUdpSockRef->setSendPeerAddr((struct sockaddr *)&(rtpto)); + _apUdpSock[uiTrackIndex]->setSendPeerAddr((struct sockaddr *)&(rtpto)); } RtspSplitter::enableRecvRtp(_eType == Rtsp::RTP_TCP); diff --git a/src/Rtsp/RtspPusher.h b/src/Rtsp/RtspPusher.h index 0d1c5890..b01af46f 100644 --- a/src/Rtsp/RtspPusher.h +++ b/src/Rtsp/RtspPusher.h @@ -65,6 +65,8 @@ private: void sendRtpPacket(const RtpPacket::Ptr & pkt) ; void sendRtspRequest(const string &cmd, const string &url ,const StrCaseMap &header = StrCaseMap(),const string &sdp = "" ); void sendRtspRequest(const string &cmd, const string &url ,const std::initializer_list &header,const string &sdp = ""); + + void createUdpSockIfNecessary(int track_idx); private: //rtsp鉴权相关 string _rtspMd5Nonce; diff --git a/src/Rtsp/RtspToRtmpMediaSource.h b/src/Rtsp/RtspToRtmpMediaSource.h index 1c1531cb..c5a165e4 100644 --- a/src/Rtsp/RtspToRtmpMediaSource.h +++ b/src/Rtsp/RtspToRtmpMediaSource.h @@ -29,10 +29,8 @@ #include "Rtmp/amf.h" #include "RtspMediaSource.h" -#include "MediaFile/MediaRecorder.h" -#include "Rtmp/RtmpMediaSource.h" #include "RtspDemuxer.h" -#include "Rtmp/RtmpMediaSourceMuxer.h" +#include "Common/MultiMediaSourceMuxer.h" using namespace toolkit; @@ -48,31 +46,34 @@ public: bool bEnableHls = true, bool bEnableMp4 = false, int ringSize = 0) : RtspMediaSource(vhost, app, id,ringSize) { - _recorder = std::make_shared(vhost, app, id, bEnableHls, bEnableMp4); + _bEnableHls = bEnableHls; + _bEnableMp4 = bEnableMp4; } virtual ~RtspToRtmpMediaSource() {} virtual void onGetSDP(const string &strSdp) override { - _rtspDemuxer = std::make_shared(strSdp); + _demuxer = std::make_shared(strSdp); RtspMediaSource::onGetSDP(strSdp); } virtual void onWrite(const RtpPacket::Ptr &rtp, bool bKeyPos) override { - if (_rtspDemuxer) { - bKeyPos = _rtspDemuxer->inputRtp(rtp); - if (!_rtmpMuxer && _rtspDemuxer->isInited(2000)) { - _rtmpMuxer = std::make_shared(getVhost(), - getApp(), - getId(), - std::make_shared(_rtspDemuxer->getDuration())); - for (auto &track : _rtspDemuxer->getTracks(false)) { - _rtmpMuxer->addTrack(track); - _recorder->addTrack(track); - track->addDelegate(_rtmpMuxer); - track->addDelegate(_recorder); + if (_demuxer) { + bKeyPos = _demuxer->inputRtp(rtp); + if (!_muxer && _demuxer->isInited(2000)) { + _muxer = std::make_shared(getVhost(), + getApp(), + getId(), + _demuxer->getDuration(), + false,//不重复生成rtsp + true,//转rtmp + _bEnableHls, + _bEnableMp4); + for (auto &track : _demuxer->getTracks(false)) { + _muxer->addTrack(track); + track->addDelegate(_muxer); } - _rtmpMuxer->setListener(_listener); + _muxer->setListener(_listener); } } RtspMediaSource::onWrite(rtp, bKeyPos); @@ -80,17 +81,18 @@ public: void setListener(const std::weak_ptr &listener) override { RtspMediaSource::setListener(listener); - if(_rtmpMuxer){ - _rtmpMuxer->setListener(listener); + if(_muxer){ + _muxer->setListener(listener); } } int readerCount() override { - return RtspMediaSource::readerCount() + (_rtmpMuxer ? _rtmpMuxer->readerCount() : 0); + return RtspMediaSource::readerCount() + (_muxer ? _muxer->readerCount() : 0); } private: - RtspDemuxer::Ptr _rtspDemuxer; - RtmpMediaSourceMuxer::Ptr _rtmpMuxer; - MediaRecorder::Ptr _recorder; + RtspDemuxer::Ptr _demuxer; + MultiMediaSourceMuxer::Ptr _muxer; + bool _bEnableHls; + bool _bEnableMp4; }; } /* namespace mediakit */ diff --git a/tests/test_httpApi.cpp b/tests/test_httpApi.cpp index 7f4cd525..bf8ff4d3 100644 --- a/tests/test_httpApi.cpp +++ b/tests/test_httpApi.cpp @@ -56,49 +56,50 @@ onceToken token1([](){ }//namespace Http } // namespace mediakit +void initEventListener(){ + static onceToken s_token([](){ + NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastHttpRequest,[](BroadcastHttpRequestArgs){ + //const Parser &parser,HttpSession::HttpResponseInvoker &invoker,bool &consumed + if(strstr(parser.Url().data(),"/api/") != parser.Url().data()){ + return; + } + //url以"/api/起始,说明是http api" + consumed = true;//该http请求已被消费 -static onceToken s_token([](){ - NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastHttpRequest,[](BroadcastHttpRequestArgs){ - //const Parser &parser,HttpSession::HttpResponseInvoker &invoker,bool &consumed - if(strstr(parser.Url().data(),"/api/") != parser.Url().data()){ - return; - } - //url以"/api/起始,说明是http api" - consumed = true;//该http请求已被消费 + _StrPrinter printer; + ////////////////method//////////////////// + printer << "\r\nmethod:\r\n\t" << parser.Method(); + ////////////////url///////////////// + printer << "\r\nurl:\r\n\t" << parser.Url(); + ////////////////protocol///////////////// + printer << "\r\nprotocol:\r\n\t" << parser.Tail(); + ///////////////args////////////////// + printer << "\r\nargs:\r\n"; + for(auto &pr : parser.getUrlArgs()){ + printer << "\t" << pr.first << " : " << pr.second << "\r\n"; + } + ///////////////header////////////////// + printer << "\r\nheader:\r\n"; + for(auto &pr : parser.getValues()){ + printer << "\t" << pr.first << " : " << pr.second << "\r\n"; + } + ////////////////content///////////////// + printer << "\r\ncontent:\r\n" << parser.Content(); + auto contentOut = printer << endl; - _StrPrinter printer; - ////////////////method//////////////////// - printer << "\r\nmethod:\r\n\t" << parser.Method(); - ////////////////url///////////////// - printer << "\r\nurl:\r\n\t" << parser.Url(); - ////////////////protocol///////////////// - printer << "\r\nprotocol:\r\n\t" << parser.Tail(); - ///////////////args////////////////// - printer << "\r\nargs:\r\n"; - for(auto &pr : parser.getUrlArgs()){ - printer << "\t" << pr.first << " : " << pr.second << "\r\n"; - } - ///////////////header////////////////// - printer << "\r\nheader:\r\n"; - for(auto &pr : parser.getValues()){ - printer << "\t" << pr.first << " : " << pr.second << "\r\n"; - } - ////////////////content///////////////// - printer << "\r\ncontent:\r\n" << parser.Content(); - auto contentOut = printer << endl; - - ////////////////我们测算异步回复,当然你也可以同步回复///////////////// - EventPollerPool::Instance().getPoller()->async([invoker,contentOut](){ - HttpSession::KeyValue headerOut; - //你可以自定义header,如果跟默认header重名,则会覆盖之 - //默认header有:Server,Connection,Date,Content-Type,Content-Length - //请勿覆盖Connection、Content-Length键 - //键名覆盖时不区分大小写 - headerOut["TestHeader"] = "HeaderValue"; - invoker("200 OK",headerOut,contentOut); - }); - }); -}, nullptr); + ////////////////我们测算异步回复,当然你也可以同步回复///////////////// + EventPollerPool::Instance().getPoller()->async([invoker,contentOut](){ + HttpSession::KeyValue headerOut; + //你可以自定义header,如果跟默认header重名,则会覆盖之 + //默认header有:Server,Connection,Date,Content-Type,Content-Length + //请勿覆盖Connection、Content-Length键 + //键名覆盖时不区分大小写 + headerOut["TestHeader"] = "HeaderValue"; + invoker("200 OK",headerOut,contentOut); + }); + }); + }, nullptr); +} int main(int argc,char *argv[]){ //设置退出信号处理函数 @@ -111,6 +112,7 @@ int main(int argc,char *argv[]){ //加载配置文件,如果配置文件不存在就创建一个 loadIniConfig(); + initEventListener(); //加载证书,证书包含公钥和私钥 SSL_Initor::Instance().loadCertificate((exeDir() + "ssl.p12").data()); diff --git a/tests/test_player.cpp b/tests/test_player.cpp index aeecf75d..62852565 100644 --- a/tests/test_player.cpp +++ b/tests/test_player.cpp @@ -25,6 +25,7 @@ */ #include #include +#include "Util/util.h" #include "Util/logger.h" #include #include "Poller/EventPoller.h" @@ -68,22 +69,27 @@ int main(int argc, char *argv[]) { WarnL << "没有视频或者视频不是264编码!"; return; } - SDLDisplayerHelper::Instance().doTask([viedoTrack]() { - std::shared_ptr decoder(new H264Decoder); - std::shared_ptr displayer(new YuvDisplayer); - viedoTrack->addDelegate(std::make_shared([decoder, displayer](const Frame::Ptr &frame) { - SDLDisplayerHelper::Instance().doTask([decoder, displayer, frame]() { - AVFrame *pFrame = nullptr; - bool flag = decoder->inputVideo((unsigned char *) frame->data(), frame->size(), - frame->stamp(), &pFrame); - if (flag) { - displayer->displayYUV(pFrame); - } - return true; - }); - })); - return true; - }); + + AnyStorage::Ptr storage(new AnyStorage); + viedoTrack->addDelegate(std::make_shared([storage](const Frame::Ptr &frame) { + SDLDisplayerHelper::Instance().doTask([frame,storage]() { + auto &decoder = (*storage)["decoder"]; + auto &displayer = (*storage)["displayer"]; + if(!decoder){ + decoder.set(); + } + if(!displayer){ + displayer.set(); + } + + AVFrame *pFrame = nullptr; + bool flag = decoder.get().inputVideo((unsigned char *) frame->data(), frame->size(), frame->stamp(), &pFrame); + if (flag) { + displayer.get().displayYUV(pFrame); + } + return true; + }); + })); }); diff --git a/tests/test_pusher.cpp b/tests/test_pusher.cpp index 36833ca5..c7a56aa0 100644 --- a/tests/test_pusher.cpp +++ b/tests/test_pusher.cpp @@ -90,7 +90,7 @@ int domain(const string &playUrl, const string &pushUrl) { //拉一个流,生成一个RtmpMediaSource,源的名称是"app/stream" //你也可以以其他方式生成RtmpMediaSource,比如说MP4文件(请查看test_rtmpPusherMp4.cpp代码) - PlayerProxy::Ptr player(new PlayerProxy(DEFAULT_VHOST, "app", "stream",false,false,-1 , poller)); + PlayerProxy::Ptr player(new PlayerProxy(DEFAULT_VHOST, "app", "stream",true,true,false,false,-1 , poller)); //可以指定rtsp拉流方式,支持tcp和udp方式,默认tcp // (*player)[Client::kRtpType] = Rtsp::RTP_UDP; player->play(playUrl.data());