dong_dl 1 год назад
Родитель
Сommit
94d88019ca

+ 5 - 14
inc/proto/ProtoParser.h

@@ -15,15 +15,13 @@
 
 class Message;
 
-struct TopicCallback
-{
-    std::function<void(int, std::string)> cb;
-};
-
 struct PublishInfo
 {
+    std::string par;
     std::string topic;
     std::string payload;
+    std::function<void(uint32_t, uint32_t, const std::string &, const std::string &, const std::string &)> cb;
+    uint64_t time;
 };
 
 typedef void (*parserCallback)(const char *topic, const char *json, void *userData);
@@ -49,7 +47,7 @@ public:
     std::string routeRequest(std::string request, std::string payload);
     std::string routeResponse(std::string response);
 
-    std::string publish(std::string topic, std::string payload, uint32_t &msgId);
+    std::string publish(const std::string &topic, const std::string &payload, const std::string &request, uint32_t &msgId, std::function<void(uint32_t, uint32_t, const std::string &, const std::string &, const std::string &)> cb = nullptr, const std::string &par = "");
 
 #if !defined USE_WASM
     void test();
@@ -116,9 +114,6 @@ public:
     }
     std::string getKey(std::string key);
 
-    void addCallback(uint32_t msgId, std::function<void(uint32_t, std::string)> cb);
-    void removeCallback(uint32_t msgId);
-
     bool     isConnected() const;
     uint64_t getMsgHead() const
     {
@@ -309,11 +304,7 @@ private:
     uint64_t                          _timestamp = 0;
     int                               _timeout   = 5000;
     unsigned char                     _rc        = 0;
-    std::mutex                        _msgIdLock;
-    std::map<uint32_t, std::string>   _topics;
-    std::mutex                        _callbackLock;
-    std::map<uint32_t, TopicCallback> _callbacks;
-    std::map<uint32_t, PublishInfo>   _publishes;
+    std::map<uint32_t, PublishInfo*>   _publishes;
     std::mutex                        _publishLock;
     ProtoParserPrivate               *p;
 #ifdef USE_WASM

+ 124 - 101
src/proto/ProtoParser.cpp

@@ -297,50 +297,63 @@ std::string ProtoParser::connectPack(std::string userName)
     return str;
 }
 
-std::string ProtoParser::publish(std::string topic, std::string payload, uint32_t &msgId)
+std::string ProtoParser::publish(const std::string &topic, const std::string &payload, const std::string &request, uint32_t &msgId, std::function<void(uint32_t, uint32_t, const std::string &, const std::string &, const std::string &)> cb, const std::string & par)
 {
+        auto                        now = timestamp_now();
     {
         std::lock_guard<std::mutex> l(_publishLock);
-        for (auto &it : _publishes)
+#ifdef DUMP
+        MqttLog(LOG_INFO, LOG_TYPE_MQTT, "_publishes size: %d", _publishes.size());
+#endif
+        for (auto it = _publishes.begin(); it != _publishes.end();)
         {
-            if (it.second.topic == topic && it.second.payload == payload)
+            if ((now - it->second->time) > 60 * 1000)
+            {
+                MqttLog(LOG_WARN, LOG_TYPE_FUNC, "remove too early publish, topic: %s, msgId: %d, time: %lld", it->second->topic.c_str(), it->first, it->second->time);
+                it = _publishes.erase(it);
+                continue;
+            }
+            if (it->second->topic == topic && it->second->payload == payload)
             {
-                msgId = it.first;
+                msgId = it->first;
 #ifdef DUMP
-                MqttLog(LOG_WARN, LOG_TYPE_FUNC, "already send topic: %s", topic.c_str());
+                MqttLog(LOG_INFO, LOG_TYPE_MQTT, "already send topic: %s, msgId: %d, request: %s", topic.c_str(), msgId, request.c_str());
 #endif
                 return "";
             }
+            it++;
         }
     }
 
-#ifdef DUMP
-    MqttLog(LOG_INFO, LOG_TYPE_MQTT, "publish topic: %s, payload size: %d", topic.c_str(), payload.size());
-#endif
     MqttPublishPacket pack;
     pack.header.byte      = 0;
     pack.header.bits.type = MqttMsgPublish;
     pack.header.bits.qos  = MqttQosExactlyOnce;
-
-    pack.topic = topic;
+    pack.topic   = topic;
+    pack.payload = payload;
     {
-        std::lock_guard<std::mutex> l(_msgIdLock);
+        std::lock_guard<std::mutex> l(_publishLock);
         pack.msgId = ++_msgId;
         if (pack.msgId == 0)
             pack.msgId = ++_msgId;
         msgId = pack.msgId;
-        _topics.emplace(pack.msgId, topic);
+        auto info = new PublishInfo();
+        info->cb  = cb;
+        info->payload = payload;
+        info->topic   = topic;
+        info->time    = timestamp_now();
+        info->par     = par;
+        _publishes.emplace(pack.msgId, info);
+        #ifdef DUMP
+        MqttLog(LOG_INFO, LOG_TYPE_MQTT, "_publishes size: %d", _publishes.size());
+        #endif
     }
-    pack.payload = payload;
-
+#ifdef DUMP
+    MqttLog(LOG_INFO, LOG_TYPE_MQTT, "publish topic: %s, msgId: %d, request: %s", topic.c_str(), pack.msgId, request.c_str());
+#endif
     std::string str;
     mqttToBuffer(&pack, str);
     //MqttLog(MQTT_LOG_INFO, "publish packet len: %d", str.length());
-
-    {
-        std::lock_guard<std::mutex> l(_publishLock);
-        _publishes[msgId] = PublishInfo{topic, payload};
-    }
     return str;
 }
 
