From c16bb75c8890bbeb7d375beb110224ec0f14b115 Mon Sep 17 00:00:00 2001 From: dec05eba Date: Sat, 10 Dec 2022 19:39:08 +0100 Subject: Matrix: re-add sync from latest cache --- include/QuickMedia.hpp | 1 + plugins/Matrix.hpp | 34 ++-- plugins/utils/UniqueProcess.hpp | 7 + src/QuickMedia.cpp | 90 +++------- src/plugins/Matrix.cpp | 316 ++++++++++++------------------------ src/plugins/utils/UniqueProcess.cpp | 111 +++++++++++++ 6 files changed, 256 insertions(+), 303 deletions(-) create mode 100644 plugins/utils/UniqueProcess.hpp create mode 100644 src/plugins/utils/UniqueProcess.cpp diff --git a/include/QuickMedia.hpp b/include/QuickMedia.hpp index 0980b76..4edea6e 100644 --- a/include/QuickMedia.hpp +++ b/include/QuickMedia.hpp @@ -180,6 +180,7 @@ namespace QuickMedia { Display *disp; mgl::Window window; Matrix *matrix = nullptr; + bool matrix_instance_already_running = false; bool is_login_sync = false; mgl::vec2i window_size; const char *plugin_name = nullptr; diff --git a/plugins/Matrix.hpp b/plugins/Matrix.hpp index c1ee3bd..e0e789e 100644 --- a/plugins/Matrix.hpp +++ b/plugins/Matrix.hpp @@ -144,7 +144,6 @@ namespace QuickMedia { mgl::vec2i thumbnail_size; // Set to {0, 0} if not specified RelatedEventType related_event_type = RelatedEventType::NONE; bool notification_mentions_me = false; - bool cache = false; bool body_is_formatted = false; std::string transaction_id; time_t timestamp = 0; // In milliseconds @@ -309,13 +308,13 @@ namespace QuickMedia { virtual ~MatrixDelegate() = default; virtual void join_room(RoomData *room) = 0; - virtual void leave_room(RoomData *room, LeaveType leave_type, const std::string &reason, bool is_cache) = 0; + virtual void leave_room(RoomData *room, LeaveType leave_type, const std::string &reason) = 0; // Note: calling |room| methods inside this function is not allowed virtual void room_add_tag(RoomData *room, const std::string &tag) = 0; // Note: calling |room| methods inside this function is not allowed virtual void room_remove_tag(RoomData *room, const std::string &tag) = 0; - virtual void room_add_new_messages(RoomData *room, const Messages &messages, bool is_initial_sync, bool sync_is_cache, MessageDirection message_dir) = 0; + virtual void room_add_new_messages(RoomData *room, const Messages &messages, bool is_initial_sync, MessageDirection message_dir) = 0; virtual void add_invite(const std::string &room_id, const Invite &invite) = 0; virtual void remove_invite(const std::string &room_id) = 0; @@ -326,8 +325,6 @@ namespace QuickMedia { virtual void remove_user(MatrixEventUserInfo user_info) = 0; virtual void set_user_info(MatrixEventUserInfo user_info) = 0; virtual void set_room_info(MatrixEventRoomInfo room_info) = 0; - - virtual void clear_data() = 0; }; class Matrix; @@ -344,10 +341,10 @@ namespace QuickMedia { MatrixQuickMedia(Program *program, Matrix *matrix, MatrixRoomsPage *rooms_page, MatrixRoomTagsPage *room_tags_page, MatrixInvitesPage *invites_page, MatrixNotificationsPage *notifications_page); void join_room(RoomData *room) override; - void leave_room(RoomData *room, LeaveType leave_type, const std::string &reason, bool is_cache) override; + void leave_room(RoomData *room, LeaveType leave_type, const std::string &reason) override; void room_add_tag(RoomData *room, const std::string &tag) override; void room_remove_tag(RoomData *room, const std::string &tag) override; - void room_add_new_messages(RoomData *room, const Messages &messages, bool is_initial_sync, bool sync_is_cache, MessageDirection message_dir) override; + void room_add_new_messages(RoomData *room, const Messages &messages, bool is_initial_sync, MessageDirection message_dir) override; void add_invite(const std::string &room_id, const Invite &invite) override; void remove_invite(const std::string &room_id) override; @@ -362,8 +359,6 @@ namespace QuickMedia { void set_room_as_read(RoomData *room); - void clear_data() override; - Program *program; Matrix *matrix; MatrixChatPage *chat_page; @@ -372,7 +367,7 @@ namespace QuickMedia { MatrixInvitesPage *invites_page; MatrixNotificationsPage *notifications_page; private: - void update_room_description(RoomData *room, const Messages &new_messages, bool is_initial_sync, bool sync_is_cache); + void update_room_description(RoomData *room, const Messages &new_messages, bool is_initial_sync); private: std::map> room_body_item_by_room; std::map> last_message_by_room; @@ -401,8 +396,6 @@ namespace QuickMedia { void clear_search(); - void clear_data(); - MatrixQuickMedia *matrix_delegate = nullptr; Body *body = nullptr; private: @@ -428,8 +421,6 @@ namespace QuickMedia { void set_current_rooms_page(MatrixRoomsPage *rooms_page); - void clear_data(); - MatrixQuickMedia *matrix_delegate = nullptr; private: struct TagData { @@ -453,8 +444,6 @@ namespace QuickMedia { void add_body_item(std::shared_ptr body_item); void remove_body_item_by_room_id(const std::string &room_id); - - void clear_data(); private: Matrix *matrix; Body *body; @@ -570,7 +559,6 @@ namespace QuickMedia { const std::string room_id; MatrixRoomsPage *rooms_page = nullptr; - bool should_clear_data = false; Body *chat_body = nullptr; bool messages_tab_visible = false; @@ -654,6 +642,7 @@ namespace QuickMedia { class Matrix { public: + Matrix(bool matrix_instance_already_running); // TODO: Make this return the Matrix object instead, to force users to call start_sync bool start_sync(MatrixDelegate *delegate, bool &cached); void stop_sync(); @@ -762,16 +751,16 @@ namespace QuickMedia { PluginResult set_qm_last_read_message_timestamp(RoomData *room, int64_t timestamp); void load_qm_read_markers_from_account_data(); - PluginResult parse_sync_response(const rapidjson::Document &root, bool is_additional_messages_sync, bool initial_sync); + PluginResult parse_sync_response(const rapidjson::Document &root, bool initial_sync); PluginResult parse_notifications(const rapidjson::Value ¬ifications_json, std::function callback_func); PluginResult parse_sync_account_data(const rapidjson::Value &account_data_json); - PluginResult parse_sync_room_data(const rapidjson::Value &rooms_json, bool is_additional_messages_sync, bool initial_sync); + PluginResult parse_sync_room_data(const rapidjson::Value &rooms_json, bool initial_sync); void parse_custom_emoji(const rapidjson::Value &custom_emoji_json); void load_custom_emoji_from_cache(); PluginResult get_previous_room_messages(RoomData *room_data, bool latest_messages, size_t &num_new_messages, bool *reached_end = nullptr); void events_add_user_info(const rapidjson::Value &events_json, RoomData *room_data, int64_t timestamp); std::shared_ptr parse_user_info(const rapidjson::Value &json, const std::string &user_id, RoomData *room_data, int64_t timestamp); - void events_set_user_read_marker(const rapidjson::Value &events_json, RoomData *room_data, std::shared_ptr &me, bool is_additional_messages_sync); + void events_set_user_read_marker(const rapidjson::Value &events_json, RoomData *room_data, std::shared_ptr &me); // Returns the number of messages added size_t events_add_messages(const rapidjson::Value &events_json, RoomData *room_data, MessageDirection message_dir, bool has_unread_notifications); void events_set_room_info(const rapidjson::Value &events_json, RoomData *room_data, int64_t timestamp); @@ -793,7 +782,6 @@ namespace QuickMedia { std::string get_next_batch(); void set_next_notifications_token(std::string new_next_token); std::string get_next_notifications_token(); - void clear_sync_cache_for_new_sync(); std::shared_ptr get_user_by_id(RoomData *room, const std::string &user_id, bool *is_new_user = nullptr, bool create_if_not_found = true); std::string get_filter_cached(); void load_silenced_invites(); @@ -813,6 +801,7 @@ namespace QuickMedia { std::string next_notifications_token; std::mutex next_batch_mutex; bool initial_sync_finished = false; + bool matrix_instance_already_running = false; std::unordered_map invites; std::mutex invite_mutex; @@ -822,12 +811,9 @@ namespace QuickMedia { std::mutex notifications_mutex; std::thread sync_thread; - std::thread sync_additional_messages_thread; std::thread notification_thread; - MessageQueue additional_messages_queue; bool sync_running = false; bool sync_failed = false; - bool sync_is_cache = false; bool finished_fetching_notifications = false; std::string sync_fail_reason; MatrixDelegate *delegate = nullptr; diff --git a/plugins/utils/UniqueProcess.hpp b/plugins/utils/UniqueProcess.hpp new file mode 100644 index 0000000..f85704f --- /dev/null +++ b/plugins/utils/UniqueProcess.hpp @@ -0,0 +1,7 @@ +#pragma once + +namespace QuickMedia { + bool is_quickmedia_instance_already_running(const char *sock_file_dir, const char *plugin_name); + bool set_quickmedia_instance_unique(const char *sock_file_dir, const char *plugin_name); + void remove_quickmedia_instance_lock(const char *sock_file_dir, const char *plugin_name); +} \ No newline at end of file diff --git a/src/QuickMedia.cpp b/src/QuickMedia.cpp index 2c40580..a0324c0 100644 --- a/src/QuickMedia.cpp +++ b/src/QuickMedia.cpp @@ -39,6 +39,7 @@ #include "../include/Downloader.hpp" #include "../include/Storage.hpp" #include "../include/AsyncImageLoader.hpp" +#include "../plugins/utils/UniqueProcess.hpp" #include #include "../include/gui/Button.hpp" #include "../external/hash-library/sha256.h" @@ -407,7 +408,7 @@ namespace QuickMedia { video_max_height = 0; std::vector tabs; const char *url = nullptr; - std::string program_path = dirname(argv[0]); + std::string program_path = Path(argv[0]).parent().data; std::string instance; std::string download_filename; bool no_dialog = false; @@ -633,7 +634,7 @@ namespace QuickMedia { chat_login_page(); } after_matrix_login_page(); - return exit_code; + goto done; } page_loop(tabs, start_tab_index); @@ -650,6 +651,10 @@ namespace QuickMedia { } } + done: + if(plugin_name && strcmp(plugin_name, "matrix") == 0) + remove_quickmedia_instance_lock(get_cache_dir().join("matrix").data.c_str(), "matrix"); + return exit_code; } @@ -1327,7 +1332,7 @@ namespace QuickMedia { page_stack.push(current_page); current_page = PageType::IMAGE_BOARD_THREAD; image_board_thread_page(thread_page.get(), body.get()); - exit(0); + exit(exit_code); } else { auto boards_page = std::make_unique(this, resources_root); FourchanBoardsPage *boards_page_ptr = boards_page.get(); @@ -1424,11 +1429,23 @@ namespace QuickMedia { tabs.push_back(Tab{create_body(false, true), std::make_unique(this), create_search_bar("Search...", 500)}); } else if(strcmp(plugin_name, "matrix") == 0) { assert(!matrix); + if(create_directory_recursive(get_cache_dir().join("matrix").join("events")) != 0) { show_notification("QuickMedia", "Failed to create events cache directory", Urgency::CRITICAL); abort(); } - matrix = new Matrix(); + + if(is_quickmedia_instance_already_running(get_cache_dir().join("matrix").data.c_str(), "matrix")) { + matrix_instance_already_running = true; + } else { + matrix_instance_already_running = false; + if(!set_quickmedia_instance_unique(get_cache_dir().join("matrix").data.c_str(), "matrix")) { + show_notification("QuickMedia", "Failed to set quickmedia process as unique", Urgency::CRITICAL); + exit(exit_code); + } + } + + matrix = new Matrix(matrix_instance_already_running); } else { assert(false); } @@ -2100,7 +2117,7 @@ namespace QuickMedia { show_notification("QuickMedia", "Initial matrix sync failed, error: " + err_msg, Urgency::CRITICAL); matrix->logout(); delete matrix; - matrix = new Matrix(); + matrix = new Matrix(matrix_instance_already_running); current_page = PageType::CHAT_LOGIN; chat_login_page(); after_matrix_login_page(); @@ -7447,7 +7464,7 @@ namespace QuickMedia { matrix->stop_sync(); matrix->logout(); delete matrix; - matrix = new Matrix(); + matrix = new Matrix(matrix_instance_already_running); // TODO: Instead of doing this, exit this current function and navigate to chat login page instead. //delete current_plugin; //current_plugin = new Matrix(); @@ -7853,7 +7870,7 @@ namespace QuickMedia { show_notification("QuickMedia", "Initial matrix sync failed, error: " + err_msg, Urgency::CRITICAL); matrix->logout(); delete matrix; - matrix = new Matrix(); + matrix = new Matrix(matrix_instance_already_running); current_page = PageType::CHAT_LOGIN; chat_login_page(); after_matrix_login_page(); @@ -7872,63 +7889,6 @@ namespace QuickMedia { tabs[selected_tab].body->on_bottom_reached(); } - if(matrix_chat_page->should_clear_data) { - matrix_chat_page->should_clear_data = false; - - std::string err_msg; - while(!matrix->is_initial_sync_finished()) { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - if(matrix->did_initial_sync_fail(err_msg)) { - matrix_chat_page->set_current_room(nullptr, nullptr, nullptr); - fetch_messages_future.cancel(); - cleanup_tasks(); - tabs.clear(); - unreferenced_events.clear(); - unresolved_reactions.clear(); - all_messages.clear(); - show_notification("QuickMedia", "Initial matrix sync failed, error: " + err_msg, Urgency::CRITICAL); - matrix->logout(); - delete matrix; - matrix = new Matrix(); - current_page = PageType::CHAT_LOGIN; - chat_login_page(); - after_matrix_login_page(); - window.close(); - goto chat_page_end; - } - } - - //all_messages.clear(); - - tabs[MESSAGES_TAB_INDEX].body->clear_items(); - - Messages all_messages_new; - matrix->get_all_synced_room_messages(current_room, all_messages_new); - for(auto &message : all_messages_new) { - fetched_messages_set.insert(message->event_id); - } - all_messages.insert(all_messages.end(), all_messages_new.begin(), all_messages_new.end()); - //me = matrix->get_me(current_room); - filter_provisional_messages(all_messages_new); - add_new_messages_to_current_room(all_messages_new); - modify_related_messages_in_current_room(all_messages_new); - unresolved_reactions.clear(); - after_token.clear(); - before_token.clear(), - fetched_enough_messages_top = false; - fetched_enough_messages_bottom = false; - fetch_messages_future.cancel(); - process_reactions(all_messages_new); - if(current_room->initial_prev_messages_fetch) { - current_room->initial_prev_messages_fetch = false; - tabs[MESSAGES_TAB_INDEX].body->select_last_item(); - } - - std::vector pinned_events; - matrix->get_all_pinned_events(current_room, pinned_events); - process_pinned_events(std::move(pinned_events)); - } - if(go_to_previous_page) { go_to_previous_page = false; goto chat_page_end; @@ -8014,7 +7974,7 @@ namespace QuickMedia { matrix->stop_sync(); if(go_to_login_page) { delete matrix; - matrix = new Matrix(); + matrix = new Matrix(matrix_instance_already_running); current_page = PageType::CHAT_LOGIN; chat_login_page(); after_matrix_login_page(); diff --git a/src/plugins/Matrix.cpp b/src/plugins/Matrix.cpp index 9bbf9bd..63027ab 100644 --- a/src/plugins/Matrix.cpp +++ b/src/plugins/Matrix.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include "../../include/QuickMedia.hpp" #include @@ -38,7 +39,6 @@ namespace QuickMedia { // then we cant see room message preview. TODO: Fix this somehow. // TODO: What about state events in initial sync in timeline? such as user display name change. static const char* INITIAL_FILTER = "{\"presence\":{\"limit\":0,\"types\":[\"\"]},\"account_data\":{\"types\":[\"qm.emoji\",\"m.direct\"]},\"room\":{\"state\":{\"not_types\":[\"m.room.related_groups\",\"m.room.power_levels\",\"m.room.join_rules\",\"m.room.history_visibility\",\"m.room.canonical_alias\",\"m.space.child\"],\"lazy_load_members\":true},\"timeline\":{\"types\":[\"m.room.message\"],\"limit\":1,\"lazy_load_members\":true},\"ephemeral\":{\"limit\":0,\"types\":[\"\"],\"lazy_load_members\":true},\"account_data\":{\"limit\":1,\"types\":[\"m.fully_read\",\"m.tag\",\"qm.last_read_message_timestamp\"],\"lazy_load_members\":true}}}"; - static const char* ADDITIONAL_MESSAGES_FILTER = "{\"presence\":{\"types\":[\"\"]},\"account_data\":{\"limit\":0,\"types\":[\"\"]},\"room\":{\"state\":{\"not_types\":[\"m.room.related_groups\",\"m.room.power_levels\",\"m.room.join_rules\",\"m.room.history_visibility\",\"m.room.canonical_alias\",\"m.space.child\"],\"lazy_load_members\":true},\"timeline\":{\"limit\":20,\"lazy_load_members\":true},\"ephemeral\":{\"limit\":0,\"types\":[\"\"],\"lazy_load_members\":true},\"account_data\":{\"limit\":0,\"types\":[\"\"],\"lazy_load_members\":true}}}"; static const char* CONTINUE_FILTER = "{\"presence\":{\"limit\":0,\"types\":[\"\"]},\"account_data\":{\"types\":[\"qm.emoji\",\"m.direct\"]},\"room\":{\"state\":{\"not_types\":[\"m.room.related_groups\",\"m.room.power_levels\",\"m.room.join_rules\",\"m.room.history_visibility\",\"m.room.canonical_alias\",\"m.space.child\"],\"lazy_load_members\":true},\"timeline\":{\"lazy_load_members\":true},\"ephemeral\":{\"limit\":0,\"types\":[\"\"],\"lazy_load_members\":true},\"account_data\":{\"types\":[\"m.fully_read\",\"m.tag\",\"qm.last_read_message_timestamp\"],\"lazy_load_members\":true}}}"; static bool is_gpg_installed = false; @@ -581,18 +581,6 @@ namespace QuickMedia { //fetched_messages_by_event_id.clear(); //userdata = nullptr; //user_info_by_user_id.clear(); - size_t i = 0; - for(auto it = messages.begin(); it != messages.end();) { - if((*it)->cache) { - message_by_event_id.erase((*it)->event_id); - it = messages.erase(it); - if(i <= messages_read_index) - --messages_read_index; - } else { - ++it; - } - ++i; - } //messages.clear(); //messages_read_index = 0; //message_by_event_id.clear(); @@ -629,11 +617,11 @@ namespace QuickMedia { rooms_page->add_body_item(body_item); } - void MatrixQuickMedia::leave_room(RoomData *room, LeaveType leave_type, const std::string &reason, bool is_cache) { + void MatrixQuickMedia::leave_room(RoomData *room, LeaveType leave_type, const std::string &reason) { room_body_item_by_room.erase(room); rooms_page->remove_body_item_by_room_id(room->id); room_tags_page->remove_body_item_by_room_id(room->id); - if(!is_cache && leave_type != LeaveType::LEAVE) + if(leave_type != LeaveType::LEAVE) show_notification("QuickMedia", reason); } @@ -651,11 +639,11 @@ namespace QuickMedia { room_tags_page->remove_room_body_item_from_tag(it->second, tag); } - void MatrixQuickMedia::room_add_new_messages(RoomData *room, const Messages &messages, bool is_initial_sync, bool sync_is_cache, MessageDirection message_dir) { + void MatrixQuickMedia::room_add_new_messages(RoomData *room, const Messages &messages, bool is_initial_sync, MessageDirection message_dir) { bool is_window_focused = program->is_window_focused(); RoomData *current_room = program->get_current_chat_room(); - if(!sync_is_cache && message_dir == MessageDirection::AFTER) { + if(message_dir == MessageDirection::AFTER) { for(auto &message : messages) { if(message->notification_mentions_me) { std::string body = remove_reply_formatting(matrix, message->body); @@ -679,7 +667,7 @@ namespace QuickMedia { } } - update_room_description(room, messages, is_initial_sync, sync_is_cache); + update_room_description(room, messages, is_initial_sync); } void MatrixQuickMedia::add_invite(const std::string &room_id, const Invite &invite) { @@ -802,15 +790,6 @@ namespace QuickMedia { body->set_selected_item(found_item_index, false); } - void MatrixQuickMedia::clear_data() { - //room_body_item_by_room.clear(); - //pending_room_messages.clear(); - //rooms_page->clear_data(); - //room_tags_page->clear_data(); - invites_page->clear_data(); - //unread_notifications.clear(); - } - static std::shared_ptr get_last_message_by_timestamp(const Messages &messages) { if(messages.empty()) return nullptr; @@ -870,7 +849,7 @@ namespace QuickMedia { return !get_config().matrix.gpg_user_id.empty() && text.find("-----BEGIN PGP MESSAGE-----") != std::string::npos && text.find("-----END PGP MESSAGE-----") != std::string::npos; } - void MatrixQuickMedia::update_room_description(RoomData *room, const Messages &new_messages, bool is_initial_sync, bool sync_is_cache) { + void MatrixQuickMedia::update_room_description(RoomData *room, const Messages &new_messages, bool is_initial_sync) { time_t read_marker_message_timestamp = 0; std::shared_ptr me = matrix->get_me(room); std::string my_user_read_marker; @@ -915,7 +894,7 @@ namespace QuickMedia { if(!room->body_item) return; - if(!sync_is_cache && last_unread_message) { + if(last_unread_message) { bool is_window_focused = program->is_window_focused(); RoomData *current_room = program->get_current_chat_room(); Body *chat_body = chat_page ? chat_page->chat_body : nullptr; @@ -1038,12 +1017,6 @@ namespace QuickMedia { body->select_first_item(); } - void MatrixRoomsPage::clear_data() { - body->clear_items(); - if(current_chat_page) - current_chat_page->should_clear_data = true; - } - PluginResult MatrixRoomTagsPage::submit(const SubmitArgs &args, std::vector &result_tabs) { auto body = create_body(true); Body *body_ptr = body.get(); @@ -1131,13 +1104,6 @@ namespace QuickMedia { current_rooms_page = rooms_page; } - void MatrixRoomTagsPage::clear_data() { - tag_body_items_by_name.clear(); - body->clear_items(); - if(current_rooms_page) - current_rooms_page->clear_data(); - } - MatrixInvitesPage::MatrixInvitesPage(Program *program, Matrix *matrix, Body *body) : Page(program), matrix(matrix), body(body) { } @@ -1198,12 +1164,6 @@ namespace QuickMedia { } } - void MatrixInvitesPage::clear_data() { - body->clear_items(); - prev_invite_count = 0; - title = "Invites (0)"; - } - PluginResult MatrixSettingsPage::submit(const SubmitArgs &args, std::vector &result_tabs) { if(args.url == "join") { result_tabs.push_back(Tab{create_body(), std::make_unique(program, matrix), create_search_bar("Enter room id...", SEARCH_DELAY_FILTER)}); @@ -1677,43 +1637,7 @@ namespace QuickMedia { "M_MISSING_PARAM" }; - static void remove_empty_fields_in_sync_rooms_response(rapidjson::Value &rooms_json) { - for(const char *member_name : {"join", "invite", "leave"}) { - auto join_it = rooms_json.FindMember(member_name); - if(join_it != rooms_json.MemberEnd() && join_it->value.IsObject() && join_it->value.MemberCount() == 0) - rooms_json.RemoveMember(join_it); - } - } - - static void remove_empty_fields_in_sync_account_data_response(rapidjson::Value &account_data_json) { - for(const char *member_name : {"events"}) { - auto join_it = account_data_json.FindMember(member_name); - if(join_it != account_data_json.MemberEnd() && join_it->value.IsObject() && join_it->value.MemberCount() == 0) - account_data_json.RemoveMember(join_it); - } - } - - static void remove_unused_sync_data_fields(rapidjson::Value &json_root) { - for(auto it = json_root.MemberBegin(); it != json_root.MemberEnd();) { - if(strcmp(it->name.GetString(), "account_data") == 0 && it->value.IsObject()) { - remove_empty_fields_in_sync_account_data_response(it->value); - if(it->value.MemberCount() == 0) - it = json_root.RemoveMember(it); - else - ++it; - } else if(strcmp(it->name.GetString(), "rooms") == 0 && it->value.IsObject()) { - // TODO: Call this, but dont remove our read marker (needed for notifications on mentions for example). Or maybe we can get it from "account_data"? - //remove_ephemeral_field_in_sync_rooms_response(rooms_it->value); - remove_empty_fields_in_sync_rooms_response(it->value); - if(it->value.MemberCount() == 0) - it = json_root.RemoveMember(it); - else - ++it; - } else { - it = json_root.EraseMember(it); - } - } - } + Matrix::Matrix(bool matrix_instance_already_running) : matrix_instance_already_running(matrix_instance_already_running) {} bool Matrix::start_sync(MatrixDelegate *delegate, bool &cached) { cached = true; @@ -1734,31 +1658,39 @@ namespace QuickMedia { cached = (get_file_type(matrix_cache_dir) == FileType::REGULAR); - sync_is_cache = false; sync_running = true; load_silenced_invites(); load_custom_emoji_from_cache(); sync_thread = std::thread([this, matrix_cache_dir]() { - sync_is_cache = true; - FILE *sync_cache_file = fopen(matrix_cache_dir.data.c_str(), "rb"); - if(sync_cache_file) { + FILE *sync_cache_file; + const rapidjson::Value *next_batch_json = nullptr; + + load_qm_read_markers_from_account_data(); // TODO: Remove when https://github.com/matrix-org/synapse/issues/14444 is fixed, if ever. + + std::ifstream sync_cache_file_stream; + sync_cache_file_stream.open(matrix_cache_dir.data.c_str(), std::ifstream::in | std::ifstream::binary); + if(sync_cache_file_stream.good() && get_file_type(get_cache_dir().join("matrix").join("updated-cache-version1")) == FileType::REGULAR) { rapidjson::Document doc; - char read_buffer[8192]; - rapidjson::FileReadStream is(sync_cache_file, read_buffer, sizeof(read_buffer)); - while(true) { - rapidjson::ParseResult parse_result = doc.ParseStream(is); + std::string line; + while(std::getline(sync_cache_file_stream, line)) { + rapidjson::ParseResult parse_result = doc.Parse(line.c_str(), line.size()); if(parse_result.IsError()) - break; - if(parse_sync_response(doc, false, false) != PluginResult::OK) + continue; // This should NEVER happen. Do initial sync if it does and remove cache? :( TODO + + if(parse_sync_response(doc, false) != PluginResult::OK) fprintf(stderr, "Failed to parse cached sync response\n"); + + next_batch_json = &GetMember(doc, "next_batch"); + if(next_batch_json->IsString()) { + set_next_batch(next_batch_json->GetString()); + //fprintf(stderr, "Matrix: next batch: %s\n", next_batch.c_str()); + } } - fclose(sync_cache_file); + malloc_trim(0); } - sync_is_cache = false; - - load_qm_read_markers_from_account_data(); // TODO: Remove when https://github.com/matrix-org/synapse/issues/14444 is fixed, if ever. + sync_cache_file_stream.close(); // Filter with account data // {"presence":{"limit":0,"types":[""]},"account_data":{"not_types":["im.vector.setting.breadcrumbs","m.push_rules","im.vector.setting.allowed_widgets","io.element.recent_emoji"]},"room":{"state":{"limit":1,"not_types":["m.room.related_groups","m.room.power_levels","m.room.join_rules","m.room.history_visibility"],"lazy_load_members":true},"timeline":{"limit":3,"lazy_load_members":true},"ephemeral":{"limit":0,"types":[""],"lazy_load_members":true},"account_data":{"limit":1,"types":["m.fully_read"],"lazy_load_members":true}}} @@ -1777,16 +1709,35 @@ namespace QuickMedia { else filter = FILTER; #endif - std::string filter_encoded = url_param_encode(INITIAL_FILTER); - std::vector additional_args = { { "-H", "Authorization: Bearer " + access_token }, { "-m", "35" } }; - const rapidjson::Value *next_batch_json; + next_batch_json = nullptr; PluginResult result; - bool initial_sync = true; + bool initial_sync = next_batch.empty(); + bool first_sync = true; + + notification_thread = std::thread([this]() { + get_previous_notifications([this](const MatrixNotification ¬ification) { + if(notification.read) + return; + + MatrixDelegate *delegate = this->delegate; + ui_thread_tasks.push([delegate, notification] { + delegate->add_unread_notification(std::move(notification)); + }); + }); + finished_fetching_notifications = true; + }); + + std::string filter_encoded; + if(initial_sync) + filter_encoded = url_param_encode(INITIAL_FILTER); + else + filter_encoded = url_param_encode(CONTINUE_FILTER); + while(sync_running) { char url[2048]; if(next_batch.empty()) @@ -1821,10 +1772,7 @@ namespace QuickMedia { } } - if(next_batch.empty()) - clear_sync_cache_for_new_sync(); - - result = parse_sync_response(json_root, false, initial_sync); + result = parse_sync_response(json_root, initial_sync); if(result != PluginResult::OK) { fprintf(stderr, "Failed to parse sync response\n"); initial_sync = false; @@ -1842,50 +1790,9 @@ namespace QuickMedia { goto sync_end; } - if(initial_sync) { - notification_thread = std::thread([this]() { - get_previous_notifications([this](const MatrixNotification ¬ification) { - if(notification.read) - return; - - MatrixDelegate *delegate = this->delegate; - ui_thread_tasks.push([delegate, notification] { - delegate->add_unread_notification(std::move(notification)); - }); - }); - - finished_fetching_notifications = true; - - { - std::vector additional_args = { - { "-H", "Authorization: Bearer " + access_token }, - { "-m", "35" } - }; - - char url[1024]; - std::string filter_encoded = url_param_encode(ADDITIONAL_MESSAGES_FILTER); - snprintf(url, sizeof(url), "%s/_matrix/client/r0/sync?filter=%s&timeout=0", homeserver.c_str(), filter_encoded.c_str()); - - rapidjson::Document json_root; - std::string err_msg; - DownloadResult download_result = download_json(json_root, url, additional_args, true, &err_msg); - if(download_result != DownloadResult::OK) { - fprintf(stderr, "/sync for additional messages failed\n"); - return; - } - - // TODO: Test? - //if(next_batch.empty()) - // clear_sync_cache_for_new_sync(); - - additional_messages_queue.pop_wait(); - parse_sync_response(json_root, true, false); - } - }); - + if(first_sync) { + first_sync = false; filter_encoded = url_param_encode(CONTINUE_FILTER); - additional_messages_queue.push(true); - malloc_trim(0); } #if 0 @@ -1895,18 +1802,33 @@ namespace QuickMedia { } #endif - // TODO: Circulate file - sync_cache_file = fopen(matrix_cache_dir.data.c_str(), initial_sync ? "wb" : "ab"); - initial_sync = false; - if(sync_cache_file) { - if(json_root.IsObject()) { - char buffer[4096]; - rapidjson::FileWriteStream file_write_stream(sync_cache_file, buffer, sizeof(buffer)); - rapidjson::Writer writer(file_write_stream); - remove_unused_sync_data_fields(json_root); - json_root.Accept(writer); + // TODO: Use a NoSQL database. + // TODO: Remove very old cache. + // TODO: Find a way to remove this? this makes sync work like other clients but we dont want that! + // If the last sync was long ago then it has to sync ALL messages again. Maybe check if sync file + // is XX days old and then ignore it? + // TODO: Remove this matrix_instance_already_running check when the matrix sync is moved to a daemon. + // Then the daemon will do the sync and matrix processes will ask that daemon for the cached data + // and fetch previous messages etc themselves. + if(!matrix_instance_already_running) { + sync_cache_file = fopen(matrix_cache_dir.data.c_str(), initial_sync ? "wb" : "ab"); + initial_sync = false; + if(sync_cache_file) { + if(json_root.IsObject()) { + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + json_root.Accept(writer); + + std::string json_data(buffer.GetString(), buffer.GetSize()); + string_replace_all(json_data, '\n', ' '); + json_data += '\n'; + + fwrite(json_data.data(), 1, json_data.size(), sync_cache_file); + file_overwrite(get_cache_dir().join("matrix").join("updated-cache-version1"), "1"); // To make sure the cache format is up to date + malloc_trim(0); + } + fclose(sync_cache_file); } - fclose(sync_cache_file); } sync_end: @@ -1925,13 +1847,6 @@ namespace QuickMedia { program_kill_in_thread(sync_thread.get_id()); sync_thread.join(); } - - if(sync_additional_messages_thread.joinable()) { - program_kill_in_thread(sync_additional_messages_thread.get_id()); - additional_messages_queue.close(); - sync_additional_messages_thread.join(); - additional_messages_queue.restart(); - } if(notification_thread.joinable()) { program_kill_in_thread(notification_thread.get_id()); @@ -2113,12 +2028,12 @@ namespace QuickMedia { } } - PluginResult Matrix::parse_sync_response(const rapidjson::Document &root, bool is_additional_messages_sync, bool initial_sync) { + PluginResult Matrix::parse_sync_response(const rapidjson::Document &root, bool initial_sync) { if(!root.IsObject()) return PluginResult::ERR; const rapidjson::Value &rooms_json = GetMember(root, "rooms"); - parse_sync_room_data(rooms_json, is_additional_messages_sync, initial_sync); + parse_sync_room_data(rooms_json, initial_sync); const rapidjson::Value &account_data_json = GetMember(root, "account_data"); parse_sync_account_data(account_data_json); @@ -2300,7 +2215,7 @@ namespace QuickMedia { return PluginResult::OK; } - PluginResult Matrix::parse_sync_room_data(const rapidjson::Value &rooms_json, bool is_additional_messages_sync, bool initial_sync) { + PluginResult Matrix::parse_sync_room_data(const rapidjson::Value &rooms_json, bool initial_sync) { if(!rooms_json.IsObject()) return PluginResult::OK; @@ -2342,16 +2257,9 @@ namespace QuickMedia { const rapidjson::Value &timeline_json = GetMember(it.value, "timeline"); if(timeline_json.IsObject()) { - if(is_additional_messages_sync) { - // This may be non-existent if this is the first event in the room - const rapidjson::Value &prev_batch_json = GetMember(timeline_json, "prev_batch"); - if(prev_batch_json.IsString()) - room->set_prev_batch(prev_batch_json.GetString()); - } - bool has_unread_notifications = false; const rapidjson::Value &unread_notification_json = GetMember(it.value, "unread_notifications"); - if(unread_notification_json.IsObject() && !is_additional_messages_sync && !sync_is_cache) { + if(unread_notification_json.IsObject()) { const rapidjson::Value &highlight_count_json = GetMember(unread_notification_json, "highlight_count"); if(highlight_count_json.IsInt64() && (highlight_count_json.GetInt64() > 0 || initial_sync)) { room->unread_notification_count = highlight_count_json.GetInt64(); @@ -2368,7 +2276,7 @@ namespace QuickMedia { if(account_data_json.IsObject()) { const rapidjson::Value &events_json = GetMember(account_data_json, "events"); auto me = get_me(room); - events_set_user_read_marker(events_json, room, me, is_additional_messages_sync); + events_set_user_read_marker(events_json, room, me); } if(is_new_room) @@ -2381,7 +2289,7 @@ namespace QuickMedia { if(account_data_json.IsObject()) { const rapidjson::Value &events_json = GetMember(account_data_json, "events"); auto me = get_me(room); - events_set_user_read_marker(events_json, room, me, is_additional_messages_sync); + events_set_user_read_marker(events_json, room, me); } if(is_new_room) @@ -2410,20 +2318,18 @@ namespace QuickMedia { } } - if(!is_additional_messages_sync) { - const rapidjson::Value &leave_json = GetMember(rooms_json, "leave"); - remove_rooms(leave_json); + const rapidjson::Value &leave_json = GetMember(rooms_json, "leave"); + remove_rooms(leave_json); - const rapidjson::Value &invite_json = GetMember(rooms_json, "invite"); - add_invites(invite_json); - } + const rapidjson::Value &invite_json = GetMember(rooms_json, "invite"); + add_invites(invite_json); if(initial_sync) { std::lock_guard lock(room_data_mutex); for(auto &room : rooms) { if(existing_rooms.find(room.get()) == existing_rooms.end()) { RoomData *room_p = room.get(); - ui_thread_tasks.push([this, room_p]{ delegate->leave_room(room_p, LeaveType::LEAVE, "", true); }); + ui_thread_tasks.push([this, room_p]{ delegate->leave_room(room_p, LeaveType::LEAVE, ""); }); remove_room(room->id); } } @@ -2549,7 +2455,7 @@ namespace QuickMedia { return user_info; } - void Matrix::events_set_user_read_marker(const rapidjson::Value &events_json, RoomData *room_data, std::shared_ptr &me, bool is_additional_messages_sync) { + void Matrix::events_set_user_read_marker(const rapidjson::Value &events_json, RoomData *room_data, std::shared_ptr &me) { assert(me); // TODO: Remove read marker from user and set it for the room instead. We need that in the matrix pages also if(!events_json.IsArray() || !me) return; @@ -2571,8 +2477,7 @@ namespace QuickMedia { if(!event_id_json.IsString()) continue; - if(!sync_is_cache && !is_additional_messages_sync) - room_data->set_user_read_marker(me, std::string(event_id_json.GetString(), event_id_json.GetStringLength())); + room_data->set_user_read_marker(me, std::string(event_id_json.GetString(), event_id_json.GetStringLength())); } else if(strcmp(type_json.GetString(), "qm.last_read_message_timestamp") == 0) { // TODO: Remove qm.last_read_message_timestamp in room level eventually when everybody has data in global level const rapidjson::Value &content_json = GetMember(event_json, "content"); if(!content_json.IsObject()) @@ -2711,7 +2616,7 @@ namespace QuickMedia { new_messages.push_back(std::move(message)); room_data->append_messages(new_messages); ui_thread_tasks.push([this, room_data, new_messages{std::move(new_messages)}]{ - delegate->room_add_new_messages(room_data, new_messages, false, false, MessageDirection::AFTER); + delegate->room_add_new_messages(room_data, new_messages, false, MessageDirection::AFTER); }); } @@ -2726,10 +2631,8 @@ namespace QuickMedia { for(const rapidjson::Value &event_item_json : events_json.GetArray()) { std::shared_ptr new_message = parse_message_event(event_item_json, room_data); - if(new_message) { - new_message->cache = sync_is_cache; + if(new_message) new_messages.push_back(std::move(new_message)); - } } if(new_messages.empty()) @@ -2766,10 +2669,9 @@ namespace QuickMedia { } } - bool cache_sync = sync_is_cache; bool is_initial_sync = next_batch.empty(); - ui_thread_tasks.push([this, room_data, cache_sync, new_messages{std::move(new_messages)}, is_initial_sync, message_dir]{ - delegate->room_add_new_messages(room_data, new_messages, is_initial_sync, cache_sync, message_dir); + ui_thread_tasks.push([this, room_data, new_messages{std::move(new_messages)}, is_initial_sync, message_dir]{ + delegate->room_add_new_messages(room_data, new_messages, is_initial_sync, message_dir); }); return num_new_messages; @@ -3775,7 +3677,6 @@ namespace QuickMedia { invite.room_avatar_url = room->get_avatar_url(); invite.invited_by = invited_by; invite.timestamp = timestamp; - invite.new_invite = !sync_is_cache; std::string room_id_str(room_id.GetString(), room_id.GetStringLength()); if(set_invite(room_id_str, invite)) @@ -3861,8 +3762,7 @@ namespace QuickMedia { if(!reason_str.empty()) desc += ", reason: " + reason_str; - const bool is_cache = sync_is_cache; - ui_thread_tasks.push([this, room, leave_type, desc{std::move(desc)}, is_cache]{ delegate->leave_room(room, leave_type, desc, is_cache); }); + ui_thread_tasks.push([this, room, leave_type, desc{std::move(desc)}]{ delegate->leave_room(room, leave_type, desc); }); remove_room(room_id_str); break; } @@ -5523,7 +5423,7 @@ namespace QuickMedia { if(download_result == DownloadResult::OK) { RoomData *room = get_room_by_id(room_id); if(room) { - ui_thread_tasks.push([this, room]{ delegate->leave_room(room, LeaveType::LEAVE, "", false); }); + ui_thread_tasks.push([this, room]{ delegate->leave_room(room, LeaveType::LEAVE, ""); }); remove_room(room_id); } } @@ -5887,18 +5787,6 @@ namespace QuickMedia { return next_notifications_token; } - void Matrix::clear_sync_cache_for_new_sync() { - std::lock_guard room_data_lock(room_data_mutex); - std::lock_guard invites_lock(invite_mutex); - for(auto &room : rooms) { - room->clear_data(); - } - // We intentionally dont clear |rooms| here because we want the objects inside it to still be valid. TODO: Clear |rooms| here - //room_data_by_id.clear(); - invites.clear(); - ui_thread_tasks.push([this]{ delegate->clear_data(); }); - } - std::shared_ptr Matrix::get_user_by_id(RoomData *room, const std::string &user_id, bool *is_new_user, bool create_if_not_found) { auto user = room->get_user_by_id(user_id); if(user) { diff --git a/src/plugins/utils/UniqueProcess.cpp b/src/plugins/utils/UniqueProcess.cpp new file mode 100644 index 0000000..d2025f5 --- /dev/null +++ b/src/plugins/utils/UniqueProcess.cpp @@ -0,0 +1,111 @@ +#include "../../../plugins/utils/UniqueProcess.hpp" +#include "../../../include/Storage.hpp" +#include +#include +#include +#include +#include +#include +#include + +namespace QuickMedia { + bool is_quickmedia_instance_already_running(const char *sock_file_dir, const char *plugin_name) { + char sock_file[PATH_MAX]; + snprintf(sock_file, sizeof(sock_file), "%s/quickmedia.%s.sock", sock_file_dir, plugin_name); + + std::string resolved_path; + if(file_get_content(sock_file, resolved_path) != 0) + return false; + + resolved_path.resize(108); // sizeof(addr.sun_path) is 108 + + int fd = socket(AF_UNIX, SOCK_STREAM, 0); + if(fd == -1) { + fprintf(stderr, "Error: failed to create unix domain socket, error: %s\n", strerror(errno)); + return true; + } + + fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); + + struct sockaddr_un addr; + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strcpy(addr.sun_path, resolved_path.c_str()); + + bool running = connect(fd, (struct sockaddr*)&addr, sizeof(addr)) == 0; + int err = errno; + if(err == EAGAIN) + running = true; + else if(err == ENOENT) + running = false; + close(fd); + return running; + } + + bool set_quickmedia_instance_unique(const char *sock_file_dir, const char *plugin_name) { + char socket_file[] = "/tmp/quickmedia.XXXXXX"; + int tmp_file_fd = mkstemp(socket_file); + if(tmp_file_fd == -1) { + fprintf(stderr, "Error: failed to create temporary file for unix domain socket, error: %s\n", strerror(errno)); + return false; + } + unlink(socket_file); + close(tmp_file_fd); + + int fd = socket(AF_UNIX, SOCK_STREAM, 0); + if(fd == -1) { + fprintf(stderr, "Error: failed to create unix domain socket, error: %s\n", strerror(errno)); + return false; + } + + struct sockaddr_un addr; + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strcpy(addr.sun_path, socket_file); + + if(bind(fd, (struct sockaddr*)&addr, sizeof(addr)) == -1) { + fprintf(stderr, "Error: failed to bind unix domain socket, error: %s\n", strerror(errno)); + unlink(socket_file); + close(fd); + return false; + } + + if(listen(fd, 0) == -1) { + fprintf(stderr, "Error: failed to listen to unix domain socket, error: %s\n", strerror(errno)); + unlink(socket_file); + close(fd); + return false; + } + + char sock_file[PATH_MAX]; + snprintf(sock_file, sizeof(sock_file), "%s/quickmedia.%s.sock", sock_file_dir, plugin_name); + bool success = file_overwrite(sock_file, socket_file) == 0; + if(!success) { + fprintf(stderr, "Error: failed to create %s unix domain socket link file\n", sock_file); + unlink(socket_file); + close(fd); + } + return success; + } + + void remove_quickmedia_instance_lock(const char *sock_file_dir, const char *plugin_name) { + char sock_file[PATH_MAX]; + snprintf(sock_file, sizeof(sock_file), "%s/quickmedia.%s.sock", sock_file_dir, plugin_name); + + std::string resolved_path; + if(file_get_content(sock_file, resolved_path) != 0) { + unlink(sock_file); + return; + } + + resolved_path.resize(108); // sizeof(addr.sun_path) is 108 + + if(resolved_path.size() < 4 || memcmp(resolved_path.data(), "/tmp", 4) != 0) { + unlink(sock_file); + return; + } + + unlink(sock_file); + unlink(resolved_path.c_str()); + } +} \ No newline at end of file -- cgit v1.2.3