From a55ea42d7f5900bd5fc8fad047040c7865824f33 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Sat, 15 Jun 2019 01:11:51 +0300 Subject: Unbreak things --- matrix/rooms/room.go | 215 +++++++++++++++++++++++--------- matrix/rooms/roomcache.go | 305 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 460 insertions(+), 60 deletions(-) create mode 100644 matrix/rooms/roomcache.go (limited to 'matrix/rooms') diff --git a/matrix/rooms/room.go b/matrix/rooms/room.go index 717636b..87c3780 100644 --- a/matrix/rooms/room.go +++ b/matrix/rooms/room.go @@ -17,6 +17,7 @@ package rooms import ( + "compress/gzip" "encoding/gob" "fmt" "os" @@ -31,17 +32,20 @@ import ( ) func init() { - gob.Register([]interface{}{}) gob.Register(map[string]interface{}{}) + gob.Register([]interface{}{}) + gob.Register(&Room{}) + gob.Register(0) } type RoomNameSource int const ( - ExplicitRoomName RoomNameSource = iota - CanonicalAliasRoomName - AliasRoomName + UnknownRoomName RoomNameSource = iota MemberRoomName + AliasRoomName + CanonicalAliasRoomName + ExplicitRoomName ) // RoomTag is a tag given to a specific room. @@ -60,7 +64,8 @@ type UnreadMessage struct { // Room represents a single Matrix room. type Room struct { - *mautrix.Room + // The room ID. + ID string // Whether or not the user has left the room. HasLeft bool @@ -79,19 +84,22 @@ type Room struct { // Whether or not this room is marked as a direct chat. IsDirect bool - // List of tags given to this room + // List of tags given to this room. RawTags []RoomTag // Timestamp of previously received actual message. LastReceivedMessage time.Time + // Room state cache. + state map[mautrix.EventType]map[string]*mautrix.Event // MXID -> Member cache calculated from membership events. memberCache map[string]*mautrix.Member - // The first non-SessionUserID member in the room. Calculated at + // The first two non-SessionUserID members in the room. Calculated at // the same time as memberCache. - firstMemberCache *mautrix.Member + firstMemberCache *mautrix.Member + secondMemberCache *mautrix.Member // The name of the room. Calculated from the state event name, // canonical_alias or alias or the member cache. - nameCache string + NameCache string // The event type from which the name cache was calculated from. nameCacheSource RoomNameSource // The topic of the room. Directly fetched from the m.room.topic state event. @@ -101,31 +109,98 @@ type Room struct { // The list of aliases. Directly fetched from the m.room.aliases state event. aliasesCache []string + // Path for state store file. + path string + // Room cache object + cache *RoomCache + // Lock for state and other room stuff. lock sync.RWMutex + + // Room state cache linked list. + prev *Room + next *Room + touch int64 } -func (room *Room) Load(path string) error { - file, err := os.OpenFile(path, os.O_RDONLY, 0600) - if err != nil { - return err +func debugPrintError(fn func() error, message string) { + if err := fn(); err != nil { + debug.Printf("%s: %v", message, err) + } +} + +func (room *Room) Loaded() bool { + return room.state != nil +} + +func (room *Room) Load() { + if room.Loaded() { + return } - defer file.Close() - dec := gob.NewDecoder(file) + room.cache.TouchNode(room) room.lock.Lock() - defer room.lock.Unlock() - return dec.Decode(room) + room.load() + room.lock.Unlock() } -func (room *Room) Save(path string) error { - file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600) +func (room *Room) load() { + if room.Loaded() { + return + } + debug.Print("Loading state for room", room.ID) + room.state = make(map[mautrix.EventType]map[string]*mautrix.Event) + file, err := os.OpenFile(room.path, os.O_RDONLY, 0600) if err != nil { - return err + if !os.IsNotExist(err) { + debug.Print("Failed to open room state file for reading:", err) + } else { + debug.Print("Room state file for", room.ID, "does not exist") + } + return } - defer file.Close() - enc := gob.NewEncoder(file) + defer debugPrintError(file.Close, "Failed to close room state file after reading") + cmpReader, err := gzip.NewReader(file) + if err != nil { + debug.Print("Failed to open room state gzip reader:", err) + return + } + defer debugPrintError(cmpReader.Close, "Failed to close room state gzip reader") + dec := gob.NewDecoder(cmpReader) + if err = dec.Decode(&room.state); err != nil { + debug.Print("Failed to decode room state:", err) + } +} + +func (room *Room) Unload() { + debug.Print("Unloading", room.ID) + room.Save() + room.state = nil + room.aliasesCache = nil + room.topicCache = "" + room.canonicalAliasCache = "" + room.firstMemberCache = nil + room.secondMemberCache = nil +} + +func (room *Room) Save() { + if !room.Loaded() { + debug.Print("Failed to save room state: room not loaded") + return + } + debug.Print("Saving state for room", room.ID) + file, err := os.OpenFile(room.path, os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + debug.Print("Failed to open room state file for writing:", err) + return + } + defer debugPrintError(file.Close, "Failed to close room state file after writing") + cmpWriter := gzip.NewWriter(file) + defer debugPrintError(cmpWriter.Close, "Failed to close room state gzip writer") + enc := gob.NewEncoder(cmpWriter) room.lock.RLock() defer room.lock.RUnlock() - return enc.Encode(room) + if err := enc.Encode(&room.state); err != nil { + debug.Print("Failed to encode room state:", err) + } } // MarkRead clears the new message statuses on this room. @@ -220,30 +295,32 @@ func (room *Room) Tags() []RoomTag { // UpdateState updates the room's current state with the given Event. This will clobber events based // on the type/state_key combination. func (room *Room) UpdateState(event *mautrix.Event) { + room.Load() room.lock.Lock() defer room.lock.Unlock() - _, exists := room.State[event.Type] + _, exists := room.state[event.Type] if !exists { - room.State[event.Type] = make(map[string]*mautrix.Event) + room.state[event.Type] = make(map[string]*mautrix.Event) } switch event.Type { case mautrix.StateRoomName: - room.nameCache = "" + room.NameCache = "" case mautrix.StateCanonicalAlias: - if room.nameCacheSource >= CanonicalAliasRoomName { - room.nameCache = "" + if room.nameCacheSource <= CanonicalAliasRoomName { + room.NameCache = "" } room.canonicalAliasCache = "" case mautrix.StateAliases: - if room.nameCacheSource >= AliasRoomName { - room.nameCache = "" + if room.nameCacheSource <= AliasRoomName { + room.NameCache = "" } room.aliasesCache = nil case mautrix.StateMember: room.memberCache = nil room.firstMemberCache = nil - if room.nameCacheSource >= MemberRoomName { - room.nameCache = "" + room.secondMemberCache = nil + if room.nameCacheSource <= MemberRoomName { + room.NameCache = "" } case mautrix.StateTopic: room.topicCache = "" @@ -258,24 +335,25 @@ func (room *Room) UpdateState(event *mautrix.Event) { } if event.StateKey == nil { - room.State[event.Type][""] = event + room.state[event.Type][""] = event } else { - room.State[event.Type][*event.StateKey] = event + room.state[event.Type][*event.StateKey] = event } } // GetStateEvent returns the state event for the given type/state_key combo, or nil. func (room *Room) GetStateEvent(eventType mautrix.EventType, stateKey string) *mautrix.Event { + room.Load() room.lock.RLock() defer room.lock.RUnlock() - stateEventMap, _ := room.State[eventType] + stateEventMap, _ := room.state[eventType] event, _ := stateEventMap[stateKey] return event } // getStateEvents returns the state events for the given type. func (room *Room) getStateEvents(eventType mautrix.EventType) map[string]*mautrix.Event { - stateEventMap, _ := room.State[eventType] + stateEventMap, _ := room.state[eventType] return stateEventMap } @@ -323,7 +401,7 @@ func (room *Room) GetAliases() []string { func (room *Room) updateNameFromNameEvent() { nameEvt := room.GetStateEvent(mautrix.StateRoomName, "") if nameEvt != nil { - room.nameCache = nameEvt.Content.Name + room.NameCache = nameEvt.Content.Name } } @@ -336,7 +414,7 @@ func (room *Room) updateNameFromAliases() { aliases := room.GetAliases() if len(aliases) > 0 { sort.Sort(sort.StringSlice(aliases)) - room.nameCache = aliases[0] + room.NameCache = aliases[0] } } @@ -351,33 +429,40 @@ func (room *Room) updateNameFromAliases() { func (room *Room) updateNameFromMembers() { members := room.GetMembers() if len(members) <= 1 { - room.nameCache = "Empty room" + room.NameCache = "Empty room" } else if room.firstMemberCache == nil { - room.nameCache = "Room" + room.NameCache = "Room" } else if len(members) == 2 { - room.nameCache = room.firstMemberCache.Displayname + room.NameCache = room.firstMemberCache.Displayname + } else if len(members) == 3 && room.secondMemberCache != nil { + room.NameCache = fmt.Sprintf("%s and %s", room.firstMemberCache.Displayname, room.secondMemberCache.Displayname) } else { - firstMember := room.firstMemberCache.Displayname - room.nameCache = fmt.Sprintf("%s and %d others", firstMember, len(members)-2) + members := room.firstMemberCache.Displayname + count := len(members) - 2 + if room.secondMemberCache != nil { + members += ", " + room.secondMemberCache.Displayname + count-- + } + room.NameCache = fmt.Sprintf("%s and %d others", members, count) } } // updateNameCache updates the room display name based on the room state in the order // specified in spec section 11.2.2.5. func (room *Room) updateNameCache() { - if len(room.nameCache) == 0 { + if len(room.NameCache) == 0 { room.updateNameFromNameEvent() room.nameCacheSource = ExplicitRoomName } - if len(room.nameCache) == 0 { - room.nameCache = room.GetCanonicalAlias() + if len(room.NameCache) == 0 { + room.NameCache = room.GetCanonicalAlias() room.nameCacheSource = CanonicalAliasRoomName } - if len(room.nameCache) == 0 { + if len(room.NameCache) == 0 { room.updateNameFromAliases() room.nameCacheSource = AliasRoomName } - if len(room.nameCache) == 0 { + if len(room.NameCache) == 0 { room.updateNameFromMembers() room.nameCacheSource = MemberRoomName } @@ -389,15 +474,19 @@ func (room *Room) updateNameCache() { // If the cache is empty, it is updated first. func (room *Room) GetTitle() string { room.updateNameCache() - return room.nameCache + return room.NameCache } // createMemberCache caches all member events into a easily processable MXID -> *Member map. func (room *Room) createMemberCache() map[string]*mautrix.Member { + if len(room.memberCache) > 0 { + return room.memberCache + } cache := make(map[string]*mautrix.Member) room.lock.RLock() events := room.getStateEvents(mautrix.StateMember) room.firstMemberCache = nil + room.secondMemberCache = nil if events != nil { for userID, event := range events { member := &event.Content.Member @@ -405,8 +494,12 @@ func (room *Room) createMemberCache() map[string]*mautrix.Member { if len(member.Displayname) == 0 { member.Displayname = userID } - if room.firstMemberCache == nil && userID != room.SessionUserID { - room.firstMemberCache = member + if userID != room.SessionUserID { + if room.firstMemberCache == nil { + room.firstMemberCache = member + } else if room.secondMemberCache == nil { + room.secondMemberCache = member + } } if member.Membership == mautrix.MembershipJoin || member.Membership == mautrix.MembershipInvite { cache[userID] = member @@ -425,18 +518,16 @@ func (room *Room) createMemberCache() map[string]*mautrix.Member { // The members are returned from the cache. // If the cache is empty, it is updated first. func (room *Room) GetMembers() map[string]*mautrix.Member { - if len(room.memberCache) == 0 || room.firstMemberCache == nil { - room.createMemberCache() - } + room.Load() + room.createMemberCache() return room.memberCache } // GetMember returns the member with the given MXID. // If the member doesn't exist, nil is returned. func (room *Room) GetMember(userID string) *mautrix.Member { - if len(room.memberCache) == 0 { - room.createMemberCache() - } + room.Load() + room.createMemberCache() room.lock.RLock() member, _ := room.memberCache[userID] room.lock.RUnlock() @@ -449,9 +540,13 @@ func (room *Room) GetSessionOwner() string { } // NewRoom creates a new Room with the given ID -func NewRoom(roomID, owner string) *Room { +func NewRoom(roomID string, cache *RoomCache) *Room { return &Room{ - Room: mautrix.NewRoom(roomID), - SessionUserID: owner, + ID: roomID, + state: make(map[mautrix.EventType]map[string]*mautrix.Event), + path: cache.roomPath(roomID), + cache: cache, + + SessionUserID: cache.getOwner(), } } diff --git a/matrix/rooms/roomcache.go b/matrix/rooms/roomcache.go new file mode 100644 index 0000000..03e3ad8 --- /dev/null +++ b/matrix/rooms/roomcache.go @@ -0,0 +1,305 @@ +// gomuks - A terminal Matrix client written in Go. +// Copyright (C) 2019 Tulir Asokan +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package rooms + +import ( + "compress/gzip" + "encoding/gob" + "os" + "path/filepath" + "time" + + "github.com/pkg/errors" + sync "github.com/sasha-s/go-deadlock" + + "maunium.net/go/gomuks/debug" +) + +// RoomCache contains room state info in a hashmap and linked list. +type RoomCache struct { + sync.Mutex + + listPath string + directory string + maxSize int + maxAge int64 + getOwner func() string + + Map map[string]*Room + head *Room + tail *Room + size int +} + +func NewRoomCache(listPath, directory string, maxSize int, maxAge int64, getOwner func() string) *RoomCache { + return &RoomCache{ + listPath: listPath, + directory: directory, + maxSize: maxSize, + maxAge: maxAge, + getOwner: getOwner, + + Map: make(map[string]*Room), + } +} + +func (cache *RoomCache) LoadList() error { + cache.Lock() + defer cache.Unlock() + + // Open room list file + file, err := os.OpenFile(cache.listPath, os.O_RDONLY, 0600) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return errors.Wrap(err, "failed to open room list file for reading") + } + defer debugPrintError(file.Close, "Failed to close room list file after reading") + + // Open gzip reader for room list file + cmpReader, err := gzip.NewReader(file) + if err != nil { + return errors.Wrap(err, "failed to read gzip room list") + } + defer debugPrintError(cmpReader.Close, "Failed to close room list gzip reader") + + // Open gob decoder for gzip reader + dec := gob.NewDecoder(cmpReader) + // Read number of items in list + var size int + err = dec.Decode(&size) + if err != nil { + return errors.Wrap(err, "failed to read size of room list") + } + + // Read list + cache.Map = make(map[string]*Room, size) + for i := 0; i < size; i++ { + room := &Room{} + err = dec.Decode(room) + if err != nil { + debug.Printf("Failed to decode %dth room list entry: %v", i+1, err) + continue + } + room.path = cache.roomPath(room.ID) + room.cache = cache + cache.Map[room.ID] = room + } + return nil +} + +func (cache *RoomCache) SaveLoadedRooms() { + cache.Lock() + defer cache.Unlock() + cache.clean() + for node := cache.head; node != nil; node = node.prev { + node.Save() + } +} + +func (cache *RoomCache) SaveList() error { + cache.Lock() + defer cache.Unlock() + + debug.Print("Saving room list...") + // Open room list file + file, err := os.OpenFile(cache.listPath, os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + return errors.Wrap(err, "failed to open room list file for writing") + } + defer debugPrintError(file.Close, "Failed to close room list file after writing") + + // Open gzip writer for room list file + cmpWriter := gzip.NewWriter(file) + defer debugPrintError(cmpWriter.Close, "Failed to close room list gzip writer") + + // Open gob encoder for gzip writer + enc := gob.NewEncoder(cmpWriter) + // Write number of items in list + err = enc.Encode(len(cache.Map)) + if err != nil { + return errors.Wrap(err, "failed to write size of room list") + } + + // Write list + for _, node := range cache.Map { + err = enc.Encode(node) + if err != nil { + debug.Printf("Failed to encode room list entry of %s: %v", node.ID, err) + } + } + debug.Print("Room list saved to", cache.listPath, len(cache.Map), cache.size) + return nil +} + +func (cache *RoomCache) Touch(roomID string) { + cache.Lock() + node, ok := cache.Map[roomID] + if !ok || node == nil { + cache.Unlock() + return + } + cache.touch(node) + cache.Unlock() +} + +func (cache *RoomCache) TouchNode(node *Room) { + cache.Lock() + cache.touch(node) + cache.Unlock() +} + +func (cache *RoomCache) touch(node *Room) { + if node == cache.head { + return + } + debug.Print("Touching", node.ID) + cache.llPop(node) + cache.llPush(node) + node.touch = time.Now().Unix() +} + +func (cache *RoomCache) Get(roomID string) *Room { + cache.Lock() + node := cache.get(roomID) + cache.Unlock() + return node +} + +func (cache *RoomCache) GetOrCreate(roomID string) *Room { + cache.Lock() + node := cache.get(roomID) + if node == nil { + node = cache.newRoom(roomID) + cache.llPush(node) + } + cache.Unlock() + return node +} + +func (cache *RoomCache) get(roomID string) *Room { + node, ok := cache.Map[roomID] + if ok && node != nil && node.Loaded() { + cache.touch(node) + return node + } + return nil +} +func (cache *RoomCache) Put(room *Room) { + cache.Lock() + node := cache.get(room.ID) + if node != nil { + cache.touch(node) + } else { + cache.Map[room.ID] = room + if room.Loaded() { + cache.llPush(room) + } + node = room + } + cache.Unlock() + node.Save() +} + +func (cache *RoomCache) roomPath(roomID string) string { + return filepath.Join(cache.directory, roomID+".gob.gz") +} + +func (cache *RoomCache) Load(roomID string) *Room { + cache.Lock() + defer cache.Unlock() + node, ok := cache.Map[roomID] + if ok { + return node + } + + node = NewRoom(roomID, cache) + node.Load() + return node +} + +func (cache *RoomCache) llPop(node *Room) { + if node.prev == nil && node.next == nil { + return + } + if node.prev != nil { + node.prev.next = node.next + } + if node.next != nil { + node.next.prev = node.prev + } + if node == cache.tail { + cache.tail = node.next + } + if node == cache.head { + cache.head = node.prev + } + node.next = nil + node.prev = nil + cache.size-- +} + +func (cache *RoomCache) llPush(node *Room) { + if node.next != nil || node.prev != nil { + debug.PrintStack() + debug.Print("Tried to llPush node that is already in stack") + return + } + if node == cache.head { + return + } + if cache.head != nil { + cache.head.next = node + } + node.prev = cache.head + node.next = nil + cache.head = node + if cache.tail == nil { + cache.tail = node + } + cache.size++ + cache.clean() +} + +func (cache *RoomCache) clean() { + origSize := cache.size + maxTS := time.Now().Unix() - cache.maxAge + for cache.size > cache.maxSize { + if cache.tail.touch > maxTS { + break + } + cache.tail.Unload() + cache.llPop(cache.tail) + } + if cleaned := origSize - cache.size; cleaned > 0 { + debug.Print("Cleaned", cleaned, "rooms") + } +} + +func (cache *RoomCache) Unload(node *Room) { + cache.Lock() + defer cache.Unlock() + cache.llPop(node) + node.Unload() +} + +func (cache *RoomCache) newRoom(roomID string) *Room { + node := NewRoom(roomID, cache) + cache.Map[node.ID] = node + return node +} -- cgit v1.2.3