dong_dl 1 рік тому
батько
коміт
aaa2ebae42
2 змінених файлів з 81 додано та 27 видалено
  1. 31 14
      inc/proto/ProtoParser.h
  2. 50 13
      src/proto/ProtoParser.cpp

+ 31 - 14
inc/proto/ProtoParser.h

@@ -15,13 +15,22 @@
 
 class Message;
 
-struct PublishInfo
+struct WasmMessage
 {
-    std::string par;
+    int         r;
+    int         rc;
+    uint32_t    msgId;
     std::string topic;
     std::string payload;
+};
+
+struct PublishInfo
+{
+    std::string                                                                                            par;
+    std::string                                                                                            topic;
+    std::string                                                                                            payload;
     std::function<void(uint32_t, uint32_t, const std::string &, const std::string &, const std::string &)> cb;
-    uint64_t time;
+    uint64_t                                                                                               time;
 };
 
 typedef void (*parserCallback)(const char *topic, const char *json, void *userData);
@@ -195,11 +204,14 @@ public:
     int  getLogLevel() const;
     int  getLogType() const;
 #ifdef USE_WASM
-    void setOnConnected(int ptr);
-    void setOnDisconnected(int ptr);
-    void setOnMessage(int ptr);
-    void setOnNotify(int ptr);
-    void setUrl(std::string url)
+    void        setOnConnected(int ptr);
+    void        setOnDisconnected(int ptr);
+    std::string getRespTopic(int msgId);
+    std::string getRespPayload(int msgId);
+    void        removeResp(int msgId);
+    void        setOnMessage(int ptr);
+    void        setOnNotify(int ptr);
+    void        setUrl(std::string url)
     {
         _url = url;
     }
@@ -301,21 +313,23 @@ private:
     onConnectedCallback               _onConnected    = nullptr;
     onDisconnectedCallback            _onDisconnected = nullptr;
     onMessageCallback                 _onMessage      = nullptr;
-    onNotifyCallback                   _onNotify       = nullptr;
+    onNotifyCallback                  _onNotify       = nullptr;
     void                             *_userData;
     uint32_t                          _msgId     = 0;
     uint32_t                          _respMsgId = 0;
     uint64_t                          _timestamp = 0;
     int                               _timeout   = 5000;
     unsigned char                     _rc        = 0;
-    std::map<uint32_t, PublishInfo*>   _publishes;
-    std::map<std::string, uint32_t>    _pullIds;
+    std::map<uint32_t, PublishInfo *> _publishes;
+    std::map<std::string, uint32_t>   _pullIds;
     std::mutex                        _publishLock;
     ProtoParserPrivate               *p;
 #ifdef USE_WASM
-    EMSCRIPTEN_WEBSOCKET_T _socket;
-    std::string            _url;
-    std::recursive_mutex   _sendLock;
+    EMSCRIPTEN_WEBSOCKET_T                    _socket;
+    std::string                               _url;
+    std::recursive_mutex                      _sendLock;
+    std::recursive_mutex                      _recvLock;
+    std::unordered_map<uint32_t, WasmMessage> _respMessages;
 #endif
 };
 
@@ -394,6 +408,9 @@ EMSCRIPTEN_BINDINGS(Proto)
         .function("setOnMessage", &ProtoParser::setOnMessage)
         .function("setOnNotify", &ProtoParser::setOnNotify)
         .function("setOnDisconnected", &ProtoParser::setOnDisconnected)
+        .function("getRespTopic", &ProtoParser::getRespTopic)
+        .function("getRespPayload", &ProtoParser::getRespPayload)
+        .function("removeResp", &ProtoParser::removeResp)
 
         .property("logLevel", &ProtoParser::getLogLevel, &ProtoParser::setLogLevel)
         .property("logType", &ProtoParser::getLogType, &ProtoParser::setLogType)

+ 50 - 13
src/proto/ProtoParser.cpp

