diff --git a/app.go b/app.go index fc7b9f5..bd411b5 100644 --- a/app.go +++ b/app.go @@ -4,25 +4,29 @@ import ( "context" "strconv" - "github.com/strimertul/strimertul/modules" - "github.com/strimertul/strimertul/modules/database" - "github.com/strimertul/strimertul/modules/http" - "github.com/strimertul/strimertul/modules/twitch" + "github.com/strimertul/strimertul/twitch" + + "github.com/strimertul/strimertul/loyalty" "git.sr.ht/~hamcha/containers" "github.com/nicklaw5/helix/v2" + "github.com/strimertul/strimertul/database" + "github.com/strimertul/strimertul/http" "github.com/urfave/cli/v2" "github.com/wailsapp/wails/v2/pkg/runtime" - "go.uber.org/zap" ) // App struct type App struct { ctx context.Context cliParams *cli.Context - driver DatabaseDriver - manager *modules.Manager + driver database.DatabaseDriver ready *containers.RWSync[bool] + + db *database.LocalDBClient + twitchClient *twitch.Client + httpServer *http.Server + loyaltyManager *loyalty.Manager } // NewApp creates a new App application struct @@ -37,16 +41,13 @@ func NewApp(cliParams *cli.Context) *App { func (a *App) startup(ctx context.Context) { a.ctx = ctx - // Create module manager - a.manager = modules.NewManager(logger) - // Make KV hub var err error - a.driver, err = getDatabaseDriver(a.cliParams) + a.driver, err = database.GetDatabaseDriver(a.cliParams) failOnError(err, "error opening database") // Start database backup task - backupOpts := BackupOptions{ + backupOpts := database.BackupOptions{ BackupDir: a.cliParams.String("backup-dir"), BackupInterval: a.cliParams.Int("backup-interval"), MaxBackups: a.cliParams.Int("max-backups"), @@ -58,28 +59,22 @@ func (a *App) startup(ctx context.Context) { hub := a.driver.Hub() go hub.Run() - db, err := database.NewDBModule(hub, a.manager) + a.db, err = database.NewLocalClient(hub, logger) failOnError(err, "failed to initialize database module") // Set meta keys - _ = db.PutKey("stul-meta/version", appVersion) - - for module, constructor := range moduleList { - err := constructor(a.manager) - if err != nil { - logger.Error("could not register module", zap.String("module", string(module)), zap.Error(err)) - } else { - } - } + _ = a.db.PutKey("stul-meta/version", appVersion) // Create logger and endpoints - httpServer, err := http.NewServer(a.manager) + a.httpServer, err = http.NewServer(a.db, logger) failOnError(err, "could not initialize http server") - defer func() { - if err := httpServer.Close(); err != nil { - logger.Error("could not close DB", zap.Error(err)) - } - }() + + // Create twitch client + a.twitchClient, err = twitch.NewClient(a.db, a.httpServer, logger) + failOnError(err, "could not initialize twitch client") + + // Initialize loyalty system + a.loyaltyManager, err = loyalty.NewManager(a.db, a.twitchClient, logger) a.ready.Set(true) runtime.EventsEmit(ctx, "ready", true) @@ -93,15 +88,20 @@ func (a *App) startup(ctx context.Context) { }() // Run HTTP server - failOnError(httpServer.Listen(), "HTTP server stopped") + failOnError(a.httpServer.Listen(), "HTTP server stopped") } func (a *App) stop(context.Context) { - for module := range a.manager.Modules { - if err := a.manager.Modules[module].Close(); err != nil { - logger.Error("could not close module", zap.String("module", string(module)), zap.Error(err)) - } + if a.loyaltyManager != nil { + a.loyaltyManager.Close() } + if a.twitchClient != nil { + a.twitchClient.Close() + } + if a.httpServer != nil { + a.httpServer.Close() + } + a.db.Close() failOnError(a.driver.Close(), "could not close driver") } @@ -115,28 +115,22 @@ func (a *App) AuthenticateKVClient(id string) { } func (a *App) IsServerReady() bool { - if !a.ready.Get() { - return false - } - return a.manager.Modules[modules.ModuleHTTP].Status().Working + return a.ready.Get() } func (a *App) GetKilovoltBind() string { - if a.manager == nil { + if a.httpServer == nil { return "" } - if httpModule, ok := a.manager.Modules[modules.ModuleHTTP]; ok { - return httpModule.Status().Data.(http.StatusData).Bind - } - return "" + return a.httpServer.Config.Bind } func (a *App) GetTwitchAuthURL() string { - return a.manager.Modules[modules.ModuleTwitch].(*twitch.Client).GetAuthorizationURL() + return a.twitchClient.GetAuthorizationURL() } func (a *App) GetTwitchLoggedUser() (helix.User, error) { - return a.manager.Modules[modules.ModuleTwitch].(*twitch.Client).GetLoggedUser() + return a.twitchClient.GetLoggedUser() } func (a *App) GetLastLogs() []LogEntry { diff --git a/driver.interface.go b/backup.go similarity index 58% rename from driver.interface.go rename to backup.go index a939e92..2217fea 100644 --- a/driver.interface.go +++ b/backup.go @@ -2,29 +2,16 @@ package main import ( "fmt" - "io" "os" - "path/filepath" "sort" "time" - kv "github.com/strimertul/kilovolt/v9" + "github.com/strimertul/strimertul/database" "github.com/strimertul/strimertul/utils" - "github.com/urfave/cli/v2" "go.uber.org/zap" ) -// DatabaseDriver is a driver wrapping a supported database -type DatabaseDriver interface { - Hub() *kv.Hub - Close() error - Import(map[string]string) error - Export(io.Writer) error - Restore(io.Reader) error - Backup(io.Writer) error -} - -func BackupTask(driver DatabaseDriver, options BackupOptions) { +func BackupTask(driver database.DatabaseDriver, options database.BackupOptions) { if options.BackupDir == "" { logger.Warn("backup directory not set, database backups are disabled") return @@ -74,38 +61,3 @@ func BackupTask(driver DatabaseDriver, options BackupOptions) { } } } - -type BackupOptions struct { - BackupDir string - BackupInterval int - MaxBackups int -} - -func getDatabaseDriverName(ctx *cli.Context) string { - driver := ctx.String("driver") - if driver != "auto" { - return driver - } - - dbdir := ctx.String("database-dir") - file, err := os.ReadFile(filepath.Join(dbdir, "stul-driver")) - if err != nil { - // No driver file found (or file corrupted), use default driver - return databaseDefaultDriver - } - return string(file) -} - -func getDatabaseDriver(ctx *cli.Context) (DatabaseDriver, error) { - name := getDatabaseDriverName(ctx) - dbdir := ctx.String("database-dir") - - switch name { - case "badger": - return nil, cli.Exit("Badger is not supported anymore as a database driver", 64) - case "pebble": - return NewPebble(dbdir) - default: - return nil, cli.Exit(fmt.Sprintf("Unknown database driver: %s", name), 64) - } -} diff --git a/cli.database.go b/cli.database.go index 09f77b7..264a7d5 100644 --- a/cli.database.go +++ b/cli.database.go @@ -3,6 +3,8 @@ package main import ( "os" + "github.com/strimertul/strimertul/database" + "github.com/urfave/cli/v2" ) @@ -23,7 +25,7 @@ func cliImport(ctx *cli.Context) error { return fatalError(err, "could not decode import file") } - driver, err := getDatabaseDriver(ctx) + driver, err := database.GetDatabaseDriver(ctx) if err != nil { return fatalError(err, "could not open database") } @@ -49,7 +51,7 @@ func cliRestore(ctx *cli.Context) error { inStream = file } - driver, err := getDatabaseDriver(ctx) + driver, err := database.GetDatabaseDriver(ctx) if err != nil { return fatalError(err, "could not open database") } @@ -75,7 +77,7 @@ func cliExport(ctx *cli.Context) error { outStream = file } - driver, err := getDatabaseDriver(ctx) + driver, err := database.GetDatabaseDriver(ctx) if err != nil { return fatalError(err, "could not open database") } diff --git a/modules/database/database.go b/database/database.go similarity index 71% rename from modules/database/database.go rename to database/database.go index 82c2542..73e4504 100644 --- a/modules/database/database.go +++ b/database/database.go @@ -4,8 +4,6 @@ import ( "errors" "fmt" - "github.com/strimertul/strimertul/modules" - jsoniter "github.com/json-iterator/go" kv "github.com/strimertul/kilovolt/v9" "go.uber.org/zap" @@ -21,7 +19,7 @@ var ( ErrEmptyKey = errors.New("empty key") ) -type DBModule struct { +type LocalDBClient struct { client *kv.LocalClient hub *kv.Hub logger *zap.Logger @@ -32,44 +30,38 @@ type KvPair struct { Data string } -func NewDBModule(hub *kv.Hub, manager *modules.Manager) (*DBModule, error) { - logger := manager.Logger(modules.ModuleDB) +func NewLocalClient(hub *kv.Hub, logger *zap.Logger) (*LocalDBClient, error) { + // Create local client localClient := kv.NewLocalClient(kv.ClientOptions{}, logger) + + // Run client and add it to the hub go localClient.Run() hub.AddClient(localClient) localClient.Wait() + + // Bypass authentication err := hub.SetAuthenticated(localClient.UID(), true) if err != nil { return nil, err } - module := &DBModule{ + + return &LocalDBClient{ client: localClient, hub: hub, logger: logger, - } - - manager.Modules[modules.ModuleDB] = module - return module, nil + }, nil } -func (mod *DBModule) Hub() *kv.Hub { +func (mod *LocalDBClient) Hub() *kv.Hub { return mod.hub } -func (mod *DBModule) Status() modules.ModuleStatus { - return modules.ModuleStatus{ - Enabled: mod.hub != nil, - Working: mod.client != nil, - StatusString: "ok", - } -} - -func (mod *DBModule) Close() error { +func (mod *LocalDBClient) Close() error { mod.hub.RemoveClient(mod.client) return nil } -func (mod *DBModule) GetKey(key string) (string, error) { +func (mod *LocalDBClient) GetKey(key string) (string, error) { res, err := mod.makeRequest(kv.CmdReadKey, map[string]interface{}{"key": key}) if err != nil { return "", err @@ -77,12 +69,12 @@ func (mod *DBModule) GetKey(key string) (string, error) { return res.Data.(string), nil } -func (mod *DBModule) PutKey(key string, data string) error { +func (mod *LocalDBClient) PutKey(key string, data string) error { _, err := mod.makeRequest(kv.CmdWriteKey, map[string]interface{}{"key": key, "data": data}) return err } -func (mod *DBModule) SubscribePrefix(fn kv.SubscriptionCallback, prefixes ...string) error { +func (mod *LocalDBClient) SubscribePrefix(fn kv.SubscriptionCallback, prefixes ...string) error { for _, prefix := range prefixes { _, err := mod.makeRequest(kv.CmdSubscribePrefix, map[string]interface{}{"prefix": prefix}) if err != nil { @@ -93,7 +85,7 @@ func (mod *DBModule) SubscribePrefix(fn kv.SubscriptionCallback, prefixes ...str return nil } -func (mod *DBModule) SubscribeKey(key string, fn func(string)) error { +func (mod *LocalDBClient) SubscribeKey(key string, fn func(string)) error { _, err := mod.makeRequest(kv.CmdSubscribePrefix, map[string]interface{}{"prefix": key}) if err != nil { return err @@ -107,7 +99,7 @@ func (mod *DBModule) SubscribeKey(key string, fn func(string)) error { return nil } -func (mod *DBModule) GetJSON(key string, dst interface{}) error { +func (mod *LocalDBClient) GetJSON(key string, dst interface{}) error { res, err := mod.GetKey(key) if err != nil { return err @@ -118,7 +110,7 @@ func (mod *DBModule) GetJSON(key string, dst interface{}) error { return json.Unmarshal([]byte(res), dst) } -func (mod *DBModule) GetAll(prefix string) (map[string]string, error) { +func (mod *LocalDBClient) GetAll(prefix string) (map[string]string, error) { res, err := mod.makeRequest(kv.CmdReadPrefix, map[string]interface{}{"prefix": prefix}) if err != nil { return nil, err @@ -131,7 +123,7 @@ func (mod *DBModule) GetAll(prefix string) (map[string]string, error) { return out, nil } -func (mod *DBModule) PutJSON(key string, data interface{}) error { +func (mod *LocalDBClient) PutJSON(key string, data interface{}) error { byt, err := json.Marshal(data) if err != nil { return err @@ -140,7 +132,7 @@ func (mod *DBModule) PutJSON(key string, data interface{}) error { return mod.PutKey(key, string(byt)) } -func (mod *DBModule) PutJSONBulk(kvs map[string]interface{}) error { +func (mod *LocalDBClient) PutJSONBulk(kvs map[string]interface{}) error { encoded := make(map[string]interface{}) for k, v := range kvs { byt, err := json.Marshal(v) @@ -154,12 +146,12 @@ func (mod *DBModule) PutJSONBulk(kvs map[string]interface{}) error { return err } -func (mod *DBModule) RemoveKey(key string) error { +func (mod *LocalDBClient) RemoveKey(key string) error { // TODO return mod.PutKey(key, "") } -func (mod *DBModule) makeRequest(cmd string, data map[string]interface{}) (kv.Response, error) { +func (mod *LocalDBClient) makeRequest(cmd string, data map[string]interface{}) (kv.Response, error) { req, chn := mod.client.MakeRequest(cmd, data) mod.hub.SendMessage(req) return getResponse(<-chn) diff --git a/database/driver.interface.go b/database/driver.interface.go new file mode 100644 index 0000000..9e6e2f2 --- /dev/null +++ b/database/driver.interface.go @@ -0,0 +1,61 @@ +package database + +import ( + "fmt" + "io" + "os" + "path/filepath" + + "go.uber.org/zap" + + kv "github.com/strimertul/kilovolt/v9" + "github.com/urfave/cli/v2" +) + +// DatabaseDriver is a driver wrapping a supported database +type DatabaseDriver interface { + Hub() *kv.Hub + Close() error + Import(map[string]string) error + Export(io.Writer) error + Restore(io.Reader) error + Backup(io.Writer) error +} + +type BackupOptions struct { + BackupDir string + BackupInterval int + MaxBackups int +} + +const databaseDefaultDriver = "pebble" + +func getDatabaseDriverName(ctx *cli.Context) string { + driver := ctx.String("driver") + if driver != "auto" { + return driver + } + + dbDirectory := ctx.String("database-dir") + file, err := os.ReadFile(filepath.Join(dbDirectory, "stul-driver")) + if err != nil { + // No driver file found (or file corrupted), use default driver + return databaseDefaultDriver + } + return string(file) +} + +func GetDatabaseDriver(ctx *cli.Context) (DatabaseDriver, error) { + name := getDatabaseDriverName(ctx) + dbDirectory := ctx.String("database-dir") + logger := ctx.Context.Value("logger").(*zap.Logger) + + switch name { + case "badger": + return nil, cli.Exit("Badger is not supported anymore as a database driver", 64) + case "pebble": + return NewPebble(dbDirectory, logger) + default: + return nil, cli.Exit(fmt.Sprintf("Unknown database driver: %s", name), 64) + } +} diff --git a/driver.pebble.go b/database/driver.pebble.go similarity index 89% rename from driver.pebble.go rename to database/driver.pebble.go index 387ac6b..06b9da1 100644 --- a/driver.pebble.go +++ b/database/driver.pebble.go @@ -1,24 +1,25 @@ -package main +package database import ( "fmt" + "go.uber.org/zap" "io" "os" "path/filepath" "github.com/cockroachdb/pebble" - "github.com/labstack/gommon/log" kv "github.com/strimertul/kilovolt/v9" pebble_driver "github.com/strimertul/kv-pebble" ) type PebbleDatabase struct { - db *pebble.DB - hub *kv.Hub + db *pebble.DB + hub *kv.Hub + logger *zap.Logger } // NewPebble creates a new database driver instance with an underlying Pebble database -func NewPebble(directory string) (*PebbleDatabase, error) { +func NewPebble(directory string, logger *zap.Logger) (*PebbleDatabase, error) { db, err := pebble.Open(directory, &pebble.Options{}) if err != nil { return nil, fmt.Errorf("could not open DB: %w", err) @@ -31,8 +32,9 @@ func NewPebble(directory string) (*PebbleDatabase, error) { } p := &PebbleDatabase{ - db: db, - hub: nil, + db: db, + hub: nil, + logger: logger, } return p, nil @@ -40,14 +42,13 @@ func NewPebble(directory string) (*PebbleDatabase, error) { func (p *PebbleDatabase) Hub() *kv.Hub { if p.hub == nil { - p.hub, _ = kv.NewHub(pebble_driver.NewPebbleBackend(p.db, true), kv.HubOptions{}, logger) + p.hub, _ = kv.NewHub(pebble_driver.NewPebbleBackend(p.db, true), kv.HubOptions{}, p.logger) } return p.hub } func (p *PebbleDatabase) Close() error { err := p.db.Close() - log.Info("database was closed") if err != nil { return fmt.Errorf("Could not close database: %w", err) } diff --git a/go.mod b/go.mod index 22aeb12..d1bd13b 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/strimertul/strimertul go 1.19 require ( - git.sr.ht/~hamcha/containers v0.0.3 + git.sr.ht/~hamcha/containers v0.2.0 github.com/Masterminds/sprig/v3 v3.2.2 github.com/apenwarr/fixconsole v0.0.0-20191012055117-5a9f6489cc29 github.com/cockroachdb/pebble v0.0.0-20221116223310-87eccabb90a3 @@ -11,7 +11,6 @@ require ( github.com/gorilla/websocket v1.5.0 github.com/hashicorp/golang-lru v0.5.1 github.com/json-iterator/go v1.1.12 - github.com/labstack/gommon v0.3.1 github.com/nicklaw5/helix/v2 v2.11.0 github.com/strimertul/kilovolt/v9 v9.0.1 github.com/strimertul/kv-pebble v1.2.0 @@ -49,6 +48,7 @@ require ( github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/labstack/echo/v4 v4.9.0 // indirect + github.com/labstack/gommon v0.3.1 // indirect github.com/leaanthony/go-ansi-parser v1.0.1 // indirect github.com/leaanthony/gosod v1.0.3 // indirect github.com/leaanthony/slicer v1.5.0 // indirect diff --git a/go.sum b/go.sum index ecf02ec..d47ec98 100644 --- a/go.sum +++ b/go.sum @@ -31,8 +31,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -git.sr.ht/~hamcha/containers v0.0.3 h1:obG9X8s5iOIahVe+EGpkBDYmUAO78oTi9Y9gRurt334= -git.sr.ht/~hamcha/containers v0.0.3/go.mod h1:RiZphUpy9t6EnL4Gf6uzByM9QrBoqRCEPo7kz2wzbhE= +git.sr.ht/~hamcha/containers v0.2.0 h1:fv8HQ6fsJUa1w46sH9KluW6dfJEh3uZN3QNLJvuCIm4= +git.sr.ht/~hamcha/containers v0.2.0/go.mod h1:RiZphUpy9t6EnL4Gf6uzByM9QrBoqRCEPo7kz2wzbhE= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v1.2.1 h1:9F2/+DoOYIOksmaJFPw1tGFy1eDnIJXg+UHjuD8lTak= diff --git a/modules/http/data.go b/http/data.go similarity index 100% rename from modules/http/data.go rename to http/data.go diff --git a/modules/http/server.go b/http/server.go similarity index 77% rename from modules/http/server.go rename to http/server.go index 4e85e47..0619750 100644 --- a/modules/http/server.go +++ b/http/server.go @@ -8,46 +8,36 @@ import ( "net/http" "net/http/pprof" - "github.com/strimertul/strimertul/modules/twitch" - "git.sr.ht/~hamcha/containers" jsoniter "github.com/json-iterator/go" - - "github.com/strimertul/strimertul/modules/database" + "github.com/strimertul/strimertul/database" "go.uber.org/zap" kv "github.com/strimertul/kilovolt/v9" - "github.com/strimertul/strimertul/modules" ) var json = jsoniter.ConfigFastest type Server struct { - Config ServerConfig - db *database.DBModule - logger *zap.Logger - server *http.Server - frontend fs.FS - hub *kv.Hub - mux *http.ServeMux - manager *modules.Manager + Config ServerConfig + db *database.LocalDBClient + logger *zap.Logger + server *http.Server + frontend fs.FS + hub *kv.Hub + mux *http.ServeMux + requestedRoutes map[string]http.HandlerFunc } -func NewServer(manager *modules.Manager) (*Server, error) { - db, ok := manager.Modules["db"].(*database.DBModule) - if !ok { - return nil, errors.New("db module not found") - } - - logger := manager.Logger(modules.ModuleHTTP) - +func NewServer(db *database.LocalDBClient, logger *zap.Logger) (*Server, error) { server := &Server{ - logger: logger, - db: db, - server: &http.Server{}, - manager: manager, + logger: logger, + db: db, + server: &http.Server{}, + requestedRoutes: make(map[string]http.HandlerFunc), } + err := db.GetJSON(ServerConfigKey, &server.Config) if err != nil { // Initialize with default config @@ -71,9 +61,6 @@ func NewServer(manager *modules.Manager) (*Server, error) { Password: server.Config.KVPassword, }) - // Register module - manager.Modules[modules.ModuleHTTP] = server - return server, nil } @@ -82,17 +69,6 @@ type StatusData struct { Bind string } -func (s *Server) Status() modules.ModuleStatus { - return modules.ModuleStatus{ - Enabled: true, - Working: s.server != nil, - Data: StatusData{ - s.server.Addr, - }, - StatusString: s.server.Addr, - } -} - func (s *Server) Close() error { return s.server.Close() } @@ -122,13 +98,20 @@ func (s *Server) makeMux() *http.ServeMux { if s.Config.EnableStaticServer { mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir(s.Config.Path)))) } - if s.manager.Modules[modules.ModuleTwitch].Status().Enabled { - mux.HandleFunc("/twitch/callback", s.manager.Modules[modules.ModuleTwitch].(*twitch.Client).AuthorizeCallback) + for route, handler := range s.requestedRoutes { + mux.HandleFunc(route, handler) } return mux } +func (s *Server) SetRoute(route string, handler http.HandlerFunc) { + s.requestedRoutes[route] = handler + if s.mux != nil { + s.mux.HandleFunc(route, handler) + } +} + func (s *Server) Listen() error { // Start HTTP server restart := containers.NewRWSync(false) diff --git a/modules/http/static-ex.go b/http/static-ex.go similarity index 100% rename from modules/http/static-ex.go rename to http/static-ex.go diff --git a/logging.go b/logging.go index 58c2ff5..1ca464e 100644 --- a/logging.go +++ b/logging.go @@ -9,11 +9,10 @@ import ( "gopkg.in/natefinch/lumberjack.v2" ) -var logger *zap.Logger - const LogHistory = 50 var ( + logger *zap.Logger lastLogs []LogEntry incomingLogs chan LogEntry ) diff --git a/modules/loyalty/data.go b/loyalty/data.go similarity index 100% rename from modules/loyalty/data.go rename to loyalty/data.go diff --git a/modules/loyalty/manager.go b/loyalty/manager.go similarity index 61% rename from modules/loyalty/manager.go rename to loyalty/manager.go index a9ca389..21718ce 100644 --- a/modules/loyalty/manager.go +++ b/loyalty/manager.go @@ -1,14 +1,19 @@ package loyalty import ( + "context" "errors" "fmt" "strings" - "sync" "time" - "github.com/strimertul/strimertul/modules" - "github.com/strimertul/strimertul/modules/database" + "github.com/strimertul/strimertul/utils" + + "git.sr.ht/~hamcha/containers" + + "github.com/strimertul/strimertul/twitch" + + "github.com/strimertul/strimertul/database" jsoniter "github.com/json-iterator/go" "go.uber.org/zap" @@ -23,53 +28,74 @@ var ( ) type Manager struct { - points map[string]PointsEntry - config Config - rewards RewardStorage - goals GoalStorage - queue RedeemQueueStorage - mu sync.Mutex - db *database.DBModule - logger *zap.Logger - cooldowns map[string]time.Time + points *containers.SyncMap[string, PointsEntry] + Config *containers.RWSync[Config] + Rewards *containers.Sync[RewardStorage] + Goals *containers.Sync[GoalStorage] + Queue *containers.Sync[RedeemQueueStorage] + db *database.LocalDBClient + logger *zap.Logger + cooldowns map[string]time.Time + banlist map[string]bool + activeUsers *containers.SyncMap[string, bool] + twitchClient *twitch.Client + ctx context.Context + cancelFn context.CancelFunc } -func Register(manager *modules.Manager) error { - db, ok := manager.Modules["db"].(*database.DBModule) - if !ok { - return errors.New("db module not found") - } - - logger := manager.Logger(modules.ModuleLoyalty) - +func NewManager(db *database.LocalDBClient, twitchClient *twitch.Client, logger *zap.Logger) (*Manager, error) { + ctx, cancelFn := context.WithCancel(context.Background()) loyalty := &Manager{ - logger: logger, - db: db, - mu: sync.Mutex{}, - cooldowns: make(map[string]time.Time), + Config: containers.NewRWSync(Config{Enabled: false}), + Rewards: containers.NewSync(RewardStorage{}), + Goals: containers.NewSync(GoalStorage{}), + Queue: containers.NewSync(RedeemQueueStorage{}), + + logger: logger, + db: db, + points: containers.NewSyncMap[string, PointsEntry](), + cooldowns: make(map[string]time.Time), + banlist: make(map[string]bool), + activeUsers: containers.NewSyncMap[string, bool](), + twitchClient: twitchClient, + ctx: ctx, + cancelFn: cancelFn, } // Get data from DB - if err := db.GetJSON(ConfigKey, &loyalty.config); err != nil { + var config Config + if err := db.GetJSON(ConfigKey, config); err == nil { + loyalty.Config.Set(config) + } else { if !errors.Is(err, database.ErrEmptyKey) { - return fmt.Errorf("could not retrieve loyalty config: %w", err) + return nil, fmt.Errorf("could not retrieve loyalty config: %w", err) } - loyalty.config.Enabled = false } // Retrieve configs - if err := db.GetJSON(RewardsKey, &loyalty.rewards); err != nil { + var rewards RewardStorage + if err := db.GetJSON(RewardsKey, &rewards); err != nil { + loyalty.Rewards.Set(rewards) + } else { if !errors.Is(err, database.ErrEmptyKey) { - return err + return nil, err } } - if err := db.GetJSON(GoalsKey, &loyalty.goals); err != nil { + + var goals GoalStorage + if err := db.GetJSON(GoalsKey, &goals); err != nil { + loyalty.Goals.Set(goals) + } else { if !errors.Is(err, database.ErrEmptyKey) { - return err + return nil, err } } - if err := db.GetJSON(QueueKey, &loyalty.queue); err != nil { + + var queue RedeemQueueStorage + if err := db.GetJSON(QueueKey, &queue); err != nil { + loyalty.Queue.Set(queue) + } else { if !errors.Is(err, database.ErrEmptyKey) { - return err + return nil, err } } @@ -77,18 +103,18 @@ func Register(manager *modules.Manager) error { points, err := db.GetAll(PointsPrefix) if err != nil { if !errors.Is(err, database.ErrEmptyKey) { - return err + return nil, err } points = make(map[string]string) } - loyalty.points = make(map[string]PointsEntry) + for k, v := range points { var entry PointsEntry err := json.UnmarshalFromString(v, &entry) if err != nil { - return err + return nil, err } - loyalty.points[k] = entry + loyalty.points.SetKey(k, entry) } // SubscribePrefix for changes @@ -97,62 +123,39 @@ func Register(manager *modules.Manager) error { logger.Error("could not setup loyalty reload subscription", zap.Error(err)) } - // Register module - manager.Modules[modules.ModuleLoyalty] = loyalty + loyalty.SetBanList(config.BanList) - return nil -} + // Setup twitch integration + loyalty.SetupTwitch() -func (m *Manager) Status() modules.ModuleStatus { - config := m.Config() - if !config.Enabled { - return modules.ModuleStatus{ - Enabled: false, - } - } - - return modules.ModuleStatus{ - Enabled: true, - Working: true, - Data: struct{}{}, - StatusString: "", - } + return loyalty, nil } func (m *Manager) Close() error { - // TODO Stop subscriptions? + // Send cancellation + m.cancelFn() + + // Teardown twitch integration + m.StopTwitch() + return nil } func (m *Manager) update(key, value string) { var err error - // Check for config changes/RPC switch key { case ConfigKey: - err = func() error { - m.mu.Lock() - defer m.mu.Unlock() - return json.UnmarshalFromString(value, &m.config) - }() + err = utils.LoadJSONToWrapped[Config](value, m.Config) + if err == nil { + m.SetBanList(m.Config.Get().BanList) + } case GoalsKey: - err = func() error { - m.mu.Lock() - defer m.mu.Unlock() - return json.UnmarshalFromString(value, &m.goals) - }() + err = utils.LoadJSONToWrapped[GoalStorage](value, m.Goals) case RewardsKey: - err = func() error { - m.mu.Lock() - defer m.mu.Unlock() - return json.UnmarshalFromString(value, &m.rewards) - }() + err = utils.LoadJSONToWrapped[RewardStorage](value, m.Rewards) case QueueKey: - err = func() error { - m.mu.Lock() - defer m.mu.Unlock() - return json.UnmarshalFromString(value, &m.queue) - }() + err = utils.LoadJSONToWrapped[RedeemQueueStorage](value, m.Queue) case CreateRedeemRPC: var redeem Redeem err = json.UnmarshalFromString(value, &redeem) @@ -173,11 +176,7 @@ func (m *Manager) update(key, value string) { var entry PointsEntry err = json.UnmarshalFromString(value, &entry) user := key[len(PointsPrefix):] - func() { - m.mu.Lock() - defer m.mu.Unlock() - m.points[user] = entry - }() + m.points.SetKey(user, entry) } } if err != nil { @@ -188,9 +187,7 @@ func (m *Manager) update(key, value string) { } func (m *Manager) GetPoints(user string) int64 { - m.mu.Lock() - defer m.mu.Unlock() - points, ok := m.points[user] + points, ok := m.points.GetKey(user) if ok { return points.Points } @@ -198,12 +195,11 @@ func (m *Manager) GetPoints(user string) int64 { } func (m *Manager) setPoints(user string, points int64) error { - m.mu.Lock() - defer m.mu.Unlock() - m.points[user] = PointsEntry{ + entry := PointsEntry{ Points: points, } - return m.db.PutJSON(PointsPrefix+user, m.points[user]) + m.points.SetKey(user, entry) + return m.db.PutJSON(PointsPrefix+user, entry) } func (m *Manager) GivePoints(pointsToGive map[string]int64) error { @@ -229,13 +225,10 @@ func (m *Manager) TakePoints(pointsToTake map[string]int64) error { } func (m *Manager) saveQueue() error { - return m.db.PutJSON(QueueKey, m.queue) + return m.db.PutJSON(QueueKey, m.Queue.Get()) } func (m *Manager) GetRewardCooldown(rewardID string) time.Time { - m.mu.Lock() - defer m.mu.Unlock() - cooldown, ok := m.cooldowns[rewardID] if !ok { // Return zero time for a reward with no cooldown @@ -246,11 +239,8 @@ func (m *Manager) GetRewardCooldown(rewardID string) time.Time { } func (m *Manager) AddRedeem(redeem Redeem) error { - m.mu.Lock() - defer m.mu.Unlock() - // Add to local list - m.queue = append(m.queue, redeem) + m.Queue.Set(append(m.Queue.Get(), redeem)) // Send redeem event if err := m.db.PutJSON(RedeemEvent, redeem); err != nil { @@ -283,13 +273,11 @@ func (m *Manager) PerformRedeem(redeem Redeem) error { } func (m *Manager) RemoveRedeem(redeem Redeem) error { - m.mu.Lock() - defer m.mu.Unlock() - - for index, queued := range m.queue { + queue := m.Queue.Get() + for index, queued := range queue { if queued.When == redeem.When && queued.Username == redeem.Username && queued.Reward.ID == redeem.Reward.ID { // Remove redemption from list - m.queue = append(m.queue[:index], m.queue[index+1:]...) + m.Queue.Set(append(queue[:index], queue[index+1:]...)) // Save points return m.saveQueue() @@ -300,25 +288,18 @@ func (m *Manager) RemoveRedeem(redeem Redeem) error { } func (m *Manager) SaveGoals() error { - return m.db.PutJSON(GoalsKey, m.goals) -} - -func (m *Manager) Goals() []Goal { - m.mu.Lock() - defer m.mu.Unlock() - return m.goals[:] + return m.db.PutJSON(GoalsKey, m.Goals.Get()) } func (m *Manager) ContributeGoal(goal Goal, user string, points int64) error { - m.mu.Lock() - defer m.mu.Unlock() - - for i, savedGoal := range m.goals { + goals := m.Goals.Get() + for i, savedGoal := range goals { if savedGoal.ID != goal.ID { continue } - m.goals[i].Contributed += points - m.goals[i].Contributors[user] += points + goals[i].Contributed += points + goals[i].Contributors[user] += points + m.Goals.Set(goals) return m.SaveGoals() } return ErrGoalNotFound @@ -353,16 +334,8 @@ func (m *Manager) PerformContribution(goal Goal, user string, points int64) erro return m.ContributeGoal(goal, user, points) } -func (m *Manager) Rewards() []Reward { - m.mu.Lock() - defer m.mu.Unlock() - return m.rewards[:] -} - func (m *Manager) GetReward(id string) Reward { - m.mu.Lock() - defer m.mu.Unlock() - for _, reward := range m.rewards { + for _, reward := range m.Rewards.Get() { if reward.ID == id { return reward } @@ -371,9 +344,7 @@ func (m *Manager) GetReward(id string) Reward { } func (m *Manager) GetGoal(id string) Goal { - m.mu.Lock() - defer m.mu.Unlock() - for _, goal := range m.goals { + for _, goal := range m.Goals.Get() { if goal.ID == id { return goal } @@ -381,8 +352,9 @@ func (m *Manager) GetGoal(id string) Goal { return Goal{} } -func (m *Manager) Config() Config { - m.mu.Lock() - defer m.mu.Unlock() - return m.config +func (m *Manager) Equals(c utils.Comparable) bool { + if manager, ok := c.(*Manager); ok { + return m == manager + } + return false } diff --git a/modules/twitch/modules.loyalty.go b/loyalty/twitch-bot.go similarity index 55% rename from modules/twitch/modules.loyalty.go rename to loyalty/twitch-bot.go index 17954b9..48dfaf3 100644 --- a/modules/twitch/modules.loyalty.go +++ b/loyalty/twitch-bot.go @@ -1,4 +1,4 @@ -package twitch +package loyalty import ( "fmt" @@ -6,139 +6,160 @@ import ( "strings" "time" + "git.sr.ht/~hamcha/containers" + + "github.com/strimertul/strimertul/twitch" + "go.uber.org/zap" irc "github.com/gempir/go-twitch-irc/v3" - "github.com/strimertul/strimertul/modules/loyalty" ) -func (b *Bot) SetupLoyalty(loyalty *loyalty.Manager) { - b.Loyalty = loyalty - config := loyalty.Config() - b.SetBanList(config.BanList) +func (m *Manager) SetupTwitch() { + bot := m.twitchClient.Bot + if bot == nil { + return + } // Add loyalty-based commands - b.commands["!redeem"] = BotCommand{ + bot.RegisterCommand("!redeem", twitch.BotCommand{ Description: "Redeem a reward with loyalty points", Usage: "!redeem [request text]", - AccessLevel: ALTEveryone, - Handler: cmdRedeemReward, + AccessLevel: twitch.ALTEveryone, + Handler: m.cmdRedeemReward, Enabled: true, - } - b.commands["!balance"] = BotCommand{ + }) + bot.RegisterCommand("!balance", twitch.BotCommand{ Description: "See your current point balance", Usage: "!balance", - AccessLevel: ALTEveryone, - Handler: cmdBalance, + AccessLevel: twitch.ALTEveryone, + Handler: m.cmdBalance, Enabled: true, - } - b.commands["!goals"] = BotCommand{ + }) + bot.RegisterCommand("!goals", twitch.BotCommand{ Description: "Check currently active community goals", Usage: "!goals", - AccessLevel: ALTEveryone, - Handler: cmdGoalList, + AccessLevel: twitch.ALTEveryone, + Handler: m.cmdGoalList, Enabled: true, - } - b.commands["!contribute"] = BotCommand{ + }) + bot.RegisterCommand("!contribute", twitch.BotCommand{ Description: "Contribute points to a community goal", Usage: "!contribute []", - AccessLevel: ALTEveryone, - Handler: cmdContributeGoal, + AccessLevel: twitch.ALTEveryone, + Handler: m.cmdContributeGoal, Enabled: true, - } + }) + + // Setup message handler for tracking user activity + bot.OnMessage.Subscribe(m) // Setup handler for adding points over time - b.Client.OnConnect(func() { - go func() { + go func() { + config := m.Config.Get() + if config.Enabled && bot != nil { for { - status := loyalty.Status() - if status.Enabled { - config := loyalty.Config() - if config.Points.Interval > 0 { - // Wait for next poll - time.Sleep(time.Duration(config.Points.Interval) * time.Second) + if config.Points.Interval > 0 { + // Wait for next poll + select { + case <-m.ctx.Done(): + return + case <-time.After(time.Duration(config.Points.Interval) * time.Second): + } - // If stream is confirmed offline, don't give points away! - isOnline := b.api.streamOnline.Get() - if !isOnline { + // If stream is confirmed offline, don't give points away! + isOnline := m.twitchClient.IsLive() + if !isOnline { + continue + } + + m.logger.Debug("awarding points") + + // Get user list + users, err := bot.Client.Userlist(bot.Config.Channel) + if err != nil { + m.logger.Error("error listing users", zap.Error(err)) + continue + } + + // Iterate for each user in the list + pointsToGive := make(map[string]int64) + for _, user := range users { + // Check if user is blocked + if m.IsBanned(user) { continue } - b.logger.Debug("awarding points") + // Check if user was active (chatting) for the bonus dingus + award := config.Points.Amount + if m.IsActive(user) { + award += config.Points.ActivityBonus + } - // Get user list - users, err := b.Client.Userlist(b.config.Channel) + // Add to point pool if already on it, otherwise initialize + pointsToGive[user] = award + } + + m.ResetActivity() + + // If changes were made, save the pool! + if len(users) > 0 { + err := m.GivePoints(pointsToGive) if err != nil { - b.logger.Error("error listing users", zap.Error(err)) - continue - } - - // Iterate for each user in the list - pointsToGive := make(map[string]int64) - for _, user := range users { - // Check if user is blocked - if b.IsBanned(user) { - continue - } - - // Check if user was active (chatting) for the bonus dingus - award := config.Points.Amount - if b.IsActive(user) { - award += config.Points.ActivityBonus - } - - // Add to point pool if already on it, otherwise initialize - pointsToGive[user] = award - } - - b.ResetActivity() - - // If changes were made, save the pool! - if len(users) > 0 { - err := b.Loyalty.GivePoints(pointsToGive) - if err != nil { - b.logger.Error("error giving points to user", zap.Error(err)) - } + m.logger.Error("error giving points to user", zap.Error(err)) } } } } - }() - }) + } + }() } -func (b *Bot) SetBanList(banned []string) { - b.banlist = make(map[string]bool) - for _, usr := range banned { - b.banlist[usr] = true +func (m *Manager) StopTwitch() { + bot := m.twitchClient.Bot + if bot != nil { + bot.RemoveCommand("!redeem") + bot.RemoveCommand("!balance") + bot.RemoveCommand("!goals") + bot.RemoveCommand("!contribute") + + // Remove message handler + bot.OnMessage.Unsubscribe(m) } } -func (b *Bot) IsBanned(user string) bool { - banned, ok := b.banlist[user] +func (m *Manager) HandleBotMessage(message irc.PrivateMessage) { + m.activeUsers.SetKey(message.User.Name, true) +} + +func (m *Manager) SetBanList(banned []string) { + m.banlist = make(map[string]bool) + for _, usr := range banned { + m.banlist[usr] = true + } +} + +func (m *Manager) IsBanned(user string) bool { + banned, ok := m.banlist[user] return ok && banned } -func (b *Bot) IsActive(user string) bool { - b.mu.Lock() - active, ok := b.activeUsers[user] - b.mu.Unlock() +func (m *Manager) IsActive(user string) bool { + active, ok := m.activeUsers.GetKey(user) return ok && active } -func (b *Bot) ResetActivity() { - b.mu.Lock() - b.activeUsers = make(map[string]bool) - b.mu.Unlock() +func (m *Manager) ResetActivity() { + m.activeUsers = containers.NewSyncMap[string, bool]() } -func cmdBalance(bot *Bot, message irc.PrivateMessage) { +func (m *Manager) cmdBalance(bot *twitch.Bot, message irc.PrivateMessage) { // Get user balance - balance := bot.Loyalty.GetPoints(message.User.Name) - bot.Client.Say(message.Channel, fmt.Sprintf("%s: You have %d %s!", message.User.DisplayName, balance, bot.Loyalty.Config().Currency)) + balance := m.GetPoints(message.User.Name) + bot.Client.Say(message.Channel, fmt.Sprintf("%s: You have %d %s!", message.User.DisplayName, balance, m.Config.Get().Currency)) } -func cmdRedeemReward(bot *Bot, message irc.PrivateMessage) { +func (m *Manager) cmdRedeemReward(bot *twitch.Bot, message irc.PrivateMessage) { parts := strings.Fields(message.Message) if len(parts) < 2 { return @@ -146,7 +167,7 @@ func cmdRedeemReward(bot *Bot, message irc.PrivateMessage) { redeemID := parts[1] // Find reward - reward := bot.Loyalty.GetReward(redeemID) + reward := m.GetReward(redeemID) if reward.ID == "" { return } @@ -157,8 +178,8 @@ func cmdRedeemReward(bot *Bot, message irc.PrivateMessage) { } // Get user balance - balance := bot.Loyalty.GetPoints(message.User.Name) - config := bot.Loyalty.Config() + balance := m.GetPoints(message.User.Name) + config := m.Config.Get() // Check if user can afford the reward if balance-reward.Price < 0 { @@ -172,7 +193,7 @@ func cmdRedeemReward(bot *Bot, message irc.PrivateMessage) { } // Perform redeem - if err := bot.Loyalty.PerformRedeem(loyalty.Redeem{ + if err := m.PerformRedeem(Redeem{ Username: message.User.Name, DisplayName: message.User.DisplayName, When: time.Now(), @@ -180,21 +201,21 @@ func cmdRedeemReward(bot *Bot, message irc.PrivateMessage) { RequestText: text, }); err != nil { switch err { - case loyalty.ErrRedeemInCooldown: - nextAvailable := bot.Loyalty.GetRewardCooldown(reward.ID) + case ErrRedeemInCooldown: + nextAvailable := m.GetRewardCooldown(reward.ID) bot.Client.Say(message.Channel, fmt.Sprintf("%s: That reward is in cooldown (available in %s)", message.User.DisplayName, time.Until(nextAvailable).Truncate(time.Second))) default: - bot.logger.Error("error while performing redeem", zap.Error(err)) + m.logger.Error("error while performing redeem", zap.Error(err)) } return } - bot.Client.Say(message.Channel, fmt.Sprintf("HolidayPresent %s has redeemed %s! (new balance: %d %s)", message.User.DisplayName, reward.Name, bot.Loyalty.GetPoints(message.User.Name), config.Currency)) + bot.Client.Say(message.Channel, fmt.Sprintf("HolidayPresent %s has redeemed %s! (new balance: %d %s)", message.User.DisplayName, reward.Name, m.GetPoints(message.User.Name), config.Currency)) } -func cmdGoalList(bot *Bot, message irc.PrivateMessage) { - goals := bot.Loyalty.Goals() +func (m *Manager) cmdGoalList(bot *twitch.Bot, message irc.PrivateMessage) { + goals := m.Goals.Get() if len(goals) < 1 { bot.Client.Say(message.Channel, fmt.Sprintf("%s: There are no active community goals right now :(!", message.User.DisplayName)) return @@ -204,14 +225,14 @@ func cmdGoalList(bot *Bot, message irc.PrivateMessage) { if !goal.Enabled { continue } - msg += fmt.Sprintf("%s (%d/%d %s) [id: %s] | ", goal.Name, goal.Contributed, goal.TotalGoal, bot.Loyalty.Config().Currency, goal.ID) + msg += fmt.Sprintf("%s (%d/%d %s) [id: %s] | ", goal.Name, goal.Contributed, goal.TotalGoal, m.Config.Get().Currency, goal.ID) } msg += " Contribute with " bot.Client.Say(message.Channel, msg) } -func cmdContributeGoal(bot *Bot, message irc.PrivateMessage) { - goals := bot.Loyalty.Goals() +func (m *Manager) cmdContributeGoal(bot *twitch.Bot, message irc.PrivateMessage) { + goals := m.Goals.Get() // Set defaults if user doesn't provide them points := int64(100) @@ -283,12 +304,12 @@ func cmdContributeGoal(bot *Bot, message irc.PrivateMessage) { } // Add points to goal - if err := bot.Loyalty.PerformContribution(selectedGoal, message.User.Name, points); err != nil { - bot.logger.Error("error while contributing to goal", zap.Error(err)) + if err := m.PerformContribution(selectedGoal, message.User.Name, points); err != nil { + m.logger.Error("error while contributing to goal", zap.Error(err)) return } - config := bot.Loyalty.Config() + config := m.Config.Get() newRemaining := selectedGoal.TotalGoal - selectedGoal.Contributed bot.Client.Say(message.Channel, fmt.Sprintf("ShowOfHands %s contributed %d %s to \"%s\"!! Only %d %s left!", message.User.DisplayName, points, config.Currency, selectedGoal.Name, newRemaining, config.Currency)) diff --git a/main.go b/main.go index 87bd6b0..cff7f9c 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "embed" "fmt" "log" @@ -12,10 +13,6 @@ import ( "go.uber.org/zap/zapcore" - "github.com/strimertul/strimertul/modules" - "github.com/strimertul/strimertul/modules/loyalty" - "github.com/strimertul/strimertul/modules/twitch" - jsoniter "github.com/json-iterator/go" "github.com/urfave/cli/v2" "github.com/wailsapp/wails/v2" @@ -27,20 +24,11 @@ import ( var json = jsoniter.ConfigFastest -const databaseDefaultDriver = "pebble" - var appVersion = "v0.0.0-UNKNOWN" //go:embed frontend/dist var frontend embed.FS -type ModuleConstructor = func(manager *modules.Manager) error - -var moduleList = map[modules.ModuleID]ModuleConstructor{ - modules.ModuleLoyalty: loyalty.Register, - modules.ModuleTwitch: twitch.Register, -} - func main() { err := fixconsole.FixConsoleIfNeeded() if err != nil { @@ -99,6 +87,7 @@ func main() { level = zapcore.InfoLevel } initLogger(level) + ctx.Context = context.WithValue(ctx.Context, "logger", logger) return nil }, After: func(ctx *cli.Context) error { diff --git a/modules/manager.go b/modules/manager.go deleted file mode 100644 index 0ebe28e..0000000 --- a/modules/manager.go +++ /dev/null @@ -1,53 +0,0 @@ -package modules - -import ( - "go.uber.org/zap" -) - -type ModuleStatus struct { - Enabled bool - Working bool - Data interface{} - StatusString string -} - -func (m ModuleStatus) String() string { - return m.StatusString -} - -type Module interface { - Status() ModuleStatus - Close() error -} - -type ModuleID string - -const ( - - // Required - ModuleDB ModuleID = "db" - ModuleHTTP ModuleID = "http" - - // Feature modules - ModuleLoyalty ModuleID = "loyalty" - - // Streaming providers - ModuleTwitch ModuleID = "twitch" -) - -type Manager struct { - Modules map[ModuleID]Module - - logger *zap.Logger -} - -func NewManager(log *zap.Logger) *Manager { - return &Manager{ - Modules: make(map[ModuleID]Module), - logger: log, - } -} - -func (m *Manager) Logger(module ModuleID) *zap.Logger { - return m.logger.With(zap.String("module", string(module))) -} diff --git a/modules/twitch/bot.go b/twitch/bot.go similarity index 76% rename from modules/twitch/bot.go rename to twitch/bot.go index c13a166..8bb9082 100644 --- a/modules/twitch/bot.go +++ b/twitch/bot.go @@ -6,7 +6,8 @@ import ( "text/template" "time" - "github.com/strimertul/strimertul/modules/loyalty" + "github.com/strimertul/strimertul/utils" + "go.uber.org/zap" "github.com/Masterminds/sprig/v3" @@ -15,14 +16,12 @@ import ( type Bot struct { Client *irc.Client + Config BotConfig api *Client username string - config BotConfig logger *zap.Logger lastMessage time.Time - activeUsers map[string]bool - banlist map[string]bool chatHistory []irc.PrivateMessage commands map[string]BotCommand @@ -30,12 +29,24 @@ type Bot struct { customTemplates map[string]*template.Template customFunctions template.FuncMap + OnConnect *utils.PubSub[BotConnectHandler] + OnMessage *utils.PubSub[BotMessageHandler] + mu sync.Mutex // Module specific vars - Loyalty *loyalty.Manager - Timers *BotTimerModule - Alerts *BotAlertsModule + Timers *BotTimerModule + Alerts *BotAlertsModule +} + +type BotConnectHandler interface { + utils.Comparable + HandleBotConnect() +} + +type BotMessageHandler interface { + utils.Comparable + HandleBotMessage(message irc.PrivateMessage) } func NewBot(api *Client, config BotConfig) *Bot { @@ -43,30 +54,45 @@ func NewBot(api *Client, config BotConfig) *Bot { client := irc.NewClient(config.Username, config.Token) bot := &Bot{ - Client: client, + Client: client, + Config: config, + username: strings.ToLower(config.Username), // Normalize username - config: config, logger: api.logger, api: api, lastMessage: time.Now(), - activeUsers: make(map[string]bool), - banlist: make(map[string]bool), mu: sync.Mutex{}, commands: make(map[string]BotCommand), customCommands: make(map[string]BotCustomCommand), customTemplates: make(map[string]*template.Template), + + OnConnect: utils.NewPubSub[BotConnectHandler](), + OnMessage: utils.NewPubSub[BotMessageHandler](), } + client.OnConnect(func() { + for _, handler := range bot.OnConnect.Subscribers() { + if handler != nil { + handler.HandleBotConnect() + } + } + }) + client.OnPrivateMessage(func(message irc.PrivateMessage) { + for _, handler := range bot.OnMessage.Subscribers() { + if handler != nil { + handler.HandleBotMessage(message) + } + } + // Ignore messages for a while or twitch will get mad! if message.Time.Before(bot.lastMessage.Add(time.Second * 2)) { bot.logger.Debug("message received too soon, ignoring") return } bot.mu.Lock() - bot.activeUsers[message.User.Name] = true - lcmessage := strings.ToLower(message.Message) + lowercaseMessage := strings.ToLower(message.Message) // Check if it's a command if strings.HasPrefix(message.Message, "!") { @@ -75,10 +101,10 @@ func NewBot(api *Client, config BotConfig) *Bot { if !data.Enabled { continue } - if !strings.HasPrefix(lcmessage, cmd) { + if !strings.HasPrefix(lowercaseMessage, cmd) { continue } - parts := strings.SplitN(lcmessage, " ", 2) + parts := strings.SplitN(lowercaseMessage, " ", 2) if parts[0] != cmd { continue } @@ -93,10 +119,10 @@ func NewBot(api *Client, config BotConfig) *Bot { continue } lc := strings.ToLower(cmd) - if !strings.HasPrefix(lcmessage, lc) { + if !strings.HasPrefix(lowercaseMessage, lc) { continue } - parts := strings.SplitN(lcmessage, " ", 2) + parts := strings.SplitN(lowercaseMessage, " ", 2) if parts[0] != lc { continue } @@ -106,9 +132,9 @@ func NewBot(api *Client, config BotConfig) *Bot { bot.mu.Unlock() bot.api.db.PutJSON(ChatEventKey, message) - if bot.config.ChatHistory > 0 { - if len(bot.chatHistory) >= bot.config.ChatHistory { - bot.chatHistory = bot.chatHistory[len(bot.chatHistory)-bot.config.ChatHistory+1:] + if bot.Config.ChatHistory > 0 { + if len(bot.chatHistory) >= bot.Config.ChatHistory { + bot.chatHistory = bot.chatHistory[len(bot.chatHistory)-bot.Config.ChatHistory+1:] } bot.chatHistory = append(bot.chatHistory, message) bot.api.db.PutJSON(ChatHistoryKey, bot.chatHistory) @@ -184,7 +210,7 @@ func (b *Bot) updateCommands(value string) { } func (b *Bot) handleWriteMessageRPC(value string) { - b.Client.Say(b.config.Channel, value) + b.Client.Say(b.Config.Channel, value) } func (b *Bot) updateTemplates() error { @@ -203,7 +229,16 @@ func (b *Bot) Connect() error { } func (b *Bot) WriteMessage(message string) { - b.Client.Say(b.config.Channel, message) + b.Client.Say(b.Config.Channel, message) +} + +func (b *Bot) RegisterCommand(trigger string, command BotCommand) { + // TODO make it goroutine safe? + b.commands[trigger] = command +} + +func (b *Bot) RemoveCommand(trigger string) { + delete(b.commands, trigger) } func getUserAccessLevel(user irc.User) AccessLevelType { diff --git a/modules/twitch/client.auth.go b/twitch/client.auth.go similarity index 99% rename from modules/twitch/client.auth.go rename to twitch/client.auth.go index 4e6bcc6..c671582 100644 --- a/modules/twitch/client.auth.go +++ b/twitch/client.auth.go @@ -71,7 +71,7 @@ func (c *Client) GetLoggedUser() (helix.User, error) { if len(users.Data.Users) < 1 { return helix.User{}, fmt.Errorf("no users found") } - + return users.Data.Users[0], nil } diff --git a/modules/twitch/client.eventsub.go b/twitch/client.eventsub.go similarity index 100% rename from modules/twitch/client.eventsub.go rename to twitch/client.eventsub.go diff --git a/modules/twitch/client.go b/twitch/client.go similarity index 56% rename from modules/twitch/client.go rename to twitch/client.go index 00818db..854c0d6 100644 --- a/modules/twitch/client.go +++ b/twitch/client.go @@ -9,11 +9,9 @@ import ( lru "github.com/hashicorp/golang-lru" jsoniter "github.com/json-iterator/go" "github.com/nicklaw5/helix/v2" - "github.com/strimertul/strimertul/modules/database" + "github.com/strimertul/strimertul/database" + "github.com/strimertul/strimertul/http" "go.uber.org/zap" - - "github.com/strimertul/strimertul/modules" - "github.com/strimertul/strimertul/modules/loyalty" ) var json = jsoniter.ConfigFastest @@ -21,35 +19,27 @@ var json = jsoniter.ConfigFastest type Client struct { Config Config Bot *Bot - db *database.DBModule + db *database.LocalDBClient API *helix.Client logger *zap.Logger - manager *modules.Manager eventCache *lru.Cache restart chan bool streamOnline *containers.RWSync[bool] } -func Register(manager *modules.Manager) error { - db, ok := manager.Modules["db"].(*database.DBModule) - if !ok { - return errors.New("db module not found") - } - - logger := manager.Logger(modules.ModuleTwitch) - +func NewClient(db *database.LocalDBClient, server *http.Server, logger *zap.Logger) (*Client, error) { eventCache, err := lru.New(128) if err != nil { - return fmt.Errorf("could not create LRU cache for events: %w", err) + return nil, fmt.Errorf("could not create LRU cache for events: %w", err) } - // Get Twitch config + // Get Twitch Config var config Config err = db.GetJSON(ConfigKey, &config) if err != nil { if !errors.Is(err, database.ErrEmptyKey) { - return fmt.Errorf("failed to get twitch config: %w", err) + return nil, fmt.Errorf("failed to get twitch Config: %w", err) } config.Enabled = false } @@ -58,83 +48,85 @@ func Register(manager *modules.Manager) error { client := &Client{ Config: config, db: db, - logger: logger, + logger: logger.With(zap.String("service", "twitch")), restart: make(chan bool, 128), streamOnline: containers.NewRWSync(false), - manager: manager, eventCache: eventCache, } - // Listen for config changes + // Listen for Config changes err = db.SubscribeKey(ConfigKey, func(value string) { err := json.UnmarshalFromString(value, &config) if err != nil { - logger.Error("failed to unmarshal config", zap.Error(err)) + client.logger.Error("failed to unmarshal Config", zap.Error(err)) return } - api, err := client.getHelixAPI() + api, err := client.getHelixAPI(config) if err != nil { - logger.Warn("failed to create new twitch client, keeping old credentials", zap.Error(err)) + client.logger.Warn("failed to create new twitch client, keeping old credentials", zap.Error(err)) return } client.API = api + client.Config = config - logger.Info("reloaded/updated Twitch API") + client.logger.Info("reloaded/updated Twitch API") }) if err != nil { - client.logger.Error("could not setup twitch config reload subscription", zap.Error(err)) + client.logger.Error("could not setup twitch Config reload subscription", zap.Error(err)) } err = db.SubscribeKey(BotConfigKey, func(value string) { var twitchBotConfig BotConfig err := json.UnmarshalFromString(value, &twitchBotConfig) if err != nil { - logger.Error("failed to unmarshal config", zap.Error(err)) + client.logger.Error("failed to unmarshal Config", zap.Error(err)) return } err = client.Bot.Client.Disconnect() if err != nil { - logger.Warn("failed to disconnect from Twitch IRC", zap.Error(err)) + client.logger.Warn("failed to disconnect from Twitch IRC", zap.Error(err)) } if client.Config.EnableBot { - if err := client.startBot(manager); err != nil { + if err := client.startBot(); err != nil { if !errors.Is(err, database.ErrEmptyKey) { - logger.Error("failed to re-create bot", zap.Error(err)) + client.logger.Error("failed to re-create bot", zap.Error(err)) } } } client.restart <- true - logger.Info("reloaded/restarted Twitch bot") + client.logger.Info("reloaded/restarted Twitch bot") }) if err != nil { - client.logger.Error("could not setup twitch bot config reload subscription", zap.Error(err)) + client.logger.Error("could not setup twitch bot Config reload subscription", zap.Error(err)) } if config.Enabled { - client.API, err = client.getHelixAPI() + client.API, err = client.getHelixAPI(config) if err != nil { client.logger.Error("failed to create twitch client", zap.Error(err)) + } else { + server.SetRoute("/twitch/callback", client.AuthorizeCallback) + + go client.runStatusPoll() + go client.connectWebsocket() } } if client.Config.EnableBot { - if err := client.startBot(manager); err != nil { + if err := client.startBot(); err != nil { if !errors.Is(err, database.ErrEmptyKey) { - return err + return nil, err } } } - go client.runStatusPoll() - go client.connectWebsocket() - go func() { for { if client.Config.EnableBot && client.Bot != nil { err := client.RunBot() if err != nil { - logger.Error("failed to connect to Twitch IRC", zap.Error(err)) - // Wait for config change before retrying + client.logger.Error("failed to connect to Twitch IRC", zap.Error(err)) + // Wait for Config change before retrying <-client.restart } } else { @@ -143,14 +135,7 @@ func Register(manager *modules.Manager) error { } }() - manager.Modules[modules.ModuleTwitch] = client - - // If loyalty module is enabled, set-up loyalty commands - if loyaltyManager, ok := client.manager.Modules[modules.ModuleLoyalty].(*loyalty.Manager); ok && client.Bot != nil { - client.Bot.SetupLoyalty(loyaltyManager) - } - - return nil + return client, nil } func (c *Client) runStatusPoll() { @@ -160,14 +145,14 @@ func (c *Client) runStatusPoll() { time.Sleep(60 * time.Second) // Make sure we're configured and connected properly first - if !c.Config.Enabled || c.Bot == nil || c.Bot.config.Channel == "" { + if !c.Config.Enabled || c.Bot == nil || c.Bot.Config.Channel == "" { continue } // Check if streamer is online, if possible func() { status, err := c.API.GetStreams(&helix.StreamsParams{ - UserLogins: []string{c.Bot.config.Channel}, // TODO Replace with something non bot dependant + UserLogins: []string{c.Bot.Config.Channel}, // TODO Replace with something non bot dependant }) if err != nil { c.logger.Error("Error checking stream status", zap.Error(err)) @@ -183,13 +168,13 @@ func (c *Client) runStatusPoll() { } } -func (c *Client) startBot(manager *modules.Manager) error { - // Get Twitch bot config +func (c *Client) startBot() error { + // Get Twitch bot Config var twitchBotConfig BotConfig err := c.db.GetJSON(BotConfigKey, &twitchBotConfig) if err != nil { if !errors.Is(err, database.ErrEmptyKey) { - return fmt.Errorf("failed to get bot config: %w", err) + return fmt.Errorf("failed to get bot Config: %w", err) } c.Config.EnableBot = false } @@ -197,15 +182,10 @@ func (c *Client) startBot(manager *modules.Manager) error { // Create and run IRC bot c.Bot = NewBot(c, twitchBotConfig) - // If loyalty module is enabled, set-up loyalty commands - if loyaltyManager, ok := manager.Modules[modules.ModuleLoyalty].(*loyalty.Manager); ok && c.Bot != nil { - c.Bot.SetupLoyalty(loyaltyManager) - } - return nil } -func (c *Client) getHelixAPI() (*helix.Client, error) { +func (c *Client) getHelixAPI(config Config) (*helix.Client, error) { redirectURI, err := c.getRedirectURI() if err != nil { return nil, err @@ -213,8 +193,8 @@ func (c *Client) getHelixAPI() (*helix.Client, error) { // Create Twitch client api, err := helix.NewClient(&helix.Options{ - ClientID: c.Config.APIClientID, - ClientSecret: c.Config.APIClientSecret, + ClientID: config.APIClientID, + ClientSecret: config.APIClientSecret, RedirectURI: redirectURI, }) if err != nil { @@ -245,19 +225,8 @@ func (c *Client) RunBot() error { } } -func (c *Client) Status() modules.ModuleStatus { - if !c.Config.Enabled { - return modules.ModuleStatus{ - Enabled: false, - } - } - - return modules.ModuleStatus{ - Enabled: true, - Working: c.Bot != nil && c.Bot.Client != nil, - Data: struct{}{}, - StatusString: "", - } +func (c *Client) IsLive() bool { + return c.streamOnline.Get() } func (c *Client) Close() error { diff --git a/modules/twitch/commands.go b/twitch/commands.go similarity index 100% rename from modules/twitch/commands.go rename to twitch/commands.go diff --git a/modules/twitch/data.go b/twitch/data.go similarity index 100% rename from modules/twitch/data.go rename to twitch/data.go diff --git a/modules/twitch/modules.alerts.go b/twitch/modules.alerts.go similarity index 100% rename from modules/twitch/modules.alerts.go rename to twitch/modules.alerts.go diff --git a/modules/twitch/modules.go b/twitch/modules.go similarity index 100% rename from modules/twitch/modules.go rename to twitch/modules.go diff --git a/modules/twitch/modules.timer.go b/twitch/modules.timer.go similarity index 100% rename from modules/twitch/modules.timer.go rename to twitch/modules.timer.go diff --git a/utils/equal.go b/utils/equal.go new file mode 100644 index 0000000..8f53a8a --- /dev/null +++ b/utils/equal.go @@ -0,0 +1,7 @@ +package utils + +// Comparable is a workaround for Go incomplete implementation of generics +// See https://github.com/golang/go/issues/56548 +type Comparable interface { + Equals(Comparable) bool +} diff --git a/utils/json.go b/utils/json.go new file mode 100644 index 0000000..70f9cd7 --- /dev/null +++ b/utils/json.go @@ -0,0 +1,18 @@ +package utils + +import ( + "git.sr.ht/~hamcha/containers" + jsoniter "github.com/json-iterator/go" +) + +var json = jsoniter.ConfigFastest + +func LoadJSONToWrapped[T any](data string, sync containers.Wrapped[T]) error { + var result T + err := json.UnmarshalFromString(data, &result) + if err != nil { + return err + } + sync.Set(result) + return nil +} diff --git a/utils/pubsub.go b/utils/pubsub.go new file mode 100644 index 0000000..c3bde17 --- /dev/null +++ b/utils/pubsub.go @@ -0,0 +1,33 @@ +package utils + +import "git.sr.ht/~hamcha/containers" + +type PubSub[T Comparable] struct { + subscribers *containers.RWSync[[]T] +} + +func NewPubSub[T Comparable]() *PubSub[T] { + return &PubSub[T]{ + subscribers: containers.NewRWSync([]T{}), + } +} + +func (p *PubSub[T]) Subscribe(handler T) { + p.subscribers.Set(append(p.subscribers.Get(), handler)) +} + +func (p *PubSub[T]) Unsubscribe(handler T) { + arr := p.subscribers.Get() + // Use slice trick to in-place remove entry if found + for index := range arr { + if arr[index].Equals(handler) { + arr[index] = arr[len(arr)-1] + p.subscribers.Set(arr[:len(arr)-1]) + return + } + } +} + +func (p *PubSub[T]) Subscribers() []T { + return p.subscribers.Get() +}