Forráskód Böngészése

优化getUserInfo和getGroupInfo

dong_dl 1 éve
szülő
commit
2e407c5310

+ 3 - 0
inc/proto/MqttLog.h

@@ -22,6 +22,9 @@ enum LOG_TYPE
 MQTTAPI void MqttSetLogType(LOG_TYPE type);
 MQTTAPI void MqttSetLogLevel(LOG_LEVEL level);
 
+MQTTAPI LOG_TYPE MqttGetLogType();
+MQTTAPI LOG_LEVEL MqttGetLogLevel();
+
 MQTTAPI void MqttLog(LOG_LEVEL level, LOG_TYPE type, const char *msg, ...);
 MQTTAPI void MqttLogHex(LOG_LEVEL level, LOG_TYPE type, std::string &str);
 

+ 9 - 3
inc/proto/ProtoParser.h

@@ -31,6 +31,7 @@ class ProtoParserPrivate;
 typedef void (*onConnectedCallback)();
 typedef void (*onDisconnectedCallback)();
 typedef void (*onMessageCallback)(int cmd, int rc, int msgId);
+typedef void (*onNotifyCallback)(int cmd, int rc, int msgId);
 
 class CLASSAPI ProtoParser
 {
@@ -191,11 +192,13 @@ public:
 
     void setLogLevel(int level);
     void setLogType(int type);
-
+    int  getLogLevel() const;
+    int  getLogType() const;
 #ifdef USE_WASM
     void setOnConnected(int ptr);
     void setOnDisconnected(int ptr);
     void setOnMessage(int ptr);
+    void setOnNotify(int ptr);
     void setUrl(std::string url)
     {
         _url = url;
@@ -298,6 +301,7 @@ private:
     onConnectedCallback               _onConnected    = nullptr;
     onDisconnectedCallback            _onDisconnected = nullptr;
     onMessageCallback                 _onMessage      = nullptr;
+    onNotifyCallback                   _onNotify       = nullptr;
     void                             *_userData;
     uint32_t                          _msgId     = 0;
     uint32_t                          _respMsgId = 0;
@@ -305,6 +309,7 @@ private:
     int                               _timeout   = 5000;
     unsigned char                     _rc        = 0;
     std::map<uint32_t, PublishInfo*>   _publishes;
+    std::map<std::string, uint32_t>    _pullIds;
     std::mutex                        _publishLock;
     ProtoParserPrivate               *p;
 #ifdef USE_WASM
@@ -387,10 +392,11 @@ EMSCRIPTEN_BINDINGS(Proto)
         .function("searchConversation", &ProtoParser::searchConversation)
         .function("setOnConnected", &ProtoParser::setOnConnected)
         .function("setOnMessage", &ProtoParser::setOnMessage)
+        .function("setOnNotify", &ProtoParser::setOnNotify)
         .function("setOnDisconnected", &ProtoParser::setOnDisconnected)
-        .function("setLogLevel", &ProtoParser::setLogLevel)
-        .function("setLogType", &ProtoParser::setLogType)
 
+        .property("logLevel", &ProtoParser::getLogLevel, &ProtoParser::setLogLevel)
+        .property("logType", &ProtoParser::getLogType, &ProtoParser::setLogType)
         .property("timeout", &ProtoParser::getTimeout, &ProtoParser::setTimeout)
         .property("secret", &ProtoParser::getSecret)
         .property("topic", &ProtoParser::getTopic)

+ 9 - 0
src/proto/MqttLog.cpp

@@ -14,6 +14,15 @@ void MqttSetLogLevel(LOG_LEVEL level)
     g_logLevel = level;
 }
 
+LOG_TYPE MqttGetLogType()
+{
+    return g_logType;
+}
+LOG_LEVEL MqttGetLogLevel()
+{
+    return g_logLevel;
+}
+
 void MqttLog(LOG_LEVEL level, LOG_TYPE type, const char *msg, ...)
 {
     if (level != LOG_ERROR)

+ 102 - 5
src/proto/ProtoParser.cpp

@@ -150,6 +150,16 @@ void ProtoParser::setLogType(int type)
     MqttSetLogType((LOG_TYPE)type);
 }
 
+int ProtoParser::getLogLevel() const
+{
+    return MqttGetLogLevel();
+}
+
+int ProtoParser::getLogType() const
+{
+    return MqttGetLogType();
+}
+
 bool ProtoParser::isConnected() const
 {
 #ifdef USE_WASM
@@ -309,6 +319,17 @@ std::string ProtoParser::publish(const std::string &topic, const std::string &pa
         {
             if ((now - it->second->time) > 60 * 1000)
             {
+                for (auto it2 = _pullIds.begin(); it2 != _pullIds.end(); )
+                {
+                    if (it2->second == it->first)
+                    {
+                        it2 = _pullIds.erase(it2);
+                    }
+                    else
+                    {
+                        it2++;
+                    }
+                }
                 MqttLog(LOG_WARN, LOG_TYPE_FUNC, "remove too early publish, topic: %s, msgId: %d, time: %lld", it->second->topic.c_str(), it->first, it->second->time);
                 it = _publishes.erase(it);
                 continue;
@@ -317,7 +338,7 @@ std::string ProtoParser::publish(const std::string &topic, const std::string &pa
             {
                 msgId = it->first;
 #ifdef DUMP
-                MqttLog(LOG_INFO, LOG_TYPE_MQTT, "already send topic: %s, msgId: %d, request: %s", topic.c_str(), msgId, request.c_str());
+                MqttLog(LOG_DEBUG, LOG_TYPE_MQTT, "already send topic: %s, msgId: %d, request: %s", topic.c_str(), msgId, request.c_str());
 #endif
                 return "";
             }
@@ -488,6 +509,17 @@ int ProtoParser::parse(std::vector<uint8_t> &data)
                 MqttLog(LOG_DEBUG, LOG_TYPE_MQTT, "_publishes size: %d", _publishes.size());
 #endif
             }
+            for (auto it2 = _pullIds.begin(); it2 != _pullIds.end();)
+            {
+                if (it2->second == pack.msgId)
+                {
+                    it2 = _pullIds.erase(it2);
+                }
+                else
+                {
+                    it2++;
+                }
+            }
         }
         if (_topic.length() > 0 && pack.payload.length() > 0)
         {
@@ -699,7 +731,11 @@ unsigned char ProtoParser::parsePayload(std::string &payload, bool pub)
             if (r.ParseFromArray(str.c_str(), str.length()))
             {
                 if (_topic.compare("MN") == 0)
+                {
                     p->update(r, _uid, _msgCurrent);
+
+                    
+                }
                 toJson(r, _payload);
             }
             else
@@ -900,16 +936,45 @@ std::string ProtoParser::dismissGroupPack(std::string requestJson, uint32_t &msg
 std::string ProtoParser::getGroupInfoPack(std::string requestJson, uint32_t &msgId)
 {
     PullUserRequest par;
-    if (!parseJson(requestJson, par))
+    if (!parseJson(requestJson, _pullIds, par))
     {
         MqttLog(LOG_ERROR, LOG_TYPE_FUNC, "getGroupInfoPack paseJson failed");
         msgId = 0;
         return "";
     }
+    if (par.request_size() == 0)
+        return "";
+
     std::string str;
     par.SerializeToString(&str);
-    str = dataEncrypt(str);
-    return publish("GPGI", str, requestJson, msgId);
+    str      = dataEncrypt(str);
+    auto ret = publish("GPGI", str, requestJson, msgId);
+    if (ret.length() > 0)
+    {
+        for (auto it = _pullIds.begin(); it != _pullIds.end(); it++)
+        {
+            if (it->second == 0)
+            {
+                it->second = msgId;
+            }
+        }
+    }
+    else
+    {
+        for (auto it = _pullIds.begin(); it != _pullIds.end(); )
+        {
+            if (it->second == 0)
+            {
+                it = _pullIds.erase(it);
+            }
+            else
+            {
+                it++;
+            }
+        }
+    }
+
+    return ret;
 }
 
 std::string ProtoParser::getGroupMemberPack(std::string requestJson, uint32_t &msgId)
@@ -931,12 +996,15 @@ std::string ProtoParser::getGroupMemberPack(std::string requestJson, uint32_t &m
 std::string ProtoParser::getUserInfoPack(std::string requestJson, uint32_t &msgId)
 {
     PullUserRequest par;
-    if (!parseJson(requestJson, par) || par.request().size() == 0)
+    if (!parseJson(requestJson, _pullIds, par))
     {
         MqttLog(LOG_ERROR, LOG_TYPE_FUNC, "getUserInfoPack paseJson failed");
         msgId = 0;
         return "";
     }
+    if(par.request_size()==0)
+        return "";
+
 #ifdef DUMP
     MqttLog(LOG_DEBUG, LOG_TYPE_FUNC, "getUserInfoPack: size: %d, first uid: %s", par.request().size(), par.request().at(0).uid().c_str());
 #endif
@@ -945,6 +1013,30 @@ std::string ProtoParser::getUserInfoPack(std::string requestJson, uint32_t &msgI
     par.SerializeToString(&str);
     str      = dataEncrypt(str);
     auto ret = publish("UPUI", str, requestJson, msgId);
+    if (ret.length() > 0)
+    {
+        for (auto it = _pullIds.begin(); it != _pullIds.end(); it++)
+        {
+            if (it->second == 0)
+            {
+                it->second = msgId;
+            }
+        }
+    }
+    else
+    {
+        for (auto it = _pullIds.begin(); it != _pullIds.end();)
+        {
+            if (it->second == 0)
+            {
+                it = _pullIds.erase(it);
+            }
+            else
+            {
+                it++;
+            }
+        }
+    }
     return ret;
 }
 
@@ -1920,6 +2012,10 @@ void ProtoParser::setOnMessage(int ptr)
 {
     _onMessage = (onMessageCallback)ptr;
 }
+void ProtoParser::setOnNotify(int ptr)
+{
+    _onNotify = (onNotifyCallback)ptr;
+}
 
 int ProtoParser::reconnectVal()
 {
@@ -1989,6 +2085,7 @@ EM_BOOL ProtoParser::onWebSocketOpen(int eventType, const EmscriptenWebSocketOpe
     {
         std::lock_guard<std::mutex> l(parser->_publishLock);
         parser->_publishes.clear();
+        parser->_pullIds.clear();
     }
     //MqttLog(MQTT_LOG_INFO, "ProtoParser::onWebSocketOpen, uid: %s", parser->_uid.c_str());
     auto res = parser->connectPack(parser->_uid);

+ 15 - 3
src/proto/jsonUtils.cpp

@@ -48,7 +48,14 @@ void copy(MessageContent &content, rapidjson::Value &mem)
         if (mem.HasMember("content"))
             content.set_content(mem["content"].GetString());
         if (mem.HasMember("binaryContent"))
-            content.set_data(mem["binaryContent"].GetString());
+        {
+            auto   bin      = mem["binaryContent"].GetString();
+            int    len     = base64DecodeLength(bin, strlen(bin));
+            uint8_t *cbytes   = new uint8_t[len + 1];
+            size_t   retLen   = base64Decode(cbytes, len, (const uint8_t*)bin, strlen(bin));
+            content.set_data(cbytes, retLen);
+            delete[] cbytes;
+        }
         if (mem.HasMember("mediaType"))
             content.set_mediatype(mem["mediaType"].GetInt());
         if (mem.HasMember("remoteMediaUrl"))
@@ -884,7 +891,7 @@ bool parseJson(std::string &str, DismissGroupRequest &par)
     return true;
 }
 
-bool parseJson(std::string &str, PullUserRequest &par)
+bool parseJson(std::string &str, std::map<std::string, uint32_t>& ids, PullUserRequest &par)
 {
     rapidjson::Document doc;
     if (str.length() <= 0 || doc.Parse(str.c_str()).HasParseError())
@@ -893,11 +900,16 @@ bool parseJson(std::string &str, PullUserRequest &par)
         return false;
     }
     auto arr = doc["request"].GetArray();
+
     for (int i = 0; i < arr.Size(); i++)
     {
         auto &item = arr[i];
+        const char *uid  = item["uid"].GetString();
+        if (ids.find(uid) != ids.end())
+            continue;
+        ids[uid] = 0;
         auto  req  = par.add_request();
-        req->set_uid(item["uid"].GetString());
+        req->set_uid(uid);
         if (item.HasMember("update_dt"))
             req->set_update_dt(item["update_dt"].GetInt64());
     }

+ 1 - 1
src/proto/jsonUtils.h

@@ -49,7 +49,7 @@ bool parseJson(std::string &str, AddGroupMemberRequest &par);
 bool parseJson(std::string &str, CreateGroupRequest &par);
 bool parseJson(std::string &str, IDBuf &par);
 bool parseJson(std::string &str, DismissGroupRequest &par);
-bool parseJson(std::string &str, PullUserRequest &par);
+bool parseJson(std::string &str, std::map<std::string, uint32_t> &ids, PullUserRequest &par);
 bool parseJson(std::string &str, QuitGroupRequest &par);
 bool parseJson(std::string &str, PullGroupMemberRequest &par);
 bool parseJson(std::string &str, HandleFriendRequest &par);