diff --git a/src/Http/HttpSession.h b/src/Http/HttpSession.h index 1a598bb2..e836fdbe 100644 --- a/src/Http/HttpSession.h +++ b/src/Http/HttpSession.h @@ -128,212 +128,6 @@ private: }; -/** - * 通过该模板类可以透明化WebSocket协议, - * 用户只要实现WebSock协议下的具体业务协议,譬如基于WebSocket协议的Rtmp协议等 - * @tparam SessionType 业务协议的TcpSession类 - */ -template -class WebSocketSession : public HttpSession { -public: - WebSocketSession(const std::shared_ptr &pTh, const Socket::Ptr &pSock) : HttpSession(pTh,pSock){} - virtual ~WebSocketSession(){} - - //收到eof或其他导致脱离TcpServer事件的回调 - void onError(const SockException &err) override{ - HttpSession::onError(err); - if(_session){ - _session->onError(err); - } - } - //每隔一段时间触发,用来做超时管理 - void onManager() override{ - HttpSession::onManager(); - if(_session){ - _session->onManager(); - } - } - - void attachServer(const TcpServer &server) override{ - HttpSession::attachServer(server); - _weakServer = const_cast(server).shared_from_this(); - } -protected: - /** - * 开始收到一个webSocket数据包 - * @param packet - */ - void onWebSocketDecodeHeader(const WebSocketHeader &packet) override{ - //新包,原来的包残余数据清空掉 - _remian_data.clear(); - - if(!_firstPacket){ - return; - } - //这是个WebSocket会话而不是普通的Http会话 - _firstPacket = false; - _session = std::make_shared(getIdentifier(),nullptr,_sock); - - auto strongServer = _weakServer.lock(); - if(strongServer){ - _session->attachServer(*strongServer); - } - - //此处截取数据并进行websocket协议打包 - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - _session->setOnBeforeSendCB([weakSelf](const Buffer::Ptr &buf){ - auto strongSelf = weakSelf.lock(); - if(strongSelf){ - bool mask_flag = strongSelf->_mask_flag; - strongSelf->_mask_flag = false; - strongSelf->WebSocketSplitter::encode((uint8_t *)buf->data(),buf->size()); - strongSelf->_mask_flag = mask_flag; - } - return buf->size(); - }); - - } - - /** - * 收到websocket数据包负载 - * @param packet - * @param ptr - * @param len - * @param recved - */ - void onWebSocketDecodePlayload(const WebSocketHeader &packet,const uint8_t *ptr,uint64_t len,uint64_t recved) override { - if(packet._playload_len == recved){ - //收到完整的包 - if(_remian_data.empty()){ - onRecvWholePacket((char *)ptr,len); - }else{ - _remian_data.append((char *)ptr,len); - onRecvWholePacket(_remian_data); - _remian_data.clear(); - } - } else { - //部分数据 - _remian_data.append((char *)ptr,len); - } - } - - /** - * 发送数据进行websocket协议打包后回调 - * @param ptr - * @param len - */ - void onWebSocketEncodeData(const uint8_t *ptr,uint64_t len) override{ - _session->realSend(_session->obtainBuffer((char *)ptr,len)); - } - - /** - * 收到一个完整的websock数据包 - * @param data - * @param len - */ - void onRecvWholePacket(const char *data,uint64_t len){ - BufferRaw::Ptr buffer = _session->obtainBuffer(data,len); - _session->onRecv(buffer); - } - - /** - * 收到一个完整的websock数据包 - * @param str - */ - void onRecvWholePacket(const string &str){ - BufferString::Ptr buffer = std::make_shared(str); - _session->onRecv(buffer); - } - -private: - typedef function onBeforeSendCB; - /** - * 该类实现了TcpSession派生类发送数据的截取 - * 目的是发送业务数据前进行websocket协议的打包 - */ - class SessionImp : public SessionType{ - public: - SessionImp(const string &identifier, - const std::shared_ptr &pTh, - const Socket::Ptr &pSock) : - _identifier(identifier),SessionType(pTh,pSock){} - - ~SessionImp(){} - - /** - * 截取到数据后,再进行webSocket协议打包 - * 然后真正的发送数据到socket - * @param buf 数据 - * @return 数据字节数 - */ - int realSend(const Buffer::Ptr &buf){ - return SessionType::send(buf); - } - - /** - * 设置发送数据截取回调函数 - * @param cb 截取回调函数 - */ - void setOnBeforeSendCB(const onBeforeSendCB &cb){ - _beforeSendCB = cb; - } - protected: - /** - * 重载send函数截取数据 - * @param buf 需要截取的数据 - * @return 数据字节数 - */ - int send(const Buffer::Ptr &buf) override { - if(_beforeSendCB){ - return _beforeSendCB(buf); - } - return SessionType::send(buf); - } - string getIdentifier() const override{ - return _identifier; - } - private: - onBeforeSendCB _beforeSendCB; - string _identifier; - }; -private: - bool _firstPacket = true; - string _remian_data; - weak_ptr _weakServer; - std::shared_ptr _session; -}; - -/** - * 回显会话 - */ -class EchoSession : public TcpSession { -public: - EchoSession(const std::shared_ptr &pTh, const Socket::Ptr &pSock) : - TcpSession(pTh,pSock){ - DebugL; - } - virtual ~EchoSession(){ - DebugL; - } - - void attachServer(const TcpServer &server) override{ - DebugL << getIdentifier() << " " << TcpSession::getIdentifier(); - } - void onRecv(const Buffer::Ptr &buffer) override { - send(buffer); - } - void onError(const SockException &err) override{ - WarnL << err.what(); - } - //每隔一段时间触发,用来做超时管理 - void onManager() override{ - DebugL; - } -}; - - -typedef WebSocketSession EchoWebSocketSession; - } /* namespace Http */ } /* namespace ZL */ diff --git a/src/Http/HttpsSession.h b/src/Http/HttpsSession.h index 0a131643..80c1d195 100644 --- a/src/Http/HttpsSession.h +++ b/src/Http/HttpsSession.h @@ -70,7 +70,7 @@ public: HttpSession::onRecv(data,len); } #endif//defined(__GNUC__) && (__GNUC__ < 5) -private: +protected: virtual int send(const Buffer::Ptr &buf) override{ TimeTicker(); m_sslBox.onSend(buf->data(), buf->size()); @@ -79,6 +79,201 @@ private: SSL_Box m_sslBox; }; + + +/** +* 通过该模板类可以透明化WebSocket协议, +* 用户只要实现WebSock协议下的具体业务协议,譬如基于WebSocket协议的Rtmp协议等 +* @tparam SessionType 业务协议的TcpSession类 +*/ +template +class WebSocketSession : public HttpSessionType { +public: + WebSocketSession(const std::shared_ptr &pTh, const Socket::Ptr &pSock) : HttpSessionType(pTh,pSock){} + virtual ~WebSocketSession(){} + + //收到eof或其他导致脱离TcpServer事件的回调 + void onError(const SockException &err) override{ + HttpSession::onError(err); + if(_session){ + _session->onError(err); + } + } + //每隔一段时间触发,用来做超时管理 + void onManager() override{ + HttpSession::onManager(); + if(_session){ + _session->onManager(); + } + } + + void attachServer(const TcpServer &server) override{ + HttpSession::attachServer(server); + _weakServer = const_cast(server).shared_from_this(); + } +protected: + /** + * 开始收到一个webSocket数据包 + * @param packet + */ + void onWebSocketDecodeHeader(const WebSocketHeader &packet) override{ + //新包,原来的包残余数据清空掉 + _remian_data.clear(); + + if(_firstPacket){ + //这是个WebSocket会话而不是普通的Http会话 + _firstPacket = false; + _session = std::make_shared(HttpSessionType::getIdentifier(),nullptr,HttpSessionType::_sock); + + auto strongServer = _weakServer.lock(); + if(strongServer){ + _session->attachServer(*strongServer); + } + + //此处截取数据并进行websocket协议打包 + weak_ptr weakSelf = dynamic_pointer_cast(HttpSessionType::shared_from_this()); + _session->setOnBeforeSendCB([weakSelf](const Buffer::Ptr &buf){ + auto strongSelf = weakSelf.lock(); + if(strongSelf){ + bool mask_flag = strongSelf->_mask_flag; + strongSelf->_mask_flag = false; + strongSelf->WebSocketSplitter::encode(*strongSelf,(uint8_t *)buf->data(),buf->size()); + strongSelf->_mask_flag = mask_flag; + } + return buf->size(); + }); + } + } + + /** + * 收到websocket数据包负载 + * @param packet + * @param ptr + * @param len + * @param recved + */ + void onWebSocketDecodePlayload(const WebSocketHeader &packet,const uint8_t *ptr,uint64_t len,uint64_t recved) override { + _remian_data.append((char *)ptr,len); + } + + /** + * 接收到完整的一个webSocket数据包后回调 + * @param header 数据包包头 + */ + void onWebSocketDecodeComplete(const WebSocketHeader &header) override { + switch (header._opcode){ + case WebSocketHeader::CLOSE:{ + HttpSessionType::encode(header,nullptr,0); + } + break; + case WebSocketHeader::PING:{ + const_cast(header)._opcode = WebSocketHeader::PONG; + HttpSessionType::encode(header,(uint8_t *)_remian_data.data(),_remian_data.size()); + } + break; + case WebSocketHeader::CONTINUATION:{ + + } + break; + case WebSocketHeader::TEXT: + case WebSocketHeader::BINARY:{ + BufferString::Ptr buffer = std::make_shared(_remian_data); + _session->onRecv(buffer); + } + break; + default: + break; + } + _remian_data.clear(); + } + + /** + * 发送数据进行websocket协议打包后回调 + * @param ptr + * @param len + */ + void onWebSocketEncodeData(const uint8_t *ptr,uint64_t len) override{ + SocketHelper::send((char *)ptr,len); + } +private: + typedef function onBeforeSendCB; + /** + * 该类实现了TcpSession派生类发送数据的截取 + * 目的是发送业务数据前进行websocket协议的打包 + */ + class SessionImp : public SessionType{ + public: + SessionImp(const string &identifier, + const std::shared_ptr &pTh, + const Socket::Ptr &pSock) : + _identifier(identifier),SessionType(pTh,pSock){} + + ~SessionImp(){} + + /** + * 设置发送数据截取回调函数 + * @param cb 截取回调函数 + */ + void setOnBeforeSendCB(const onBeforeSendCB &cb){ + _beforeSendCB = cb; + } + protected: + /** + * 重载send函数截取数据 + * @param buf 需要截取的数据 + * @return 数据字节数 + */ + int send(const Buffer::Ptr &buf) override { + if(_beforeSendCB){ + return _beforeSendCB(buf); + } + return SessionType::send(buf); + } + string getIdentifier() const override{ + return _identifier; + } + private: + onBeforeSendCB _beforeSendCB; + string _identifier; + }; +private: + bool _firstPacket = true; + string _remian_data; + weak_ptr _weakServer; + std::shared_ptr _session; +}; + +/** +* 回显会话 +*/ +class EchoSession : public TcpSession { +public: + EchoSession(const std::shared_ptr &pTh, const Socket::Ptr &pSock) : + TcpSession(pTh,pSock){ + DebugL; + } + virtual ~EchoSession(){ + DebugL; + } + + void attachServer(const TcpServer &server) override{ + DebugL << getIdentifier() << " " << TcpSession::getIdentifier(); + } + void onRecv(const Buffer::Ptr &buffer) override { + send(buffer); + } + void onError(const SockException &err) override{ + WarnL << err.what(); + } + //每隔一段时间触发,用来做超时管理 + void onManager() override{ + DebugL; + } +}; + + +typedef WebSocketSession EchoWebSocketSession; + } /* namespace Http */ } /* namespace ZL */ diff --git a/src/Http/WebSocketSplitter.cpp b/src/Http/WebSocketSplitter.cpp index 99612a24..d79e32f4 100644 --- a/src/Http/WebSocketSplitter.cpp +++ b/src/Http/WebSocketSplitter.cpp @@ -115,6 +115,9 @@ begin_decode: _mask_offset = 0; _playload_offset = 0; onWebSocketDecodeHeader(*this); + if(_playload_len == 0){ + onWebSocketDecodeComplete(*this); + } } //进入后面逻辑代表已经获取到了webSocket协议头, @@ -129,6 +132,8 @@ begin_decode: onPlayloadData(ptr,playload_slice_len); if(_playload_offset == _playload_len){ + onWebSocketDecodeComplete(*this); + //这是下一个包 remain -= playload_slice_len; ptr += playload_slice_len; @@ -157,47 +162,48 @@ void WebSocketSplitter::onPlayloadData(uint8_t *ptr, uint64_t len) { onWebSocketDecodePlayload(*this, _mask_flag ? ptr - len : ptr, len, _playload_offset); } -void WebSocketSplitter::encode(uint8_t *data, uint64_t len) { +void WebSocketSplitter::encode(const WebSocketHeader &header,uint8_t *data, uint64_t len) { string ret; - uint8_t byte = _fin << 7 | ((_reserved & 0x07) << 4) | (_opcode & 0x0F) ; + uint8_t byte = header._fin << 7 | ((header._reserved & 0x07) << 4) | (header._opcode & 0x0F) ; ret.push_back(byte); - _mask_flag = (_mask_flag && _mask.size() >= 4); - byte = _mask_flag << 7; + auto mask_flag = (header._mask_flag && header._mask.size() >= 4); + byte = mask_flag << 7; - _playload_len = len; - if(_playload_len < 126){ - byte |= _playload_len; + if(len < 126){ + byte |= len; ret.push_back(byte); - }else if(_playload_len <= 0xFFFF){ + }else if(len <= 0xFFFF){ byte |= 126; ret.push_back(byte); - uint16_t len = htons(_playload_len); + uint16_t len = htons(len); ret.append((char *)&len,2); }else{ byte |= 127; ret.push_back(byte); - uint32_t len_high = htonl(_playload_len >> 32) ; - uint32_t len_low = htonl(_playload_len & 0xFFFFFFFF); + uint32_t len_high = htonl(len >> 32) ; + uint32_t len_low = htonl(len & 0xFFFFFFFF); ret.append((char *)&len_high,4); ret.append((char *)&len_low,4); } - if(_mask_flag){ - ret.append((char *)_mask.data(),4); + if(mask_flag){ + ret.append((char *)header._mask.data(),4); } onWebSocketEncodeData((uint8_t*)ret.data(),ret.size()); - if(_mask_flag){ - uint8_t *ptr = data; - for(int i = 0; i < len ; ++i,++ptr){ - *(ptr) ^= _mask[i % 4]; + if(len > 0){ + if(mask_flag){ + uint8_t *ptr = data; + for(int i = 0; i < len ; ++i,++ptr){ + *(ptr) ^= header._mask[i % 4]; + } } + onWebSocketEncodeData(data,len); } - onWebSocketEncodeData(data,len); } diff --git a/src/Http/WebSocketSplitter.h b/src/Http/WebSocketSplitter.h index 4fa56fd9..37150b41 100644 --- a/src/Http/WebSocketSplitter.h +++ b/src/Http/WebSocketSplitter.h @@ -87,25 +87,33 @@ public: * 编码一个数据包 * 将触发2次onWebSocketEncodeData回调 * 第一次是数据头,第二次是负载数据 + * @param header 数据头 * @param data 负载数据 * @param len 负载数据长度 */ - void encode(uint8_t *data,uint64_t len); + void encode(const WebSocketHeader &header,uint8_t *data,uint64_t len); protected: /** * 收到一个webSocket数据包包头,后续将继续触发onWebSocketDecodePlayload回调 - * @param packet 数据包头 + * @param header 数据包头 */ - virtual void onWebSocketDecodeHeader(const WebSocketHeader &packet) {}; + virtual void onWebSocketDecodeHeader(const WebSocketHeader &header) {}; /** * 收到webSocket数据包负载 - * @param packet 数据包包头 + * @param header 数据包包头 * @param ptr 负载数据指针 * @param len 负载数据长度 - * @param recved 已接收数据长度(包含本次数据长度),等于packet._playload_len时则接受完毕 + * @param recved 已接收数据长度(包含本次数据长度),等于header._playload_len时则接受完毕 */ - virtual void onWebSocketDecodePlayload(const WebSocketHeader &packet, const uint8_t *ptr, uint64_t len, uint64_t recved) {}; + virtual void onWebSocketDecodePlayload(const WebSocketHeader &header, const uint8_t *ptr, uint64_t len, uint64_t recved) {}; + + + /** + * 接收到完整的一个webSocket数据包后回调 + * @param header 数据包包头 + */ + virtual void onWebSocketDecodeComplete(const WebSocketHeader &header) {}; /** * websocket数据编码回调