dong_dl há 1 ano atrás
pai
commit
dd99eba9a2

+ 4 - 0
src/proto/ProtoParser.cpp

@@ -1976,6 +1976,10 @@ std::string ProtoParser::getMessages(std::string requestJson)
         count  = doc["count"].GetInt();
     }
 
+#ifdef DUMP
+    MqttLog(LOG_INFO, LOG_TYPE_FUNC, "call getMessages, fromIndex: %lld, before: %d, count: %d", idx, before ? 1 : 0, count);
+#endif
+
     rapidjson::StringBuffer                    buffer;
     rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
     writer.StartArray();

+ 55 - 10
src/proto/ProtoParserPrivate.cpp

@@ -18,6 +18,7 @@ ProtoParserPrivate::~ProtoParserPrivate()
 
 void ProtoParserPrivate::close()
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
 #ifdef USE_SQLITE3
     if (_db != nullptr)
     {
@@ -40,6 +41,7 @@ void ProtoParserPrivate::close()
 
 int ProtoParserPrivate::open(const char *pwd)
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
 #ifdef USE_SQLITE3
     /* EM_ASM(
         FS.mkdir('/data');
@@ -123,6 +125,7 @@ int ProtoParserPrivate::check()
 
 int ProtoParserPrivate::getCurrent(uint64_t &msgCurrent, uint64_t &friendCurrent, uint64_t &friendRqCurrent, uint64_t &settingCurrent)
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
     msgCurrent      = 0;
     friendCurrent   = 0;
     friendRqCurrent = 0;
@@ -142,6 +145,7 @@ int ProtoParserPrivate::getCurrent(uint64_t &msgCurrent, uint64_t &friendCurrent
 
 int ProtoParserPrivate::getFriendRequest(const char *from, const char *to, int status, int fromReadStatus, int toReadStatus, std::function<bool(const FriendRequest &item)> cb)
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
 #ifdef USE_ALL_SQLITE3
     // TODO:
 #else
@@ -166,6 +170,7 @@ int ProtoParserPrivate::getFriendRequest(const char *from, const char *to, int s
 
 int ProtoParserPrivate::getUserSetting(int scope, const char *type, std::function<bool(const UserSettingEntry &item)> cb)
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
 #ifdef USE_ALL_SQLITE3
     // TODO:
 #else
@@ -184,6 +189,7 @@ int ProtoParserPrivate::getUserSetting(int scope, const char *type, std::functio
 
 int ProtoParserPrivate::getFriend(std::function<bool(const Friend &item)> cb)
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
 #ifdef USE_ALL_SQLITE3
     // TODO:
 #else
@@ -198,6 +204,7 @@ int ProtoParserPrivate::getFriend(std::function<bool(const Friend &item)> cb)
 
 int ProtoParserPrivate::getFriend(const char *uid, std::function<bool(const Friend &item)> cb)
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
 #ifdef USE_ALL_SQLITE3
     // TODO:
 #else
@@ -217,6 +224,7 @@ int ProtoParserPrivate::getFriend(const char *uid, std::function<bool(const Frie
 
 int ProtoParserPrivate::getUser(const char *uid, const char *gid, std::function<bool(const User &item)> cb)
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
 #ifdef USE_ALL_SQLITE3
     // TODO:
 #else
@@ -233,6 +241,7 @@ int ProtoParserPrivate::getUser(const char *uid, const char *gid, std::function<
 
 int ProtoParserPrivate::getGroup(const char *id, std::function<bool(const GroupInfo &item)> cb)
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
 #ifdef USE_ALL_SQLITE3
     // TODO:
 #else
@@ -249,6 +258,7 @@ int ProtoParserPrivate::getGroup(const char *id, std::function<bool(const GroupI
 
 int ProtoParserPrivate::getGroupMembers(const char *id, std::function<bool(const GroupMember &item)> cb)
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
 #ifdef USE_ALL_SQLITE3
     // TODO:
 #else
@@ -264,6 +274,7 @@ int ProtoParserPrivate::getGroupMembers(const char *id, std::function<bool(const
 
 void ProtoParserPrivate::updateMessage(uint64_t msgId, uint64_t mid, uint64_t ts)
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
 #ifdef USE_SQLITE3
 // TODO:
 #else
@@ -290,6 +301,7 @@ void ProtoParserPrivate::updateMessage(uint64_t msgId, uint64_t mid, uint64_t ts
 }
 void ProtoParserPrivate::recallMessage(uint64_t mid)
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
 #ifdef USE_SQLITE3
 // TODO:
 #else
@@ -316,6 +328,7 @@ void ProtoParserPrivate::recallMessage(uint64_t mid)
 
 void ProtoParserPrivate::sendMessage(Message &msg, uint32_t msgId)
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
     if (msgId == 0)
         return;
 #ifdef USE_SQLITE3
@@ -333,6 +346,7 @@ void ProtoParserPrivate::sendMessage(Message &msg, uint32_t msgId)
 
 int ProtoParserPrivate::getMessage(uint64_t id, std::function<bool(const Message &msg)> cb)
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
 #ifdef USE_SQLITE3
     // TODO:
     return 0;
@@ -351,6 +365,7 @@ int ProtoParserPrivate::getMessage(uint64_t id, std::function<bool(const Message
 
 int ProtoParserPrivate::getMessage(bool before, uint64_t idx, int count, int type, int line, const char *target, std::function<bool(const Message &msg)> cb)
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
     if (count <= 0)
         count = 100;
     int cnt = 0;
@@ -391,23 +406,36 @@ int ProtoParserPrivate::getMessage(bool before, uint64_t idx, int count, int typ
 #else
 
 #ifdef DUMP_MESSAGE
-    MqttLog(LOG_INFO, LOG_TYPE_FUNC, "_messages count: %d", _messages.size());
+    MqttLog(LOG_INFO, LOG_TYPE_FUNC, "all _messages count: %d", _messages.size());
 #endif
 
     if (before)
     {
-        for (auto &msg : _messages)
+        std::list<Message> msgs;
+        for (auto it = _messages.rbegin(); it != _messages.rend(); it++)
         {
+            auto &msg = *it;
+            //MqttLog(LOG_DEBUG, LOG_TYPE_FUNC, "check messageid: %lld", msg.message_id());
             if (msg.conversation().type() == type && msg.conversation().line() == line && strcmp(msg.conversation().target().c_str(), target) == 0)
             {
                 if (idx == 0 || msg.message_id() < idx)
                 {
+                    msgs.emplace_back(msg);
                     cnt++;
-                    if (!cb(msg) || cnt >= count)
+                    //MqttLog(LOG_DEBUG, LOG_TYPE_FUNC, "accept messageid: %lld", msg.message_id());
+                    if (cnt >= count)
                         break;
                 }
             }
         }
+        msgs.sort([](const auto &s1, const auto &s2) {
+            return s1.server_timestamp() < s2.server_timestamp();
+        });
+        for (auto &msg : msgs)
+        {
+            if (!cb(msg))
+                break;
+        }
     }
     else
     {
@@ -430,6 +458,7 @@ int ProtoParserPrivate::getMessage(bool before, uint64_t idx, int count, int typ
 
 bool ProtoParserPrivate::isSilent(Conversation &par)
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
 #ifdef USE_ALL_SQLITE3
     // TODO:
 #else
@@ -447,6 +476,7 @@ bool ProtoParserPrivate::isSilent(Conversation &par)
 
 bool ProtoParserPrivate::isTop(Conversation &par)
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
 #ifdef USE_ALL_SQLITE3
     // TODO:
 #else
@@ -464,6 +494,7 @@ bool ProtoParserPrivate::isTop(Conversation &par)
 
 void ProtoParserPrivate::getUnread(Conversation &par, std::string uid, int &unread, int &unreadMention, int &unreadMentionAll)
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
 #ifdef USE_SQLITE3
 #else
     for (auto &it : _messages)
@@ -495,6 +526,7 @@ void ProtoParserPrivate::getUnread(Conversation &par, std::string uid, int &unre
 
 int ProtoParserPrivate::getConversation(const char *keyword, const char *types, const char *lines, std::function<bool(const Message &msg)> cb)
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
 #ifdef USE_SQLITE3
     std::string str1        = types;
     str1[0]                 = '(';
@@ -682,6 +714,7 @@ int ProtoParserPrivate::insert(const char *sql, std::function<bool(struct sqlite
 #endif
 int ProtoParserPrivate::searchFriends(const char *keyword, std::function<bool(const Friend &item)> cb)
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
 #ifdef USE_ALL_SQLITE3
 // TODO:
 #else
@@ -701,6 +734,7 @@ int ProtoParserPrivate::searchFriends(const char *keyword, std::function<bool(co
 
 int ProtoParserPrivate::searchGroups(const char *keyword, std::function<bool(const GroupInfo &item)> cb)
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
 #ifdef USE_ALL_SQLITE3
     // TODO:
 #else
@@ -720,6 +754,7 @@ int ProtoParserPrivate::searchGroups(const char *keyword, std::function<bool(con
 
 int ProtoParserPrivate::getUnreadFriendRequestCount(const char *uid)
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
 #ifdef USE_ALL_SQLITE3
 // TODO:
 #else
@@ -735,7 +770,8 @@ int ProtoParserPrivate::getUnreadFriendRequestCount(const char *uid)
 
 int ProtoParserPrivate::update(GetFriendsResult &result)
 {
-    int n = result.entry_size();
+    std::lock_guard<std::recursive_mutex> l(_lock);
+    int                                   n = result.entry_size();
 #ifdef USE_ALL_SQLITE3
     const char *sql = "insert into t_friend(_uid, _state,_dt,_alias,_blacked,_extra) values (?,?,?,?,?,?,?);";
 
@@ -770,7 +806,8 @@ int ProtoParserPrivate::update(GetFriendsResult &result)
 
 int ProtoParserPrivate::update(GetFriendRequestResult &result)
 {
-    int n = result.entry_size();
+    std::lock_guard<std::recursive_mutex> l(_lock);
+    int                                   n = result.entry_size();
 #ifdef USE_ALL_SQLITE3
     const char *sql = "insert into t_friend_request(_from_uid, _to_uid, _reason, _state, _dt, _from_read_status, _to_read_status, _extra) values (?,?,?,?,?,?,?,?);";
 
@@ -798,7 +835,8 @@ int ProtoParserPrivate::update(GetFriendRequestResult &result)
 
 int ProtoParserPrivate::update(GetUserSettingResult &result)
 {
-    int n = result.entry_size();
+    std::lock_guard<std::recursive_mutex> l(_lock);
+    int                                   n = result.entry_size();
 #ifdef USE_ALL_SQLITE3
     const char *sql = "insert into t_user_setting(_scope, _key, _value, _dt) values (?,?,?,?);";
     int         i   = 0;
@@ -838,7 +876,8 @@ int ProtoParserPrivate::update(GetUserSettingResult &result)
 
 int ProtoParserPrivate::update(PullGroupInfoResult &result)
 {
-    int n = result.info_size();
+    std::lock_guard<std::recursive_mutex> l(_lock);
+    int                                   n = result.info_size();
 #ifdef USE_ALL_SQLITE3
 // TODO:
 #else
@@ -865,7 +904,8 @@ int ProtoParserPrivate::update(PullGroupInfoResult &result)
 
 int ProtoParserPrivate::update(PullUserResult &result)
 {
-    int n = result.result_size();
+    std::lock_guard<std::recursive_mutex> l(_lock);
+    int                                   n = result.result_size();
 #ifdef USE_ALL_SQLITE3
 // TODO:
 #else
@@ -894,6 +934,7 @@ int ProtoParserPrivate::update(PullUserResult &result)
 
 int ProtoParserPrivate::update(GroupInfo &result)
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
 #ifdef USE_ALL_SQLITE3
 // TODO:
 #else
@@ -915,7 +956,8 @@ int ProtoParserPrivate::update(GroupInfo &result)
 
 int ProtoParserPrivate::update(PullGroupMemberResult &result)
 {
-    int n = result.member_size();
+    std::lock_guard<std::recursive_mutex> l(_lock);
+    int                                   n = result.member_size();
 #ifdef USE_ALL_SQLITE3
 // TODO:
 #else
@@ -936,6 +978,7 @@ int ProtoParserPrivate::update(PullGroupMemberResult &result)
 #ifndef USE_SQLITE3
 void ProtoParserPrivate::sortMessage(uint64_t *head)
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
     _messages.sort([](const auto &s1, const auto &s2) {
         return s1.server_timestamp() < s2.server_timestamp();
     });
@@ -959,6 +1002,7 @@ void ProtoParserPrivate::sortMessage(uint64_t *head)
 
 int ProtoParserPrivate::update(NotifyMessage &result, std::string &uid, uint64_t &head)
 {
+    std::lock_guard<std::recursive_mutex> l(_lock);
 #ifdef USE_SQLITE3
     // TODO:
 #else
@@ -977,7 +1021,8 @@ int ProtoParserPrivate::update(NotifyMessage &result, std::string &uid, uint64_t
 }
 int ProtoParserPrivate::update(PullMessageResult &result, uint64_t &head)
 {
-    int n = result.message_size();
+    std::lock_guard<std::recursive_mutex> l(_lock);
+    int                                   n = result.message_size();
     if (n == 0)
         return 0;
 #ifdef USE_SQLITE3

+ 2 - 1
src/proto/ProtoParserPrivate.h

@@ -78,5 +78,6 @@ private:
     std::list<Conversation> _conversations;
 #endif
 #endif
-    struct sqlite3 *_db = nullptr;
+    struct sqlite3      *_db = nullptr;
+    std::recursive_mutex _lock;
 };