Initial version of matrix logging bot.
This commit is contained in:
parent
b9016d8db5
commit
fd49577ef8
4 changed files with 351 additions and 0 deletions
281
2024/10/matrix_simpl_logging_bot/main.go
Normal file
281
2024/10/matrix_simpl_logging_bot/main.go
Normal file
|
@ -0,0 +1,281 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
"go.mau.fi/util/exzerolog"
|
||||
"io"
|
||||
"maunium.net/go/mautrix"
|
||||
"maunium.net/go/mautrix/event"
|
||||
"maunium.net/go/mautrix/id"
|
||||
"os"
|
||||
"os/signal"
|
||||
"regexp"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const StateDataDir = "data"
|
||||
const StateLogDir = "log"
|
||||
const StateFilePath = StateDataDir + "/state.json"
|
||||
const StateFiltersPath = StateDataDir + "/filters.json"
|
||||
|
||||
func syncFunc(client *mautrix.Client, syncCtx context.Context, syncStopWait *sync.WaitGroup) {
|
||||
log.Info().Msg(fmt.Sprintf("Syncing try"))
|
||||
err := client.SyncWithContext(syncCtx)
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
log.Error().Err(err).
|
||||
Msg("Failed to sync")
|
||||
}
|
||||
defer syncStopWait.Done()
|
||||
log.Info().Msg(fmt.Sprintf("Syncing try finished"))
|
||||
}
|
||||
|
||||
type FileStore struct {
|
||||
filterIds map[id.UserID]string
|
||||
loadedFilterIds bool
|
||||
nextBatchTokens map[id.UserID]string
|
||||
loadedBatchTokens bool
|
||||
}
|
||||
|
||||
func (f FileStore) SaveFilterID(_ context.Context, userID id.UserID, filterID string) error {
|
||||
f.filterIds[userID] = filterID
|
||||
err := saveValueToFile(StateFiltersPath, f.filterIds)
|
||||
if err == nil {
|
||||
f.loadedFilterIds = true
|
||||
} else {
|
||||
f.loadedFilterIds = false
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func saveValueToFile(fileName string, f map[id.UserID]string) error {
|
||||
file, err := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE, 0644)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
writer := io.Writer(file)
|
||||
encoder := json.NewEncoder(writer)
|
||||
return encoder.Encode(f)
|
||||
}
|
||||
|
||||
func (f FileStore) LoadFilterID(_ context.Context, userID id.UserID) (string, error) {
|
||||
if !f.loadedFilterIds {
|
||||
_, err := os.Stat(StateFiltersPath)
|
||||
if err == nil {
|
||||
err = loadStateFromFile(StateFiltersPath, &f.filterIds)
|
||||
if err != nil {
|
||||
log.Error().
|
||||
Err(err).
|
||||
Msg("Got an eror while reading filters")
|
||||
}
|
||||
}
|
||||
f.loadedBatchTokens = true
|
||||
}
|
||||
|
||||
s := f.filterIds[userID]
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (f FileStore) SaveNextBatch(ctx context.Context, userID id.UserID, nextBatchToken string) error {
|
||||
f.nextBatchTokens[userID] = nextBatchToken
|
||||
err := saveValueToFile(StateFilePath, f.nextBatchTokens)
|
||||
if err == nil {
|
||||
f.loadedBatchTokens = true
|
||||
} else {
|
||||
f.loadedBatchTokens = false
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (f FileStore) LoadNextBatch(ctx context.Context, userID id.UserID) (string, error) {
|
||||
if !f.loadedBatchTokens {
|
||||
_, err := os.Stat(StateFilePath)
|
||||
if err == nil {
|
||||
err := loadStateFromFile(StateFilePath, &f.nextBatchTokens)
|
||||
if err != nil {
|
||||
log.Error().
|
||||
Err(err).
|
||||
Msg("Got an eror while reading state")
|
||||
}
|
||||
}
|
||||
f.loadedBatchTokens = true
|
||||
}
|
||||
|
||||
s := f.nextBatchTokens[userID]
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func loadStateFromFile(path string, ref *map[id.UserID]string) error {
|
||||
file, err := os.OpenFile(path, os.O_RDONLY, 0644)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
decoder := json.NewDecoder(bufio.NewReader(file))
|
||||
err = decoder.Decode(ref)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func(f2 *os.File) {
|
||||
_ = f2.Close()
|
||||
}(file)
|
||||
return nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
ce, cerr := regexp.Compile("[^a-zA-Z0-9 -_]+")
|
||||
if cerr != nil {
|
||||
panic("Error in compiling regexp " + cerr.Error())
|
||||
}
|
||||
|
||||
var homeserver = flag.String("homeserver", "", "homeserver")
|
||||
var userId = flag.String("userId", "", "userId")
|
||||
var token = flag.String("token", "", "token")
|
||||
var debug = flag.Bool("debug", false, "debug")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
c := make(chan os.Signal, 1)
|
||||
signal.Notify(c, os.Interrupt)
|
||||
signal.Notify(c, os.Kill)
|
||||
|
||||
var user = id.NewUserID(*userId, *homeserver)
|
||||
var client, err = mautrix.NewClient(*homeserver, user, *token)
|
||||
if err != nil {
|
||||
panic("Couldn't connect to homeserver " + err.Error())
|
||||
}
|
||||
|
||||
log := zerolog.New(zerolog.NewConsoleWriter(func(w *zerolog.ConsoleWriter) {
|
||||
w.Out = os.Stdout
|
||||
w.TimeFormat = time.Stamp
|
||||
})).With().Timestamp().Logger()
|
||||
if !*debug {
|
||||
log = log.Level(zerolog.DebugLevel)
|
||||
}
|
||||
exzerolog.SetupDefaults(&log)
|
||||
client.Log = log
|
||||
|
||||
// var lastRoomId id.RoomID
|
||||
var lastRoomName string
|
||||
|
||||
syncer := client.Syncer.(*mautrix.DefaultSyncer)
|
||||
client.Store = FileStore{
|
||||
loadedFilterIds: false,
|
||||
filterIds: make(map[id.UserID]string),
|
||||
loadedBatchTokens: false,
|
||||
nextBatchTokens: make(map[id.UserID]string)}
|
||||
|
||||
roomNames := make(map[string]string)
|
||||
filesNames := make(map[string]*os.File)
|
||||
syncer.OnEventType(event.StateRoomName, func(ctx context.Context, evt *event.Event) {
|
||||
roomNames[evt.RoomID.String()] = evt.Content.AsRoomName().Name
|
||||
log.Info().
|
||||
Str("sender", evt.Sender.String()).
|
||||
Str("type", evt.Type.String()).
|
||||
Str("id", evt.ID.String()).
|
||||
Msg(fmt.Sprintf("Received sync state room name mapping %s -> %s",
|
||||
evt.RoomID.String(),
|
||||
evt.Content.AsRoomName().Name))
|
||||
})
|
||||
|
||||
var lastRoomId id.RoomID
|
||||
_ = os.MkdirAll(StateLogDir, 0644)
|
||||
_ = os.MkdirAll(StateDataDir, 0644)
|
||||
syncer.OnEventType(
|
||||
event.EventMessage,
|
||||
func(ctx context.Context, evt *event.Event) {
|
||||
lastRoomId = evt.RoomID
|
||||
lastRoomName = roomNames[lastRoomId.String()]
|
||||
if lastRoomName == "" {
|
||||
lastRoomName = lastRoomId.String()
|
||||
}
|
||||
|
||||
var file *os.File
|
||||
file = filesNames[lastRoomName]
|
||||
if file == nil {
|
||||
lastRoomName = string(ce.ReplaceAll([]byte(lastRoomName), []byte("_")))
|
||||
channelLogPath := StateLogDir + "/" + lastRoomName + ".log"
|
||||
file, err := os.OpenFile(channelLogPath, os.O_RDWR|os.O_CREATE, 0644)
|
||||
if err != nil {
|
||||
panic("Failed to open file " + lastRoomName)
|
||||
}
|
||||
|
||||
_, err = file.Seek(0, io.SeekEnd)
|
||||
|
||||
if err != nil {
|
||||
panic("Failed to seek file " + lastRoomName)
|
||||
}
|
||||
filesNames[lastRoomName] = file
|
||||
if err != nil {
|
||||
panic("Failed to create file " + lastRoomName)
|
||||
}
|
||||
}
|
||||
|
||||
file = filesNames[lastRoomName]
|
||||
|
||||
timestamp := evt.Timestamp
|
||||
t := time.Unix(timestamp/1000, timestamp%1000*1000)
|
||||
msg := evt.Content.AsMessage()
|
||||
log.Info().
|
||||
Str("sender", evt.Sender.String()).
|
||||
Str("type", evt.Type.String()).
|
||||
Str("id", evt.ID.String()).
|
||||
Str("body", msg.Body).
|
||||
Msg(fmt.Sprintf("Received sync message on channel %s (%s)",
|
||||
lastRoomName, lastRoomId))
|
||||
|
||||
_, err := file.WriteString(t.String() + " " + "[" + evt.Sender.String() + "] " + msg.Body + "\n")
|
||||
if err != nil {
|
||||
panic("Failed to write to the file " + lastRoomName)
|
||||
}
|
||||
|
||||
err = file.Sync()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
syncer.OnEventType(event.StateMember, func(ctx context.Context, evt *event.Event) {
|
||||
if evt.GetStateKey() == client.UserID.String() && evt.Content.AsMember().Membership == event.MembershipInvite {
|
||||
_, err := client.JoinRoomByID(ctx, evt.RoomID)
|
||||
if err != nil {
|
||||
log.Error().Err(err).
|
||||
Str("room_id", evt.RoomID.String()).
|
||||
Str("inviter", evt.Sender.String()).
|
||||
Msg("Failed to join " + evt.RoomID.String() +
|
||||
" + room after invite by " + evt.Sender.String())
|
||||
} else {
|
||||
log.Info().
|
||||
Str("room_id", evt.RoomID.String()).
|
||||
Str("inviter", evt.Sender.String()).
|
||||
Msg("Succeeded to join " + evt.RoomID.String() +
|
||||
" + room after invite by " + evt.Sender.String())
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
log.Info().Msg("Now running")
|
||||
syncCtx, cancelSync := context.WithCancel(context.Background())
|
||||
var syncStopWait sync.WaitGroup
|
||||
syncStopWait.Add(1)
|
||||
|
||||
go syncFunc(client, syncCtx, &syncStopWait)
|
||||
|
||||
for sig := range c {
|
||||
if sig == os.Interrupt {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
cancelSync()
|
||||
syncStopWait.Wait()
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue