193 lines
6.5 KiB
Go
193 lines
6.5 KiB
Go
// mautrix-signal - A Matrix-Signal puppeting bridge.
|
|
// Copyright (C) 2025 Tulir Asokan
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
package connector
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"slices"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/rs/zerolog"
|
|
"go.mau.fi/util/ptr"
|
|
"maunium.net/go/mautrix/bridgev2"
|
|
"maunium.net/go/mautrix/bridgev2/networkid"
|
|
|
|
"go.mau.fi/mautrix-signal/pkg/msgconv"
|
|
"go.mau.fi/mautrix-signal/pkg/signalid"
|
|
"go.mau.fi/mautrix-signal/pkg/signalmeow/protobuf/backuppb"
|
|
"go.mau.fi/mautrix-signal/pkg/signalmeow/store"
|
|
)
|
|
|
|
var _ bridgev2.BackfillingNetworkAPI = (*SignalClient)(nil)
|
|
|
|
func tryCastUUID(b []byte) uuid.UUID {
|
|
if len(b) == 16 {
|
|
return uuid.UUID(b)
|
|
}
|
|
return uuid.Nil
|
|
}
|
|
|
|
func (s *SignalClient) FetchMessages(ctx context.Context, params bridgev2.FetchMessagesParams) (*bridgev2.FetchMessagesResponse, error) {
|
|
if !s.IsLoggedIn() {
|
|
return nil, bridgev2.ErrNotLoggedIn
|
|
}
|
|
userID, groupID, err := signalid.ParsePortalID(params.Portal.ID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to parse portal ID: %w", err)
|
|
}
|
|
var chat *store.BackupChat
|
|
if groupID != "" {
|
|
chat, err = s.Client.Store.BackupStore.GetBackupChatByGroupID(ctx, groupID)
|
|
} else {
|
|
chat, err = s.Client.Store.BackupStore.GetBackupChatByUserID(ctx, userID)
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get chat: %w", err)
|
|
} else if chat == nil {
|
|
zerolog.Ctx(ctx).Debug().Msg("Chat not found, returning nil response for backfill")
|
|
return nil, nil
|
|
}
|
|
var anchorTS time.Time
|
|
if params.AnchorMessage != nil {
|
|
anchorTS = params.AnchorMessage.Timestamp
|
|
}
|
|
minTS := anchorTS
|
|
items, err := s.Client.Store.BackupStore.GetBackupChatItems(ctx, chat.Id, anchorTS, params.Forward, params.Count)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get chat items: %w", err)
|
|
}
|
|
if len(items) > 0 {
|
|
minTS = time.UnixMilli(int64(items[0].DateSent))
|
|
}
|
|
// GetBackupChatItems returns in reverse chronological order, so flip the list
|
|
slices.Reverse(items)
|
|
var firstDirectionfulProcessed bool
|
|
var isRead bool
|
|
convertedMessages := make([]*bridgev2.BackfillMessage, 0, len(items))
|
|
attMap := make(msgconv.AttachmentMap)
|
|
recipientMap := make(map[uint64]*backuppb.Recipient)
|
|
getRecipientACI := func(id uint64) (uuid.UUID, error) {
|
|
recipient, ok := recipientMap[id]
|
|
if !ok {
|
|
recipient, err = s.Client.Store.BackupStore.GetBackupRecipient(ctx, id)
|
|
if err != nil {
|
|
return uuid.Nil, fmt.Errorf("failed to get recipient %d: %w", id, err)
|
|
} else if len(recipient.GetContact().GetAci()) != 16 && recipient.GetSelf() == nil {
|
|
zerolog.Ctx(ctx).Warn().
|
|
Uint64("recipient_id", id).
|
|
Type("recipient_type", recipient.GetDestination()).
|
|
Msg("ACI not found for recipient")
|
|
}
|
|
recipientMap[id] = recipient
|
|
}
|
|
|
|
switch dest := recipient.Destination.(type) {
|
|
case *backuppb.Recipient_Self:
|
|
return s.Client.Store.ACI, nil
|
|
case *backuppb.Recipient_Contact:
|
|
if len(dest.Contact.GetAci()) == 16 {
|
|
return uuid.UUID(dest.Contact.GetAci()), nil
|
|
}
|
|
}
|
|
return uuid.Nil, nil
|
|
}
|
|
for _, item := range items {
|
|
var streamOrder int64
|
|
switch dt := item.DirectionalDetails.(type) {
|
|
case *backuppb.ChatItem_Incoming:
|
|
streamOrder = int64(dt.Incoming.GetDateServerSent())
|
|
if !firstDirectionfulProcessed {
|
|
firstDirectionfulProcessed = true
|
|
isRead = dt.Incoming.Read
|
|
}
|
|
case *backuppb.ChatItem_Outgoing:
|
|
// TODO stream order?
|
|
if !firstDirectionfulProcessed {
|
|
firstDirectionfulProcessed = true
|
|
isRead = true
|
|
}
|
|
}
|
|
if len(attMap) > 0 {
|
|
clear(attMap)
|
|
}
|
|
senderACI, err := getRecipientACI(item.AuthorId)
|
|
if err != nil {
|
|
return nil, err
|
|
} else if senderACI == uuid.Nil {
|
|
continue
|
|
}
|
|
dm, reactions := msgconv.BackupToDataMessage(item, attMap)
|
|
if dm == nil {
|
|
continue
|
|
}
|
|
cm := s.Main.MsgConv.ToMatrix(ctx, s.Client, params.Portal, s.Main.Bridge.Bot, dm, attMap)
|
|
convertedReactions := make([]*bridgev2.BackfillReaction, 0, len(reactions))
|
|
for _, reaction := range reactions {
|
|
reactionSenderACI, err := getRecipientACI(reaction.AuthorId)
|
|
if err != nil {
|
|
return nil, err
|
|
} else if reactionSenderACI == uuid.Nil {
|
|
continue
|
|
}
|
|
convertedReactions = append(convertedReactions, &bridgev2.BackfillReaction{
|
|
TargetPart: ptr.Ptr(networkid.PartID("")),
|
|
Timestamp: time.UnixMilli(int64(reaction.SentTimestamp)),
|
|
Sender: s.makeEventSender(reactionSenderACI),
|
|
Emoji: reaction.GetEmoji(),
|
|
})
|
|
}
|
|
msgID := signalid.MakeMessageID(senderACI, item.DateSent)
|
|
convertedMessages = append(convertedMessages, &bridgev2.BackfillMessage{
|
|
ConvertedMessage: cm,
|
|
Sender: s.makeEventSender(senderACI),
|
|
ID: msgID,
|
|
TxnID: networkid.TransactionID(msgID),
|
|
Timestamp: time.UnixMilli(int64(item.DateSent)),
|
|
StreamOrder: streamOrder,
|
|
Reactions: convertedReactions,
|
|
})
|
|
}
|
|
return &bridgev2.FetchMessagesResponse{
|
|
Messages: convertedMessages,
|
|
HasMore: len(items) >= params.Count,
|
|
Forward: params.Forward,
|
|
MarkRead: isRead,
|
|
ApproxTotalCount: chat.TotalMessages,
|
|
CompleteCallback: func() {
|
|
// When reaching the last backwards backfill batch, delete the chat from the backup store.
|
|
// If backwards backfilling isn't enabled, delete immediately after the first backfill request.
|
|
if (!params.Forward && len(items) < params.Count) || !s.Main.Bridge.Config.Backfill.Queue.Enabled {
|
|
err := s.Client.Store.BackupStore.DeleteBackupChat(ctx, chat.Id)
|
|
if err != nil {
|
|
zerolog.Ctx(ctx).Err(err).Msg("Failed to delete chat from backup store")
|
|
} else {
|
|
zerolog.Ctx(ctx).Debug().Msg("Deleted chat from backup store as backfill seems finished")
|
|
}
|
|
} else {
|
|
err := s.Client.Store.BackupStore.DeleteBackupChatItems(ctx, chat.Id, minTS)
|
|
if err != nil {
|
|
zerolog.Ctx(ctx).Err(err).Time("min_ts", minTS).Msg("Failed to delete messages from backup store")
|
|
} else {
|
|
zerolog.Ctx(ctx).Debug().Time("min_ts", minTS).Msg("Deleted messages from backup store")
|
|
}
|
|
}
|
|
},
|
|
}, nil
|
|
}
|