mirror of https://github.com/mautrix/go.git
163 lines
5.7 KiB
Go
163 lines
5.7 KiB
Go
// Copyright (c) 2024 Tulir Asokan
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
package database
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"time"
|
|
|
|
"go.mau.fi/util/dbutil"
|
|
|
|
"maunium.net/go/mautrix/bridgev2/networkid"
|
|
)
|
|
|
|
type BackfillTaskQuery struct {
|
|
BridgeID networkid.BridgeID
|
|
*dbutil.QueryHelper[*BackfillTask]
|
|
}
|
|
|
|
type BackfillTask struct {
|
|
BridgeID networkid.BridgeID
|
|
PortalKey networkid.PortalKey
|
|
UserLoginID networkid.UserLoginID
|
|
|
|
BatchCount int
|
|
IsDone bool
|
|
Cursor networkid.PaginationCursor
|
|
OldestMessageID networkid.MessageID
|
|
DispatchedAt time.Time
|
|
CompletedAt time.Time
|
|
NextDispatchMinTS time.Time
|
|
}
|
|
|
|
var BackfillNextDispatchNever = time.Unix(0, (1<<63)-1)
|
|
|
|
const (
|
|
ensureBackfillExistsQuery = `
|
|
INSERT INTO backfill_task (bridge_id, portal_id, portal_receiver, user_login_id, batch_count, is_done, next_dispatch_min_ts)
|
|
VALUES ($1, $2, $3, $4, -1, false, $5)
|
|
ON CONFLICT (bridge_id, portal_id, portal_receiver) DO UPDATE
|
|
SET user_login_id=CASE
|
|
WHEN backfill_task.user_login_id=''
|
|
THEN excluded.user_login_id
|
|
ELSE backfill_task.user_login_id
|
|
END,
|
|
next_dispatch_min_ts=CASE
|
|
WHEN backfill_task.next_dispatch_min_ts=9223372036854775807
|
|
THEN excluded.next_dispatch_min_ts
|
|
ELSE backfill_task.next_dispatch_min_ts
|
|
END
|
|
`
|
|
upsertBackfillQueueQuery = `
|
|
INSERT INTO backfill_task (
|
|
bridge_id, portal_id, portal_receiver, user_login_id, batch_count, is_done, cursor,
|
|
oldest_message_id, dispatched_at, completed_at, next_dispatch_min_ts
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
|
|
ON CONFLICT (bridge_id, portal_id, portal_receiver) DO UPDATE
|
|
SET user_login_id=excluded.user_login_id,
|
|
batch_count=excluded.batch_count,
|
|
is_done=excluded.is_done,
|
|
cursor=excluded.cursor,
|
|
oldest_message_id=excluded.oldest_message_id,
|
|
dispatched_at=excluded.dispatched_at,
|
|
completed_at=excluded.completed_at,
|
|
next_dispatch_min_ts=excluded.next_dispatch_min_ts
|
|
`
|
|
markBackfillDispatchedQuery = `
|
|
UPDATE backfill_task SET dispatched_at=$4, completed_at=NULL, next_dispatch_min_ts=$5
|
|
WHERE bridge_id = $1 AND portal_id = $2 AND portal_receiver = $3
|
|
`
|
|
updateBackfillQueueQuery = `
|
|
UPDATE backfill_task
|
|
SET user_login_id=$4, batch_count=$5, is_done=$6, cursor=$7, oldest_message_id=$8,
|
|
dispatched_at=$9, completed_at=$10, next_dispatch_min_ts=$11
|
|
WHERE bridge_id = $1 AND portal_id = $2 AND portal_receiver = $3
|
|
`
|
|
getNextBackfillQuery = `
|
|
SELECT
|
|
bridge_id, portal_id, portal_receiver, user_login_id, batch_count, is_done,
|
|
cursor, oldest_message_id, dispatched_at, completed_at, next_dispatch_min_ts
|
|
FROM backfill_task
|
|
WHERE bridge_id = $1 AND next_dispatch_min_ts < $2 AND is_done = false AND user_login_id <> ''
|
|
ORDER BY next_dispatch_min_ts LIMIT 1
|
|
`
|
|
deleteBackfillQueueQuery = `
|
|
DELETE FROM backfill_task
|
|
WHERE bridge_id = $1 AND portal_id = $2 AND portal_receiver = $3
|
|
`
|
|
)
|
|
|
|
func (btq *BackfillTaskQuery) EnsureExists(ctx context.Context, portal networkid.PortalKey, loginID networkid.UserLoginID) error {
|
|
return btq.Exec(ctx, ensureBackfillExistsQuery, btq.BridgeID, portal.ID, portal.Receiver, loginID, time.Now().UnixNano())
|
|
}
|
|
|
|
func (btq *BackfillTaskQuery) Upsert(ctx context.Context, bq *BackfillTask) error {
|
|
ensureBridgeIDMatches(&bq.BridgeID, btq.BridgeID)
|
|
return btq.Exec(ctx, upsertBackfillQueueQuery, bq.sqlVariables()...)
|
|
}
|
|
|
|
const UnfinishedBackfillBackoff = 1 * time.Hour
|
|
|
|
func (btq *BackfillTaskQuery) MarkDispatched(ctx context.Context, bq *BackfillTask) error {
|
|
ensureBridgeIDMatches(&bq.BridgeID, btq.BridgeID)
|
|
bq.DispatchedAt = time.Now()
|
|
bq.CompletedAt = time.Time{}
|
|
bq.NextDispatchMinTS = bq.DispatchedAt.Add(UnfinishedBackfillBackoff)
|
|
return btq.Exec(
|
|
ctx, markBackfillDispatchedQuery,
|
|
bq.BridgeID, bq.PortalKey.ID, bq.PortalKey.Receiver,
|
|
bq.DispatchedAt.UnixNano(), bq.NextDispatchMinTS.UnixNano(),
|
|
)
|
|
}
|
|
|
|
func (btq *BackfillTaskQuery) Update(ctx context.Context, bq *BackfillTask) error {
|
|
ensureBridgeIDMatches(&bq.BridgeID, btq.BridgeID)
|
|
return btq.Exec(ctx, updateBackfillQueueQuery, bq.sqlVariables()...)
|
|
}
|
|
|
|
func (btq *BackfillTaskQuery) GetNext(ctx context.Context) (*BackfillTask, error) {
|
|
return btq.QueryOne(ctx, getNextBackfillQuery, btq.BridgeID, time.Now().UnixNano())
|
|
}
|
|
|
|
func (btq *BackfillTaskQuery) Delete(ctx context.Context, portalKey networkid.PortalKey) error {
|
|
return btq.Exec(ctx, deleteBackfillQueueQuery, btq.BridgeID, portalKey.ID, portalKey.Receiver)
|
|
}
|
|
|
|
func (bt *BackfillTask) Scan(row dbutil.Scannable) (*BackfillTask, error) {
|
|
var cursor, oldestMessageID sql.NullString
|
|
var dispatchedAt, completedAt, nextDispatchMinTS sql.NullInt64
|
|
err := row.Scan(
|
|
&bt.BridgeID, &bt.PortalKey.ID, &bt.PortalKey.Receiver, &bt.UserLoginID, &bt.BatchCount, &bt.IsDone,
|
|
&cursor, &oldestMessageID, &dispatchedAt, &completedAt, &nextDispatchMinTS)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
bt.Cursor = networkid.PaginationCursor(cursor.String)
|
|
bt.OldestMessageID = networkid.MessageID(oldestMessageID.String)
|
|
if dispatchedAt.Valid {
|
|
bt.DispatchedAt = time.Unix(0, dispatchedAt.Int64)
|
|
}
|
|
if completedAt.Valid {
|
|
bt.CompletedAt = time.Unix(0, completedAt.Int64)
|
|
}
|
|
if nextDispatchMinTS.Valid {
|
|
bt.NextDispatchMinTS = time.Unix(0, nextDispatchMinTS.Int64)
|
|
}
|
|
return bt, nil
|
|
}
|
|
|
|
func (bt *BackfillTask) sqlVariables() []any {
|
|
return []any{
|
|
bt.BridgeID, bt.PortalKey.ID, bt.PortalKey.Receiver, bt.UserLoginID, bt.BatchCount, bt.IsDone,
|
|
dbutil.StrPtr(bt.Cursor), dbutil.StrPtr(bt.OldestMessageID),
|
|
dbutil.ConvertedPtr(bt.DispatchedAt, time.Time.UnixNano),
|
|
dbutil.ConvertedPtr(bt.CompletedAt, time.Time.UnixNano),
|
|
bt.NextDispatchMinTS.UnixNano(),
|
|
}
|
|
}
|