mirror of https://github.com/mautrix/go.git
247 lines
7.4 KiB
Go
247 lines
7.4 KiB
Go
// Copyright (c) 2024 Tulir Asokan
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
package bridgev2
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"runtime/debug"
|
|
"time"
|
|
|
|
"github.com/rs/zerolog"
|
|
|
|
"maunium.net/go/mautrix/bridgev2/database"
|
|
)
|
|
|
|
const BackfillMinBackoffAfterRoomCreate = 1 * time.Minute
|
|
const BackfillQueueErrorBackoff = 1 * time.Minute
|
|
const BackfillQueueMaxEmptyBackoff = 10 * time.Minute
|
|
|
|
func (br *Bridge) WakeupBackfillQueue() {
|
|
select {
|
|
case br.wakeupBackfillQueue <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (br *Bridge) RunBackfillQueue() {
|
|
if !br.Config.Backfill.Queue.Enabled || !br.Config.Backfill.Enabled {
|
|
return
|
|
}
|
|
log := br.Log.With().Str("component", "backfill queue").Logger()
|
|
if !br.Matrix.GetCapabilities().BatchSending {
|
|
log.Warn().Msg("Backfill queue is enabled in config, but Matrix server doesn't support batch sending")
|
|
return
|
|
}
|
|
ctx, cancel := context.WithCancel(log.WithContext(context.Background()))
|
|
go func() {
|
|
<-br.stopBackfillQueue
|
|
cancel()
|
|
}()
|
|
batchDelay := time.Duration(br.Config.Backfill.Queue.BatchDelay) * time.Second
|
|
log.Info().Stringer("batch_delay", batchDelay).Msg("Backfill queue starting")
|
|
noTasksFoundCount := 0
|
|
for {
|
|
nextDelay := batchDelay
|
|
if noTasksFoundCount > 0 {
|
|
extraDelay := batchDelay * time.Duration(noTasksFoundCount)
|
|
nextDelay += min(BackfillQueueMaxEmptyBackoff, extraDelay)
|
|
}
|
|
timer := time.NewTimer(nextDelay)
|
|
select {
|
|
case <-br.wakeupBackfillQueue:
|
|
if !timer.Stop() {
|
|
select {
|
|
case <-timer.C:
|
|
default:
|
|
}
|
|
}
|
|
noTasksFoundCount = 0
|
|
case <-br.stopBackfillQueue:
|
|
if !timer.Stop() {
|
|
select {
|
|
case <-timer.C:
|
|
default:
|
|
}
|
|
}
|
|
log.Info().Msg("Stopping backfill queue")
|
|
return
|
|
case <-timer.C:
|
|
}
|
|
backfillTask, err := br.DB.BackfillTask.GetNext(ctx)
|
|
if err != nil {
|
|
log.Err(err).Msg("Failed to get next backfill queue entry")
|
|
time.Sleep(BackfillQueueErrorBackoff)
|
|
continue
|
|
} else if backfillTask != nil {
|
|
br.doBackfillTask(ctx, backfillTask)
|
|
noTasksFoundCount = 0
|
|
}
|
|
}
|
|
}
|
|
|
|
func (br *Bridge) doBackfillTask(ctx context.Context, task *database.BackfillTask) {
|
|
log := zerolog.Ctx(ctx).With().
|
|
Object("portal_key", task.PortalKey).
|
|
Str("login_id", string(task.UserLoginID)).
|
|
Logger()
|
|
defer func() {
|
|
err := recover()
|
|
if err != nil {
|
|
logEvt := log.Error().
|
|
Bytes(zerolog.ErrorStackFieldName, debug.Stack())
|
|
if realErr, ok := err.(error); ok {
|
|
logEvt = logEvt.Err(realErr)
|
|
} else {
|
|
logEvt = logEvt.Any(zerolog.ErrorFieldName, err)
|
|
}
|
|
logEvt.Msg("Panic in backfill queue")
|
|
}
|
|
}()
|
|
ctx = log.WithContext(ctx)
|
|
err := br.DB.BackfillTask.MarkDispatched(ctx, task)
|
|
if err != nil {
|
|
log.Err(err).Msg("Failed to mark backfill task as dispatched")
|
|
time.Sleep(BackfillQueueErrorBackoff)
|
|
return
|
|
}
|
|
completed, err := br.actuallyDoBackfillTask(ctx, task)
|
|
if err != nil {
|
|
log.Err(err).Msg("Failed to do backfill task")
|
|
time.Sleep(BackfillQueueErrorBackoff)
|
|
return
|
|
} else if completed {
|
|
log.Info().
|
|
Int("batch_count", task.BatchCount).
|
|
Bool("is_done", task.IsDone).
|
|
Msg("Backfill task completed successfully")
|
|
} else {
|
|
log.Info().
|
|
Int("batch_count", task.BatchCount).
|
|
Bool("is_done", task.IsDone).
|
|
Msg("Backfill task canceled")
|
|
}
|
|
err = br.DB.BackfillTask.Update(ctx, task)
|
|
if err != nil {
|
|
log.Err(err).Msg("Failed to update backfill task")
|
|
time.Sleep(BackfillQueueErrorBackoff)
|
|
}
|
|
}
|
|
|
|
func (portal *Portal) deleteBackfillQueueTaskIfRoomDoesNotExist(ctx context.Context) bool {
|
|
// Acquire the room create lock to ensure that task deletion doesn't race with room creation
|
|
portal.roomCreateLock.Lock()
|
|
defer portal.roomCreateLock.Unlock()
|
|
if portal.MXID == "" {
|
|
zerolog.Ctx(ctx).Debug().Msg("Portal for backfill task doesn't exist, deleting entry")
|
|
err := portal.Bridge.DB.BackfillTask.Delete(ctx, portal.PortalKey)
|
|
if err != nil {
|
|
zerolog.Ctx(ctx).Err(err).Msg("Failed to delete backfill task after portal wasn't found")
|
|
}
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (br *Bridge) actuallyDoBackfillTask(ctx context.Context, task *database.BackfillTask) (bool, error) {
|
|
log := zerolog.Ctx(ctx)
|
|
portal, err := br.GetExistingPortalByKey(ctx, task.PortalKey)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to get portal for backfill task: %w", err)
|
|
} else if portal == nil {
|
|
log.Warn().Msg("Portal not found for backfill task")
|
|
err = br.DB.BackfillTask.Delete(ctx, task.PortalKey)
|
|
if err != nil {
|
|
log.Err(err).Msg("Failed to delete backfill task after portal wasn't found")
|
|
time.Sleep(BackfillQueueErrorBackoff)
|
|
}
|
|
return false, nil
|
|
} else if portal.MXID == "" {
|
|
portal.deleteBackfillQueueTaskIfRoomDoesNotExist(ctx)
|
|
return false, nil
|
|
}
|
|
login, err := br.GetExistingUserLoginByID(ctx, task.UserLoginID)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to get user login for backfill task: %w", err)
|
|
} else if login == nil || !login.Client.IsLoggedIn() {
|
|
if login == nil {
|
|
log.Warn().Msg("User login not found for backfill task")
|
|
} else {
|
|
log.Warn().Msg("User login not logged in for backfill task")
|
|
}
|
|
logins, err := br.GetUserLoginsInPortal(ctx, portal.PortalKey)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to get user portals for backfill task: %w", err)
|
|
} else if len(logins) == 0 {
|
|
log.Debug().Msg("No user logins found for backfill task")
|
|
task.NextDispatchMinTS = database.BackfillNextDispatchNever
|
|
if login == nil {
|
|
task.UserLoginID = ""
|
|
}
|
|
return false, nil
|
|
}
|
|
if login == nil {
|
|
task.UserLoginID = ""
|
|
}
|
|
foundLogin := false
|
|
for _, login = range logins {
|
|
if login.Client.IsLoggedIn() {
|
|
foundLogin = true
|
|
task.UserLoginID = login.ID
|
|
log.UpdateContext(func(c zerolog.Context) zerolog.Context {
|
|
return c.Str("overridden_login_id", string(login.ID))
|
|
})
|
|
log.Debug().Msg("Found user login for backfill task")
|
|
break
|
|
}
|
|
}
|
|
if !foundLogin {
|
|
log.Debug().Msg("No logged in user logins found for backfill task")
|
|
task.NextDispatchMinTS = database.BackfillNextDispatchNever
|
|
return false, nil
|
|
}
|
|
}
|
|
if task.BatchCount < 0 {
|
|
var msgCount int
|
|
msgCount, err = br.DB.Message.CountMessagesInPortal(ctx, task.PortalKey)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to count messages in portal: %w", err)
|
|
}
|
|
task.BatchCount = msgCount / br.Config.Backfill.Queue.BatchSize
|
|
log.Debug().
|
|
Int("message_count", msgCount).
|
|
Int("batch_count", task.BatchCount).
|
|
Msg("Calculated existing batch count")
|
|
}
|
|
maxBatches := br.Config.Backfill.Queue.MaxBatches
|
|
api, ok := login.Client.(BackfillingNetworkAPI)
|
|
if !ok {
|
|
return false, fmt.Errorf("network API does not support backfilling")
|
|
}
|
|
limiterAPI, ok := api.(BackfillingNetworkAPIWithLimits)
|
|
if ok {
|
|
maxBatches = limiterAPI.GetBackfillMaxBatchCount(ctx, portal, task)
|
|
}
|
|
if maxBatches < 0 || maxBatches > task.BatchCount {
|
|
err = portal.DoBackwardsBackfill(ctx, login, task)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to backfill: %w", err)
|
|
}
|
|
task.BatchCount++
|
|
} else {
|
|
log.Debug().
|
|
Int("max_batches", maxBatches).
|
|
Int("batch_count", task.BatchCount).
|
|
Msg("Not actually backfilling: max batches reached")
|
|
}
|
|
task.IsDone = task.IsDone || (maxBatches > 0 && task.BatchCount >= maxBatches)
|
|
batchDelay := time.Duration(br.Config.Backfill.Queue.BatchDelay) * time.Second
|
|
task.CompletedAt = time.Now()
|
|
task.NextDispatchMinTS = task.CompletedAt.Add(batchDelay)
|
|
return true, nil
|
|
}
|