@@ -393,7 +406,9 @@ std::string ProtoParser::searchConversation(std::string keyword, std::string typ
 int ProtoParser::parse(std::vector<uint8_t> &data)
 {
     MqttHeader *header = (MqttHeader *)&data[0];
-    // MqttLog(MQTT_LOG_INFO, "parse type: %d", header->bits.type);
+#ifdef DUMP
+    MqttLog(LOG_DEBUG, LOG_TYPE_FUNC, "parse type: %d", header->bits.type);
+#endif
     switch (header->bits.type)
     {
     case MqttMsgConnAck:
@@ -441,11 +456,13 @@ int ProtoParser::parse(std::vector<uint8_t> &data)
         _payload   = "";
         _topic     = pack.topic;
 #ifdef DUMP
-        MqttLog(LOG_INFO, LOG_TYPE_MQTT, "recv publish from server topic: %s, len: %d", _topic.c_str(), pack.payload.length());
+        MqttLog(LOG_INFO, LOG_TYPE_FUNC, "recv publish from server topic: %s, respMsgId: %d, len: %d", _topic.c_str(), _respMsgId, pack.payload.length());
 #endif
         if (_topic.length() > 0 && pack.payload.length() > 0)
         {
-            MqttLogHex(LOG_INFO, LOG_TYPE_MQTT, pack.payload);
+#ifdef DUMP
+            MqttLogHex(LOG_DEBUG, LOG_TYPE_MQTT, pack.payload);
+#endif
             parsePayload(pack.payload, true);
         }
     }
@@ -458,30 +475,32 @@ int ProtoParser::parse(std::vector<uint8_t> &data)
         _rc        = pack.rc;
         _payload   = "";
         _topic     = "";
-        std::lock_guard<std::mutex> l(_msgIdLock);
+        PublishInfo * info=nullptr;
         {
-            auto it = _topics.find(pack.msgId);
-            if (it != _topics.end())
+            std::lock_guard<std::mutex> l(_publishLock);
+            auto it = _publishes.find(pack.msgId);
+            if (it != _publishes.end())
             {
-                _topic = it->second;
-                _topics.erase(it);
+                info = it->second;
+                _topic = info->topic;
+                _publishes.erase(it);
+                MqttLog(LOG_INFO, LOG_TYPE_MQTT, "_publishes size: %d", _publishes.size());
             }
         }
         if (_topic.length() > 0 && pack.payload.length() > 0)
         {
             _rc = parsePayload(pack.payload, false);
         }
+        if (info!=nullptr)
         {
-            std::lock_guard<std::mutex> l(_callbackLock);
-            auto                        it = _callbacks.find(_respMsgId);
-            if (it != _callbacks.end())
+            if (info->cb!=nullptr)
             {
-                it->second.cb(_rc, _payload);
-                _callbacks.erase(it);
+                info->cb(_respMsgId, _rc, pack.payload, _payload, info->par);
             }
+            delete info;
         }
 #ifdef DUMP
-        MqttLog(LOG_DEBUG, LOG_TYPE_MQTT, "recv MqttMsgPubAck from server topic: %s", _topic.c_str());
+        MqttLog(LOG_INFO, LOG_TYPE_MQTT, "recv MqttMsgPubAck from server topic: %s, msgId: %d, rc: %d", _topic.c_str(), _respMsgId, _rc);
 #endif
     }
     break;
@@ -506,7 +525,7 @@ unsigned char ProtoParser::parsePayload(std::string &payload, bool pub)
     {
         auto flag = payload[0];
 #ifdef DUMP
-        MqttLog(LOG_INFO, LOG_TYPE_MQTT, "flag: %d, len: %d", flag, payload.length());
+        MqttLog(LOG_DEBUG, LOG_TYPE_MQTT, "flag: %d, len: %d", flag, payload.length());
 #endif
         payload.erase(payload.begin());
         if ((flag & 1) != 0)
@@ -518,25 +537,19 @@ unsigned char ProtoParser::parsePayload(std::string &payload, bool pub)
             rc = payload[0];
             payload.erase(payload.begin());
 #ifdef DUMP
-            MqttLog(LOG_INFO, LOG_TYPE_MQTT, "rc: %d", rc);
+            MqttLog(LOG_DEBUG, LOG_TYPE_MQTT, "rc: %d", rc);
 #endif
         }
         str = dataDecrypt(payload);
 #ifdef DUMP
-        MqttLog(LOG_INFO, LOG_TYPE_FUNC, "decrypt size: %d, src: %d", str.length(), payload.length());
+        MqttLog(LOG_DEBUG, LOG_TYPE_FUNC, "decrypt size: %d, src: %d", str.length(), payload.length());
 #endif
+        payload = str;
     }
     else
     {
         str = payload;
     }
