From f668faa8940ba02e796e016ff27f2e4620b49a29 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Sun, 19 Apr 2020 15:57:49 +0300 Subject: Process different rooms in sync responses in goroutines --- matrix/sync.go | 80 ++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 56 insertions(+), 24 deletions(-) (limited to 'matrix/sync.go') diff --git a/matrix/sync.go b/matrix/sync.go index cd6334f..ed9d137 100644 --- a/matrix/sync.go +++ b/matrix/sync.go @@ -20,6 +20,7 @@ package matrix import ( "fmt" + "sync" "time" "maunium.net/go/mautrix" @@ -33,6 +34,8 @@ import ( type SyncerSession interface { GetRoom(id id.RoomID) *rooms.Room GetUserID() id.UserID + DisableUnloading() + EnableUnloading() } type EventSource int @@ -108,51 +111,80 @@ func NewGomuksSyncer(session SyncerSession) *GomuksSyncer { // ProcessResponse processes a Matrix sync response. func (s *GomuksSyncer) ProcessResponse(res *mautrix.RespSync, since string) (err error) { + if since == "" { + s.Session.DisableUnloading() + } debug.Print("Received sync response") s.processSyncEvents(nil, res.Presence.Events, EventSourcePresence) s.processSyncEvents(nil, res.AccountData.Events, EventSourceAccountData) + wait := &sync.WaitGroup{} + + wait.Add(len(res.Rooms.Join)) for roomID, roomData := range res.Rooms.Join { - room := s.Session.GetRoom(roomID) - room.UpdateSummary(roomData.Summary) - s.processSyncEvents(room, roomData.State.Events, EventSourceJoin|EventSourceState) - s.processSyncEvents(room, roomData.Timeline.Events, EventSourceJoin|EventSourceTimeline) - s.processSyncEvents(room, roomData.Ephemeral.Events, EventSourceJoin|EventSourceEphemeral) - s.processSyncEvents(room, roomData.AccountData.Events, EventSourceJoin|EventSourceAccountData) - - if len(room.PrevBatch) == 0 { - room.PrevBatch = roomData.Timeline.PrevBatch - } - room.LastPrevBatch = roomData.Timeline.PrevBatch + go s.processJoinedRoom(roomID, roomData, wait) } + wait.Add(len(res.Rooms.Invite)) for roomID, roomData := range res.Rooms.Invite { - room := s.Session.GetRoom(roomID) - room.UpdateSummary(roomData.Summary) - s.processSyncEvents(room, roomData.State.Events, EventSourceInvite|EventSourceState) + go s.processInvitedRoom(roomID, roomData, wait) } + wait.Add(len(res.Rooms.Leave)) for roomID, roomData := range res.Rooms.Leave { - room := s.Session.GetRoom(roomID) - room.HasLeft = true - room.UpdateSummary(roomData.Summary) - s.processSyncEvents(room, roomData.State.Events, EventSourceLeave|EventSourceState) - s.processSyncEvents(room, roomData.Timeline.Events, EventSourceLeave|EventSourceTimeline) - - if len(room.PrevBatch) == 0 { - room.PrevBatch = roomData.Timeline.PrevBatch - } - room.LastPrevBatch = roomData.Timeline.PrevBatch + go s.processLeftRoom(roomID, roomData, wait) } + wait.Wait() + if since == "" && s.InitDoneCallback != nil { s.InitDoneCallback() + s.Session.EnableUnloading() } s.FirstSyncDone = true return } +func (s *GomuksSyncer) processJoinedRoom(roomID id.RoomID, roomData mautrix.SyncJoinedRoom, wait *sync.WaitGroup) { + defer debug.Recover() + room := s.Session.GetRoom(roomID) + room.UpdateSummary(roomData.Summary) + s.processSyncEvents(room, roomData.State.Events, EventSourceJoin|EventSourceState) + s.processSyncEvents(room, roomData.Timeline.Events, EventSourceJoin|EventSourceTimeline) + s.processSyncEvents(room, roomData.Ephemeral.Events, EventSourceJoin|EventSourceEphemeral) + s.processSyncEvents(room, roomData.AccountData.Events, EventSourceJoin|EventSourceAccountData) + + if len(room.PrevBatch) == 0 { + room.PrevBatch = roomData.Timeline.PrevBatch + } + room.LastPrevBatch = roomData.Timeline.PrevBatch + wait.Done() +} + +func (s *GomuksSyncer) processInvitedRoom(roomID id.RoomID, roomData mautrix.SyncInvitedRoom, wait *sync.WaitGroup) { + defer debug.Recover() + room := s.Session.GetRoom(roomID) + room.UpdateSummary(roomData.Summary) + s.processSyncEvents(room, roomData.State.Events, EventSourceInvite|EventSourceState) + wait.Done() +} + +func (s *GomuksSyncer) processLeftRoom(roomID id.RoomID, roomData mautrix.SyncLeftRoom, wait *sync.WaitGroup) { + defer debug.Recover() + room := s.Session.GetRoom(roomID) + room.HasLeft = true + room.UpdateSummary(roomData.Summary) + s.processSyncEvents(room, roomData.State.Events, EventSourceLeave|EventSourceState) + s.processSyncEvents(room, roomData.Timeline.Events, EventSourceLeave|EventSourceTimeline) + + if len(room.PrevBatch) == 0 { + room.PrevBatch = roomData.Timeline.PrevBatch + } + room.LastPrevBatch = roomData.Timeline.PrevBatch + wait.Done() +} + func (s *GomuksSyncer) processSyncEvents(room *rooms.Room, events []*event.Event, source EventSource) { for _, evt := range events { s.processSyncEvent(room, evt, source) -- cgit v1.2.3