@@ -319,7 +319,7 @@ std::string ProtoParser::publish(const std::string &topic, const std::string &pa
         {
             if ((now - it->second->time) > 60 * 1000)
             {
-                for (auto it2 = _pullIds.begin(); it2 != _pullIds.end(); )
+                for (auto it2 = _pullIds.begin(); it2 != _pullIds.end();)
                 {
                     if (it2->second == it->first)
                     {
@@ -735,8 +735,6 @@ unsigned char ProtoParser::parsePayload(std::string &payload, bool pub)
                 if (_topic.compare("MN") == 0)
                 {
                     p->update(r, _uid, _msgCurrent);
-
-                    
                 }
                 toJson(r, _payload);
             }
@@ -963,7 +961,7 @@ std::string ProtoParser::getGroupInfoPack(std::string requestJson, uint32_t &msg
     }
     else
     {
-        for (auto it = _pullIds.begin(); it != _pullIds.end(); )
+        for (auto it = _pullIds.begin(); it != _pullIds.end();)
         {
             if (it->second == 0)
             {
@@ -1004,7 +1002,7 @@ std::string ProtoParser::getUserInfoPack(std::string requestJson, uint32_t &msgI
         msgId = 0;
         return "";
     }
-    if(par.request_size()==0)
+    if (par.request_size() == 0)
         return "";
 
 #ifdef DUMP
@@ -1118,7 +1116,7 @@ std::string ProtoParser::pullMessagePack(std::string requestJson, uint32_t &msgI
         msgId = 0;
         return "";
     }
-   
+
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
@@ -1325,7 +1323,7 @@ std::string ProtoParser::kickoffGroupMemberPack(std::string requestJson, uint32_
         for (int i = 0; i < arr.Size(); i++)
         {
             auto &item = arr[i];
-            par.add_removed_member( item.GetString());
+            par.add_removed_member(item.GetString());
         }
     }
     {
@@ -2069,6 +2067,30 @@ int ProtoParser::connectVal(std::string requestJson)
     return 0;
 }
 
+std::string ProtoParser::getRespTopic(int msgId)
+{
+    std::lock_guard<std::recursive_mutex> l(_recvLock);
+    auto                                  it = _respMessages.find((uint32_t)msgId);
+    if (it == _respMessages.end())
+        return "";
+    return it->second.topic;
+}
+
+void ProtoParser::removeResp(int msgId)
+{
+    std::lock_guard<std::recursive_mutex> l(_recvLock);
+    _respMessages.erase((uint32_t)msgId);
+}
+
+std::string ProtoParser::getRespPayload(int msgId)
+{
+    std::lock_guard<std::recursive_mutex> l(_recvLock);
+    auto                                  it = _respMessages.find((uint32_t)msgId);
+    if (it == _respMessages.end())
+        return "";
+    return it->second.payload;
+}
+
 EM_BOOL ProtoParser::onWebSocketClose(int eventType, const EmscriptenWebSocketCloseEvent *websocketEvent, void *userData)
 {
     MqttLog(LOG_WARN, LOG_TYPE_FUNC, "web socket closed");
@@ -2107,17 +2129,32 @@ EM_BOOL ProtoParser::onWebSocketOpen(int eventType, const EmscriptenWebSocketOpe
 EM_BOOL ProtoParser::onWebSocketMessage(int eventType, const EmscriptenWebSocketMessageEvent *websocketEvent, void *userData)
 {
 #ifdef DUMP
-    MqttLog(LOG_DEBUG, LOG_TYPE_FUNC, "ProtoParser::onWebSocketMessage, bin: %d, len: %d", !websocketEvent->isText, websocketEvent->numBytes);
+    MqttLog(LOG_DEBUG, LOG_TYPE_FUNC, "ProtoParser::onWebSocketMessage, bin: %d, len: %d", websocketEvent->isText ? 0 : 1, websocketEvent->numBytes);
 #endif
     /*if (websocketEvent->isText)
 		return EM_TRUE;*/
 
-    ProtoParser         *parser = (ProtoParser *)userData;
-    std::vector<uint8_t> data(websocketEvent->data, websocketEvent->data + websocketEvent->numBytes);
-    int                  r = parser->parse(data);
-    if (parser->_onMessage != nullptr)
+    ProtoParser                          *parser = (ProtoParser *)userData;
+    std::lock_guard<std::recursive_mutex> l(parser->_recvLock);
     {
-        parser->_onMessage(r, parser->_rc, parser->_respMsgId);
+        std::vector<uint8_t> data(websocketEvent->data, websocketEvent->data + websocketEvent->numBytes);
+        int                  r = parser->parse(data);
+        if (parser->_respMessages.size() > 100)
+        {
+            parser->_respMessages.erase(parser->_respMessages.begin());
+        }
+        WasmMessage msg;
+        msg.r       = r;
+        msg.rc      = parser->_rc;
+        msg.msgId   = parser->_respMsgId;
+        msg.topic   = parser->_topic;
+        msg.payload = parser->_payload;
+        parser->_respMessages.emplace(std::make_pair(msg.msgId, msg));
+
+        if (parser->_onMessage != nullptr)
+        {
+            parser->_onMessage(r, parser->_rc, parser->_respMsgId);
+        }
     }
     return EM_TRUE;
 }