-    if (!pub)
-    {
-        std::lock_guard<std::mutex> l(_publishLock);
-        auto                        it = _publishes.find(_respMsgId);
-        if (it != _publishes.end())
-            _publishes.erase(it);
-    }
     if (str.length() > 0)
     {
         if (_topic.compare("FP") == 0) // pullFriend
@@ -788,7 +801,7 @@ std::string ProtoParser::pullFriendPack(std::string requestJson, uint32_t &msgId
     v.SerializeToString(&str);
 
     str = dataEncrypt(str);
-    return publish("FP", str, msgId);
+    return publish("FP", str, requestJson, msgId);
 }
 
 std::string ProtoParser::pullFriendRequestPack(std::string requestJson, uint32_t &msgId)
@@ -800,7 +813,7 @@ std::string ProtoParser::pullFriendRequestPack(std::string requestJson, uint32_t
     v.SerializeToString(&str);
 
     str = dataEncrypt(str);
-    return publish("FRP", str, msgId);
+    return publish("FRP", str, requestJson, msgId);
 }
 
 std::string ProtoParser::addFriendPack(std::string requestJson, uint32_t &msgId)
@@ -816,7 +829,7 @@ std::string ProtoParser::addFriendPack(std::string requestJson, uint32_t &msgId)
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("FAR", str, msgId);
+    return publish("FAR", str, requestJson, msgId);
 }
 
 std::string ProtoParser::addGroupMemberPack(std::string requestJson, uint32_t &msgId)
@@ -831,7 +844,7 @@ std::string ProtoParser::addGroupMemberPack(std::string requestJson, uint32_t &m
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("GAM", str, msgId);
+    return publish("GAM", str, requestJson, msgId);
 }
 
 std::string ProtoParser::createGroupPack(std::string requestJson, uint32_t &msgId)
@@ -847,7 +860,7 @@ std::string ProtoParser::createGroupPack(std::string requestJson, uint32_t &msgI
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("GC", str, msgId);
+    return publish("GC", str, requestJson, msgId);
 }
 
 std::string ProtoParser::deleteFriendPack(std::string requestJson, uint32_t &msgId)
@@ -863,7 +876,7 @@ std::string ProtoParser::deleteFriendPack(std::string requestJson, uint32_t &msg
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("FDL", str, msgId);
+    return publish("FDL", str, requestJson, msgId);
 }
 
 std::string ProtoParser::dismissGroupPack(std::string requestJson, uint32_t &msgId)
