diff options
Diffstat (limited to 'matrix')
-rw-r--r-- | matrix/history.go | 25 | ||||
-rw-r--r-- | matrix/matrix.go | 121 | ||||
-rw-r--r-- | matrix/rooms/room.go | 316 | ||||
-rw-r--r-- | matrix/rooms/roomcache.go | 319 | ||||
-rw-r--r-- | matrix/sync.go | 6 |
5 files changed, 680 insertions, 107 deletions
diff --git a/matrix/history.go b/matrix/history.go index 767cace..7275c15 100644 --- a/matrix/history.go +++ b/matrix/history.go @@ -18,6 +18,7 @@ package matrix import ( "bytes" + "compress/gzip" "encoding/binary" "encoding/gob" @@ -28,6 +29,10 @@ import ( "maunium.net/go/mautrix" ) +func init() { + gob.Register(&mautrix.Event{}) +} + type HistoryManager struct { sync.Mutex @@ -226,13 +231,27 @@ func btoi(b []byte) uint64 { func marshalEvent(event *mautrix.Event) ([]byte, error) { var buf bytes.Buffer - err := gob.NewEncoder(&buf).Encode(event) - return buf.Bytes(), err + enc := gzip.NewWriter(&buf) + if err := gob.NewEncoder(enc).Encode(event); err != nil { + _ = enc.Close() + return nil, err + } else if err := enc.Close(); err != nil { + return nil, err + } + return buf.Bytes(), nil } func unmarshalEvent(data []byte) (*mautrix.Event, error) { event := &mautrix.Event{} - return event, gob.NewDecoder(bytes.NewReader(data)).Decode(event) + if cmpReader, err := gzip.NewReader(bytes.NewReader(data)); err != nil { + return nil, err + } else if err := gob.NewDecoder(cmpReader).Decode(event); err != nil { + _ = cmpReader.Close() + return nil, err + } else if err := cmpReader.Close(); err != nil { + return nil, err + } + return event, nil } func put(streams, eventIDs *bolt.Bucket, event *mautrix.Event, key uint64) error { diff --git a/matrix/matrix.go b/matrix/matrix.go index ef272b0..cfd5614 100644 --- a/matrix/matrix.go +++ b/matrix/matrix.go @@ -29,7 +29,9 @@ import ( "path" "path/filepath" "regexp" + "runtime" "time" + dbg "runtime/debug" "maunium.net/go/mautrix" "maunium.net/go/mautrix/format" @@ -204,6 +206,9 @@ func (c *Container) OnLogin() { debug.Print("Initializing syncer") c.syncer = NewGomuksSyncer(c.config) c.syncer.OnEventType(mautrix.EventMessage, c.HandleMessage) + // Just pass encrypted events as messages, they'll show up with an encryption unsupported message. + c.syncer.OnEventType(mautrix.EventEncrypted, c.HandleMessage) + c.syncer.OnEventType(mautrix.EventSticker, c.HandleMessage) c.syncer.OnEventType(mautrix.StateAliases, c.HandleMessage) c.syncer.OnEventType(mautrix.StateCanonicalAlias, c.HandleMessage) c.syncer.OnEventType(mautrix.StateTopic, c.HandleMessage) @@ -218,9 +223,22 @@ func (c *Container) OnLogin() { c.syncer.InitDoneCallback = func() { debug.Print("Initial sync done") c.config.AuthCache.InitialSyncDone = true - c.config.SaveAuthCache() - c.ui.MainView().InitialSyncDone() + debug.Print("Updating title caches") + for _, room := range c.config.Rooms.Map { + room.GetTitle() + } + debug.Print("Cleaning cached rooms from memory") + c.config.Rooms.ForceClean() + debug.Print("Saving all data") + c.config.SaveAll() + debug.Print("Adding rooms to UI") + c.ui.MainView().SetRooms(c.config.Rooms) c.ui.Render() + // The initial sync can be a bit heavy, so we force run the GC here + // after cleaning up rooms from memory above. + debug.Print("Running GC") + runtime.GC() + dbg.FreeOSMemory() } c.client.Syncer = c.syncer @@ -274,7 +292,9 @@ func (c *Container) HandlePreferences(source EventSource, evt *mautrix.Event) { return } debug.Print("Updated preferences:", orig, "->", c.config.Preferences) - c.ui.HandleNewPreferences() + if c.config.AuthCache.InitialSyncDone { + c.ui.HandleNewPreferences() + } } func (c *Container) SendPreferencesToMatrix() { @@ -289,9 +309,24 @@ func (c *Container) SendPreferencesToMatrix() { // HandleMessage is the event handler for the m.room.message timeline event. func (c *Container) HandleMessage(source EventSource, evt *mautrix.Event) { - if source&EventSourceLeave != 0 || source&EventSourceState != 0 { + room := c.GetOrCreateRoom(evt.RoomID) + if source&EventSourceLeave != 0 { + room.HasLeft = true + return + } else if source&EventSourceState != 0 { + return + } + + err := c.history.Append(room, []*mautrix.Event{evt}) + if err != nil { + debug.Printf("Failed to add event %s to history: %v", evt.ID, err) + } + + if !c.config.AuthCache.InitialSyncDone { + room.LastReceivedMessage = time.Unix(evt.Timestamp/1000, evt.Timestamp%1000*1000) return } + mainView := c.ui.MainView() roomView := mainView.GetRoom(evt.RoomID) @@ -300,23 +335,29 @@ func (c *Container) HandleMessage(source EventSource, evt *mautrix.Event) { return } - err := c.history.Append(roomView.MxRoom(), []*mautrix.Event{evt}) - if err != nil { - debug.Printf("Failed to add event %s to history: %v", evt.ID, err) + if !room.Loaded() { + pushRules := c.PushRules().GetActions(room, evt).Should() + shouldNotify := pushRules.Notify || !pushRules.NotifySpecified + if !shouldNotify { + room.LastReceivedMessage = time.Unix(evt.Timestamp/1000, evt.Timestamp%1000*1000) + room.AddUnread(evt.ID, shouldNotify, pushRules.Highlight) + mainView.Bump(room) + return + } } // TODO switch to roomView.AddEvent message := roomView.ParseEvent(evt) if message != nil { roomView.AddMessage(message) - roomView.MxRoom().LastReceivedMessage = message.Timestamp() + roomView.MxRoom().LastReceivedMessage = message.Time() if c.syncer.FirstSyncDone { pushRules := c.PushRules().GetActions(roomView.MxRoom(), evt).Should() mainView.NotifyMessage(roomView.MxRoom(), message, pushRules) c.ui.Render() } } else { - debug.Printf("Parsing event %s type %s %v from %s in %s failed (ParseEvent() returned nil).", evt.ID, evt.Type, evt.Content.Raw, evt.Sender, evt.RoomID) + debug.Printf("Parsing event %s type %s %v from %s in %s failed (ParseEvent() returned nil).", evt.ID, evt.Type.String(), evt.Content.Raw, evt.Sender, evt.RoomID) } } @@ -324,6 +365,9 @@ func (c *Container) HandleMessage(source EventSource, evt *mautrix.Event) { func (c *Container) HandleMembership(source EventSource, evt *mautrix.Event) { isLeave := source&EventSourceLeave != 0 isTimeline := source&EventSourceTimeline != 0 + if isLeave { + c.GetOrCreateRoom(evt.RoomID).HasLeft = true + } isNonTimelineLeave := isLeave && !isTimeline if !c.config.AuthCache.InitialSyncDone && isNonTimelineLeave { return @@ -350,11 +394,16 @@ func (c *Container) processOwnMembershipChange(evt *mautrix.Event) { room := c.GetRoom(evt.RoomID) switch membership { case "join": - c.ui.MainView().AddRoom(room) + if c.config.AuthCache.InitialSyncDone { + c.ui.MainView().AddRoom(room) + } room.HasLeft = false case "leave": - c.ui.MainView().RemoveRoom(room) + if c.config.AuthCache.InitialSyncDone { + c.ui.MainView().RemoveRoom(room) + } room.HasLeft = true + room.Unload() case "invite": // TODO handle debug.Printf("%s invited the user to %s", evt.Sender, evt.RoomID) @@ -399,8 +448,12 @@ func (c *Container) HandleReadReceipt(source EventSource, evt *mautrix.Event) { } room := c.GetRoom(evt.RoomID) - room.MarkRead(lastReadEvent) - c.ui.Render() + if room != nil { + room.MarkRead(lastReadEvent) + if c.config.AuthCache.InitialSyncDone { + c.ui.Render() + } + } } func (c *Container) parseDirectChatInfo(evt *mautrix.Event) map[*rooms.Room]bool { @@ -417,7 +470,7 @@ func (c *Container) parseDirectChatInfo(evt *mautrix.Event) map[*rooms.Room]bool continue } - room := c.GetRoom(roomID) + room := c.GetOrCreateRoom(roomID) if room != nil && !room.HasLeft { directChats[room] = true } @@ -428,11 +481,13 @@ func (c *Container) parseDirectChatInfo(evt *mautrix.Event) map[*rooms.Room]bool func (c *Container) HandleDirectChatInfo(source EventSource, evt *mautrix.Event) { directChats := c.parseDirectChatInfo(evt) - for _, room := range c.config.Rooms { + for _, room := range c.config.Rooms.Map { shouldBeDirect := directChats[room] if shouldBeDirect != room.IsDirect { room.IsDirect = shouldBeDirect - c.ui.MainView().UpdateTags(room) + if c.config.AuthCache.InitialSyncDone { + c.ui.MainView().UpdateTags(room) + } } } } @@ -451,7 +506,7 @@ func (c *Container) HandlePushRules(source EventSource, evt *mautrix.Event) { // HandleTag is the event handler for the m.tag account data event. func (c *Container) HandleTag(source EventSource, evt *mautrix.Event) { - room := c.config.GetRoom(evt.RoomID) + room := c.GetOrCreateRoom(evt.RoomID) newTags := make([]rooms.RoomTag, len(evt.Content.RoomTags)) index := 0 @@ -466,14 +521,19 @@ func (c *Container) HandleTag(source EventSource, evt *mautrix.Event) { } index++ } - - mainView := c.ui.MainView() room.RawTags = newTags - mainView.UpdateTags(room) + + if c.config.AuthCache.InitialSyncDone { + mainView := c.ui.MainView() + mainView.UpdateTags(room) + } } // HandleTyping is the event handler for the m.typing event. func (c *Container) HandleTyping(source EventSource, evt *mautrix.Event) { + if !c.config.AuthCache.InitialSyncDone { + return + } c.ui.MainView().SetTyping(evt.RoomID, evt.Content.TypingUserIDs) } @@ -544,7 +604,7 @@ func (c *Container) CreateRoom(req *mautrix.ReqCreateRoom) (*rooms.Room, error) if err != nil { return nil, err } - room := c.GetRoom(resp.RoomID) + room := c.GetOrCreateRoom(resp.RoomID) return room, nil } @@ -557,7 +617,6 @@ func (c *Container) JoinRoom(roomID, server string) (*rooms.Room, error) { room := c.GetRoom(resp.RoomID) room.HasLeft = false - return room, nil } @@ -568,8 +627,9 @@ func (c *Container) LeaveRoom(roomID string) error { return err } - room := c.GetRoom(roomID) - room.HasLeft = true + node := c.GetOrCreateRoom(roomID) + node.HasLeft = true + node.Unload() return nil } @@ -593,9 +653,9 @@ func (c *Container) GetHistory(room *rooms.Room, limit int) ([]*mautrix.Event, e return nil, err } } - room.PrevBatch = resp.End - c.config.PutRoom(room) debug.Printf("Loaded %d events for %s from server from %s to %s", len(resp.Chunk), room.ID, resp.Start, resp.End) + room.PrevBatch = resp.End + c.config.Rooms.Put(room) return resp.Chunk, nil } @@ -613,9 +673,14 @@ func (c *Container) GetEvent(room *rooms.Room, eventID string) (*mautrix.Event, return event, nil } +// GetOrCreateRoom gets the room instance stored in the session. +func (c *Container) GetOrCreateRoom(roomID string) *rooms.Room { + return c.config.Rooms.GetOrCreate(roomID) +} + // GetRoom gets the room instance stored in the session. func (c *Container) GetRoom(roomID string) *rooms.Room { - return c.config.GetRoom(roomID) + return c.config.Rooms.Get(roomID) } var mxcRegex = regexp.MustCompile("mxc://(.+)/(.+)") @@ -642,7 +707,7 @@ func (c *Container) Download(mxcURL string) (data []byte, hs, id string, err err } } - data, err = c.download(hs, id, cacheFile) + //FIXME data, err = c.download(hs, id, cacheFile) return } diff --git a/matrix/rooms/room.go b/matrix/rooms/room.go index 717636b..4928871 100644 --- a/matrix/rooms/room.go +++ b/matrix/rooms/room.go @@ -17,6 +17,7 @@ package rooms import ( + "compress/gzip" "encoding/gob" "fmt" "os" @@ -31,17 +32,18 @@ import ( ) func init() { - gob.Register([]interface{}{}) gob.Register(map[string]interface{}{}) + gob.Register([]interface{}{}) } type RoomNameSource int const ( - ExplicitRoomName RoomNameSource = iota - CanonicalAliasRoomName - AliasRoomName + UnknownRoomName RoomNameSource = iota MemberRoomName + AliasRoomName + CanonicalAliasRoomName + ExplicitRoomName ) // RoomTag is a tag given to a specific room. @@ -60,7 +62,8 @@ type UnreadMessage struct { // Room represents a single Matrix room. type Room struct { - *mautrix.Room + // The room ID. + ID string // Whether or not the user has left the room. HasLeft bool @@ -70,6 +73,7 @@ type Room struct { PrevBatch string // The MXID of the user whose session this room was created for. SessionUserID string + SessionMember *mautrix.Member // The number of unread messages that were notified about. UnreadMessages []UnreadMessage @@ -79,19 +83,22 @@ type Room struct { // Whether or not this room is marked as a direct chat. IsDirect bool - // List of tags given to this room + // List of tags given to this room. RawTags []RoomTag // Timestamp of previously received actual message. LastReceivedMessage time.Time + // Room state cache. + state map[mautrix.EventType]map[string]*mautrix.Event // MXID -> Member cache calculated from membership events. memberCache map[string]*mautrix.Member - // The first non-SessionUserID member in the room. Calculated at + // The first two non-SessionUserID members in the room. Calculated at // the same time as memberCache. - firstMemberCache *mautrix.Member + firstMemberCache *mautrix.Member + secondMemberCache *mautrix.Member // The name of the room. Calculated from the state event name, // canonical_alias or alias or the member cache. - nameCache string + NameCache string // The event type from which the name cache was calculated from. nameCacheSource RoomNameSource // The topic of the room. Directly fetched from the m.room.topic state event. @@ -101,31 +108,143 @@ type Room struct { // The list of aliases. Directly fetched from the m.room.aliases state event. aliasesCache []string + // Path for state store file. + path string + // Room cache object + cache *RoomCache + // Lock for state and other room stuff. lock sync.RWMutex + // Pre/post un/load hooks + preUnload func() bool + preLoad func() bool + postUnload func() + postLoad func() + // Whether or not the room state has changed + changed bool + + // Room state cache linked list. + prev *Room + next *Room + touch int64 } -func (room *Room) Load(path string) error { - file, err := os.OpenFile(path, os.O_RDONLY, 0600) - if err != nil { - return err +func debugPrintError(fn func() error, message string) { + if err := fn(); err != nil { + debug.Printf("%s: %v", message, err) + } +} + +func (room *Room) Loaded() bool { + return room.state != nil +} + +func (room *Room) Load() { + room.cache.TouchNode(room) + if room.Loaded() { + return + } + if room.preLoad != nil && !room.preLoad() { + return } - defer file.Close() - dec := gob.NewDecoder(file) room.lock.Lock() - defer room.lock.Unlock() - return dec.Decode(room) + room.load() + room.lock.Unlock() + if room.postLoad != nil { + room.postLoad() + } +} + +func (room *Room) load() { + if room.Loaded() { + return + } + debug.Print("Loading state for room", room.ID, "from disk") + room.state = make(map[mautrix.EventType]map[string]*mautrix.Event) + file, err := os.OpenFile(room.path, os.O_RDONLY, 0600) + if err != nil { + if !os.IsNotExist(err) { + debug.Print("Failed to open room state file for reading:", err) + } else { + debug.Print("Room state file for", room.ID, "does not exist") + } + return + } + defer debugPrintError(file.Close, "Failed to close room state file after reading") + cmpReader, err := gzip.NewReader(file) + if err != nil { + debug.Print("Failed to open room state gzip reader:", err) + return + } + defer debugPrintError(cmpReader.Close, "Failed to close room state gzip reader") + dec := gob.NewDecoder(cmpReader) + if err = dec.Decode(&room.state); err != nil { + debug.Print("Failed to decode room state:", err) + } + room.changed = false +} + +func (room *Room) Touch() { + room.cache.TouchNode(room) +} + +func (room *Room) Unload() bool { + if room.preUnload != nil && !room.preUnload() { + return false + } + debug.Print("Unloading", room.ID) + room.Save() + room.state = nil + room.aliasesCache = nil + room.topicCache = "" + room.canonicalAliasCache = "" + room.firstMemberCache = nil + room.secondMemberCache = nil + if room.postUnload != nil { + room.postUnload() + } + return true } -func (room *Room) Save(path string) error { - file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600) +func (room *Room) SetPreUnload(fn func() bool) { + room.preUnload = fn +} + +func (room *Room) SetPreLoad(fn func() bool) { + room.preLoad = fn +} + +func (room *Room) SetPostUnload(fn func()) { + room.postUnload = fn +} + +func (room *Room) SetPostLoad(fn func()) { + room.postLoad = fn +} + +func (room *Room) Save() { + if !room.Loaded() { + debug.Print("Failed to save room", room.ID, "state: room not loaded") + return + } + if !room.changed { + debug.Print("Not saving", room.ID, "as state hasn't changed") + return + } + debug.Print("Saving state for room", room.ID, "to disk") + file, err := os.OpenFile(room.path, os.O_WRONLY|os.O_CREATE, 0600) if err != nil { - return err + debug.Print("Failed to open room state file for writing:", err) + return } - defer file.Close() - enc := gob.NewEncoder(file) + defer debugPrintError(file.Close, "Failed to close room state file after writing") + cmpWriter := gzip.NewWriter(file) + defer debugPrintError(cmpWriter.Close, "Failed to close room state gzip writer") + enc := gob.NewEncoder(cmpWriter) room.lock.RLock() defer room.lock.RUnlock() - return enc.Encode(room) + if err := enc.Encode(&room.state); err != nil { + debug.Print("Failed to encode room state:", err) + } } // MarkRead clears the new message statuses on this room. @@ -220,62 +339,79 @@ func (room *Room) Tags() []RoomTag { // UpdateState updates the room's current state with the given Event. This will clobber events based // on the type/state_key combination. func (room *Room) UpdateState(event *mautrix.Event) { + room.Load() room.lock.Lock() defer room.lock.Unlock() - _, exists := room.State[event.Type] + room.changed = true + _, exists := room.state[event.Type] if !exists { - room.State[event.Type] = make(map[string]*mautrix.Event) + room.state[event.Type] = make(map[string]*mautrix.Event) } switch event.Type { case mautrix.StateRoomName: - room.nameCache = "" + room.NameCache = event.Content.Name + room.nameCacheSource = ExplicitRoomName case mautrix.StateCanonicalAlias: - if room.nameCacheSource >= CanonicalAliasRoomName { - room.nameCache = "" + if room.nameCacheSource <= CanonicalAliasRoomName { + room.NameCache = event.Content.Alias + room.nameCacheSource = CanonicalAliasRoomName } - room.canonicalAliasCache = "" + room.canonicalAliasCache = event.Content.Alias case mautrix.StateAliases: - if room.nameCacheSource >= AliasRoomName { - room.nameCache = "" + if room.nameCacheSource <= AliasRoomName { + room.NameCache = "" } room.aliasesCache = nil case mautrix.StateMember: - room.memberCache = nil - room.firstMemberCache = nil - if room.nameCacheSource >= MemberRoomName { - room.nameCache = "" + userID := event.GetStateKey() + if userID == room.SessionUserID { + room.SessionMember = room.eventToMember(userID, &event.Content) + } + if room.memberCache != nil { + if event.Content.Membership == mautrix.MembershipLeave || event.Content.Membership == mautrix.MembershipBan { + delete(room.memberCache, userID) + } else if event.Content.Membership == mautrix.MembershipInvite || event.Content.Membership == mautrix.MembershipJoin { + member := room.eventToMember(userID, &event.Content) + existingMember, ok := room.memberCache[userID] + if ok { + *existingMember = *member + } else { + room.memberCache[userID] = member + room.updateNthMemberCache(userID, member) + } + } + } + if room.nameCacheSource <= MemberRoomName { + room.NameCache = "" } case mautrix.StateTopic: - room.topicCache = "" + room.topicCache = event.Content.Topic } - stateKey := "" - if event.StateKey != nil { - stateKey = *event.StateKey - } if event.Type != mautrix.StateMember { - debug.Printf("Updating state %s#%s for %s", event.Type, stateKey, room.ID) + debug.Printf("Updating state %s#%s for %s", event.Type.String(), event.GetStateKey(), room.ID) } if event.StateKey == nil { - room.State[event.Type][""] = event + room.state[event.Type][""] = event } else { - room.State[event.Type][*event.StateKey] = event + room.state[event.Type][*event.StateKey] = event } } // GetStateEvent returns the state event for the given type/state_key combo, or nil. func (room *Room) GetStateEvent(eventType mautrix.EventType, stateKey string) *mautrix.Event { + room.Load() room.lock.RLock() defer room.lock.RUnlock() - stateEventMap, _ := room.State[eventType] + stateEventMap, _ := room.state[eventType] event, _ := stateEventMap[stateKey] return event } // getStateEvents returns the state events for the given type. func (room *Room) getStateEvents(eventType mautrix.EventType) map[string]*mautrix.Event { - stateEventMap, _ := room.State[eventType] + stateEventMap, _ := room.state[eventType] return stateEventMap } @@ -323,7 +459,7 @@ func (room *Room) GetAliases() []string { func (room *Room) updateNameFromNameEvent() { nameEvt := room.GetStateEvent(mautrix.StateRoomName, "") if nameEvt != nil { - room.nameCache = nameEvt.Content.Name + room.NameCache = nameEvt.Content.Name } } @@ -336,7 +472,7 @@ func (room *Room) updateNameFromAliases() { aliases := room.GetAliases() if len(aliases) > 0 { sort.Sort(sort.StringSlice(aliases)) - room.nameCache = aliases[0] + room.NameCache = aliases[0] } } @@ -351,33 +487,40 @@ func (room *Room) updateNameFromAliases() { func (room *Room) updateNameFromMembers() { members := room.GetMembers() if len(members) <= 1 { - room.nameCache = "Empty room" + room.NameCache = "Empty room" } else if room.firstMemberCache == nil { - room.nameCache = "Room" + room.NameCache = "Room" } else if len(members) == 2 { - room.nameCache = room.firstMemberCache.Displayname + room.NameCache = room.firstMemberCache.Displayname + } else if len(members) == 3 && room.secondMemberCache != nil { + room.NameCache = fmt.Sprintf("%s and %s", room.firstMemberCache.Displayname, room.secondMemberCache.Displayname) } else { - firstMember := room.firstMemberCache.Displayname - room.nameCache = fmt.Sprintf("%s and %d others", firstMember, len(members)-2) + members := room.firstMemberCache.Displayname + count := len(members) - 2 + if room.secondMemberCache != nil { + members += ", " + room.secondMemberCache.Displayname + count-- + } + room.NameCache = fmt.Sprintf("%s and %d others", members, count) } } // updateNameCache updates the room display name based on the room state in the order // specified in spec section 11.2.2.5. func (room *Room) updateNameCache() { - if len(room.nameCache) == 0 { + if len(room.NameCache) == 0 { room.updateNameFromNameEvent() room.nameCacheSource = ExplicitRoomName } - if len(room.nameCache) == 0 { - room.nameCache = room.GetCanonicalAlias() + if len(room.NameCache) == 0 { + room.NameCache = room.GetCanonicalAlias() room.nameCacheSource = CanonicalAliasRoomName } - if len(room.nameCache) == 0 { + if len(room.NameCache) == 0 { room.updateNameFromAliases() room.nameCacheSource = AliasRoomName } - if len(room.nameCache) == 0 { + if len(room.NameCache) == 0 { room.updateNameFromMembers() room.nameCacheSource = MemberRoomName } @@ -389,27 +532,47 @@ func (room *Room) updateNameCache() { // If the cache is empty, it is updated first. func (room *Room) GetTitle() string { room.updateNameCache() - return room.nameCache + return room.NameCache +} + +func (room *Room) eventToMember(userID string, content *mautrix.Content) *mautrix.Member { + member := &content.Member + member.Membership = content.Membership + if len(member.Displayname) == 0 { + member.Displayname = userID + } + return member +} + +func (room *Room) updateNthMemberCache(userID string, member *mautrix.Member) { + if userID != room.SessionUserID { + if room.firstMemberCache == nil { + room.firstMemberCache = member + } else if room.secondMemberCache == nil { + room.secondMemberCache = member + } + } } // createMemberCache caches all member events into a easily processable MXID -> *Member map. func (room *Room) createMemberCache() map[string]*mautrix.Member { + if len(room.memberCache) > 0 { + return room.memberCache + } cache := make(map[string]*mautrix.Member) room.lock.RLock() events := room.getStateEvents(mautrix.StateMember) room.firstMemberCache = nil + room.secondMemberCache = nil if events != nil { for userID, event := range events { - member := &event.Content.Member - member.Membership = event.Content.Membership - if len(member.Displayname) == 0 { - member.Displayname = userID - } - if room.firstMemberCache == nil && userID != room.SessionUserID { - room.firstMemberCache = member - } + member := room.eventToMember(userID, &event.Content) if member.Membership == mautrix.MembershipJoin || member.Membership == mautrix.MembershipInvite { cache[userID] = member + room.updateNthMemberCache(userID, member) + } + if userID == room.SessionUserID { + room.SessionMember = member } } } @@ -425,18 +588,19 @@ func (room *Room) createMemberCache() map[string]*mautrix.Member { // The members are returned from the cache. // If the cache is empty, it is updated first. func (room *Room) GetMembers() map[string]*mautrix.Member { - if len(room.memberCache) == 0 || room.firstMemberCache == nil { - room.createMemberCache() - } + room.Load() + room.createMemberCache() return room.memberCache } // GetMember returns the member with the given MXID. // If the member doesn't exist, nil is returned. func (room *Room) GetMember(userID string) *mautrix.Member { - if len(room.memberCache) == 0 { - room.createMemberCache() + if userID == room.SessionUserID && room.SessionMember != nil { + return room.SessionMember } + room.Load() + room.createMemberCache() room.lock.RLock() member, _ := room.memberCache[userID] room.lock.RUnlock() @@ -449,9 +613,13 @@ func (room *Room) GetSessionOwner() string { } // NewRoom creates a new Room with the given ID -func NewRoom(roomID, owner string) *Room { +func NewRoom(roomID string, cache *RoomCache) *Room { return &Room{ - Room: mautrix.NewRoom(roomID), - SessionUserID: owner, + ID: roomID, + state: make(map[mautrix.EventType]map[string]*mautrix.Event), + path: cache.roomPath(roomID), + cache: cache, + + SessionUserID: cache.getOwner(), } } diff --git a/matrix/rooms/roomcache.go b/matrix/rooms/roomcache.go new file mode 100644 index 0000000..6fc400c --- /dev/null +++ b/matrix/rooms/roomcache.go @@ -0,0 +1,319 @@ +// gomuks - A terminal Matrix client written in Go. +// Copyright (C) 2019 Tulir Asokan +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +package rooms + +import ( + "compress/gzip" + "encoding/gob" + "os" + "path/filepath" + "time" + + "github.com/pkg/errors" + sync "github.com/sasha-s/go-deadlock" + + "maunium.net/go/gomuks/debug" +) + +// RoomCache contains room state info in a hashmap and linked list. +type RoomCache struct { + sync.Mutex + + listPath string + directory string + maxSize int + maxAge int64 + getOwner func() string + + Map map[string]*Room + head *Room + tail *Room + size int +} + +func NewRoomCache(listPath, directory string, maxSize int, maxAge int64, getOwner func() string) *RoomCache { + return &RoomCache{ + listPath: listPath, + directory: directory, + maxSize: maxSize, + maxAge: maxAge, + getOwner: getOwner, + + Map: make(map[string]*Room), + } +} + +func (cache *RoomCache) LoadList() error { + cache.Lock() + defer cache.Unlock() + + // Open room list file + file, err := os.OpenFile(cache.listPath, os.O_RDONLY, 0600) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return errors.Wrap(err, "failed to open room list file for reading") + } + defer debugPrintError(file.Close, "Failed to close room list file after reading") + + // Open gzip reader for room list file + cmpReader, err := gzip.NewReader(file) + if err != nil { + return errors.Wrap(err, "failed to read gzip room list") + } + defer debugPrintError(cmpReader.Close, "Failed to close room list gzip reader") + + // Open gob decoder for gzip reader + dec := gob.NewDecoder(cmpReader) + // Read number of items in list + var size int + err = dec.Decode(&size) + if err != nil { + return errors.Wrap(err, "failed to read size of room list") + } + + // Read list + cache.Map = make(map[string]*Room, size) + for i := 0; i < size; i++ { + room := &Room{} + err = dec.Decode(room) + if err != nil { + debug.Printf("Failed to decode %dth room list entry: %v", i+1, err) + continue + } + room.path = cache.roomPath(room.ID) + room.cache = cache + cache.Map[room.ID] = room + } + return nil +} + +func (cache *RoomCache) SaveLoadedRooms() { + cache.Lock() + defer cache.Unlock() + cache.clean(false) + for node := cache.head; node != nil; node = node.prev { + node.Save() + } +} + +func (cache *RoomCache) SaveList() error { + cache.Lock() + defer cache.Unlock() + + debug.Print("Saving room list...") + // Open room list file + file, err := os.OpenFile(cache.listPath, os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + return errors.Wrap(err, "failed to open room list file for writing") + } + defer debugPrintError(file.Close, "Failed to close room list file after writing") + + // Open gzip writer for room list file + cmpWriter := gzip.NewWriter(file) + defer debugPrintError(cmpWriter.Close, "Failed to close room list gzip writer") + + // Open gob encoder for gzip writer + enc := gob.NewEncoder(cmpWriter) + // Write number of items in list + err = enc.Encode(len(cache.Map)) + if err != nil { + return errors.Wrap(err, "failed to write size of room list") + } + + // Write list + for _, node := range cache.Map { + err = enc.Encode(node) + if err != nil { + debug.Printf("Failed to encode room list entry of %s: %v", node.ID, err) + } + } + debug.Print("Room list saved to", cache.listPath, len(cache.Map), cache.size) + return nil +} + +func (cache *RoomCache) Touch(roomID string) { + cache.Lock() + node, ok := cache.Map[roomID] + if !ok || node == nil { + cache.Unlock() + return + } + cache.touch(node) + cache.Unlock() +} + +func (cache *RoomCache) TouchNode(node *Room) { + cache.Lock() + cache.touch(node) + cache.Unlock() +} + +func (cache *RoomCache) touch(node *Room) { + if node == cache.head { + return + } + debug.Print("Touching", node.ID) + cache.llPop(node) + cache.llPush(node) + node.touch = time.Now().Unix() +} + +func (cache *RoomCache) Get(roomID string) *Room { + cache.Lock() + node := cache.get(roomID) + cache.Unlock() + return node +} + +func (cache *RoomCache) GetOrCreate(roomID string) *Room { + cache.Lock() + node := cache.get(roomID) + if node == nil { + node = cache.newRoom(roomID) + cache.llPush(node) + } + cache.Unlock() + return node +} + +func (cache *RoomCache) get(roomID string) *Room { + node, ok := cache.Map[roomID] + if ok && node != nil { + return node + } + return nil +} +func (cache *RoomCache) Put(room *Room) { + cache.Lock() + node := cache.get(room.ID) + if node != nil { + cache.touch(node) + } else { + cache.Map[room.ID] = room + if room.Loaded() { + cache.llPush(room) + } + node = room + } + cache.Unlock() + node.Save() +} + +func (cache *RoomCache) roomPath(roomID string) string { + return filepath.Join(cache.directory, roomID+".gob.gz") +} + +func (cache *RoomCache) Load(roomID string) *Room { + cache.Lock() + defer cache.Unlock() + node, ok := cache.Map[roomID] + if ok { + return node + } + + node = NewRoom(roomID, cache) + node.Load() + return node +} + +func (cache *RoomCache) llPop(node *Room) { + if node.prev == nil && node.next == nil { + return + } + if node.prev != nil { + node.prev.next = node.next + } + if node.next != nil { + node.next.prev = node.prev + } + if node == cache.tail { + cache.tail = node.next + } + if node == cache.head { + cache.head = node.prev + } + node.next = nil + node.prev = nil + cache.size-- +} + +func (cache *RoomCache) llPush(node *Room) { + if node.next != nil || node.prev != nil { + debug.PrintStack() + debug.Print("Tried to llPush node that is already in stack") + return + } + if node == cache.head { + return + } + if cache.head != nil { + cache.head.next = node + } + node.prev = cache.head + node.next = nil + cache.head = node + if cache.tail == nil { + cache.tail = node + } + cache.size++ + cache.clean(false) +} + +func (cache *RoomCache) ForceClean() { + cache.Lock() + cache.clean(true) + cache.Unlock() +} + +func (cache *RoomCache) clean(force bool) { + origSize := cache.size + maxTS := time.Now().Unix() - cache.maxAge + for cache.size > cache.maxSize { + if cache.tail.touch > maxTS && !force { + break + } + ok := cache.tail.Unload() + node := cache.tail + cache.llPop(node) + if !ok { + debug.Print("Unload returned false, pushing node back") + cache.llPush(node) + } + } + if cleaned := origSize - cache.size; cleaned > 0 { + debug.Print("Cleaned", cleaned, "rooms") + } +} + +func (cache *RoomCache) Unload(node *Room) { + cache.Lock() + defer cache.Unlock() + cache.llPop(node) + ok := node.Unload() + if !ok { + debug.Print("Unload returned false, pushing node back") + cache.llPush(node) + } +} + +func (cache *RoomCache) newRoom(roomID string) *Room { + node := NewRoom(roomID, cache) + cache.Map[node.ID] = node + return node +} diff --git a/matrix/sync.go b/matrix/sync.go index 7f9c902..ab0e9f8 100644 --- a/matrix/sync.go +++ b/matrix/sync.go @@ -107,8 +107,6 @@ func NewGomuksSyncer(session SyncerSession) *GomuksSyncer { // ProcessResponse processes a Matrix sync response. func (s *GomuksSyncer) ProcessResponse(res *mautrix.RespSync, since string) (err error) { debug.Print("Received sync response") -// dat, _ := json.MarshalIndent(res, "", " ") -// debug.Print(string(dat)) s.processSyncEvents(nil, res.Presence.Events, EventSourcePresence) s.processSyncEvents(nil, res.AccountData.Events, EventSourceAccountData) @@ -215,6 +213,10 @@ func (s *GomuksSyncer) GetFilterJSON(userID string) json.RawMessage { Timeline: mautrix.FilterPart{ Types: []string{ "m.room.message", + "m.room.encrypted", + "m.sticker", + "m.reaction", + "m.room.member", "m.room.name", "m.room.topic", |