package main import ( "bufio" "context" "encoding/json" "errors" "flag" "fmt" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "go.mau.fi/util/exzerolog" "io" "maunium.net/go/mautrix" "maunium.net/go/mautrix/event" "maunium.net/go/mautrix/id" "os" "os/signal" "regexp" "sync" "time" ) const StateDataDir = "data" const StateLogDir = "log" const StateFilePath = StateDataDir + "/state.json" const StateFiltersPath = StateDataDir + "/filters.json" func syncFunc(client *mautrix.Client, syncCtx context.Context, syncStopWait *sync.WaitGroup) { log.Info().Msg(fmt.Sprintf("Syncing try")) err := client.SyncWithContext(syncCtx) if err != nil && !errors.Is(err, context.Canceled) { log.Error().Err(err). Msg("Failed to sync") } defer syncStopWait.Done() log.Info().Msg(fmt.Sprintf("Syncing try finished")) } type FileStore struct { filterIds map[id.UserID]string loadedFilterIds bool nextBatchTokens map[id.UserID]string loadedBatchTokens bool } func (f FileStore) SaveFilterID(_ context.Context, userID id.UserID, filterID string) error { f.filterIds[userID] = filterID err := saveValueToFile(StateFiltersPath, f.filterIds) if err == nil { f.loadedFilterIds = true } else { f.loadedFilterIds = false } return err } func saveValueToFile(fileName string, f map[id.UserID]string) error { file, err := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE, 0644) if err != nil { return err } writer := io.Writer(file) encoder := json.NewEncoder(writer) return encoder.Encode(f) } func (f FileStore) LoadFilterID(_ context.Context, userID id.UserID) (string, error) { if !f.loadedFilterIds { _, err := os.Stat(StateFiltersPath) if err == nil { err = loadStateFromFile(StateFiltersPath, &f.filterIds) if err != nil { log.Error(). Err(err). Msg("Got an eror while reading filters") } } f.loadedBatchTokens = true } s := f.filterIds[userID] return s, nil } func (f FileStore) SaveNextBatch(ctx context.Context, userID id.UserID, nextBatchToken string) error { f.nextBatchTokens[userID] = nextBatchToken err := saveValueToFile(StateFilePath, f.nextBatchTokens) if err == nil { f.loadedBatchTokens = true } else { f.loadedBatchTokens = false } return err } func (f FileStore) LoadNextBatch(ctx context.Context, userID id.UserID) (string, error) { if !f.loadedBatchTokens { _, err := os.Stat(StateFilePath) if err == nil { err := loadStateFromFile(StateFilePath, &f.nextBatchTokens) if err != nil { log.Error(). Err(err). Msg("Got an eror while reading state") } } f.loadedBatchTokens = true } s := f.nextBatchTokens[userID] return s, nil } func loadStateFromFile(path string, ref *map[id.UserID]string) error { file, err := os.OpenFile(path, os.O_RDONLY, 0644) if err != nil { return err } decoder := json.NewDecoder(bufio.NewReader(file)) err = decoder.Decode(ref) if err != nil { return err } defer func(f2 *os.File) { _ = f2.Close() }(file) return nil } func main() { ce, cerr := regexp.Compile("[^a-zA-Z0-9 -_]+") if cerr != nil { panic("Error in compiling regexp " + cerr.Error()) } var homeserver = flag.String("homeserver", "", "homeserver") var userId = flag.String("userId", "", "userId") var token = flag.String("token", "", "token") var debug = flag.Bool("debug", false, "debug") flag.Parse() c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) signal.Notify(c, os.Kill) var user = id.NewUserID(*userId, *homeserver) var client, err = mautrix.NewClient(*homeserver, user, *token) if err != nil { panic("Couldn't connect to homeserver " + err.Error()) } log := zerolog.New(zerolog.NewConsoleWriter(func(w *zerolog.ConsoleWriter) { w.Out = os.Stdout w.TimeFormat = time.Stamp })).With().Timestamp().Logger() if !*debug { log = log.Level(zerolog.DebugLevel) } exzerolog.SetupDefaults(&log) client.Log = log // var lastRoomId id.RoomID var lastRoomName string syncer := client.Syncer.(*mautrix.DefaultSyncer) client.Store = FileStore{ loadedFilterIds: false, filterIds: make(map[id.UserID]string), loadedBatchTokens: false, nextBatchTokens: make(map[id.UserID]string)} roomNames := make(map[string]string) filesNames := make(map[string]*os.File) syncer.OnEventType(event.StateRoomName, func(ctx context.Context, evt *event.Event) { roomNames[evt.RoomID.String()] = evt.Content.AsRoomName().Name log.Info(). Str("sender", evt.Sender.String()). Str("type", evt.Type.String()). Str("id", evt.ID.String()). Msg(fmt.Sprintf("Received sync state room name mapping %s -> %s", evt.RoomID.String(), evt.Content.AsRoomName().Name)) }) var lastRoomId id.RoomID _ = os.MkdirAll(StateLogDir, 0644) _ = os.MkdirAll(StateDataDir, 0644) syncer.OnEventType( event.EventMessage, func(ctx context.Context, evt *event.Event) { lastRoomId = evt.RoomID lastRoomName = roomNames[lastRoomId.String()] if lastRoomName == "" { lastRoomName = lastRoomId.String() } var file *os.File file = filesNames[lastRoomName] if file == nil { lastRoomName = string(ce.ReplaceAll([]byte(lastRoomName), []byte("_"))) channelLogPath := StateLogDir + "/" + lastRoomName + ".log" file, err := os.OpenFile(channelLogPath, os.O_RDWR|os.O_CREATE, 0644) if err != nil { panic("Failed to open file " + lastRoomName) } _, err = file.Seek(0, io.SeekEnd) if err != nil { panic("Failed to seek file " + lastRoomName) } filesNames[lastRoomName] = file if err != nil { panic("Failed to create file " + lastRoomName) } } file = filesNames[lastRoomName] timestamp := evt.Timestamp t := time.Unix(timestamp/1000, timestamp%1000*1000) msg := evt.Content.AsMessage() log.Info(). Str("sender", evt.Sender.String()). Str("type", evt.Type.String()). Str("id", evt.ID.String()). Str("body", msg.Body). Msg(fmt.Sprintf("Received sync message on channel %s (%s)", lastRoomName, lastRoomId)) _, err := file.WriteString(t.String() + " " + "[" + evt.Sender.String() + "] " + msg.Body + "\n") if err != nil { panic("Failed to write to the file " + lastRoomName) } err = file.Sync() if err != nil { return } }, ) syncer.OnEventType(event.StateMember, func(ctx context.Context, evt *event.Event) { if evt.GetStateKey() == client.UserID.String() && evt.Content.AsMember().Membership == event.MembershipInvite { _, err := client.JoinRoomByID(ctx, evt.RoomID) if err != nil { log.Error().Err(err). Str("room_id", evt.RoomID.String()). Str("inviter", evt.Sender.String()). Msg("Failed to join " + evt.RoomID.String() + " + room after invite by " + evt.Sender.String()) } else { log.Info(). Str("room_id", evt.RoomID.String()). Str("inviter", evt.Sender.String()). Msg("Succeeded to join " + evt.RoomID.String() + " + room after invite by " + evt.Sender.String()) } } }) log.Info().Msg("Now running") syncCtx, cancelSync := context.WithCancel(context.Background()) var syncStopWait sync.WaitGroup syncStopWait.Add(1) go syncFunc(client, syncCtx, &syncStopWait) for sig := range c { if sig == os.Interrupt { break } } cancelSync() syncStopWait.Wait() }