@@ -879,7 +892,7 @@ std::string ProtoParser::dismissGroupPack(std::string requestJson, uint32_t &msg
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("GD", str, msgId);
+    return publish("GD", str, requestJson, msgId);
 }
 
 std::string ProtoParser::getGroupInfoPack(std::string requestJson, uint32_t &msgId)
@@ -894,7 +907,7 @@ std::string ProtoParser::getGroupInfoPack(std::string requestJson, uint32_t &msg
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("GPGI", str, msgId);
+    return publish("GPGI", str, requestJson, msgId);
 }
 
 std::string ProtoParser::getGroupMemberPack(std::string requestJson, uint32_t &msgId)
@@ -910,7 +923,7 @@ std::string ProtoParser::getGroupMemberPack(std::string requestJson, uint32_t &m
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("GPGM", str, msgId);
+    return publish("GPGM", str, requestJson, msgId);
 }
 
 std::string ProtoParser::getUserInfoPack(std::string requestJson, uint32_t &msgId)
@@ -923,13 +936,13 @@ std::string ProtoParser::getUserInfoPack(std::string requestJson, uint32_t &msgI
         return "";
     }
 #ifdef DUMP
-    MqttLog(LOG_INFO, LOG_TYPE_FUNC, "getUserInfoPack: %d, %s", par.request().size(), par.request().at(0).uid().c_str());
+    MqttLog(LOG_DEBUG, LOG_TYPE_FUNC, "getUserInfoPack: size: %d, first uid: %s", par.request().size(), par.request().at(0).uid().c_str());
 #endif
 
     std::string str;
     par.SerializeToString(&str);
     str      = dataEncrypt(str);
-    auto ret = publish("UPUI", str, msgId);
+    auto ret = publish("UPUI", str, requestJson, msgId);
     return ret;
 }
 
@@ -949,7 +962,7 @@ std::string ProtoParser::getUserSettingPack(std::string requestJson, uint32_t &m
     v.SerializeToString(&str);
 
     str = dataEncrypt(str);
-    return publish("UG", str, msgId);
+    return publish("UG", str, requestJson, msgId);
 }
 
 std::string ProtoParser::handleFriendRequestPack(std::string requestJson, uint32_t &msgId)
@@ -965,7 +978,7 @@ std::string ProtoParser::handleFriendRequestPack(std::string requestJson, uint32
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("FHR", str, msgId);
+    return publish("FHR", str, requestJson, msgId);
 }
 
 std::string ProtoParser::loadRemoteMessagesPack(std::string requestJson, uint32_t &msgId)
@@ -981,7 +994,7 @@ std::string ProtoParser::loadRemoteMessagesPack(std::string requestJson, uint32_
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("LRM", str, msgId);
+    return publish("LRM", str, requestJson, msgId);
 }
 
 std::string ProtoParser::modifyMyInfoPack(std::string requestJson, uint32_t &msgId)
@@ -997,7 +1010,7 @@ std::string ProtoParser::modifyMyInfoPack(std::string requestJson, uint32_t &msg
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("MMI", str, msgId);
+    return publish("MMI", str, requestJson, msgId);
 }
 
 std::string ProtoParser::pullMessagePack(std::string requestJson, uint32_t &msgId)
@@ -1014,7 +1027,7 @@ std::string ProtoParser::pullMessagePack(std::string requestJson, uint32_t &msgI
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("MP", str, msgId);
+    return publish("MP", str, requestJson, msgId);
 }
 
 std::string ProtoParser::putUserSettingPack(std::string requestJson, uint32_t &msgId)
@@ -1030,7 +1043,7 @@ std::string ProtoParser::putUserSettingPack(std::string requestJson, uint32_t &m
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("UP", str, msgId);
+    return publish("UP", str, requestJson, msgId);
 }
 
 std::string ProtoParser::quitGroupPack(std::string requestJson, uint32_t &msgId)
@@ -1046,7 +1059,7 @@ std::string ProtoParser::quitGroupPack(std::string requestJson, uint32_t &msgId)
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("GQ", str, msgId);
+    return publish("GQ", str, requestJson, msgId);
 }
 
 std::string ProtoParser::recallMessagePack(std::string requestJson, uint32_t &msgId)
@@ -1059,12 +1072,20 @@ std::string ProtoParser::recallMessagePack(std::string requestJson, uint32_t &ms
         msgId = 0;
         return "";
     }
