mirror of https://github.com/mautrix/go.git
147 lines
3.3 KiB
Go
147 lines
3.3 KiB
Go
// Copyright (c) 2024 Tulir Asokan
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
package bridgev2
|
|
|
|
import (
|
|
"context"
|
|
"runtime/debug"
|
|
"time"
|
|
|
|
"github.com/rs/zerolog"
|
|
|
|
"maunium.net/go/mautrix/bridge/status"
|
|
)
|
|
|
|
type BridgeStateQueue struct {
|
|
prevUnsent *status.BridgeState
|
|
prevSent *status.BridgeState
|
|
ch chan status.BridgeState
|
|
bridge *Bridge
|
|
user status.StandaloneCustomBridgeStateFiller
|
|
}
|
|
|
|
func (br *Bridge) SendGlobalBridgeState(state status.BridgeState) {
|
|
state = state.Fill(nil)
|
|
for {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
if err := br.Matrix.SendBridgeStatus(ctx, &state); err != nil {
|
|
br.Log.Warn().Err(err).Msg("Failed to update global bridge state")
|
|
cancel()
|
|
time.Sleep(5 * time.Second)
|
|
continue
|
|
} else {
|
|
br.Log.Debug().Any("bridge_state", state).Msg("Sent new global bridge state")
|
|
cancel()
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (br *Bridge) NewBridgeStateQueue(user status.StandaloneCustomBridgeStateFiller) *BridgeStateQueue {
|
|
bsq := &BridgeStateQueue{
|
|
ch: make(chan status.BridgeState, 10),
|
|
bridge: br,
|
|
user: user,
|
|
}
|
|
go bsq.loop()
|
|
return bsq
|
|
}
|
|
|
|
func (bsq *BridgeStateQueue) Destroy() {
|
|
close(bsq.ch)
|
|
}
|
|
|
|
func (bsq *BridgeStateQueue) loop() {
|
|
defer func() {
|
|
err := recover()
|
|
if err != nil {
|
|
bsq.bridge.Log.Error().
|
|
Bytes(zerolog.ErrorStackFieldName, debug.Stack()).
|
|
Any(zerolog.ErrorFieldName, err).
|
|
Msg("Panic in bridge state loop")
|
|
}
|
|
}()
|
|
for state := range bsq.ch {
|
|
bsq.immediateSendBridgeState(state)
|
|
}
|
|
}
|
|
|
|
func (bsq *BridgeStateQueue) immediateSendBridgeState(state status.BridgeState) {
|
|
retryIn := 2
|
|
for {
|
|
if bsq.prevSent != nil && bsq.prevSent.ShouldDeduplicate(&state) {
|
|
bsq.bridge.Log.Debug().
|
|
Str("state_event", string(state.StateEvent)).
|
|
Msg("Not sending bridge state as it's a duplicate")
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
err := bsq.bridge.Matrix.SendBridgeStatus(ctx, &state)
|
|
cancel()
|
|
|
|
if err != nil {
|
|
bsq.bridge.Log.Warn().Err(err).
|
|
Int("retry_in_seconds", retryIn).
|
|
Msg("Failed to update bridge state")
|
|
time.Sleep(time.Duration(retryIn) * time.Second)
|
|
retryIn *= 2
|
|
if retryIn > 64 {
|
|
retryIn = 64
|
|
}
|
|
} else {
|
|
bsq.prevSent = &state
|
|
bsq.bridge.Log.Debug().
|
|
Any("bridge_state", state).
|
|
Msg("Sent new bridge state")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (bsq *BridgeStateQueue) Send(state status.BridgeState) {
|
|
if bsq == nil {
|
|
return
|
|
}
|
|
|
|
state = state.Fill(bsq.user)
|
|
bsq.prevUnsent = &state
|
|
|
|
if len(bsq.ch) >= 8 {
|
|
bsq.bridge.Log.Warn().Msg("Bridge state queue is nearly full, discarding an item")
|
|
select {
|
|
case <-bsq.ch:
|
|
default:
|
|
}
|
|
}
|
|
select {
|
|
case bsq.ch <- state:
|
|
default:
|
|
bsq.bridge.Log.Error().Msg("Bridge state queue is full, dropped new state")
|
|
}
|
|
}
|
|
|
|
func (bsq *BridgeStateQueue) GetPrev() status.BridgeState {
|
|
if bsq != nil && bsq.prevSent != nil {
|
|
return *bsq.prevSent
|
|
}
|
|
return status.BridgeState{}
|
|
}
|
|
|
|
func (bsq *BridgeStateQueue) GetPrevUnsent() status.BridgeState {
|
|
if bsq != nil && bsq.prevSent != nil {
|
|
return *bsq.prevUnsent
|
|
}
|
|
return status.BridgeState{}
|
|
}
|
|
|
|
func (bsq *BridgeStateQueue) SetPrev(prev status.BridgeState) {
|
|
if bsq != nil {
|
|
bsq.prevSent = &prev
|
|
}
|
|
}
|