From 2fee173d7a12df64bb643e4d73420c8282dea13b Mon Sep 17 00:00:00 2001 From: wxf Date: Tue, 30 Jun 2020 21:45:17 +0800 Subject: [PATCH 01/16] =?UTF-8?q?=E4=BC=98=E5=85=88=E4=BD=BF=E7=94=A8=20pk?= =?UTF-8?q?g-config=20=E6=9D=A5=E6=9F=A5=E6=89=BE=20AV*=20=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 部分主机上 ffmpeg 安装路径不标准, 默认查找方式不方便适应该方式 --- tests/CMakeLists.txt | 38 ++++++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index ee75592c..4458b8d1 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -8,20 +8,38 @@ if (SDL2_FOUND) message(STATUS "found library:${SDL2_LIBRARY}") endif (SDL2_FOUND) +find_package(PkgConfig QUIET) + #查找ffmpeg/libutil是否安装 -find_package(AVUTIL QUIET) -if(AVUTIL_FOUND) - include_directories(${AVUTIL_INCLUDE_DIR}) - list(APPEND LINK_LIB_LIST ${AVUTIL_LIBRARIES}) - message(STATUS "found library:${AVUTIL_LIBRARIES}") +if(PKG_CONFIG_FOUND) + pkg_check_modules(AVUTIL QUIET IMPORTED_TARGET libavutil) + if(AVUTIL_FOUND) + list(APPEND LINK_LIB_LIST PkgConfig::AVUTIL) + message(STATUS "found library:${AVUTIL_LIBRARY}") + endif() +else() + find_package(AVUTIL QUIET) + if(AVUTIL_FOUND) + include_directories(${AVUTIL_INCLUDE_DIR}) + list(APPEND LINK_LIB_LIST ${AVUTIL_LIBRARIES}) + message(STATUS "found library:${AVUTIL_LIBRARIES}") + endif() endif() #查找ffmpeg/libavcodec是否安装 -find_package(AVCODEC QUIET) -if(AVCODEC_FOUND) - include_directories(${AVCODEC_INCLUDE_DIR}) - list(APPEND LINK_LIB_LIST ${AVCODEC_LIBRARIES}) - message(STATUS "found library:${AVCODEC_LIBRARIES}") +if(PKG_CONFIG_FOUND) + pkg_check_modules(AVCODEC QUIET IMPORTED_TARGET libavcodec) + if(AVCODEC_FOUND) + list(APPEND LINK_LIB_LIST PkgConfig::AVCODEC) + message(STATUS "found library:${AVCODEC_LIBRARY}") + endif() +else() + find_package(AVCODEC QUIET) + if(AVCODEC_FOUND) + include_directories(${AVCODEC_INCLUDE_DIR}) + list(APPEND LINK_LIB_LIST ${AVCODEC_LIBRARIES}) + message(STATUS "found library:${AVCODEC_LIBRARIES}") + endif() endif() aux_source_directory(. TEST_SRC_LIST) From 21edbb0ebe6d5ab5d4238230200cac3a0b962739 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Thu, 2 Jul 2020 17:44:53 +0800 Subject: [PATCH 02/16] =?UTF-8?q?=E6=9B=B4=E6=96=B0ZLToolKit?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 3rdpart/ZLToolKit | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index 5030af90..ed47015f 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit 5030af90126ea8f01ded6744ae8abdf549d00a81 +Subproject commit ed47015f92cc79dfe3344b3666aafb54f1bbc2f4 From ac320ddc092c976df31d15a48ca7c0f0b85f266e Mon Sep 17 00:00:00 2001 From: wxf Date: Thu, 2 Jul 2020 16:58:21 +0800 Subject: [PATCH 03/16] =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=9C=A8=E7=BC=96?= =?UTF-8?q?=E8=AF=91=E6=97=B6=E5=BC=80=E5=90=AF/=E5=85=B3=E9=97=AD?= =?UTF-8?q?=E9=83=A8=E5=88=86=E7=89=B9=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index c23279c7..4cb5f70e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -41,13 +41,13 @@ INCLUDE_DIRECTORIES(${ToolKit_Root}) INCLUDE_DIRECTORIES(${MediaKit_Root}) INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR}/3rdpart) -set(ENABLE_HLS true) -set(ENABLE_OPENSSL true) -set(ENABLE_MYSQL false) -set(ENABLE_FAAC false) -set(ENABLE_X264 false) -set(ENABLE_MP4 true) -set(ENABLE_RTPPROXY true) +option(ENABLE_HLS "Enable HLS" true) +option(ENABLE_OPENSSL "Enable OpenSSL" true) +option(ENABLE_MYSQL "Enable MySQL" false) +option(ENABLE_FAAC "Enable FAAC" false) +option(ENABLE_X264 "Enable x264" false) +option(ENABLE_MP4 "Enable MP4" true) +option(ENABLE_RTPPROXY "Enable RTPPROXY" true) set(LINK_LIB_LIST zlmediakit zltoolkit) From e89e39704f937d53ec07ee46d717ab1076c18ace Mon Sep 17 00:00:00 2001 From: wxf Date: Thu, 2 Jul 2020 17:55:32 +0800 Subject: [PATCH 04/16] =?UTF-8?q?=E6=94=AF=E6=8C=81=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E6=98=AF=E5=90=A6=E7=BC=96=E8=AF=91=20api/tests/server?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4cb5f70e..2173561b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -48,6 +48,9 @@ option(ENABLE_FAAC "Enable FAAC" false) option(ENABLE_X264 "Enable x264" false) option(ENABLE_MP4 "Enable MP4" true) option(ENABLE_RTPPROXY "Enable RTPPROXY" true) +option(ENABLE_API "Enable C API SDK" true) +option(ENABLE_TESTS "Enable Tests" true) +option(ENABLE_SERVER "Enable Server" true) set(LINK_LIB_LIST zlmediakit zltoolkit) @@ -188,11 +191,17 @@ execute_process(COMMAND cp -r ${CMAKE_CURRENT_SOURCE_DIR}/www ${EXECUTABLE_OUTPU execute_process(COMMAND cp ${CMAKE_CURRENT_SOURCE_DIR}/conf/config.ini ${EXECUTABLE_OUTPUT_PATH}/) #添加c库 -add_subdirectory(api) +if(ENABLE_API) + add_subdirectory(api) +endif() if (NOT IOS) #测试程序 - add_subdirectory(tests) + if(ENABLE_TESTS) + add_subdirectory(tests) + endif() #主服务器 - add_subdirectory(server) + if(ENABLE_SERVER) + add_subdirectory(server) + endif() endif () From dccb1e2a428058947f40327a6b54d01133646b7f Mon Sep 17 00:00:00 2001 From: wxf Date: Thu, 2 Jul 2020 17:56:18 +0800 Subject: [PATCH 05/16] =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=AE=89=E8=A3=85=20C+?= =?UTF-8?q?+=20=E5=BA=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 17 +++++++++++++++++ api/CMakeLists.txt | 9 --------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 2173561b..6a6fe11b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -49,6 +49,7 @@ option(ENABLE_X264 "Enable x264" false) option(ENABLE_MP4 "Enable MP4" true) option(ENABLE_RTPPROXY "Enable RTPPROXY" true) option(ENABLE_API "Enable C API SDK" true) +option(ENABLE_CXX_API "Enable C++ API SDK" false) option(ENABLE_TESTS "Enable Tests" true) option(ENABLE_SERVER "Enable Server" true) @@ -178,6 +179,22 @@ endif () add_library(zltoolkit STATIC ${ToolKit_src_list}) add_library(zlmediakit STATIC ${MediaKit_src_list}) +#安装目录 +if (WIN32) + set(INSTALL_PATH_LIB $ENV{HOME}/${CMAKE_PROJECT_NAME}/lib) + set(INSTALL_PATH_INCLUDE $ENV{HOME}/${CMAKE_PROJECT_NAME}/include) +else () + set(INSTALL_PATH_LIB lib) + set(INSTALL_PATH_INCLUDE include) +endif () + +if(ENABLE_CXX_API) + # 保留目录结构 + install(DIRECTORY ${ToolKit_Root}/ DESTINATION ${INSTALL_PATH_INCLUDE}/ZLToolKit REGEX "(.*[.](md|cpp)|win32)$" EXCLUDE) + install(DIRECTORY ${MediaKit_Root}/ DESTINATION ${INSTALL_PATH_INCLUDE}/ZLMediaKit REGEX ".*[.](md|cpp)$" EXCLUDE) + install(TARGETS zltoolkit zlmediakit DESTINATION ${INSTALL_PATH_LIB}) +endif() + if (WIN32) list(APPEND LINK_LIB_LIST WS2_32 Iphlpapi shlwapi) set_target_properties(zltoolkit PROPERTIES COMPILE_FLAGS ${VS_FALGS} ) diff --git a/api/CMakeLists.txt b/api/CMakeLists.txt index 6169e0f5..6f460300 100644 --- a/api/CMakeLists.txt +++ b/api/CMakeLists.txt @@ -22,15 +22,6 @@ else () target_link_libraries(mk_api ${LINK_LIB_LIST}) add_subdirectory(tests) - #安装目录 - if (WIN32) - set(INSTALL_PATH_LIB $ENV{HOME}/${CMAKE_PROJECT_NAME}/lib) - set(INSTALL_PATH_INCLUDE $ENV{HOME}/${CMAKE_PROJECT_NAME}/include) - else () - set(INSTALL_PATH_LIB lib) - set(INSTALL_PATH_INCLUDE include) - endif () - file(GLOB api_header_list include/*.h) install(FILES ${api_header_list} DESTINATION ${INSTALL_PATH_INCLUDE}) install(TARGETS mk_api ARCHIVE DESTINATION ${INSTALL_PATH_LIB} LIBRARY DESTINATION ${INSTALL_PATH_LIB}) From 3c858a835175f6d7f1c66b62a6cc0db2ed511156 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Thu, 2 Jul 2020 18:14:39 +0800 Subject: [PATCH 06/16] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=AA=92=E4=BD=93?= =?UTF-8?q?=E6=B3=A8=E5=86=8C=E5=9B=9E=E8=B0=83=E4=BA=8B=E4=BB=B6:#373?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/include/mk_media.h | 17 +++++++++++++++++ api/source/mk_media.cpp | 21 +++++++++++++++++++++ server/FFmpegSource.cpp | 7 +++++++ server/FFmpegSource.h | 1 + src/Common/MediaSource.cpp | 12 +++++++++++- src/Common/MediaSource.h | 2 ++ src/Common/MultiMediaSourceMuxer.cpp | 7 +++++++ src/Common/MultiMediaSourceMuxer.h | 7 +++++++ 8 files changed, 73 insertions(+), 1 deletion(-) diff --git a/api/include/mk_media.h b/api/include/mk_media.h index 3337b59f..87b8836d 100755 --- a/api/include/mk_media.h +++ b/api/include/mk_media.h @@ -12,6 +12,7 @@ #define MK_MEDIA_H_ #include "mk_common.h" +#include "mk_events_objects.h" #ifdef __cplusplus extern "C" { @@ -158,6 +159,22 @@ API_EXPORT void API_CALL mk_media_set_on_seek(mk_media ctx, on_mk_media_seek cb, */ API_EXPORT int API_CALL mk_media_total_reader_count(mk_media ctx); +/** + * 生成的MediaSource注册或注销事件 + * @param user_data 设置回调时的用户数据指针 + * @param sender 生成的MediaSource对象 + * @param regist 1为注册事件,0为注销事件 + */ +typedef void(API_CALL *on_mk_media_source_regist)(void *user_data, mk_media_source sender, int regist); + +/** + * 设置MediaSource注册或注销事件回调函数 + * @param ctx 对象指针 + * @param cb 回调指针 + * @param user_data 用户数据指针 + */ +API_EXPORT void API_CALL mk_media_set_on_regist(mk_media ctx, on_mk_media_source_regist cb, void *user_data); + #ifdef __cplusplus } #endif diff --git a/api/source/mk_media.cpp b/api/source/mk_media.cpp index 91761829..bad5bf51 100755 --- a/api/source/mk_media.cpp +++ b/api/source/mk_media.cpp @@ -42,6 +42,12 @@ public: _on_seek = cb; _on_seek_data = user_data; } + + void setOnRegist(on_mk_media_source_regist cb, void *user_data){ + _on_regist = cb; + _on_regist_data = user_data; + } + protected: // 通知其停止推流 bool close(MediaSource &sender,bool force) override{ @@ -70,12 +76,21 @@ protected: int totalReaderCount(MediaSource &sender) override{ return _channel->totalReaderCount(); } + + void onRegist(MediaSource &sender, bool regist) override{ + if (_on_regist) { + _on_regist(_on_regist_data, &sender, regist); + } + } + private: DevChannel::Ptr _channel; on_mk_media_close _on_close = nullptr; on_mk_media_seek _on_seek = nullptr; + on_mk_media_source_regist _on_regist = nullptr; void *_on_seek_data; void *_on_close_data; + void *_on_regist_data; }; API_EXPORT void API_CALL mk_media_set_on_close(mk_media ctx, on_mk_media_close cb, void *user_data){ @@ -90,6 +105,12 @@ API_EXPORT void API_CALL mk_media_set_on_seek(mk_media ctx, on_mk_media_seek cb, (*obj)->setOnSeek(cb, user_data); } +API_EXPORT void API_CALL mk_media_set_on_regist(mk_media ctx, on_mk_media_source_regist cb, void *user_data){ + assert(ctx); + MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx; + (*obj)->setOnRegist(cb, user_data); +} + API_EXPORT int API_CALL mk_media_total_reader_count(mk_media ctx){ assert(ctx); MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx; diff --git a/server/FFmpegSource.cpp b/server/FFmpegSource.cpp index b6a7ef7e..fdf9b7f2 100644 --- a/server/FFmpegSource.cpp +++ b/server/FFmpegSource.cpp @@ -249,6 +249,13 @@ void FFmpegSource::onNoneReader(MediaSource &sender){ MediaSourceEvent::onNoneReader(sender); } +void FFmpegSource::onRegist(MediaSource &sender, bool regist){ + auto listener = _listener.lock(); + if(listener){ + listener->onRegist(sender, regist); + } +} + void FFmpegSource::onGetMediaSource(const MediaSource::Ptr &src) { _listener = src->getListener(); src->setListener(shared_from_this()); diff --git a/server/FFmpegSource.h b/server/FFmpegSource.h index 06bcf410..00635ac5 100644 --- a/server/FFmpegSource.h +++ b/server/FFmpegSource.h @@ -62,6 +62,7 @@ private: bool close(MediaSource &sender,bool force) override; int totalReaderCount(MediaSource &sender) override; void onNoneReader(MediaSource &sender) override; + void onRegist(MediaSource &sender, bool regist) override; private: Process _process; diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index 7d9ba2f8..e7bc6b14 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -295,7 +295,7 @@ void MediaSource::regist() { //注册该源,注册后服务器才能找到该源 { lock_guard lock(g_mtxMediaSrc); - g_mapMediaSrc[_strSchema][_strVhost][_strApp][_strId] = shared_from_this(); + g_mapMediaSrc[_strSchema][_strVhost][_strApp][_strId] = shared_from_this(); } _StrPrinter codec_info; auto tracks = getTracks(true); @@ -326,6 +326,11 @@ void MediaSource::regist() { InfoL << _strSchema << " " << _strVhost << " " << _strApp << " " << _strId << " " << codec_info; NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaChanged, true, *this); + + auto listener = _listener.lock(); + if (listener) { + listener->onRegist(*this, true); + } } //反注册该源 @@ -352,6 +357,11 @@ bool MediaSource::unregist() { if(ret){ InfoL << _strSchema << " " << _strVhost << " " << _strApp << " " << _strId; NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaChanged, false, *this); + + auto listener = _listener.lock(); + if (listener) { + listener->onRegist(*this, false); + } } return ret; } diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 28330166..d4e44d4e 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -55,6 +55,8 @@ public: virtual bool isRecording(MediaSource &sender, Recorder::type type) { return false; }; // 通知无人观看 virtual void onNoneReader(MediaSource &sender); + //流注册或注销事件 + virtual void onRegist(MediaSource &sender, bool regist) {}; private: Timer::Ptr _async_close_timer; diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index b5f10da8..aeaa84bc 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -287,6 +287,13 @@ void MultiMediaSourceMuxer::onNoneReader(MediaSource &sender){ listener->onNoneReader(sender); } +void MultiMediaSourceMuxer::onRegist(MediaSource &sender, bool regist){ + auto listener = _listener.lock(); + if (listener) { + listener->onRegist(sender, regist); + } +} + bool MultiMediaSourceMuxer::setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) { return _muxer->setupRecord(sender,type,start,custom_path); } diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index 679906ba..216ba443 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -137,6 +137,13 @@ public: */ void onNoneReader(MediaSource &sender) override; + /** + * 媒体注册注销事件 + * @param sender 触发者 + * @param regist 是否为注册事件 + */ + void onRegist(MediaSource &sender, bool regist) override; + /** * 设置录制状态 * @param type 录制类型 From b603b8a68d772099ce8b542bd4c9780bc8f33497 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Thu, 2 Jul 2020 22:23:43 +0800 Subject: [PATCH 07/16] =?UTF-8?q?=E6=94=AF=E6=8C=81http=20api=E5=8A=A8?= =?UTF-8?q?=E6=80=81=E6=B7=BB=E5=8A=A0=E6=88=96=E5=85=B3=E9=97=ADrtp?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/source/mk_common.cpp | 25 +++++----------- server/WebApi.cpp | 47 +++++++++++++++++++++++++++-- server/main.cpp | 15 +++------- src/Rtp/RtpServer.cpp | 64 ++++++++++++++++++++++++++++++++++++++++ src/Rtp/RtpServer.h | 61 ++++++++++++++++++++++++++++++++++++++ src/Rtp/UdpRecver.cpp | 44 --------------------------- src/Rtp/UdpRecver.h | 40 ------------------------- 7 files changed, 181 insertions(+), 115 deletions(-) create mode 100644 src/Rtp/RtpServer.cpp create mode 100644 src/Rtp/RtpServer.h delete mode 100644 src/Rtp/UdpRecver.cpp delete mode 100644 src/Rtp/UdpRecver.h diff --git a/api/source/mk_common.cpp b/api/source/mk_common.cpp index ce36924e..1b84707e 100644 --- a/api/source/mk_common.cpp +++ b/api/source/mk_common.cpp @@ -30,10 +30,8 @@ static TcpServer::Ptr http_server[2]; static TcpServer::Ptr shell_server; #ifdef ENABLE_RTPPROXY -#include "Rtp/UdpRecver.h" -#include "Rtp/RtpSession.h" -static std::shared_ptr udpRtpServer; -static TcpServer::Ptr tcpRtpServer; +#include "Rtp/RtpServer.h" +static std::shared_ptr rtpServer; #endif //////////////////////////environment init/////////////////////////// @@ -57,8 +55,7 @@ API_EXPORT void API_CALL mk_stop_all_server(){ CLEAR_ARR(rtmp_server); CLEAR_ARR(http_server); #ifdef ENABLE_RTPPROXY - udpRtpServer = nullptr; - tcpRtpServer = nullptr; + rtpServer = nullptr; #endif stopAllTcpServer(); } @@ -184,18 +181,12 @@ API_EXPORT uint16_t API_CALL mk_rtmp_server_start(uint16_t port, int ssl) { API_EXPORT uint16_t API_CALL mk_rtp_server_start(uint16_t port){ #ifdef ENABLE_RTPPROXY try { - //创建rtp tcp服务器 - tcpRtpServer = std::make_shared(); - tcpRtpServer->start(port); - - //创建rtp udp服务器 - auto ret = tcpRtpServer->getPort(); - udpRtpServer = std::make_shared(); - udpRtpServer->initSock(port); - return ret; + //创建rtp 服务器 + rtpServer = std::make_shared(); + rtpServer->start(port); + return rtpServer->getPort(); } catch (std::exception &ex) { - tcpRtpServer.reset(); - udpRtpServer.reset(); + rtpServer.reset(); WarnL << ex.what(); return 0; } diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 20a653aa..cc98d596 100644 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -34,6 +34,9 @@ #include "Thread/WorkThreadPool.h" #include "Rtp/RtpSelector.h" #include "FFmpegSource.h" +#if defined(ENABLE_RTPPROXY) +#include "Rtp/RtpServer.h" +#endif using namespace Json; using namespace toolkit; using namespace mediakit; @@ -244,15 +247,24 @@ bool checkArgs(Args &&args,First &&first,KeyTypes && ...keys){ } \ } +//拉流代理器列表 static unordered_map s_proxyMap; static recursive_mutex s_proxyMapMtx; + +//FFmpeg拉流代理器列表 +static unordered_map s_ffmpegMap; +static recursive_mutex s_ffmpegMapMtx; + +#if defined(ENABLE_RTPPROXY) +//rtp服务器列表 +static unordered_map s_rtpServerMap; +static recursive_mutex s_rtpServerMapMtx; +#endif + static inline string getProxyKey(const string &vhost,const string &app,const string &stream){ return vhost + "/" + app + "/" + stream; } -static unordered_map s_ffmpegMap; -static recursive_mutex s_ffmpegMapMtx; - /** * 安装api接口 * 所有api都支持GET和POST两种方式 @@ -745,6 +757,29 @@ void installWebApi() { val["peer_ip"] = process->get_peer_ip(); val["peer_port"] = process->get_peer_port(); }); + + api_regist1("/index/api/openRtpServer",[](API_ARGS1){ + CHECK_SECRET(); + CHECK_ARGS("port", "enable_tcp"); + + RtpServer::Ptr server = std::make_shared(); + server->start(allArgs["port"], allArgs["enable_tcp"].as()); + val["port"] = server->getPort(); + + //保存对象 + lock_guard lck(s_rtpServerMapMtx); + s_rtpServerMap.emplace(server->getPort(), server); + }); + + api_regist1("/index/api/closeRtpServer",[](API_ARGS1){ + CHECK_SECRET(); + CHECK_ARGS("port"); + + lock_guard lck(s_rtpServerMapMtx); + val["hit"] = (int)s_rtpServerMap.erase(allArgs["port"].as()); + }); + + #endif//ENABLE_RTPPROXY // 开始录制hls或MP4 @@ -1045,4 +1080,10 @@ void unInstallWebApi(){ lock_guard lck(s_ffmpegMapMtx); s_ffmpegMap.clear(); } + { +#if defined(ENABLE_RTPPROXY) + lock_guard lck(s_rtpServerMapMtx); + s_rtpServerMap.clear(); +#endif + } } \ No newline at end of file diff --git a/server/main.cpp b/server/main.cpp index cd1ee8a0..705092c4 100644 --- a/server/main.cpp +++ b/server/main.cpp @@ -20,13 +20,11 @@ #include "Network/TcpServer.h" #include "Poller/EventPoller.h" #include "Common/config.h" -#include "Rtsp/UDPServer.h" #include "Rtsp/RtspSession.h" -#include "Rtp/RtpSession.h" #include "Rtmp/RtmpSession.h" #include "Shell/ShellSession.h" #include "Http/WebSocketSession.h" -#include "Rtp/UdpRecver.h" +#include "Rtp/RtpServer.h" #include "WebApi.h" #include "WebHook.h" @@ -283,8 +281,7 @@ int start_main(int argc,char *argv[]) { #if defined(ENABLE_RTPPROXY) //GB28181 rtp推流端口,支持UDP/TCP - UdpRecver recver; - TcpServer::Ptr tcpRtpServer(new TcpServer()); + RtpServer::Ptr rtpServer = std::make_shared(); #endif//defined(ENABLE_RTPPROXY) try { @@ -307,12 +304,8 @@ int start_main(int argc,char *argv[]) { if(shellPort) { shellSrv->start(shellPort); } #if defined(ENABLE_RTPPROXY) - if(rtpPort){ - //创建rtp udp服务器 - recver.initSock(rtpPort); - //创建rtp tcp服务器 - tcpRtpServer->start(rtpPort); - } + //创建rtp服务器 + if(rtpPort){ rtpServer->start(rtpPort); } #endif//defined(ENABLE_RTPPROXY) }catch (std::exception &ex){ diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp new file mode 100644 index 00000000..5e90c1f7 --- /dev/null +++ b/src/Rtp/RtpServer.cpp @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#if defined(ENABLE_RTPPROXY) +#include "RtpServer.h" +#include "RtpSelector.h" +namespace mediakit{ + +RtpServer::RtpServer() { +} + +RtpServer::~RtpServer() { + if(_udp_server){ + _udp_server->setOnRead(nullptr); + } +} + +void RtpServer::start(uint16_t local_port, bool enable_tcp, const char *local_ip) { + _udp_server.reset(new Socket(nullptr, false)); + auto &ref = RtpSelector::Instance(); + auto sock = _udp_server; + _udp_server->setOnRead([&ref, sock](const Buffer::Ptr &buf, struct sockaddr *addr, int) { + ref.inputRtp(sock, buf->data(), buf->size(), addr); + }); + + //创建udp服务器 + if (!_udp_server->bindUdpSock(local_port, local_ip)) { + _udp_server = nullptr; + string err = (StrPrinter << "bindUdpSock on " << local_ip << ":" << local_port << " failed:" << get_uv_errmsg(true)); + throw std::runtime_error(err); + } + //设置udp socket读缓存 + SockUtil::setRecvBuf(_udp_server->rawFD(), 4 * 1024 * 1024); + + if (enable_tcp) { + try { + //创建tcp服务器 + _tcp_server = std::make_shared(_udp_server->getPoller()); + _tcp_server->start(_udp_server->get_local_port(), local_ip); + } catch (...) { + _tcp_server = nullptr; + _udp_server = nullptr; + throw; + } + } +} + +EventPoller::Ptr RtpServer::getPoller() { + return _udp_server->getPoller(); +} + +uint16_t RtpServer::getPort() { + return _udp_server ? _udp_server->get_local_port() : 0; +} + +}//namespace mediakit +#endif//defined(ENABLE_RTPPROXY) \ No newline at end of file diff --git a/src/Rtp/RtpServer.h b/src/Rtp/RtpServer.h new file mode 100644 index 00000000..c81f9513 --- /dev/null +++ b/src/Rtp/RtpServer.h @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_RTPSERVER_H +#define ZLMEDIAKIT_RTPSERVER_H + +#if defined(ENABLE_RTPPROXY) +#include +#include "Network/Socket.h" +#include "Network/TcpServer.h" +#include "RtpSession.h" + +using namespace std; +using namespace toolkit; + +namespace mediakit{ + +/** + * RTP服务器,支持UDP/TCP + */ +class RtpServer { +public: + typedef std::shared_ptr Ptr; + typedef function onRecv; + + RtpServer(); + ~RtpServer(); + + /** + * 开启服务器,可能抛异常 + * @param local_port 本地端口,0时为随机端口 + * @param enable_tcp 是否启用tcp服务器 + * @param local_ip 绑定的本地网卡ip + */ + void start(uint16_t local_port, bool enable_tcp = true, const char *local_ip = "0.0.0.0"); + + /** + * 获取绑定的本地端口 + */ + uint16_t getPort(); + + /** + * 获取绑定的线程 + */ + EventPoller::Ptr getPoller(); + +protected: + Socket::Ptr _udp_server; + TcpServer::Ptr _tcp_server; +}; + +}//namespace mediakit +#endif//defined(ENABLE_RTPPROXY) +#endif //ZLMEDIAKIT_RTPSERVER_H diff --git a/src/Rtp/UdpRecver.cpp b/src/Rtp/UdpRecver.cpp deleted file mode 100644 index 2b4a6f3b..00000000 --- a/src/Rtp/UdpRecver.cpp +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. - * - * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). - * - * Use of this source code is governed by MIT license that can be found in the - * LICENSE file in the root of the source tree. All contributing project authors - * may be found in the AUTHORS file in the root of the source tree. - */ - -#if defined(ENABLE_RTPPROXY) -#include "UdpRecver.h" -#include "RtpSelector.h" -namespace mediakit{ - -UdpRecver::UdpRecver() { -} - -UdpRecver::~UdpRecver() { - if(_sock){ - _sock->setOnRead(nullptr); - } -} - -bool UdpRecver::initSock(uint16_t local_port,const char *local_ip) { - _sock.reset(new Socket(nullptr, false)); - onceToken token(nullptr,[&](){ - SockUtil::setRecvBuf(_sock->rawFD(),4 * 1024 * 1024); - }); - - auto &ref = RtpSelector::Instance(); - auto sock = _sock; - _sock->setOnRead([&ref, sock](const Buffer::Ptr &buf, struct sockaddr *addr, int ){ - ref.inputRtp(sock, buf->data(),buf->size(),addr); - }); - return _sock->bindUdpSock(local_port,local_ip); -} - -EventPoller::Ptr UdpRecver::getPoller() { - return _sock->getPoller(); -} - -}//namespace mediakit -#endif//defined(ENABLE_RTPPROXY) \ No newline at end of file diff --git a/src/Rtp/UdpRecver.h b/src/Rtp/UdpRecver.h deleted file mode 100644 index 98361064..00000000 --- a/src/Rtp/UdpRecver.h +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. - * - * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). - * - * Use of this source code is governed by MIT license that can be found in the - * LICENSE file in the root of the source tree. All contributing project authors - * may be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef ZLMEDIAKIT_UDPRECVER_H -#define ZLMEDIAKIT_UDPRECVER_H - -#if defined(ENABLE_RTPPROXY) -#include -#include "Network/Socket.h" -using namespace std; -using namespace toolkit; - -namespace mediakit{ - -/** - * 组播接收器 - */ -class UdpRecver { -public: - typedef std::shared_ptr Ptr; - typedef function onRecv; - - UdpRecver(); - virtual ~UdpRecver(); - bool initSock(uint16_t local_port,const char *local_ip = "0.0.0.0"); - EventPoller::Ptr getPoller(); -protected: - Socket::Ptr _sock; -}; - -}//namespace mediakit -#endif//defined(ENABLE_RTPPROXY) -#endif //ZLMEDIAKIT_UDPRECVER_H From 2be19ffb31b60415cdbae87a2173aec1354c76f5 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Thu, 2 Jul 2020 22:26:38 +0800 Subject: [PATCH 08/16] =?UTF-8?q?=E6=B7=BB=E5=8A=A0listRtpServer=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/WebApi.cpp | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/server/WebApi.cpp b/server/WebApi.cpp index cc98d596..80de4984 100644 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -779,6 +779,14 @@ void installWebApi() { val["hit"] = (int)s_rtpServerMap.erase(allArgs["port"].as()); }); + api_regist1("/index/api/listRtpServer",[](API_ARGS1){ + CHECK_SECRET(); + + lock_guard lck(s_rtpServerMapMtx); + for(auto &pr : s_rtpServerMap){ + val["data"].append(pr.first); + } + }); #endif//ENABLE_RTPPROXY From cd5dd6ffd01fc03fdbcbca4b955bb35f32a1e93c Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Sat, 4 Jul 2020 19:30:03 +0800 Subject: [PATCH 09/16] =?UTF-8?q?=E6=9B=B4=E6=96=B0ps=E8=A7=A3=E6=9E=90?= =?UTF-8?q?=E5=BA=93=EF=BC=8C=E4=BF=AE=E5=A4=8D=E7=8C=9C=E6=B5=8B=E7=BC=96?= =?UTF-8?q?=E7=A0=81=E6=A0=BC=E5=BC=8F=E9=94=99=E8=AF=AF=E7=9A=84bug:#359?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 3rdpart/media-server | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/3rdpart/media-server b/3rdpart/media-server index 576216c6..037571ec 160000 --- a/3rdpart/media-server +++ b/3rdpart/media-server @@ -1 +1 @@ -Subproject commit 576216c64bf3bcdc5e787da2adb3e169bdd97118 +Subproject commit 037571ecbf47085f39f4f208bfbf261570b6412f From 80d9f8ccaae99736a6b6116353fb0fcc12ebf70e Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Sat, 4 Jul 2020 21:44:44 +0800 Subject: [PATCH 10/16] =?UTF-8?q?=E6=9B=B4=E6=96=B0ps=E8=A7=A3=E6=9E=90?= =?UTF-8?q?=E5=BA=93=EF=BC=8C=E5=85=BC=E5=AE=B9=E4=B8=8D=E8=A7=84=E8=8C=83?= =?UTF-8?q?=E7=9A=84ps=E6=B5=81:#359?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 3rdpart/media-server | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/3rdpart/media-server b/3rdpart/media-server index 037571ec..7f7906b0 160000 --- a/3rdpart/media-server +++ b/3rdpart/media-server @@ -1 +1 @@ -Subproject commit 037571ecbf47085f39f4f208bfbf261570b6412f +Subproject commit 7f7906b05d84c5efeceecb8d6f540a71c8153431 From 30260e5414d71922b7d4a5a2c2eff146e1e81540 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Tue, 7 Jul 2020 09:58:08 +0800 Subject: [PATCH 11/16] =?UTF-8?q?=E5=A4=8D=E7=94=A8printSSRC=E5=87=BD?= =?UTF-8?q?=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Rtsp/Rtsp.cpp | 10 ++++++++++ src/Rtsp/Rtsp.h | 1 + src/Rtsp/RtspSession.cpp | 9 --------- src/Rtsp/RtspSession.h | 2 -- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/Rtsp/Rtsp.cpp b/src/Rtsp/Rtsp.cpp index b55c1d0e..c93331d6 100644 --- a/src/Rtsp/Rtsp.cpp +++ b/src/Rtsp/Rtsp.cpp @@ -404,4 +404,14 @@ std::pair makeSockPair(const EventPoller::Ptr &poller, } } +string printSSRC(uint32_t ui32Ssrc) { + char tmp[9] = { 0 }; + ui32Ssrc = htonl(ui32Ssrc); + uint8_t *pSsrc = (uint8_t *) &ui32Ssrc; + for (int i = 0; i < 4; i++) { + sprintf(tmp + 2 * i, "%02X", pSsrc[i]); + } + return tmp; +} + }//namespace mediakit \ No newline at end of file diff --git a/src/Rtsp/Rtsp.h b/src/Rtsp/Rtsp.h index 3e7a22e7..ff31295c 100644 --- a/src/Rtsp/Rtsp.h +++ b/src/Rtsp/Rtsp.h @@ -272,6 +272,7 @@ private: }; std::pair makeSockPair(const EventPoller::Ptr &poller, const string &local_ip); +string printSSRC(uint32_t ui32Ssrc); } //namespace mediakit #endif //RTSP_RTSP_H_ diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index d73b11f1..d60941b4 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -1068,15 +1068,6 @@ bool RtspSession::sendRtspResponse(const string &res_code, return sendRtspResponse(res_code,header_map,sdp,protocol); } -inline string RtspSession::printSSRC(uint32_t ui32Ssrc) { - char tmp[9] = { 0 }; - ui32Ssrc = htonl(ui32Ssrc); - uint8_t *pSsrc = (uint8_t *) &ui32Ssrc; - for (int i = 0; i < 4; i++) { - sprintf(tmp + 2 * i, "%02X", pSsrc[i]); - } - return tmp; -} inline int RtspSession::getTrackIndexByTrackType(TrackType type) { for (unsigned int i = 0; i < _aTrackInfo.size(); i++) { if (type == _aTrackInfo[i]->_type) { diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index c7e3160c..eeba0d85 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -136,8 +136,6 @@ private: void inline send_SessionNotFound(); //一般rtsp服务器打开端口失败时触发 void inline send_NotAcceptable(); - //ssrc转字符串 - inline string printSSRC(uint32_t ui32Ssrc); //获取track下标 inline int getTrackIndexByTrackType(TrackType type); From 477f99b756782e40fd67d769bc8d7eea768922d0 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Tue, 7 Jul 2020 10:01:12 +0800 Subject: [PATCH 12/16] =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=8A=A8=E6=80=81?= =?UTF-8?q?=E5=88=9B=E5=BB=BAGB28181=E6=94=B6=E6=B5=81=E7=AB=AF=E5=8F=A3?= =?UTF-8?q?=E5=B9=B6=E5=8F=AF=E6=8C=87=E5=AE=9Astream=5Fid:#338?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/WebApi.cpp | 15 ++++------ src/Rtp/RtpProcess.cpp | 17 ++--------- src/Rtp/RtpProcess.h | 4 +-- src/Rtp/RtpSelector.cpp | 63 ++++++++++++++++++++++------------------- src/Rtp/RtpSelector.h | 21 +++++++++----- src/Rtp/RtpServer.cpp | 7 +++-- src/Rtp/RtpServer.h | 3 +- src/Rtp/RtpSession.cpp | 23 +++++++++++---- src/Rtp/RtpSession.h | 6 +++- tests/test_rtp.cpp | 3 +- 10 files changed, 89 insertions(+), 73 deletions(-) diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 80de4984..1980ee02 100644 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -741,15 +741,12 @@ void installWebApi() { }); #if defined(ENABLE_RTPPROXY) - api_regist1("/index/api/getSsrcInfo",[](API_ARGS1){ + api_regist1("/index/api/getRtpInfo",[](API_ARGS1){ CHECK_SECRET(); - CHECK_ARGS("ssrc"); - uint32_t ssrc = 0; - stringstream ss(allArgs["ssrc"]); - ss >> std::hex >> ssrc; + CHECK_ARGS("stream_id"); - auto process = RtpSelector::Instance().getProcess(ssrc,false); - if(!process){ + auto process = RtpSelector::Instance().getProcess(allArgs["stream_id"], false); + if (!process) { val["exist"] = false; return; } @@ -760,10 +757,10 @@ void installWebApi() { api_regist1("/index/api/openRtpServer",[](API_ARGS1){ CHECK_SECRET(); - CHECK_ARGS("port", "enable_tcp"); + CHECK_ARGS("port", "enable_tcp", "stream_id"); RtpServer::Ptr server = std::make_shared(); - server->start(allArgs["port"], allArgs["enable_tcp"].as()); + server->start(allArgs["port"], allArgs["stream_id"], allArgs["enable_tcp"].as()); val["port"] = server->getPort(); //保存对象 diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index 08923216..b3f32d19 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -16,32 +16,21 @@ namespace mediakit{ -string printSSRC(uint32_t ui32Ssrc) { - char tmp[9] = { 0 }; - ui32Ssrc = htonl(ui32Ssrc); - uint8_t *pSsrc = (uint8_t *) &ui32Ssrc; - for (int i = 0; i < 4; i++) { - sprintf(tmp + 2 * i, "%02X", pSsrc[i]); - } - return tmp; -} - static string printAddress(const struct sockaddr *addr){ return StrPrinter << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr) << ":" << ntohs(((struct sockaddr_in *) addr)->sin_port); } -RtpProcess::RtpProcess(uint32_t ssrc) { - _ssrc = ssrc; +RtpProcess::RtpProcess(const string &stream_id) { _track = std::make_shared(); _track->_interleaved = 0; _track->_samplerate = 90000; _track->_type = TrackVideo; - _track->_ssrc = _ssrc; + _track->_ssrc = 0; _media_info._schema = RTP_APP_NAME; _media_info._vhost = DEFAULT_VHOST; _media_info._app = RTP_APP_NAME; - _media_info._streamid = printSSRC(_ssrc); + _media_info._streamid = stream_id; GET_CONFIG(string,dump_dir,RtpProxy::kDumpDir); { diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index 7b054292..0c55e375 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -22,11 +22,10 @@ using namespace mediakit; namespace mediakit{ -string printSSRC(uint32_t ui32Ssrc); class RtpProcess : public RtpReceiver , public RtpDecoder, public SockInfo, public MediaSinkInterface, public std::enable_shared_from_this{ public: typedef std::shared_ptr Ptr; - RtpProcess(uint32_t ssrc); + RtpProcess(const string &stream_id); ~RtpProcess(); bool inputRtp(const Socket::Ptr &sock, const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr); bool alive(); @@ -54,7 +53,6 @@ private: std::shared_ptr _save_file_rtp; std::shared_ptr _save_file_ps; std::shared_ptr _save_file_video; - uint32_t _ssrc; SdpTrack::Ptr _track; struct sockaddr *_addr = nullptr; uint16_t _sequence = 0; diff --git a/src/Rtp/RtpSelector.cpp b/src/Rtp/RtpSelector.cpp index d40e5ceb..36d8f8bc 100644 --- a/src/Rtp/RtpSelector.cpp +++ b/src/Rtp/RtpSelector.cpp @@ -15,37 +15,44 @@ namespace mediakit{ INSTANCE_IMP(RtpSelector); -bool RtpSelector::inputRtp(const Socket::Ptr &sock, const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) { - uint32_t ssrc = 0; - if(!getSSRC(data,data_len,ssrc)){ - WarnL << "get ssrc from rtp failed:" << data_len; - return false; +bool RtpSelector::inputRtp(const Socket::Ptr &sock, string &stream_id, const char *data, int data_len, + const struct sockaddr *addr,uint32_t *dts_out) { + if (stream_id.empty()) { + //未指定流id,那么使用ssrc为流id + uint32_t ssrc = 0; + if (!getSSRC(data, data_len, ssrc)) { + WarnL << "get ssrc from rtp failed:" << data_len; + return false; + } + stream_id = printSSRC(ssrc); } - auto process = getProcess(ssrc, true); - if(process){ - return process->inputRtp(sock, data,data_len, addr,dts_out); + + //假定指定了流id,那么通过流id来区分是否为一路流(哪怕可能同时收到多路流) + auto process = getProcess(stream_id, true); + if (process) { + return process->inputRtp(sock, data, data_len, addr, dts_out); } return false; } bool RtpSelector::getSSRC(const char *data,int data_len, uint32_t &ssrc){ - if(data_len < 12){ + if (data_len < 12) { return false; } - uint32_t *ssrc_ptr = (uint32_t *)(data + 8); + uint32_t *ssrc_ptr = (uint32_t *) (data + 8); ssrc = ntohl(*ssrc_ptr); return true; } -RtpProcess::Ptr RtpSelector::getProcess(uint32_t ssrc,bool makeNew) { +RtpProcess::Ptr RtpSelector::getProcess(const string &stream_id,bool makeNew) { lock_guard lck(_mtx_map); - auto it = _map_rtp_process.find(ssrc); - if(it == _map_rtp_process.end() && !makeNew){ + auto it = _map_rtp_process.find(stream_id); + if (it == _map_rtp_process.end() && !makeNew) { return nullptr; } - RtpProcessHelper::Ptr &ref = _map_rtp_process[ssrc]; - if(!ref){ - ref = std::make_shared(ssrc,shared_from_this()); + RtpProcessHelper::Ptr &ref = _map_rtp_process[stream_id]; + if (!ref) { + ref = std::make_shared(stream_id, shared_from_this()); ref->attachEvent(); createTimer(); } @@ -67,17 +74,15 @@ void RtpSelector::createTimer() { } } -void RtpSelector::delProcess(uint32_t ssrc,const RtpProcess *ptr) { +void RtpSelector::delProcess(const string &stream_id,const RtpProcess *ptr) { lock_guard lck(_mtx_map); - auto it = _map_rtp_process.find(ssrc); - if(it == _map_rtp_process.end()){ + auto it = _map_rtp_process.find(stream_id); + if (it == _map_rtp_process.end()) { return; } - - if(it->second->getProcess().get() != ptr){ + if (it->second->getProcess().get() != ptr) { return; } - _map_rtp_process.erase(it); } @@ -88,7 +93,7 @@ void RtpSelector::onManager() { ++it; continue; } - WarnL << "RtpProcess timeout:" << printSSRC(it->first); + WarnL << "RtpProcess timeout:" << it->first; it = _map_rtp_process.erase(it); } } @@ -99,10 +104,10 @@ RtpSelector::RtpSelector() { RtpSelector::~RtpSelector() { } -RtpProcessHelper::RtpProcessHelper(uint32_t ssrc, const weak_ptr &parent) { - _ssrc = ssrc; +RtpProcessHelper::RtpProcessHelper(const string &stream_id, const weak_ptr &parent) { + _stream_id = stream_id; _parent = parent; - _process = std::make_shared(_ssrc); + _process = std::make_shared(stream_id); } RtpProcessHelper::~RtpProcessHelper() { @@ -114,14 +119,14 @@ void RtpProcessHelper::attachEvent() { bool RtpProcessHelper::close(MediaSource &sender, bool force) { //此回调在其他线程触发 - if(!_process || (!force && _process->totalReaderCount())){ + if (!_process || (!force && _process->totalReaderCount())) { return false; } auto parent = _parent.lock(); - if(!parent){ + if (!parent) { return false; } - parent->delProcess(_ssrc,_process.get()); + parent->delProcess(_stream_id, _process.get()); WarnL << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force; return true; } diff --git a/src/Rtp/RtpSelector.h b/src/Rtp/RtpSelector.h index d6354f48..adb6f1a5 100644 --- a/src/Rtp/RtpSelector.h +++ b/src/Rtp/RtpSelector.h @@ -24,19 +24,21 @@ class RtpSelector; class RtpProcessHelper : public MediaSourceEvent , public std::enable_shared_from_this { public: typedef std::shared_ptr Ptr; - RtpProcessHelper(uint32_t ssrc,const weak_ptr &parent); + RtpProcessHelper(const string &stream_id, const weak_ptr &parent); ~RtpProcessHelper(); void attachEvent(); RtpProcess::Ptr & getProcess(); + protected: // 通知其停止推流 bool close(MediaSource &sender,bool force) override; // 观看总人数 int totalReaderCount(MediaSource &sender) override; + private: weak_ptr _parent; RtpProcess::Ptr _process; - uint32_t _ssrc = 0; + string _stream_id; }; class RtpSelector : public std::enable_shared_from_this{ @@ -44,16 +46,21 @@ public: RtpSelector(); ~RtpSelector(); - static RtpSelector &Instance(); - bool inputRtp(const Socket::Ptr &sock, const char *data,int data_len,const struct sockaddr *addr ,uint32_t *dts_out = nullptr ); static bool getSSRC(const char *data,int data_len, uint32_t &ssrc); - RtpProcess::Ptr getProcess(uint32_t ssrc,bool makeNew); - void delProcess(uint32_t ssrc,const RtpProcess *ptr); + static RtpSelector &Instance(); + + bool inputRtp(const Socket::Ptr &sock, string &stream_id, const char *data, int data_len, + const struct sockaddr *addr, uint32_t *dts_out = nullptr); + + RtpProcess::Ptr getProcess(const string &stream_id, bool makeNew); + void delProcess(const string &stream_id, const RtpProcess *ptr); + private: void onManager(); void createTimer(); + private: - unordered_map _map_rtp_process; + unordered_map _map_rtp_process; recursive_mutex _mtx_map; Timer::Ptr _timer; }; diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp index 5e90c1f7..466b6a0c 100644 --- a/src/Rtp/RtpServer.cpp +++ b/src/Rtp/RtpServer.cpp @@ -22,12 +22,12 @@ RtpServer::~RtpServer() { } } -void RtpServer::start(uint16_t local_port, bool enable_tcp, const char *local_ip) { +void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_tcp, const char *local_ip) { _udp_server.reset(new Socket(nullptr, false)); auto &ref = RtpSelector::Instance(); auto sock = _udp_server; - _udp_server->setOnRead([&ref, sock](const Buffer::Ptr &buf, struct sockaddr *addr, int) { - ref.inputRtp(sock, buf->data(), buf->size(), addr); + _udp_server->setOnRead([&ref, sock, stream_id](const Buffer::Ptr &buf, struct sockaddr *addr, int) { + ref.inputRtp(sock, const_cast(stream_id), buf->data(), buf->size(), addr); }); //创建udp服务器 @@ -43,6 +43,7 @@ void RtpServer::start(uint16_t local_port, bool enable_tcp, const char *local_i try { //创建tcp服务器 _tcp_server = std::make_shared(_udp_server->getPoller()); + (*_tcp_server)[RtpSession::kStreamID] = stream_id; _tcp_server->start(_udp_server->get_local_port(), local_ip); } catch (...) { _tcp_server = nullptr; diff --git a/src/Rtp/RtpServer.h b/src/Rtp/RtpServer.h index c81f9513..7c406903 100644 --- a/src/Rtp/RtpServer.h +++ b/src/Rtp/RtpServer.h @@ -36,10 +36,11 @@ public: /** * 开启服务器,可能抛异常 * @param local_port 本地端口,0时为随机端口 + * @param stream_id 流id,置空则使用ssrc * @param enable_tcp 是否启用tcp服务器 * @param local_ip 绑定的本地网卡ip */ - void start(uint16_t local_port, bool enable_tcp = true, const char *local_ip = "0.0.0.0"); + void start(uint16_t local_port, const string &stream_id = "", bool enable_tcp = true, const char *local_ip = "0.0.0.0"); /** * 获取绑定的本地端口 diff --git a/src/Rtp/RtpSession.cpp b/src/Rtp/RtpSession.cpp index b51bbda4..2e3f63fe 100644 --- a/src/Rtp/RtpSession.cpp +++ b/src/Rtp/RtpSession.cpp @@ -11,8 +11,15 @@ #if defined(ENABLE_RTPPROXY) #include "RtpSession.h" #include "RtpSelector.h" +#include "Network/TcpServer.h" namespace mediakit{ +const string RtpSession::kStreamID = "stream_id"; + +void RtpSession::attachServer(const TcpServer &server) { + _stream_id = const_cast(server)[kStreamID]; +} + RtpSession::RtpSession(const Socket::Ptr &sock) : TcpSession(sock) { DebugP(this); socklen_t addr_len = sizeof(addr); @@ -21,7 +28,7 @@ RtpSession::RtpSession(const Socket::Ptr &sock) : TcpSession(sock) { RtpSession::~RtpSession() { DebugP(this); if(_process){ - RtpSelector::Instance().delProcess(_ssrc,_process.get()); + RtpSelector::Instance().delProcess(_stream_id,_process.get()); } } @@ -36,7 +43,7 @@ void RtpSession::onRecv(const Buffer::Ptr &data) { } void RtpSession::onError(const SockException &err) { - WarnL << _ssrc << " " << err.what(); + WarnL << _stream_id << " " << err.what(); } void RtpSession::onManager() { @@ -51,13 +58,19 @@ void RtpSession::onManager() { void RtpSession::onRtpPacket(const char *data, uint64_t len) { if (!_process) { - if (!RtpSelector::getSSRC(data + 2, len - 2, _ssrc)) { + uint32_t ssrc; + if (!RtpSelector::getSSRC(data + 2, len - 2, ssrc)) { return; } - _process = RtpSelector::Instance().getProcess(_ssrc, true); + if (_stream_id.empty()) { + //未指定流id就使用ssrc为流id + _stream_id = printSSRC(ssrc); + } + //tcp情况下,一个tcp链接只可能是一路流,不需要通过多个ssrc来区分,所以不需要频繁getProcess + _process = RtpSelector::Instance().getProcess(_stream_id, true); _process->setListener(dynamic_pointer_cast(shared_from_this())); } - _process->inputRtp(_sock,data + 2, len - 2, &addr); + _process->inputRtp(_sock, data + 2, len - 2, &addr); _ticker.resetTime(); } diff --git a/src/Rtp/RtpSession.h b/src/Rtp/RtpSession.h index a3557029..07c9166b 100644 --- a/src/Rtp/RtpSession.h +++ b/src/Rtp/RtpSession.h @@ -22,22 +22,26 @@ namespace mediakit{ class RtpSession : public TcpSession , public RtpSplitter , public MediaSourceEvent{ public: + static const string kStreamID; RtpSession(const Socket::Ptr &sock); ~RtpSession() override; void onRecv(const Buffer::Ptr &) override; void onError(const SockException &err) override; void onManager() override; + void attachServer(const TcpServer &server) override; + protected: // 通知其停止推流 bool close(MediaSource &sender,bool force) override; // 观看总人数 int totalReaderCount(MediaSource &sender) override; void onRtpPacket(const char *data,uint64_t len) override; + private: - uint32_t _ssrc = 0; RtpProcess::Ptr _process; Ticker _ticker; struct sockaddr addr; + string _stream_id; }; }//namespace mediakit diff --git a/tests/test_rtp.cpp b/tests/test_rtp.cpp index d5d68511..7d6d0b42 100644 --- a/tests/test_rtp.cpp +++ b/tests/test_rtp.cpp @@ -38,6 +38,7 @@ static bool loadFile(const char *path){ uint16_t len; char rtp[2 * 1024]; struct sockaddr addr = {0}; + string stream_id; while (true) { if (2 != fread(&len, 1, 2, fp)) { WarnL; @@ -55,7 +56,7 @@ static bool loadFile(const char *path){ } uint32_t timeStamp; - RtpSelector::Instance().inputRtp(nullptr,rtp,len, &addr,&timeStamp); + RtpSelector::Instance().inputRtp(nullptr, stream_id, rtp, len, &addr, &timeStamp); if(timeStamp_last){ auto diff = timeStamp - timeStamp_last; if(diff > 0 && diff < 500){ From 248b2d5cb98cc7c853472b172d853f802644fa3e Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Wed, 8 Jul 2020 09:36:10 +0800 Subject: [PATCH 13/16] =?UTF-8?q?=E5=AE=8C=E5=96=84GB28181=E6=8E=A8?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Rtp/RtpProcess.h | 15 +++++++++++++++ src/Rtp/RtpSelector.cpp | 17 +++++++---------- src/Rtp/RtpSelector.h | 23 ++++++++++++++++++++++- src/Rtp/RtpServer.cpp | 36 ++++++++++++++++++++++++++++-------- src/Rtp/RtpServer.h | 1 + tests/test_rtp.cpp | 3 +-- 6 files changed, 74 insertions(+), 21 deletions(-) diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index 0c55e375..1f53451b 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -27,9 +27,24 @@ public: typedef std::shared_ptr Ptr; RtpProcess(const string &stream_id); ~RtpProcess(); + + /** + * 输入rtp + * @param sock 本地监听的socket + * @param data rtp数据指针 + * @param data_len rtp数据长度 + * @param addr 数据源地址 + * @param dts_out 解析出最新的dts + * @return 是否解析成功 + */ bool inputRtp(const Socket::Ptr &sock, const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr); + + /** + * 是否超时,用于超时移除对象 + */ bool alive(); + /// SockInfo override string get_local_ip() override; uint16_t get_local_port() override; string get_peer_ip() override; diff --git a/src/Rtp/RtpSelector.cpp b/src/Rtp/RtpSelector.cpp index 36d8f8bc..06979972 100644 --- a/src/Rtp/RtpSelector.cpp +++ b/src/Rtp/RtpSelector.cpp @@ -15,20 +15,17 @@ namespace mediakit{ INSTANCE_IMP(RtpSelector); -bool RtpSelector::inputRtp(const Socket::Ptr &sock, string &stream_id, const char *data, int data_len, +bool RtpSelector::inputRtp(const Socket::Ptr &sock, const char *data, int data_len, const struct sockaddr *addr,uint32_t *dts_out) { - if (stream_id.empty()) { - //未指定流id,那么使用ssrc为流id - uint32_t ssrc = 0; - if (!getSSRC(data, data_len, ssrc)) { - WarnL << "get ssrc from rtp failed:" << data_len; - return false; - } - stream_id = printSSRC(ssrc); + //使用ssrc为流id + uint32_t ssrc = 0; + if (!getSSRC(data, data_len, ssrc)) { + WarnL << "get ssrc from rtp failed:" << data_len; + return false; } //假定指定了流id,那么通过流id来区分是否为一路流(哪怕可能同时收到多路流) - auto process = getProcess(stream_id, true); + auto process = getProcess(printSSRC(ssrc), true); if (process) { return process->inputRtp(sock, data, data_len, addr, dts_out); } diff --git a/src/Rtp/RtpSelector.h b/src/Rtp/RtpSelector.h index adb6f1a5..83f8bd2e 100644 --- a/src/Rtp/RtpSelector.h +++ b/src/Rtp/RtpSelector.h @@ -49,10 +49,31 @@ public: static bool getSSRC(const char *data,int data_len, uint32_t &ssrc); static RtpSelector &Instance(); - bool inputRtp(const Socket::Ptr &sock, string &stream_id, const char *data, int data_len, + /** + * 输入多个rtp流,根据ssrc分流 + * @param sock 本地socket + * @param data 收到的数据 + * @param data_len 收到的数据长度 + * @param addr rtp流源地址 + * @param dts_out 解析出最新的dts + * @return 是否成功 + */ + bool inputRtp(const Socket::Ptr &sock, const char *data, int data_len, const struct sockaddr *addr, uint32_t *dts_out = nullptr); + /** + * 获取一个rtp处理器 + * @param stream_id 流id + * @param makeNew 不存在时是否新建 + * @return rtp处理器 + */ RtpProcess::Ptr getProcess(const string &stream_id, bool makeNew); + + /** + * 删除rtp处理器 + * @param stream_id 流id + * @param ptr rtp处理器指针 + */ void delProcess(const string &stream_id, const RtpProcess *ptr); private: diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp index 466b6a0c..fd006c50 100644 --- a/src/Rtp/RtpServer.cpp +++ b/src/Rtp/RtpServer.cpp @@ -17,25 +17,20 @@ RtpServer::RtpServer() { } RtpServer::~RtpServer() { - if(_udp_server){ - _udp_server->setOnRead(nullptr); + if(_on_clearup){ + _on_clearup(); } } void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_tcp, const char *local_ip) { _udp_server.reset(new Socket(nullptr, false)); - auto &ref = RtpSelector::Instance(); - auto sock = _udp_server; - _udp_server->setOnRead([&ref, sock, stream_id](const Buffer::Ptr &buf, struct sockaddr *addr, int) { - ref.inputRtp(sock, const_cast(stream_id), buf->data(), buf->size(), addr); - }); - //创建udp服务器 if (!_udp_server->bindUdpSock(local_port, local_ip)) { _udp_server = nullptr; string err = (StrPrinter << "bindUdpSock on " << local_ip << ":" << local_port << " failed:" << get_uv_errmsg(true)); throw std::runtime_error(err); } + //设置udp socket读缓存 SockUtil::setRecvBuf(_udp_server->rawFD(), 4 * 1024 * 1024); @@ -51,6 +46,31 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable throw; } } + + auto sock = _udp_server; + RtpProcess::Ptr process; + if (!stream_id.empty()) { + //指定了流id,那么一个端口一个流(不管是否包含多个ssrc的多个流,绑定rtp源后,会筛选掉ip端口不匹配的流) + process = RtpSelector::Instance().getProcess(stream_id, true); + _udp_server->setOnRead([sock, process](const Buffer::Ptr &buf, struct sockaddr *addr, int) { + process->inputRtp(sock, buf->data(), buf->size(), addr); + }); + } else { + //未指定流id,一个端口多个流,通过ssrc来分流 + auto &ref = RtpSelector::Instance(); + _udp_server->setOnRead([&ref, sock](const Buffer::Ptr &buf, struct sockaddr *addr, int) { + ref.inputRtp(sock, buf->data(), buf->size(), addr); + }); + } + + _on_clearup = [sock, process, stream_id]() { + //去除循环引用 + sock->setOnRead(nullptr); + if (process) { + //删除rtp处理器 + RtpSelector::Instance().delProcess(stream_id, process.get()); + } + }; } EventPoller::Ptr RtpServer::getPoller() { diff --git a/src/Rtp/RtpServer.h b/src/Rtp/RtpServer.h index 7c406903..2623ae2b 100644 --- a/src/Rtp/RtpServer.h +++ b/src/Rtp/RtpServer.h @@ -55,6 +55,7 @@ public: protected: Socket::Ptr _udp_server; TcpServer::Ptr _tcp_server; + function _on_clearup; }; }//namespace mediakit diff --git a/tests/test_rtp.cpp b/tests/test_rtp.cpp index 7d6d0b42..24d9ac83 100644 --- a/tests/test_rtp.cpp +++ b/tests/test_rtp.cpp @@ -38,7 +38,6 @@ static bool loadFile(const char *path){ uint16_t len; char rtp[2 * 1024]; struct sockaddr addr = {0}; - string stream_id; while (true) { if (2 != fread(&len, 1, 2, fp)) { WarnL; @@ -56,7 +55,7 @@ static bool loadFile(const char *path){ } uint32_t timeStamp; - RtpSelector::Instance().inputRtp(nullptr, stream_id, rtp, len, &addr, &timeStamp); + RtpSelector::Instance().inputRtp(nullptr, rtp, len, &addr, &timeStamp); if(timeStamp_last){ auto diff = timeStamp - timeStamp_last; if(diff > 0 && diff < 500){ From e58a63c528d21184a7b777c0f7e979888d3dd4a2 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Wed, 8 Jul 2020 10:09:16 +0800 Subject: [PATCH 14/16] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Rtp/RtpServer.cpp | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp index fd006c50..365aceb2 100644 --- a/src/Rtp/RtpServer.cpp +++ b/src/Rtp/RtpServer.cpp @@ -23,54 +23,52 @@ RtpServer::~RtpServer() { } void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable_tcp, const char *local_ip) { - _udp_server.reset(new Socket(nullptr, false)); //创建udp服务器 - if (!_udp_server->bindUdpSock(local_port, local_ip)) { - _udp_server = nullptr; - string err = (StrPrinter << "bindUdpSock on " << local_ip << ":" << local_port << " failed:" << get_uv_errmsg(true)); - throw std::runtime_error(err); + Socket::Ptr udp_server = std::make_shared(nullptr, false); + if (!udp_server->bindUdpSock(local_port, local_ip)) { + throw std::runtime_error(StrPrinter << "bindUdpSock on " << local_ip << ":" << local_port << " failed:" << get_uv_errmsg(true)); } - //设置udp socket读缓存 - SockUtil::setRecvBuf(_udp_server->rawFD(), 4 * 1024 * 1024); + SockUtil::setRecvBuf(udp_server->rawFD(), 4 * 1024 * 1024); + TcpServer::Ptr tcp_server; if (enable_tcp) { try { //创建tcp服务器 - _tcp_server = std::make_shared(_udp_server->getPoller()); - (*_tcp_server)[RtpSession::kStreamID] = stream_id; - _tcp_server->start(_udp_server->get_local_port(), local_ip); + tcp_server = std::make_shared(udp_server->getPoller()); + (*tcp_server)[RtpSession::kStreamID] = stream_id; + tcp_server->start(udp_server->get_local_port(), local_ip); } catch (...) { - _tcp_server = nullptr; - _udp_server = nullptr; throw; } } - auto sock = _udp_server; RtpProcess::Ptr process; if (!stream_id.empty()) { //指定了流id,那么一个端口一个流(不管是否包含多个ssrc的多个流,绑定rtp源后,会筛选掉ip端口不匹配的流) process = RtpSelector::Instance().getProcess(stream_id, true); - _udp_server->setOnRead([sock, process](const Buffer::Ptr &buf, struct sockaddr *addr, int) { - process->inputRtp(sock, buf->data(), buf->size(), addr); + udp_server->setOnRead([udp_server, process](const Buffer::Ptr &buf, struct sockaddr *addr, int) { + process->inputRtp(udp_server, buf->data(), buf->size(), addr); }); } else { //未指定流id,一个端口多个流,通过ssrc来分流 auto &ref = RtpSelector::Instance(); - _udp_server->setOnRead([&ref, sock](const Buffer::Ptr &buf, struct sockaddr *addr, int) { - ref.inputRtp(sock, buf->data(), buf->size(), addr); + udp_server->setOnRead([&ref, udp_server](const Buffer::Ptr &buf, struct sockaddr *addr, int) { + ref.inputRtp(udp_server, buf->data(), buf->size(), addr); }); } - _on_clearup = [sock, process, stream_id]() { + _on_clearup = [udp_server, process, stream_id]() { //去除循环引用 - sock->setOnRead(nullptr); + udp_server->setOnRead(nullptr); if (process) { //删除rtp处理器 RtpSelector::Instance().delProcess(stream_id, process.get()); } }; + + _tcp_server = tcp_server; + _udp_server = udp_server; } EventPoller::Ptr RtpServer::getPoller() { From b2ff53037b99189853612c612b1e0d80c7f00243 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Wed, 8 Jul 2020 10:25:30 +0800 Subject: [PATCH 15/16] =?UTF-8?q?http=20api=E6=96=B0=E5=BB=BA=E7=9A=84rtp?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=99=A8=E5=8F=AF=E4=BB=A5=E8=87=AA=E5=8A=A8?= =?UTF-8?q?=E8=B6=85=E6=97=B6=E7=A7=BB=E9=99=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/WebApi.cpp | 13 +++++++++++-- src/Rtp/RtpProcess.cpp | 10 ++++++++++ src/Rtp/RtpProcess.h | 11 +++++++++++ src/Rtp/RtpSelector.cpp | 2 ++ src/Rtp/RtpServer.cpp | 7 +++++++ src/Rtp/RtpServer.h | 6 ++++++ 6 files changed, 47 insertions(+), 2 deletions(-) diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 1980ee02..bbfc803d 100644 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -761,11 +761,20 @@ void installWebApi() { RtpServer::Ptr server = std::make_shared(); server->start(allArgs["port"], allArgs["stream_id"], allArgs["enable_tcp"].as()); - val["port"] = server->getPort(); + + auto port = server->getPort(); + server->setOnDetach([port]() { + //设置rtp超时移除事件 + lock_guard lck(s_rtpServerMapMtx); + s_rtpServerMap.erase(port); + }); //保存对象 lock_guard lck(s_rtpServerMapMtx); - s_rtpServerMap.emplace(server->getPort(), server); + s_rtpServerMap.emplace(port, server); + + //回复json + val["port"] = port; }); api_regist1("/index/api/closeRtpServer",[](API_ARGS1){ diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index b3f32d19..0693ddae 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -177,6 +177,16 @@ bool RtpProcess::alive() { return false; } +void RtpProcess::onDetach(){ + if(_on_detach){ + _on_detach(); + } +} + +void RtpProcess::setOnDetach(const function &cb) { + _on_detach = cb; +} + string RtpProcess::get_peer_ip() { if(_addr){ return SockUtil::inet_ntoa(((struct sockaddr_in *) _addr)->sin_addr); diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index 1f53451b..81dc7f2f 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -44,6 +44,16 @@ public: */ bool alive(); + /** + * 超时时被RtpSelector移除时触发 + */ + void onDetach(); + + /** + * 设置onDetach事件回调 + */ + void setOnDetach(const function &cb); + /// SockInfo override string get_local_ip() override; uint16_t get_local_port() override; @@ -79,6 +89,7 @@ private: MediaInfo _media_info; uint64_t _total_bytes = 0; Socket::Ptr _sock; + function _on_detach; }; }//namespace mediakit diff --git a/src/Rtp/RtpSelector.cpp b/src/Rtp/RtpSelector.cpp index 06979972..fb790048 100644 --- a/src/Rtp/RtpSelector.cpp +++ b/src/Rtp/RtpSelector.cpp @@ -91,7 +91,9 @@ void RtpSelector::onManager() { continue; } WarnL << "RtpProcess timeout:" << it->first; + auto process = it->second->getProcess(); it = _map_rtp_process.erase(it); + process->onDetach(); } } diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp index 365aceb2..f7a4c9eb 100644 --- a/src/Rtp/RtpServer.cpp +++ b/src/Rtp/RtpServer.cpp @@ -69,6 +69,13 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable _tcp_server = tcp_server; _udp_server = udp_server; + _rtp_process = process; +} + +void RtpServer::setOnDetach(const function &cb){ + if(_rtp_process){ + _rtp_process->setOnDetach(cb); + } } EventPoller::Ptr RtpServer::getPoller() { diff --git a/src/Rtp/RtpServer.h b/src/Rtp/RtpServer.h index 2623ae2b..bcc045bc 100644 --- a/src/Rtp/RtpServer.h +++ b/src/Rtp/RtpServer.h @@ -52,9 +52,15 @@ public: */ EventPoller::Ptr getPoller(); + /** + * 设置RtpProcess onDetach事件回调 + */ + void setOnDetach(const function &cb); + protected: Socket::Ptr _udp_server; TcpServer::Ptr _tcp_server; + RtpProcess::Ptr _rtp_process; function _on_clearup; }; From 7bce212701443de20643f910de17b781b82face7 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Wed, 8 Jul 2020 12:42:05 +0800 Subject: [PATCH 16/16] =?UTF-8?q?=E4=BF=AE=E5=A4=8Drtsp=E9=87=8D=E5=A4=8D?= =?UTF-8?q?=E6=8E=A8=E6=B5=81=E5=88=A4=E6=96=AD=E6=97=A0=E6=95=88=E7=9A=84?= =?UTF-8?q?bug:#394?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Rtsp/RtspSession.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index d60941b4..4b2cabe0 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -218,7 +218,7 @@ void RtspSession::handleReq_Options(const Parser &parser) { } void RtspSession::handleReq_ANNOUNCE(const Parser &parser) { - auto src = dynamic_pointer_cast(MediaSource::find(RTSP_SCHEMA, + auto src = dynamic_pointer_cast(MediaSource::find(RTSP_SCHEMA, _mediaInfo._vhost, _mediaInfo._app, _mediaInfo._streamid));