aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTulir Asokan <tulir@maunium.net>2020-04-19 15:57:49 +0300
committerTulir Asokan <tulir@maunium.net>2020-04-19 15:57:49 +0300
commitf668faa8940ba02e796e016ff27f2e4620b49a29 (patch)
tree020e48d747e576956c3a7581675bfda854922d21
parent5ee6aa72dbb2cc4b437624f7e2d5821234c2fc7c (diff)
Process different rooms in sync responses in goroutines
-rw-r--r--config/config.go8
-rw-r--r--matrix/rooms/room.go7
-rw-r--r--matrix/rooms/roomcache.go16
-rw-r--r--matrix/sync.go80
4 files changed, 80 insertions, 31 deletions
diff --git a/config/config.go b/config/config.go
index d85e243..37058cd 100644
--- a/config/config.go
+++ b/config/config.go
@@ -277,3 +277,11 @@ func (config *Config) LoadRoom(_ id.RoomID) *mautrix.Room {
func (config *Config) GetRoom(roomID id.RoomID) *rooms.Room {
return config.Rooms.GetOrCreate(roomID)
}
+
+func (config *Config) DisableUnloading() {
+ config.Rooms.DisableUnloading()
+}
+
+func (config *Config) EnableUnloading() {
+ config.Rooms.EnableUnloading()
+}
diff --git a/matrix/rooms/room.go b/matrix/rooms/room.go
index 87e63f0..df7160c 100644
--- a/matrix/rooms/room.go
+++ b/matrix/rooms/room.go
@@ -213,13 +213,6 @@ func (room *Room) Unload() bool {
debug.Print("Unloading", room.ID)
room.Save()
room.state = nil
- room.topicCache = ""
- room.CanonicalAliasCache = ""
- room.firstMemberCache = nil
- room.secondMemberCache = nil
- room.memberCache = nil
- room.exMemberCache = nil
- room.replacedByCache = nil
if room.postUnload != nil {
room.postUnload()
}
diff --git a/matrix/rooms/roomcache.go b/matrix/rooms/roomcache.go
index d442734..9da1922 100644
--- a/matrix/rooms/roomcache.go
+++ b/matrix/rooms/roomcache.go
@@ -39,6 +39,7 @@ type RoomCache struct {
maxSize int
maxAge int64
getOwner func() id.UserID
+ noUnload bool
Map map[id.RoomID]*Room
head *Room
@@ -58,6 +59,14 @@ func NewRoomCache(listPath, directory string, maxSize int, maxAge int64, getOwne
}
}
+func (cache *RoomCache) DisableUnloading() {
+ cache.noUnload = true
+}
+
+func (cache *RoomCache) EnableUnloading() {
+ cache.noUnload = false
+}
+
func (cache *RoomCache) LoadList() error {
cache.Lock()
defer cache.Unlock()
@@ -160,6 +169,9 @@ func (cache *RoomCache) Touch(roomID id.RoomID) {
}
func (cache *RoomCache) TouchNode(node *Room) {
+ if cache.noUnload || node.touch + 2 > time.Now().Unix() {
+ return
+ }
cache.Lock()
cache.touch(node)
cache.Unlock()
@@ -200,6 +212,7 @@ func (cache *RoomCache) get(roomID id.RoomID) *Room {
}
return nil
}
+
func (cache *RoomCache) Put(room *Room) {
cache.Lock()
node := cache.get(room.ID)
@@ -283,6 +296,9 @@ func (cache *RoomCache) ForceClean() {
}
func (cache *RoomCache) clean(force bool) {
+ if cache.noUnload && !force {
+ return
+ }
origSize := cache.size
maxTS := time.Now().Unix() - cache.maxAge
for cache.size > cache.maxSize {
diff --git a/matrix/sync.go b/matrix/sync.go
index cd6334f..ed9d137 100644
--- a/matrix/sync.go
+++ b/matrix/sync.go
@@ -20,6 +20,7 @@ package matrix
import (
"fmt"
+ "sync"
"time"
"maunium.net/go/mautrix"
@@ -33,6 +34,8 @@ import (
type SyncerSession interface {
GetRoom(id id.RoomID) *rooms.Room
GetUserID() id.UserID
+ DisableUnloading()
+ EnableUnloading()
}
type EventSource int
@@ -108,51 +111,80 @@ func NewGomuksSyncer(session SyncerSession) *GomuksSyncer {
// ProcessResponse processes a Matrix sync response.
func (s *GomuksSyncer) ProcessResponse(res *mautrix.RespSync, since string) (err error) {
+ if since == "" {
+ s.Session.DisableUnloading()
+ }
debug.Print("Received sync response")
s.processSyncEvents(nil, res.Presence.Events, EventSourcePresence)
s.processSyncEvents(nil, res.AccountData.Events, EventSourceAccountData)
+ wait := &sync.WaitGroup{}
+
+ wait.Add(len(res.Rooms.Join))
for roomID, roomData := range res.Rooms.Join {
- room := s.Session.GetRoom(roomID)
- room.UpdateSummary(roomData.Summary)
- s.processSyncEvents(room, roomData.State.Events, EventSourceJoin|EventSourceState)
- s.processSyncEvents(room, roomData.Timeline.Events, EventSourceJoin|EventSourceTimeline)
- s.processSyncEvents(room, roomData.Ephemeral.Events, EventSourceJoin|EventSourceEphemeral)
- s.processSyncEvents(room, roomData.AccountData.Events, EventSourceJoin|EventSourceAccountData)
-
- if len(room.PrevBatch) == 0 {
- room.PrevBatch = roomData.Timeline.PrevBatch
- }
- room.LastPrevBatch = roomData.Timeline.PrevBatch
+ go s.processJoinedRoom(roomID, roomData, wait)
}
+ wait.Add(len(res.Rooms.Invite))
for roomID, roomData := range res.Rooms.Invite {
- room := s.Session.GetRoom(roomID)
- room.UpdateSummary(roomData.Summary)
- s.processSyncEvents(room, roomData.State.Events, EventSourceInvite|EventSourceState)
+ go s.processInvitedRoom(roomID, roomData, wait)
}
+ wait.Add(len(res.Rooms.Leave))
for roomID, roomData := range res.Rooms.Leave {
- room := s.Session.GetRoom(roomID)
- room.HasLeft = true
- room.UpdateSummary(roomData.Summary)
- s.processSyncEvents(room, roomData.State.Events, EventSourceLeave|EventSourceState)
- s.processSyncEvents(room, roomData.Timeline.Events, EventSourceLeave|EventSourceTimeline)
-
- if len(room.PrevBatch) == 0 {
- room.PrevBatch = roomData.Timeline.PrevBatch
- }
- room.LastPrevBatch = roomData.Timeline.PrevBatch
+ go s.processLeftRoom(roomID, roomData, wait)
}
+ wait.Wait()
+
if since == "" && s.InitDoneCallback != nil {
s.InitDoneCallback()
+ s.Session.EnableUnloading()
}
s.FirstSyncDone = true
return
}
+func (s *GomuksSyncer) processJoinedRoom(roomID id.RoomID, roomData mautrix.SyncJoinedRoom, wait *sync.WaitGroup) {
+ defer debug.Recover()
+ room := s.Session.GetRoom(roomID)
+ room.UpdateSummary(roomData.Summary)
+ s.processSyncEvents(room, roomData.State.Events, EventSourceJoin|EventSourceState)
+ s.processSyncEvents(room, roomData.Timeline.Events, EventSourceJoin|EventSourceTimeline)
+ s.processSyncEvents(room, roomData.Ephemeral.Events, EventSourceJoin|EventSourceEphemeral)
+ s.processSyncEvents(room, roomData.AccountData.Events, EventSourceJoin|EventSourceAccountData)
+
+ if len(room.PrevBatch) == 0 {
+ room.PrevBatch = roomData.Timeline.PrevBatch
+ }
+ room.LastPrevBatch = roomData.Timeline.PrevBatch
+ wait.Done()
+}
+
+func (s *GomuksSyncer) processInvitedRoom(roomID id.RoomID, roomData mautrix.SyncInvitedRoom, wait *sync.WaitGroup) {
+ defer debug.Recover()
+ room := s.Session.GetRoom(roomID)
+ room.UpdateSummary(roomData.Summary)
+ s.processSyncEvents(room, roomData.State.Events, EventSourceInvite|EventSourceState)
+ wait.Done()
+}
+
+func (s *GomuksSyncer) processLeftRoom(roomID id.RoomID, roomData mautrix.SyncLeftRoom, wait *sync.WaitGroup) {
+ defer debug.Recover()
+ room := s.Session.GetRoom(roomID)
+ room.HasLeft = true
+ room.UpdateSummary(roomData.Summary)
+ s.processSyncEvents(room, roomData.State.Events, EventSourceLeave|EventSourceState)
+ s.processSyncEvents(room, roomData.Timeline.Events, EventSourceLeave|EventSourceTimeline)
+
+ if len(room.PrevBatch) == 0 {
+ room.PrevBatch = roomData.Timeline.PrevBatch
+ }
+ room.LastPrevBatch = roomData.Timeline.PrevBatch
+ wait.Done()
+}
+
func (s *GomuksSyncer) processSyncEvents(room *rooms.Room, events []*event.Event, source EventSource) {
for _, evt := range events {
s.processSyncEvent(room, evt, source)