-    par.set_id(GetInt64(doc["id"]));
+    auto mid = GetInt64(doc["id"]);
+    par.set_id(mid);
 
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("MR", str, msgId);
+    std::string s = std::to_string(mid);
+    return publish("MR", str, requestJson, msgId, [&](auto msgId, auto rc, auto& org, auto& payload, auto& par) 
+    {
+        if (rc == 0)
+        {
+            p->recallMessage(atoll(par.c_str()));
+        }
+    }, s);
 }
 
 std::string ProtoParser::syncFriendRequestUnreadPack(std::string requestJson, uint32_t &msgId)
@@ -1080,7 +1101,7 @@ std::string ProtoParser::syncFriendRequestUnreadPack(std::string requestJson, ui
     std::string str;
     v.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("FRUS", str, msgId);
+    return publish("FRUS", str, requestJson, msgId);
 }
 
 std::string ProtoParser::sendMessagePack(std::string requestJson, uint32_t &msgId)
@@ -1099,17 +1120,30 @@ std::string ProtoParser::sendMessagePack(std::string requestJson, uint32_t &msgI
     std::string str;
     par.SerializeToString(&str);
     str      = dataEncrypt(str);
-    auto ret = publish("MS", str, msgId);
-    p->sendMessage(par, msgId);
-
-    addCallback(msgId, [&](auto i, auto s) {
-        if (s.length() == 16)
+    auto ret = publish("MS", str, requestJson, msgId, [&](auto msgId, auto rc, auto& org, auto& payload, auto& par) {
+        if (rc == 0)
         {
-            auto mid = htonll(*(int64_t *)s.c_str());
-            auto ts  = htonll(*((int64_t *)str.c_str()) + 1);
+            if (org.length() == 16)
+            {
+                int64_t *ptr = (int64_t *)org.c_str();
+                auto     mid = htonll(*ptr);
+                auto     ts  = htonll(*(ptr + 1));
+#ifdef DUMP
+                MqttLog(LOG_INFO, LOG_TYPE_FUNC, "update message: %d, mid: %lld, ts: %lld", msgId, mid, ts);
+#endif
+                p->updateMessage(msgId, mid, ts);
+            }
+            else
+            {
+                MqttLog(LOG_WARN, LOG_TYPE_FUNC, "unexpect send message callback: %d", org.length());
+            }
         }
     });
-
+    if (ret.length() > 0)
+    {
+        MqttLog(LOG_INFO, LOG_TYPE_FUNC, "send message with msgId: %d", msgId);
+        p->sendMessage(par, msgId);
+    }
     return ret;
 }
 
@@ -1134,7 +1168,7 @@ std::string ProtoParser::uploadMediaPack(std::string requestJson, uint32_t &msgI
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("GMUT", str, msgId);
+    return publish("GMUT", str, requestJson, msgId);
 }
 
 std::string ProtoParser::RriendRequestUnreadSyncPack(std::string requestJson, uint32_t &msgId)
@@ -1151,7 +1185,7 @@ std::string ProtoParser::RriendRequestUnreadSyncPack(std::string requestJson, ui
     v.SerializeToString(&str);
 
     str = dataEncrypt(str);
-    return publish("FRUS", str, msgId);
+    return publish("FRUS", str, requestJson, msgId);
 }
 
 std::string ProtoParser::userSearchPack(std::string requestJson, uint32_t &msgId)
@@ -1174,7 +1208,7 @@ std::string ProtoParser::userSearchPack(std::string requestJson, uint32_t &msgId
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("US", str, msgId);
+    return publish("US", str, requestJson, msgId);
 }
 
 std::string ProtoParser::kickoffGroupMemberPack(std::string requestJson, uint32_t &msgId)
@@ -1218,7 +1252,7 @@ std::string ProtoParser::kickoffGroupMemberPack(std::string requestJson, uint32_
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("GKM", str, msgId);
+    return publish("GKM", str, requestJson, msgId);
 }
 
 std::string ProtoParser::modifyGroupInfoPack(std::string requestJson, uint32_t &msgId)
@@ -1256,7 +1290,7 @@ std::string ProtoParser::modifyGroupInfoPack(std::string requestJson, uint32_t &
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("GMI", str, msgId);
+    return publish("GMI", str, requestJson, msgId);
 }
 
 std::string ProtoParser::modifyGroupAliasPack(std::string requestJson, uint32_t &msgId)
@@ -1295,7 +1329,7 @@ std::string ProtoParser::modifyGroupAliasPack(std::string requestJson, uint32_t
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("GMA", str, msgId);
+    return publish("GMA", str, requestJson, msgId);
 }
 
 std::string ProtoParser::setFriendAliasPack(std::string requestJson, uint32_t &msgId)
@@ -1317,7 +1351,7 @@ std::string ProtoParser::setFriendAliasPack(std::string requestJson, uint32_t &m
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("FALS", str, msgId);
+    return publish("FALS", str, requestJson, msgId);
 }
 
 std::string ProtoParser::transferGroupPack(std::string requestJson, uint32_t &msgId)
@@ -1354,7 +1388,7 @@ std::string ProtoParser::transferGroupPack(std::string requestJson, uint32_t &ms
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("GTG", str, msgId);
+    return publish("GTG", str, requestJson, msgId);
 }
 
 std::string ProtoParser::setGroupManagerPack(std::string requestJson, uint32_t &msgId)
@@ -1397,7 +1431,7 @@ std::string ProtoParser::setGroupManagerPack(std::string requestJson, uint32_t &
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("GTG", str, msgId);
+    return publish("GTG", str, requestJson, msgId);
 }
 
 std::string ProtoParser::clearMessagesPack(std::string requestJson, uint32_t &msgId)
@@ -1418,7 +1452,7 @@ std::string ProtoParser::clearMessagesPack(std::string requestJson, uint32_t &ms
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("CMD", str, msgId);
+    return publish("CMD", str, requestJson, msgId);
 }
 
 std::string ProtoParser::modifyGroupMemberAliasPack(std::string requestJson, uint32_t &msgId)
@@ -1456,7 +1490,7 @@ std::string ProtoParser::modifyGroupMemberAliasPack(std::string requestJson, uin
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("GMMA", str, msgId);
+    return publish("GMMA", str, requestJson, msgId);
 }
 
 std::string ProtoParser::modifyGroupMemberExtraPack(std::string requestJson, uint32_t &msgId)
@@ -1495,23 +1529,7 @@ std::string ProtoParser::modifyGroupMemberExtraPack(std::string requestJson, uin
     std::string str;
     par.SerializeToString(&str);
     str = dataEncrypt(str);
-    return publish("GMME", str, msgId);
-}
-
-void ProtoParser::addCallback(uint32_t msgId, std::function<void(uint32_t, std::string)> cb)
-{
-    std::lock_guard<std::mutex> l(_callbackLock);
-    TopicCallback               tcb;
-    tcb.cb = cb;
-    _callbacks.emplace(std::make_pair(msgId, tcb));
-}
-
-void ProtoParser::removeCallback(uint32_t msgId)
-{
-    std::lock_guard<std::mutex> l(_callbackLock);
-    auto                        it = _callbacks.find(msgId);
-    if (it != _callbacks.end())
-        _callbacks.erase(it);
+    return publish("GMME", str, requestJson, msgId);
 }
 
 std::string ProtoParser::getUserInfo(std::string userId, std::string groupId)
@@ -1964,6 +1982,11 @@ EM_BOOL ProtoParser::onWebSocketError(int eventType, const EmscriptenWebSocketEr
 EM_BOOL ProtoParser::onWebSocketOpen(int eventType, const EmscriptenWebSocketOpenEvent *websocketEvent, void *userData)
 {
     ProtoParser *parser = (ProtoParser *)userData;
+        
+    {
+        std::lock_guard<std::mutex> l(parser->_publishLock);
+        parser->_publishes.clear();
+    }
     //MqttLog(MQTT_LOG_INFO, "ProtoParser::onWebSocketOpen, uid: %s", parser->_uid.c_str());
     auto res = parser->connectPack(parser->_uid);
     parser->send(res);
@@ -1978,7 +2001,7 @@ EM_BOOL ProtoParser::onWebSocketOpen(int eventType, const EmscriptenWebSocketOpe
 EM_BOOL ProtoParser::onWebSocketMessage(int eventType, const EmscriptenWebSocketMessageEvent *websocketEvent, void *userData)
 {
 #ifdef DUMP
-    MqttLog(MQTT_LOG_INFO, "ProtoParser::onWebSocketMessage, bin: %d, len: %d", !websocketEvent->isText, websocketEvent->numBytes);
+    MqttLog(LOG_DEBUG, LOG_TYPE_FUNC, "ProtoParser::onWebSocketMessage, bin: %d, len: %d", !websocketEvent->isText, websocketEvent->numBytes);
 #endif
     /*if (websocketEvent->isText)
 		return EM_TRUE;*/

+ 85 - 25
src/proto/ProtoParserPrivate.cpp

@@ -135,7 +135,7 @@ int ProtoParserPrivate::getCurrent(uint64_t &msgCurrent, uint64_t &friendCurrent
 #endif
 
 #if defined DUMP || defined DUMP_MESSAGE
-    MqttLog(MQTT_LOG_INFO, "msgCurrent: %lld, friendCurrent: %lld, friendRqCurrrent: %lld, settingCurrent: %lld", msgCurrent, friendCurrent, friendRqCurrent, settingCurrent);
+    MqttLog(LOG_INFO, LOG_TYPE_FUNC, "msgCurrent: %lld, friendCurrent: %lld, friendRqCurrrent: %lld, settingCurrent: %lld", msgCurrent, friendCurrent, friendRqCurrent, settingCurrent);
 #endif
     return 0;
 }
@@ -262,6 +262,58 @@ int ProtoParserPrivate::getGroupMembers(const char *id, std::function<bool(const
 #endif
 }
 
+void ProtoParserPrivate::updateMessage(uint64_t msgId, uint64_t mid, uint64_t ts)
+{
+#ifdef USE_SQLITE3
+// TODO:
+#else
+    bool found = false;
+    for (auto &msg : _messages)
+    {
+        if (msg.message_id() == msgId)
+        {
+            found = true;
+            msg.set_message_id(mid);
+            msg.set_server_timestamp(ts);
+            break;
+        }
+    }
+    if (found)
+    {
+        sortMessage(nullptr);
+    }
+    else
+    {
+        MqttLog(LOG_WARN, LOG_TYPE_FUNC, "can't find message with id: %d", msgId);
+    }
+#endif
+}
+void ProtoParserPrivate::recallMessage(uint64_t mid)
+{
+#ifdef USE_SQLITE3
+// TODO:
+#else
+    bool found = false;
+    for (auto it = _messages.begin(); it != _messages.end() ; it++)
+    {
+        if (it->message_id() == mid)
+        {
+            found = true;
+            _messages.erase(it);
+            break;
+        }
+    }
+    if (found)
+    {
+        sortMessage(nullptr);
+    }
+    else
+    {
+        MqttLog(LOG_WARN, LOG_TYPE_FUNC, "recall can't find message with id: %lld", mid);
+    }
+    #endif
+}
+
 void ProtoParserPrivate::sendMessage(Message &msg, uint32_t msgId)
 {
     if (msgId == 0)
@@ -270,16 +322,12 @@ void ProtoParserPrivate::sendMessage(Message &msg, uint32_t msgId)
 // TODO:
 #else
 #ifdef DUMP_MESSAGE
-    MqttLog(MQTT_LOG_INFO, "current message count: %d", _messages.size());
-    MqttLog(MQTT_LOG_INFO, "send message, line: %d, type: %d, target: %s", msg.conversation().line(), msg.conversation().type(), msg.conversation().target().c_str());
+    MqttLog(LOG_INFO, LOG_TYPE_DATA, "current message count: %d", _messages.size());
+    MqttLog(LOG_INFO, LOG_TYPE_FUNC, "send message, line: %d, type: %d, target: %s, msgId: %d", msg.conversation().line(), msg.conversation().type(), msg.conversation().target().c_str(), msgId);
 #endif
+    msg.set_message_id(msgId);
     _messages.emplace_back(msg);
-    _messages.sort([](const auto &s1, const auto &s2) {
-        return s1.server_timestamp() < s2.server_timestamp();
-    });
-#ifdef DUMP_MESSAGE
-    MqttLog(MQTT_LOG_INFO, "current message count: %d", _messages.size());
-#endif
+    sortMessage(nullptr);
 #endif
 }
 
@@ -341,7 +389,7 @@ int ProtoParserPrivate::getMessage(bool before, uint64_t idx, int count, int typ
 #else
 
 #ifdef DUMP_MESSAGE
-    MqttLog(MQTT_LOG_INFO, "_messages count: %d", _messages.size());
+    MqttLog(LOG_INFO, LOG_TYPE_FUNC, "_messages count: %d", _messages.size());
 #endif
 
     if (before)
@@ -823,7 +871,7 @@ int ProtoParserPrivate::update(PullUserResult &result)
     {
         auto entry = result.result(i);
 #ifdef DUMP
-        MqttLog(MQTT_LOG_INFO, "update user: %s", entry.user().uid().c_str());
+        MqttLog(LOG_INFO, LOG_TYPE_FUNC, "update user: %s", entry.user().uid().c_str());
 #endif
         for (auto it = _users.begin(); it != _users.end(); it++)
         {
@@ -893,6 +941,30 @@ int ProtoParserPrivate::update(PullGroupMemberResult &result)
 #endif
 }
 
+#ifndef USE_SQLITE3
+void ProtoParserPrivate::sortMessage(uint64_t *head)
+{
+    _messages.sort([](const auto &s1, const auto &s2) {
+        return s1.server_timestamp() < s2.server_timestamp();
+    });
+    if (head != nullptr)
+    {
+        for (auto it = _messages.rbegin(); it != _messages.rend(); it++)
+        {
+            if (it->message_id() > 0x100000000L)
+            {
+                *head = _messages.rbegin()->message_id();
+                break;
+            }
+        }
+        // TODO: update head
+#if defined DUMP || defined DUMP_MESSAGE
+        MqttLog(LOG_DEBUG, LOG_TYPE_FUNC, "msg current: %lld", head);
+#endif
+    }
+}
+#endif
+
 int ProtoParserPrivate::update(NotifyMessage &result, std::string &uid, uint64_t &head)
 {
 #ifdef USE_SQLITE3
@@ -906,13 +978,7 @@ int ProtoParserPrivate::update(NotifyMessage &result, std::string &uid, uint64_t
             conv->set_target(result.message().from_user());
         }
         _messages.emplace_back(result.message());
-        _messages.sort([](const auto &s1, const auto &s2) {
-            return s1.server_timestamp() < s2.server_timestamp();
-        });
-        // TODO: update head
-#if defined DUMP || defined DUMP_MESSAGE
-        MqttLog(MQTT_LOG_INFO, "msg current: %lld", head);
-#endif
+        sortMessage(&head);
     }
 #endif
     return 0;
@@ -995,13 +1061,7 @@ int ProtoParserPrivate::update(PullMessageResult &result, uint64_t &head)
         }
         _messages.emplace_back(entry);
     }
-    _messages.sort([](const auto &s1, const auto &s2) {
-        return s1.server_timestamp() < s2.server_timestamp();
-    });
-    // TODO: update head
-#endif
-#if defined DUMP || defined DUMP_MESSAGE
-    MqttLog(MQTT_LOG_INFO, "msg current: %lld", head);
+    sortMessage(&head);
 #endif
     return 0;
 }

+ 4 - 0
src/proto/ProtoParserPrivate.h

@@ -42,7 +42,9 @@ public:
     bool isSilent(Conversation &par);
     void getUnread(Conversation &par, std::string uid, int &unread, int &unreadMention, int &unreadMentionAll);
 
+    void recallMessage(uint64_t mid);
     void sendMessage(Message &msg, uint32_t msgId);
+    void updateMessage(uint64_t msgId, uint64_t mid, uint64_t ts);
 
 protected:
 #ifdef USE_SQLITE3
@@ -52,6 +54,8 @@ protected:
     static void toMessage(struct sqlite3_stmt *stmt, Message &msg);
     void        bindText(struct sqlite3_stmt *stmt, int col, const char *str);
     static void freeStr(void *);
+#else
+    void sortMessage(uint64_t *head);
 #endif
 private:
 #ifndef USE_ALL_SQLITE3

+ 2 - 2
src/proto/pch.h

@@ -33,7 +33,7 @@
 #define stricmp strcasecmp
 #endif
 
-//#define DUMP
-//#define DUMP_MESSAGE
+#define DUMP
+#define DUMP_MESSAGE
 
 uint64_t htonll(uint64_t Value);

+ 1 - 1
src/zlib/zconf.h.included

@@ -8,7 +8,7 @@
 #ifndef ZCONF_H
 #define ZCONF_H
 /* #undef Z_PREFIX */
-/* #undef Z_HAVE_UNISTD_H */
+#define Z_HAVE_UNISTD_H
 
 /*
  * If you *really* need a unique prefix for all types and library functions,