mirror of https://github.com/mautrix/go.git
210 lines
5.0 KiB
Go
210 lines
5.0 KiB
Go
// Copyright (c) 2023 Tulir Asokan
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
package appservice
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"runtime/debug"
|
|
"time"
|
|
|
|
"github.com/rs/zerolog"
|
|
|
|
"maunium.net/go/mautrix"
|
|
"maunium.net/go/mautrix/event"
|
|
)
|
|
|
|
type ExecMode uint8
|
|
|
|
const (
|
|
AsyncHandlers ExecMode = iota
|
|
AsyncLoop
|
|
Sync
|
|
)
|
|
|
|
type EventHandler = func(ctx context.Context, evt *event.Event)
|
|
type OTKHandler = func(ctx context.Context, otk *mautrix.OTKCount)
|
|
type DeviceListHandler = func(ctx context.Context, lists *mautrix.DeviceLists, since string)
|
|
|
|
type EventProcessor struct {
|
|
ExecMode ExecMode
|
|
|
|
ExecSyncWarnTime time.Duration
|
|
ExecSyncTimeout time.Duration
|
|
|
|
as *AppService
|
|
stop chan struct{}
|
|
handlers map[event.Type][]EventHandler
|
|
|
|
otkHandlers []OTKHandler
|
|
deviceListHandlers []DeviceListHandler
|
|
}
|
|
|
|
func NewEventProcessor(as *AppService) *EventProcessor {
|
|
return &EventProcessor{
|
|
ExecMode: AsyncHandlers,
|
|
as: as,
|
|
stop: make(chan struct{}, 1),
|
|
handlers: make(map[event.Type][]EventHandler),
|
|
|
|
ExecSyncWarnTime: 30 * time.Second,
|
|
ExecSyncTimeout: 15 * time.Minute,
|
|
|
|
otkHandlers: make([]OTKHandler, 0),
|
|
deviceListHandlers: make([]DeviceListHandler, 0),
|
|
}
|
|
}
|
|
|
|
func (ep *EventProcessor) On(evtType event.Type, handler EventHandler) {
|
|
handlers, ok := ep.handlers[evtType]
|
|
if !ok {
|
|
handlers = []EventHandler{handler}
|
|
} else {
|
|
handlers = append(handlers, handler)
|
|
}
|
|
ep.handlers[evtType] = handlers
|
|
}
|
|
|
|
func (ep *EventProcessor) PrependHandler(evtType event.Type, handler EventHandler) {
|
|
handlers, ok := ep.handlers[evtType]
|
|
if !ok {
|
|
handlers = []EventHandler{handler}
|
|
} else {
|
|
handlers = append([]EventHandler{handler}, handlers...)
|
|
}
|
|
ep.handlers[evtType] = handlers
|
|
}
|
|
|
|
func (ep *EventProcessor) OnOTK(handler OTKHandler) {
|
|
ep.otkHandlers = append(ep.otkHandlers, handler)
|
|
}
|
|
|
|
func (ep *EventProcessor) OnDeviceList(handler DeviceListHandler) {
|
|
ep.deviceListHandlers = append(ep.deviceListHandlers, handler)
|
|
}
|
|
|
|
func (ep *EventProcessor) recoverFunc(data interface{}) {
|
|
if err := recover(); err != nil {
|
|
d, _ := json.Marshal(data)
|
|
ep.as.Log.Error().
|
|
Str(zerolog.ErrorStackFieldName, string(debug.Stack())).
|
|
Interface(zerolog.ErrorFieldName, err).
|
|
Str("event_content", string(d)).
|
|
Msg("Panic in Matrix event handler")
|
|
}
|
|
}
|
|
|
|
func (ep *EventProcessor) callHandler(ctx context.Context, handler EventHandler, evt *event.Event) {
|
|
defer ep.recoverFunc(evt)
|
|
handler(ctx, evt)
|
|
}
|
|
|
|
func (ep *EventProcessor) callOTKHandler(ctx context.Context, handler OTKHandler, otk *mautrix.OTKCount) {
|
|
defer ep.recoverFunc(otk)
|
|
handler(ctx, otk)
|
|
}
|
|
|
|
func (ep *EventProcessor) callDeviceListHandler(ctx context.Context, handler DeviceListHandler, dl *mautrix.DeviceLists) {
|
|
defer ep.recoverFunc(dl)
|
|
handler(ctx, dl, "")
|
|
}
|
|
|
|
func (ep *EventProcessor) DispatchOTK(ctx context.Context, otk *mautrix.OTKCount) {
|
|
for _, handler := range ep.otkHandlers {
|
|
go ep.callOTKHandler(ctx, handler, otk)
|
|
}
|
|
}
|
|
|
|
func (ep *EventProcessor) DispatchDeviceList(ctx context.Context, dl *mautrix.DeviceLists) {
|
|
for _, handler := range ep.deviceListHandlers {
|
|
go ep.callDeviceListHandler(ctx, handler, dl)
|
|
}
|
|
}
|
|
|
|
func (ep *EventProcessor) Dispatch(ctx context.Context, evt *event.Event) {
|
|
handlers, ok := ep.handlers[evt.Type]
|
|
if !ok {
|
|
return
|
|
}
|
|
switch ep.ExecMode {
|
|
case AsyncHandlers:
|
|
for _, handler := range handlers {
|
|
go ep.callHandler(ctx, handler, evt)
|
|
}
|
|
case AsyncLoop:
|
|
go func() {
|
|
for _, handler := range handlers {
|
|
ep.callHandler(ctx, handler, evt)
|
|
}
|
|
}()
|
|
case Sync:
|
|
if ep.ExecSyncWarnTime == 0 && ep.ExecSyncTimeout == 0 {
|
|
for _, handler := range handlers {
|
|
ep.callHandler(ctx, handler, evt)
|
|
}
|
|
return
|
|
}
|
|
doneChan := make(chan struct{})
|
|
go func() {
|
|
for _, handler := range handlers {
|
|
ep.callHandler(ctx, handler, evt)
|
|
}
|
|
close(doneChan)
|
|
}()
|
|
select {
|
|
case <-doneChan:
|
|
return
|
|
case <-time.After(ep.ExecSyncWarnTime):
|
|
log := ep.as.Log.With().
|
|
Str("event_id", evt.ID.String()).
|
|
Str("event_type", evt.Type.String()).
|
|
Logger()
|
|
log.Warn().Msg("Handling event in appservice transaction channel is taking long")
|
|
select {
|
|
case <-doneChan:
|
|
return
|
|
case <-time.After(ep.ExecSyncTimeout):
|
|
log.Error().Msg("Giving up waiting for event handler")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
func (ep *EventProcessor) startEvents(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case evt := <-ep.as.Events:
|
|
ep.Dispatch(ctx, evt)
|
|
case <-ep.stop:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (ep *EventProcessor) startEncryption(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case evt := <-ep.as.ToDeviceEvents:
|
|
ep.Dispatch(ctx, evt)
|
|
case otk := <-ep.as.OTKCounts:
|
|
ep.DispatchOTK(ctx, otk)
|
|
case dl := <-ep.as.DeviceLists:
|
|
ep.DispatchDeviceList(ctx, dl)
|
|
case <-ep.stop:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (ep *EventProcessor) Start(ctx context.Context) {
|
|
go ep.startEvents(ctx)
|
|
go ep.startEncryption(ctx)
|
|
}
|
|
|
|
func (ep *EventProcessor) Stop() {
|
|
close(ep.stop)
|
|
}
|