From 4262ae6c87b4c11f7fe958a91fdd3333a8c5138c Mon Sep 17 00:00:00 2001 From: dec05eba Date: Sun, 1 Nov 2020 23:29:01 +0100 Subject: Matrix: cache sync --- src/plugins/Matrix.cpp | 261 ++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 225 insertions(+), 36 deletions(-) (limited to 'src/plugins') diff --git a/src/plugins/Matrix.cpp b/src/plugins/Matrix.cpp index 22c2447..de33c3c 100644 --- a/src/plugins/Matrix.cpp +++ b/src/plugins/Matrix.cpp @@ -7,6 +7,8 @@ #include #include #include +#include +#include #include #include #include "../../include/QuickMedia.hpp" @@ -230,6 +232,7 @@ namespace QuickMedia { } void MatrixQuickMedia::join_room(RoomData *room) { + std::lock_guard lock(room_body_items_mutex); std::string room_name = room->get_name(); if(room_name.empty()) room_name = room->id; @@ -246,6 +249,7 @@ namespace QuickMedia { } void MatrixQuickMedia::leave_room(RoomData *room, LeaveType leave_type, const std::string &reason) { + std::lock_guard lock(room_body_items_mutex); room_body_item_by_room.erase(room); rooms_page->remove_body_item_by_room_id(room->id); room_tags_page->remove_body_item_by_room_id(room->id); @@ -254,10 +258,12 @@ namespace QuickMedia { } void MatrixQuickMedia::room_add_tag(RoomData *room, const std::string &tag) { + std::lock_guard lock(room_body_items_mutex); room_tags_page->add_room_body_item_to_tag(room_body_item_by_room[room], tag); } void MatrixQuickMedia::room_remove_tag(RoomData *room, const std::string &tag) { + std::lock_guard lock(room_body_items_mutex); room_tags_page->remove_room_body_item_from_tag(room_body_item_by_room[room], tag); } @@ -317,6 +323,16 @@ namespace QuickMedia { update_pending_room_messages(page_type); } + void MatrixQuickMedia::clear_data() { + std::lock_guard lock(pending_room_messages_mutex); + std::lock_guard room_body_lock(room_body_items_mutex); + room_body_item_by_room.clear(); + pending_room_messages.clear(); + rooms_page->clear_data(); + room_tags_page->clear_data(); + invites_page->clear_data(); + } + void MatrixQuickMedia::update_pending_room_messages(MatrixPageType page_type) { std::lock_guard lock(pending_room_messages_mutex); bool is_window_focused = program->is_window_focused(); @@ -405,6 +421,11 @@ namespace QuickMedia { void MatrixRoomsPage::update() { { std::lock_guard lock(mutex); + if(clear_data_on_update) { + clear_data_on_update = false; + body->clear_items(); + } + for(const std::string &room_id : pending_remove_body_items) { remove_body_item_by_url(body->items, room_id); // TODO: There can be a race condition where current_chat_page is set after entering a room and then we will enter a room we left @@ -414,6 +435,7 @@ namespace QuickMedia { current_chat_page = nullptr; } } + pending_remove_body_items.clear(); body->clamp_selection(); body->append_items(std::move(room_body_items)); @@ -459,6 +481,15 @@ namespace QuickMedia { current_chat_page = chat_page; } + void MatrixRoomsPage::clear_data() { + std::lock_guard lock(mutex); + room_body_items.clear(); + pending_remove_body_items.clear(); + clear_data_on_update = true; + if(current_chat_page) + current_chat_page->should_clear_data = true; + } + PluginResult MatrixRoomTagsPage::submit(const std::string &title, const std::string &url, std::vector &result_tabs) { (void)title; std::lock_guard lock(mutex); @@ -477,6 +508,11 @@ namespace QuickMedia { void MatrixRoomTagsPage::update() { { std::lock_guard lock(mutex); + if(clear_data_on_update) { + clear_data_on_update = false; + body->clear_items(); + } + for(auto &it : remove_room_body_items_by_tags) { auto tag_body_it = tag_body_items_by_name.find(it.first); if(tag_body_it == tag_body_items_by_name.end()) @@ -558,6 +594,16 @@ namespace QuickMedia { current_rooms_page = rooms_page; } + void MatrixRoomTagsPage::clear_data() { + std::lock_guard lock(mutex); + tag_body_items_by_name.clear(); + add_room_body_items_by_tags.clear(); + remove_room_body_items_by_tags.clear(); + clear_data_on_update = true; + if(current_rooms_page) + current_rooms_page->clear_data(); + } + MatrixInvitesPage::MatrixInvitesPage(Program *program, Matrix *matrix, Body *body) : Page(program), matrix(matrix), body(body) { } @@ -597,6 +643,11 @@ namespace QuickMedia { void MatrixInvitesPage::update() { std::lock_guard lock(mutex); + if(clear_data_on_update) { + clear_data_on_update = false; + body->clear_items(); + } + for(const std::string &room_id : pending_remove_body_items) { remove_body_item_by_url(body->items, room_id); } @@ -621,6 +672,15 @@ namespace QuickMedia { pending_remove_body_items.push_back(room_id); } + void MatrixInvitesPage::clear_data() { + std::lock_guard lock(mutex); + body_items.clear(); + pending_remove_body_items.clear(); + title = "Invites (0)"; + prev_invite_count = 0; + clear_data_on_update = true; + } + MatrixChatPage::MatrixChatPage(Program *program, std::string room_id, MatrixRoomsPage *rooms_page) : Page(program), room_id(std::move(room_id)), rooms_page(rooms_page) { if(rooms_page) rooms_page->set_current_chat_page(this); @@ -637,6 +697,66 @@ namespace QuickMedia { rooms_page->update(); } + static std::array sync_fail_error_codes = { + "M_FORBIDDEN", + "M_UNKNOWN_TOKEN", + "M_MISSING_TOKEN", + "M_UNAUTHORIZED", + "M_USER_DEACTIVATED", + "M_CAPTCHA_NEEDED", + "M_MISSING_PARAM" + }; + + static void remove_empty_fields_in_sync_rooms_response(rapidjson::Value &rooms_json) { + for(const char *member_name : {"join", "invite", "leave"}) { + auto join_it = rooms_json.FindMember(member_name); + if(join_it != rooms_json.MemberEnd() && join_it->value.IsObject() && join_it->value.MemberCount() == 0) + rooms_json.RemoveMember(join_it); + } + } + + static void remove_ephemeral_field_in_sync_rooms_response(rapidjson::Value &rooms_json) { + auto join_it = rooms_json.FindMember("join"); + if(join_it == rooms_json.MemberEnd() || !join_it->value.IsObject()) + return; + + for(auto &it : join_it->value.GetObject()) { + if(!it.value.IsObject()) + continue; + it.value.RemoveMember("ephemeral"); + } + } + + static void remove_empty_fields_in_sync_account_data_response(rapidjson::Value &account_data_json) { + for(const char *member_name : {"events"}) { + auto join_it = account_data_json.FindMember(member_name); + if(join_it != account_data_json.MemberEnd() && join_it->value.IsObject() && join_it->value.MemberCount() == 0) + account_data_json.RemoveMember(join_it); + } + } + + static void remove_unused_sync_data_fields(rapidjson::Value &json_root) { + for(auto it = json_root.MemberBegin(); it != json_root.MemberEnd();) { + if(strcmp(it->name.GetString(), "account_data") == 0 && it->value.IsObject()) { + remove_empty_fields_in_sync_account_data_response(it->value); + if(it->value.MemberCount() == 0) + it = json_root.RemoveMember(it); + else + ++it; + } else if(strcmp(it->name.GetString(), "rooms") == 0 && it->value.IsObject()) { + // TODO: Call this, but dont remove our read marker (needed for notifications on mentions for example). Or maybe we can get it from "account_data"? + //remove_ephemeral_field_in_sync_rooms_response(rooms_it->value); + remove_empty_fields_in_sync_rooms_response(it->value); + if(it->value.MemberCount() == 0) + it = json_root.RemoveMember(it); + else + ++it; + } else { + it = json_root.EraseMember(it); + } + } + } + void Matrix::start_sync(MatrixDelegate *delegate) { if(sync_running) return; @@ -644,10 +764,31 @@ namespace QuickMedia { assert(!this->delegate); this->delegate = delegate; + Path matrix_cache_dir = get_cache_dir().join("matrix"); + if(create_directory_recursive(matrix_cache_dir) != 0) + fprintf(stderr, "Failed to create matrix cache directory\n"); + matrix_cache_dir.join("sync_data.json"); + sync_running = true; - sync_thread = std::thread([this, delegate]() { + sync_thread = std::thread([this, delegate, matrix_cache_dir]() { + FILE *sync_cache_file = fopen(matrix_cache_dir.data.c_str(), "rb"); + if(sync_cache_file) { + rapidjson::Document doc; + char read_buffer[8192]; + rapidjson::FileReadStream is(sync_cache_file, read_buffer, sizeof(read_buffer)); + while(true) { + rapidjson::ParseResult parse_result = doc.ParseStream(is); + if(parse_result.IsError()) + break; + if(parse_sync_response(doc, delegate) != PluginResult::OK) + fprintf(stderr, "Failed to parse cached sync response\n"); + } + fclose(sync_cache_file); + } + const rapidjson::Value *next_batch_json; PluginResult result; + bool initial_sync = true; while(sync_running) { std::vector additional_args = { { "-H", "Authorization: Bearer " + access_token }, @@ -661,12 +802,32 @@ namespace QuickMedia { snprintf(url, sizeof(url), "%s/_matrix/client/r0/sync?timeout=30000&since=%s", homeserver.c_str(), next_batch.c_str()); rapidjson::Document json_root; - DownloadResult download_result = download_json(json_root, url, std::move(additional_args), true); + std::string err_msg; + DownloadResult download_result = download_json(json_root, url, std::move(additional_args), true, &err_msg); if(download_result != DownloadResult::OK) { - fprintf(stderr, "Fetch response failed\n"); + fprintf(stderr, "/sync failed\n"); + if(initial_sync && json_root.IsObject()) { + const rapidjson::Value &errcode_json = GetMember(json_root, "errcode"); + if(errcode_json.IsString()) { + for(const char *sync_fail_error_code : sync_fail_error_codes) { + if(strcmp(errcode_json.GetString(), sync_fail_error_code) == 0) { + sync_fail_reason = sync_fail_error_code; + const rapidjson::Value &error_json = GetMember(json_root, "error"); + if(error_json.IsString()) + sync_fail_reason = error_json.GetString(); + sync_failed = true; + sync_running = false; + break; + } + } + } + } goto sync_end; } + if(next_batch.empty()) + clear_sync_cache_for_new_sync(); + result = parse_sync_response(json_root, delegate); if(result != PluginResult::OK) { fprintf(stderr, "Failed to parse sync response\n"); @@ -675,15 +836,31 @@ namespace QuickMedia { next_batch_json = &GetMember(json_root, "next_batch"); if(next_batch_json->IsString()) { - next_batch = next_batch_json->GetString(); + set_next_batch(next_batch_json->GetString()); fprintf(stderr, "Matrix: next batch: %s\n", next_batch.c_str()); } else { + set_next_batch("Invalid"); fprintf(stderr, "Matrix: missing next batch\n"); } sync_end: if(sync_running) std::this_thread::sleep_for(std::chrono::seconds(1)); + + if(!json_root.IsObject()) + continue; + + // TODO: Circulate file + FILE *sync_cache_file = fopen(matrix_cache_dir.data.c_str(), initial_sync ? "wb" : "ab"); + initial_sync = false; + if(sync_cache_file) { + char buffer[4096]; + rapidjson::FileWriteStream file_write_stream(sync_cache_file, buffer, sizeof(buffer)); + rapidjson::Writer writer(file_write_stream); + remove_unused_sync_data_fields(json_root); + json_root.Accept(writer); + fclose(sync_cache_file); + } } }); } @@ -694,12 +871,23 @@ namespace QuickMedia { if(sync_thread.joinable()) sync_thread.join(); delegate = nullptr; + sync_failed = false; + sync_fail_reason.clear(); } bool Matrix::is_initial_sync_finished() const { return !next_batch.empty(); } + bool Matrix::did_initial_sync_fail(std::string &err_msg) { + if(sync_failed) { + err_msg = sync_fail_reason; + return true; + } else { + return false; + } + } + void Matrix::get_room_sync_data(RoomData *room, SyncData &sync_data) { room->acquire_room_lock(); auto &room_messages = room->get_messages_thread_unsafe(); @@ -1692,7 +1880,7 @@ namespace QuickMedia { if(from.empty()) { fprintf(stderr, "Info: missing previous batch for room: %s, using /sync next batch\n", room_data->id.c_str()); // TODO: When caching /sync, remember to add lock around getting next_batch! - from = next_batch; + from = get_next_batch(); if(from.empty()) { fprintf(stderr, "Error: missing next batch!\n"); return PluginResult::OK; @@ -2105,22 +2293,13 @@ namespace QuickMedia { auto fetched_message_it = room->fetched_messages_by_event_id.find(event_id); if(fetched_message_it != room->fetched_messages_by_event_id.end()) return fetched_message_it->second; - - rapidjson::Document request_data(rapidjson::kObjectType); - request_data.AddMember("lazy_load_members", true, request_data.GetAllocator()); - - rapidjson::StringBuffer buffer; - rapidjson::Writer writer(buffer); - request_data.Accept(writer); std::vector additional_args = { { "-H", "Authorization: Bearer " + access_token } }; - std::string filter = url_param_encode(buffer.GetString()); - char url[512]; - snprintf(url, sizeof(url), "%s/_matrix/client/r0/rooms/%s/context/%s?limit=0&filter=%s", homeserver.c_str(), room->id.c_str(), event_id.c_str(), filter.c_str()); + snprintf(url, sizeof(url), "%s/_matrix/client/r0/rooms/%s/event/%s", homeserver.c_str(), room->id.c_str(), event_id.c_str()); std::string err_msg; rapidjson::Document json_root; @@ -2136,8 +2315,7 @@ namespace QuickMedia { } } - const rapidjson::Value &event_json = GetMember(json_root, "event"); - std::shared_ptr new_message = parse_message_event(event_json, room); + std::shared_ptr new_message = parse_message_event(json_root, room); room->fetched_messages_by_event_id.insert(std::make_pair(event_id, new_message)); return new_message; } @@ -2282,6 +2460,9 @@ namespace QuickMedia { DownloadResult download_result = download_json(json_root, homeserver + "/_matrix/client/r0/login", std::move(additional_args), true, &err_msg); if(download_result != DownloadResult::OK) return download_result_to_plugin_result(download_result); + Path matrix_sync_data_path = get_cache_dir().join("matrix").join("sync_data.json"); + remove(matrix_sync_data_path.data.c_str()); + if(!json_root.IsObject()) { err_msg = "Failed to parse matrix login response"; return PluginResult::ERR; @@ -2310,7 +2491,6 @@ namespace QuickMedia { json_root.AddMember("homeserver", rapidjson::StringRef(homeserver.c_str()), request_data.GetAllocator()); this->user_id = user_id_json.GetString(); - this->username = extract_user_name_from_user_id(this->user_id); this->access_token = access_token_json.GetString(); this->homeserver = homeserver; @@ -2330,6 +2510,7 @@ namespace QuickMedia { } PluginResult Matrix::logout() { + assert(!sync_running); Path session_path = get_storage_dir().join(SERVICE_NAME).join("session.json"); remove(session_path.data.c_str()); @@ -2345,11 +2526,10 @@ namespace QuickMedia { rooms.clear(); room_data_by_id.clear(); user_id.clear(); - username.clear(); access_token.clear(); homeserver.clear(); upload_limit.reset(); - next_batch.clear(); + set_next_batch(""); invites.clear(); return PluginResult::OK; @@ -2403,7 +2583,7 @@ namespace QuickMedia { return PluginResult::OK; } - PluginResult Matrix::load_and_verify_cached_session() { + PluginResult Matrix::load_cached_session() { Path session_path = get_storage_dir().join(SERVICE_NAME).join("session.json"); std::string session_json_content; if(file_get_content(session_path, session_json_content) != 0) { @@ -2443,20 +2623,7 @@ namespace QuickMedia { std::string access_token = access_token_json.GetString(); std::string homeserver = homeserver_json.GetString(); - std::vector additional_args = { - { "-H", "Authorization: Bearer " + access_token } - }; - - std::string server_response; - // We want to make any request to the server that can verify that our token is still valid, doesn't matter which call - DownloadResult download_result = download_to_string(homeserver + "/_matrix/client/r0/account/whoami", server_response, std::move(additional_args), use_tor, true); - if(download_result != DownloadResult::OK) { - fprintf(stderr, "Matrix whoami response: %s\n", server_response.c_str()); - return download_result_to_plugin_result(download_result); - } - this->user_id = std::move(user_id); - this->username = extract_user_name_from_user_id(this->user_id); this->access_token = std::move(access_token); this->homeserver = std::move(homeserver); return PluginResult::OK; @@ -2582,7 +2749,7 @@ namespace QuickMedia { DownloadResult download_result = download_to_string(homeserver + "/_matrix/client/r0/rooms/" + room_id + "/leave", server_response, std::move(additional_args), use_tor, true); if(download_result == DownloadResult::OK) { RoomData *room = get_room_by_id(room_id); - if(!room) { + if(room) { delegate->leave_room(room, LeaveType::LEAVE, ""); remove_room(room_id); } @@ -2678,9 +2845,31 @@ namespace QuickMedia { return false; } + void Matrix::set_next_batch(std::string new_next_batch) { + std::lock_guard lock(next_batch_mutex); + next_batch = std::move(new_next_batch); + } + + std::string Matrix::get_next_batch() { + std::lock_guard lock(next_batch_mutex); + return next_batch; + } + + void Matrix::clear_sync_cache_for_new_sync() { + std::lock_guard room_data_lock(room_data_mutex); + std::lock_guard invites_lock(invite_mutex); + for(auto &room : rooms) { + room->clear_data(); + } + // We intentionally dont clear |rooms| here because we want the objects inside it to still be valid. TODO: Clear |rooms| here + room_data_by_id.clear(); + invites.clear(); + delegate->clear_data(); + } + DownloadResult Matrix::download_json(rapidjson::Document &result, const std::string &url, std::vector additional_args, bool use_browser_useragent, std::string *err_msg) const { if(download_to_json(url, result, std::move(additional_args), use_tor, use_browser_useragent, err_msg == nullptr) != DownloadResult::OK) { - // Cant get error since we parse directory to json. TODO: Make this work somehow? + // Cant get error since we parse directly to json. TODO: Make this work somehow? if(err_msg) *err_msg = "Failed to download/parse json"; return DownloadResult::NET_ERR; -- cgit v1.2.3