refactor: begone modules

This commit is contained in:
Ash Keel 2022-11-30 19:15:47 +01:00
parent 89dd50996b
commit 4c0708138a
No known key found for this signature in database
GPG Key ID: BAD8D93E7314ED3E
29 changed files with 553 additions and 578 deletions

82
app.go
View File

@ -4,25 +4,29 @@ import (
"context"
"strconv"
"github.com/strimertul/strimertul/modules"
"github.com/strimertul/strimertul/modules/database"
"github.com/strimertul/strimertul/modules/http"
"github.com/strimertul/strimertul/modules/twitch"
"github.com/strimertul/strimertul/twitch"
"github.com/strimertul/strimertul/loyalty"
"git.sr.ht/~hamcha/containers"
"github.com/nicklaw5/helix/v2"
"github.com/strimertul/strimertul/database"
"github.com/strimertul/strimertul/http"
"github.com/urfave/cli/v2"
"github.com/wailsapp/wails/v2/pkg/runtime"
"go.uber.org/zap"
)
// App struct
type App struct {
ctx context.Context
cliParams *cli.Context
driver DatabaseDriver
manager *modules.Manager
driver database.DatabaseDriver
ready *containers.RWSync[bool]
db *database.LocalDBClient
twitchClient *twitch.Client
httpServer *http.Server
loyaltyManager *loyalty.Manager
}
// NewApp creates a new App application struct
@ -37,16 +41,13 @@ func NewApp(cliParams *cli.Context) *App {
func (a *App) startup(ctx context.Context) {
a.ctx = ctx
// Create module manager
a.manager = modules.NewManager(logger)
// Make KV hub
var err error
a.driver, err = getDatabaseDriver(a.cliParams)
a.driver, err = database.GetDatabaseDriver(a.cliParams)
failOnError(err, "error opening database")
// Start database backup task
backupOpts := BackupOptions{
backupOpts := database.BackupOptions{
BackupDir: a.cliParams.String("backup-dir"),
BackupInterval: a.cliParams.Int("backup-interval"),
MaxBackups: a.cliParams.Int("max-backups"),
@ -58,28 +59,22 @@ func (a *App) startup(ctx context.Context) {
hub := a.driver.Hub()
go hub.Run()
db, err := database.NewDBModule(hub, a.manager)
a.db, err = database.NewLocalClient(hub, logger)
failOnError(err, "failed to initialize database module")
// Set meta keys
_ = db.PutKey("stul-meta/version", appVersion)
for module, constructor := range moduleList {
err := constructor(a.manager)
if err != nil {
logger.Error("could not register module", zap.String("module", string(module)), zap.Error(err))
} else {
}
}
_ = a.db.PutKey("stul-meta/version", appVersion)
// Create logger and endpoints
httpServer, err := http.NewServer(a.manager)
a.httpServer, err = http.NewServer(a.db, logger)
failOnError(err, "could not initialize http server")
defer func() {
if err := httpServer.Close(); err != nil {
logger.Error("could not close DB", zap.Error(err))
}
}()
// Create twitch client
a.twitchClient, err = twitch.NewClient(a.db, a.httpServer, logger)
failOnError(err, "could not initialize twitch client")
// Initialize loyalty system
a.loyaltyManager, err = loyalty.NewManager(a.db, a.twitchClient, logger)
a.ready.Set(true)
runtime.EventsEmit(ctx, "ready", true)
@ -93,15 +88,20 @@ func (a *App) startup(ctx context.Context) {
}()
// Run HTTP server
failOnError(httpServer.Listen(), "HTTP server stopped")
failOnError(a.httpServer.Listen(), "HTTP server stopped")
}
func (a *App) stop(context.Context) {
for module := range a.manager.Modules {
if err := a.manager.Modules[module].Close(); err != nil {
logger.Error("could not close module", zap.String("module", string(module)), zap.Error(err))
}
if a.loyaltyManager != nil {
a.loyaltyManager.Close()
}
if a.twitchClient != nil {
a.twitchClient.Close()
}
if a.httpServer != nil {
a.httpServer.Close()
}
a.db.Close()
failOnError(a.driver.Close(), "could not close driver")
}
@ -115,28 +115,22 @@ func (a *App) AuthenticateKVClient(id string) {
}
func (a *App) IsServerReady() bool {
if !a.ready.Get() {
return false
}
return a.manager.Modules[modules.ModuleHTTP].Status().Working
return a.ready.Get()
}
func (a *App) GetKilovoltBind() string {
if a.manager == nil {
if a.httpServer == nil {
return ""
}
if httpModule, ok := a.manager.Modules[modules.ModuleHTTP]; ok {
return httpModule.Status().Data.(http.StatusData).Bind
}
return ""
return a.httpServer.Config.Bind
}
func (a *App) GetTwitchAuthURL() string {
return a.manager.Modules[modules.ModuleTwitch].(*twitch.Client).GetAuthorizationURL()
return a.twitchClient.GetAuthorizationURL()
}
func (a *App) GetTwitchLoggedUser() (helix.User, error) {
return a.manager.Modules[modules.ModuleTwitch].(*twitch.Client).GetLoggedUser()
return a.twitchClient.GetLoggedUser()
}
func (a *App) GetLastLogs() []LogEntry {

View File

@ -2,29 +2,16 @@ package main
import (
"fmt"
"io"
"os"
"path/filepath"
"sort"
"time"
kv "github.com/strimertul/kilovolt/v9"
"github.com/strimertul/strimertul/database"
"github.com/strimertul/strimertul/utils"
"github.com/urfave/cli/v2"
"go.uber.org/zap"
)
// DatabaseDriver is a driver wrapping a supported database
type DatabaseDriver interface {
Hub() *kv.Hub
Close() error
Import(map[string]string) error
Export(io.Writer) error
Restore(io.Reader) error
Backup(io.Writer) error
}
func BackupTask(driver DatabaseDriver, options BackupOptions) {
func BackupTask(driver database.DatabaseDriver, options database.BackupOptions) {
if options.BackupDir == "" {
logger.Warn("backup directory not set, database backups are disabled")
return
@ -74,38 +61,3 @@ func BackupTask(driver DatabaseDriver, options BackupOptions) {
}
}
}
type BackupOptions struct {
BackupDir string
BackupInterval int
MaxBackups int
}
func getDatabaseDriverName(ctx *cli.Context) string {
driver := ctx.String("driver")
if driver != "auto" {
return driver
}
dbdir := ctx.String("database-dir")
file, err := os.ReadFile(filepath.Join(dbdir, "stul-driver"))
if err != nil {
// No driver file found (or file corrupted), use default driver
return databaseDefaultDriver
}
return string(file)
}
func getDatabaseDriver(ctx *cli.Context) (DatabaseDriver, error) {
name := getDatabaseDriverName(ctx)
dbdir := ctx.String("database-dir")
switch name {
case "badger":
return nil, cli.Exit("Badger is not supported anymore as a database driver", 64)
case "pebble":
return NewPebble(dbdir)
default:
return nil, cli.Exit(fmt.Sprintf("Unknown database driver: %s", name), 64)
}
}

View File

@ -3,6 +3,8 @@ package main
import (
"os"
"github.com/strimertul/strimertul/database"
"github.com/urfave/cli/v2"
)
@ -23,7 +25,7 @@ func cliImport(ctx *cli.Context) error {
return fatalError(err, "could not decode import file")
}
driver, err := getDatabaseDriver(ctx)
driver, err := database.GetDatabaseDriver(ctx)
if err != nil {
return fatalError(err, "could not open database")
}
@ -49,7 +51,7 @@ func cliRestore(ctx *cli.Context) error {
inStream = file
}
driver, err := getDatabaseDriver(ctx)
driver, err := database.GetDatabaseDriver(ctx)
if err != nil {
return fatalError(err, "could not open database")
}
@ -75,7 +77,7 @@ func cliExport(ctx *cli.Context) error {
outStream = file
}
driver, err := getDatabaseDriver(ctx)
driver, err := database.GetDatabaseDriver(ctx)
if err != nil {
return fatalError(err, "could not open database")
}

View File

@ -4,8 +4,6 @@ import (
"errors"
"fmt"
"github.com/strimertul/strimertul/modules"
jsoniter "github.com/json-iterator/go"
kv "github.com/strimertul/kilovolt/v9"
"go.uber.org/zap"
@ -21,7 +19,7 @@ var (
ErrEmptyKey = errors.New("empty key")
)
type DBModule struct {
type LocalDBClient struct {
client *kv.LocalClient
hub *kv.Hub
logger *zap.Logger
@ -32,44 +30,38 @@ type KvPair struct {
Data string
}
func NewDBModule(hub *kv.Hub, manager *modules.Manager) (*DBModule, error) {
logger := manager.Logger(modules.ModuleDB)
func NewLocalClient(hub *kv.Hub, logger *zap.Logger) (*LocalDBClient, error) {
// Create local client
localClient := kv.NewLocalClient(kv.ClientOptions{}, logger)
// Run client and add it to the hub
go localClient.Run()
hub.AddClient(localClient)
localClient.Wait()
// Bypass authentication
err := hub.SetAuthenticated(localClient.UID(), true)
if err != nil {
return nil, err
}
module := &DBModule{
return &LocalDBClient{
client: localClient,
hub: hub,
logger: logger,
}
manager.Modules[modules.ModuleDB] = module
return module, nil
}, nil
}
func (mod *DBModule) Hub() *kv.Hub {
func (mod *LocalDBClient) Hub() *kv.Hub {
return mod.hub
}
func (mod *DBModule) Status() modules.ModuleStatus {
return modules.ModuleStatus{
Enabled: mod.hub != nil,
Working: mod.client != nil,
StatusString: "ok",
}
}
func (mod *DBModule) Close() error {
func (mod *LocalDBClient) Close() error {
mod.hub.RemoveClient(mod.client)
return nil
}
func (mod *DBModule) GetKey(key string) (string, error) {
func (mod *LocalDBClient) GetKey(key string) (string, error) {
res, err := mod.makeRequest(kv.CmdReadKey, map[string]interface{}{"key": key})
if err != nil {
return "", err
@ -77,12 +69,12 @@ func (mod *DBModule) GetKey(key string) (string, error) {
return res.Data.(string), nil
}
func (mod *DBModule) PutKey(key string, data string) error {
func (mod *LocalDBClient) PutKey(key string, data string) error {
_, err := mod.makeRequest(kv.CmdWriteKey, map[string]interface{}{"key": key, "data": data})
return err
}
func (mod *DBModule) SubscribePrefix(fn kv.SubscriptionCallback, prefixes ...string) error {
func (mod *LocalDBClient) SubscribePrefix(fn kv.SubscriptionCallback, prefixes ...string) error {
for _, prefix := range prefixes {
_, err := mod.makeRequest(kv.CmdSubscribePrefix, map[string]interface{}{"prefix": prefix})
if err != nil {
@ -93,7 +85,7 @@ func (mod *DBModule) SubscribePrefix(fn kv.SubscriptionCallback, prefixes ...str
return nil
}
func (mod *DBModule) SubscribeKey(key string, fn func(string)) error {
func (mod *LocalDBClient) SubscribeKey(key string, fn func(string)) error {
_, err := mod.makeRequest(kv.CmdSubscribePrefix, map[string]interface{}{"prefix": key})
if err != nil {
return err
@ -107,7 +99,7 @@ func (mod *DBModule) SubscribeKey(key string, fn func(string)) error {
return nil
}
func (mod *DBModule) GetJSON(key string, dst interface{}) error {
func (mod *LocalDBClient) GetJSON(key string, dst interface{}) error {
res, err := mod.GetKey(key)
if err != nil {
return err
@ -118,7 +110,7 @@ func (mod *DBModule) GetJSON(key string, dst interface{}) error {
return json.Unmarshal([]byte(res), dst)
}
func (mod *DBModule) GetAll(prefix string) (map[string]string, error) {
func (mod *LocalDBClient) GetAll(prefix string) (map[string]string, error) {
res, err := mod.makeRequest(kv.CmdReadPrefix, map[string]interface{}{"prefix": prefix})
if err != nil {
return nil, err
@ -131,7 +123,7 @@ func (mod *DBModule) GetAll(prefix string) (map[string]string, error) {
return out, nil
}
func (mod *DBModule) PutJSON(key string, data interface{}) error {
func (mod *LocalDBClient) PutJSON(key string, data interface{}) error {
byt, err := json.Marshal(data)
if err != nil {
return err
@ -140,7 +132,7 @@ func (mod *DBModule) PutJSON(key string, data interface{}) error {
return mod.PutKey(key, string(byt))
}
func (mod *DBModule) PutJSONBulk(kvs map[string]interface{}) error {
func (mod *LocalDBClient) PutJSONBulk(kvs map[string]interface{}) error {
encoded := make(map[string]interface{})
for k, v := range kvs {
byt, err := json.Marshal(v)
@ -154,12 +146,12 @@ func (mod *DBModule) PutJSONBulk(kvs map[string]interface{}) error {
return err
}
func (mod *DBModule) RemoveKey(key string) error {
func (mod *LocalDBClient) RemoveKey(key string) error {
// TODO
return mod.PutKey(key, "")
}
func (mod *DBModule) makeRequest(cmd string, data map[string]interface{}) (kv.Response, error) {
func (mod *LocalDBClient) makeRequest(cmd string, data map[string]interface{}) (kv.Response, error) {
req, chn := mod.client.MakeRequest(cmd, data)
mod.hub.SendMessage(req)
return getResponse(<-chn)

View File

@ -0,0 +1,61 @@
package database
import (
"fmt"
"io"
"os"
"path/filepath"
"go.uber.org/zap"
kv "github.com/strimertul/kilovolt/v9"
"github.com/urfave/cli/v2"
)
// DatabaseDriver is a driver wrapping a supported database
type DatabaseDriver interface {
Hub() *kv.Hub
Close() error
Import(map[string]string) error
Export(io.Writer) error
Restore(io.Reader) error
Backup(io.Writer) error
}
type BackupOptions struct {
BackupDir string
BackupInterval int
MaxBackups int
}
const databaseDefaultDriver = "pebble"
func getDatabaseDriverName(ctx *cli.Context) string {
driver := ctx.String("driver")
if driver != "auto" {
return driver
}
dbDirectory := ctx.String("database-dir")
file, err := os.ReadFile(filepath.Join(dbDirectory, "stul-driver"))
if err != nil {
// No driver file found (or file corrupted), use default driver
return databaseDefaultDriver
}
return string(file)
}
func GetDatabaseDriver(ctx *cli.Context) (DatabaseDriver, error) {
name := getDatabaseDriverName(ctx)
dbDirectory := ctx.String("database-dir")
logger := ctx.Context.Value("logger").(*zap.Logger)
switch name {
case "badger":
return nil, cli.Exit("Badger is not supported anymore as a database driver", 64)
case "pebble":
return NewPebble(dbDirectory, logger)
default:
return nil, cli.Exit(fmt.Sprintf("Unknown database driver: %s", name), 64)
}
}

View File

@ -1,24 +1,25 @@
package main
package database
import (
"fmt"
"go.uber.org/zap"
"io"
"os"
"path/filepath"
"github.com/cockroachdb/pebble"
"github.com/labstack/gommon/log"
kv "github.com/strimertul/kilovolt/v9"
pebble_driver "github.com/strimertul/kv-pebble"
)
type PebbleDatabase struct {
db *pebble.DB
hub *kv.Hub
db *pebble.DB
hub *kv.Hub
logger *zap.Logger
}
// NewPebble creates a new database driver instance with an underlying Pebble database
func NewPebble(directory string) (*PebbleDatabase, error) {
func NewPebble(directory string, logger *zap.Logger) (*PebbleDatabase, error) {
db, err := pebble.Open(directory, &pebble.Options{})
if err != nil {
return nil, fmt.Errorf("could not open DB: %w", err)
@ -31,8 +32,9 @@ func NewPebble(directory string) (*PebbleDatabase, error) {
}
p := &PebbleDatabase{
db: db,
hub: nil,
db: db,
hub: nil,
logger: logger,
}
return p, nil
@ -40,14 +42,13 @@ func NewPebble(directory string) (*PebbleDatabase, error) {
func (p *PebbleDatabase) Hub() *kv.Hub {
if p.hub == nil {
p.hub, _ = kv.NewHub(pebble_driver.NewPebbleBackend(p.db, true), kv.HubOptions{}, logger)
p.hub, _ = kv.NewHub(pebble_driver.NewPebbleBackend(p.db, true), kv.HubOptions{}, p.logger)
}
return p.hub
}
func (p *PebbleDatabase) Close() error {
err := p.db.Close()
log.Info("database was closed")
if err != nil {
return fmt.Errorf("Could not close database: %w", err)
}

4
go.mod
View File

@ -3,7 +3,7 @@ module github.com/strimertul/strimertul
go 1.19
require (
git.sr.ht/~hamcha/containers v0.0.3
git.sr.ht/~hamcha/containers v0.2.0
github.com/Masterminds/sprig/v3 v3.2.2
github.com/apenwarr/fixconsole v0.0.0-20191012055117-5a9f6489cc29
github.com/cockroachdb/pebble v0.0.0-20221116223310-87eccabb90a3
@ -11,7 +11,6 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/golang-lru v0.5.1
github.com/json-iterator/go v1.1.12
github.com/labstack/gommon v0.3.1
github.com/nicklaw5/helix/v2 v2.11.0
github.com/strimertul/kilovolt/v9 v9.0.1
github.com/strimertul/kv-pebble v1.2.0
@ -49,6 +48,7 @@ require (
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/labstack/echo/v4 v4.9.0 // indirect
github.com/labstack/gommon v0.3.1 // indirect
github.com/leaanthony/go-ansi-parser v1.0.1 // indirect
github.com/leaanthony/gosod v1.0.3 // indirect
github.com/leaanthony/slicer v1.5.0 // indirect

4
go.sum
View File

@ -31,8 +31,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
git.sr.ht/~hamcha/containers v0.0.3 h1:obG9X8s5iOIahVe+EGpkBDYmUAO78oTi9Y9gRurt334=
git.sr.ht/~hamcha/containers v0.0.3/go.mod h1:RiZphUpy9t6EnL4Gf6uzByM9QrBoqRCEPo7kz2wzbhE=
git.sr.ht/~hamcha/containers v0.2.0 h1:fv8HQ6fsJUa1w46sH9KluW6dfJEh3uZN3QNLJvuCIm4=
git.sr.ht/~hamcha/containers v0.2.0/go.mod h1:RiZphUpy9t6EnL4Gf6uzByM9QrBoqRCEPo7kz2wzbhE=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v1.2.1 h1:9F2/+DoOYIOksmaJFPw1tGFy1eDnIJXg+UHjuD8lTak=

View File

@ -8,46 +8,36 @@ import (
"net/http"
"net/http/pprof"
"github.com/strimertul/strimertul/modules/twitch"
"git.sr.ht/~hamcha/containers"
jsoniter "github.com/json-iterator/go"
"github.com/strimertul/strimertul/modules/database"
"github.com/strimertul/strimertul/database"
"go.uber.org/zap"
kv "github.com/strimertul/kilovolt/v9"
"github.com/strimertul/strimertul/modules"
)
var json = jsoniter.ConfigFastest
type Server struct {
Config ServerConfig
db *database.DBModule
logger *zap.Logger
server *http.Server
frontend fs.FS
hub *kv.Hub
mux *http.ServeMux
manager *modules.Manager
Config ServerConfig
db *database.LocalDBClient
logger *zap.Logger
server *http.Server
frontend fs.FS
hub *kv.Hub
mux *http.ServeMux
requestedRoutes map[string]http.HandlerFunc
}
func NewServer(manager *modules.Manager) (*Server, error) {
db, ok := manager.Modules["db"].(*database.DBModule)
if !ok {
return nil, errors.New("db module not found")
}
logger := manager.Logger(modules.ModuleHTTP)
func NewServer(db *database.LocalDBClient, logger *zap.Logger) (*Server, error) {
server := &Server{
logger: logger,
db: db,
server: &http.Server{},
manager: manager,
logger: logger,
db: db,
server: &http.Server{},
requestedRoutes: make(map[string]http.HandlerFunc),
}
err := db.GetJSON(ServerConfigKey, &server.Config)
if err != nil {
// Initialize with default config
@ -71,9 +61,6 @@ func NewServer(manager *modules.Manager) (*Server, error) {
Password: server.Config.KVPassword,
})
// Register module
manager.Modules[modules.ModuleHTTP] = server
return server, nil
}
@ -82,17 +69,6 @@ type StatusData struct {
Bind string
}
func (s *Server) Status() modules.ModuleStatus {
return modules.ModuleStatus{
Enabled: true,
Working: s.server != nil,
Data: StatusData{
s.server.Addr,
},
StatusString: s.server.Addr,
}
}
func (s *Server) Close() error {
return s.server.Close()
}
@ -122,13 +98,20 @@ func (s *Server) makeMux() *http.ServeMux {
if s.Config.EnableStaticServer {
mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir(s.Config.Path))))
}
if s.manager.Modules[modules.ModuleTwitch].Status().Enabled {
mux.HandleFunc("/twitch/callback", s.manager.Modules[modules.ModuleTwitch].(*twitch.Client).AuthorizeCallback)
for route, handler := range s.requestedRoutes {
mux.HandleFunc(route, handler)
}
return mux
}
func (s *Server) SetRoute(route string, handler http.HandlerFunc) {
s.requestedRoutes[route] = handler
if s.mux != nil {
s.mux.HandleFunc(route, handler)
}
}
func (s *Server) Listen() error {
// Start HTTP server
restart := containers.NewRWSync(false)

View File

@ -9,11 +9,10 @@ import (
"gopkg.in/natefinch/lumberjack.v2"
)
var logger *zap.Logger
const LogHistory = 50
var (
logger *zap.Logger
lastLogs []LogEntry
incomingLogs chan LogEntry
)

View File

@ -1,14 +1,19 @@
package loyalty
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/strimertul/strimertul/modules"
"github.com/strimertul/strimertul/modules/database"
"github.com/strimertul/strimertul/utils"
"git.sr.ht/~hamcha/containers"
"github.com/strimertul/strimertul/twitch"
"github.com/strimertul/strimertul/database"
jsoniter "github.com/json-iterator/go"
"go.uber.org/zap"
@ -23,53 +28,74 @@ var (
)
type Manager struct {
points map[string]PointsEntry
config Config
rewards RewardStorage
goals GoalStorage
queue RedeemQueueStorage
mu sync.Mutex
db *database.DBModule
logger *zap.Logger
cooldowns map[string]time.Time
points *containers.SyncMap[string, PointsEntry]
Config *containers.RWSync[Config]
Rewards *containers.Sync[RewardStorage]
Goals *containers.Sync[GoalStorage]
Queue *containers.Sync[RedeemQueueStorage]
db *database.LocalDBClient
logger *zap.Logger
cooldowns map[string]time.Time
banlist map[string]bool
activeUsers *containers.SyncMap[string, bool]
twitchClient *twitch.Client
ctx context.Context
cancelFn context.CancelFunc
}
func Register(manager *modules.Manager) error {
db, ok := manager.Modules["db"].(*database.DBModule)
if !ok {
return errors.New("db module not found")
}
logger := manager.Logger(modules.ModuleLoyalty)
func NewManager(db *database.LocalDBClient, twitchClient *twitch.Client, logger *zap.Logger) (*Manager, error) {
ctx, cancelFn := context.WithCancel(context.Background())
loyalty := &Manager{
logger: logger,
db: db,
mu: sync.Mutex{},
cooldowns: make(map[string]time.Time),
Config: containers.NewRWSync(Config{Enabled: false}),
Rewards: containers.NewSync(RewardStorage{}),
Goals: containers.NewSync(GoalStorage{}),
Queue: containers.NewSync(RedeemQueueStorage{}),
logger: logger,
db: db,
points: containers.NewSyncMap[string, PointsEntry](),
cooldowns: make(map[string]time.Time),
banlist: make(map[string]bool),
activeUsers: containers.NewSyncMap[string, bool](),
twitchClient: twitchClient,
ctx: ctx,
cancelFn: cancelFn,
}
// Get data from DB
if err := db.GetJSON(ConfigKey, &loyalty.config); err != nil {
var config Config
if err := db.GetJSON(ConfigKey, config); err == nil {
loyalty.Config.Set(config)
} else {
if !errors.Is(err, database.ErrEmptyKey) {
return fmt.Errorf("could not retrieve loyalty config: %w", err)
return nil, fmt.Errorf("could not retrieve loyalty config: %w", err)
}
loyalty.config.Enabled = false
}
// Retrieve configs
if err := db.GetJSON(RewardsKey, &loyalty.rewards); err != nil {
var rewards RewardStorage
if err := db.GetJSON(RewardsKey, &rewards); err != nil {
loyalty.Rewards.Set(rewards)
} else {
if !errors.Is(err, database.ErrEmptyKey) {
return err
return nil, err
}
}
if err := db.GetJSON(GoalsKey, &loyalty.goals); err != nil {
var goals GoalStorage
if err := db.GetJSON(GoalsKey, &goals); err != nil {
loyalty.Goals.Set(goals)
} else {
if !errors.Is(err, database.ErrEmptyKey) {
return err
return nil, err
}
}
if err := db.GetJSON(QueueKey, &loyalty.queue); err != nil {
var queue RedeemQueueStorage
if err := db.GetJSON(QueueKey, &queue); err != nil {
loyalty.Queue.Set(queue)
} else {
if !errors.Is(err, database.ErrEmptyKey) {
return err
return nil, err
}
}
@ -77,18 +103,18 @@ func Register(manager *modules.Manager) error {
points, err := db.GetAll(PointsPrefix)
if err != nil {
if !errors.Is(err, database.ErrEmptyKey) {
return err
return nil, err
}
points = make(map[string]string)
}
loyalty.points = make(map[string]PointsEntry)
for k, v := range points {
var entry PointsEntry
err := json.UnmarshalFromString(v, &entry)
if err != nil {
return err
return nil, err
}
loyalty.points[k] = entry
loyalty.points.SetKey(k, entry)
}
// SubscribePrefix for changes
@ -97,62 +123,39 @@ func Register(manager *modules.Manager) error {
logger.Error("could not setup loyalty reload subscription", zap.Error(err))
}
// Register module
manager.Modules[modules.ModuleLoyalty] = loyalty
loyalty.SetBanList(config.BanList)
return nil
}
// Setup twitch integration
loyalty.SetupTwitch()
func (m *Manager) Status() modules.ModuleStatus {
config := m.Config()
if !config.Enabled {
return modules.ModuleStatus{
Enabled: false,
}
}
return modules.ModuleStatus{
Enabled: true,
Working: true,
Data: struct{}{},
StatusString: "",
}
return loyalty, nil
}
func (m *Manager) Close() error {
// TODO Stop subscriptions?
// Send cancellation
m.cancelFn()
// Teardown twitch integration
m.StopTwitch()
return nil
}
func (m *Manager) update(key, value string) {
var err error
// Check for config changes/RPC
switch key {
case ConfigKey:
err = func() error {
m.mu.Lock()
defer m.mu.Unlock()
return json.UnmarshalFromString(value, &m.config)
}()
err = utils.LoadJSONToWrapped[Config](value, m.Config)
if err == nil {
m.SetBanList(m.Config.Get().BanList)
}
case GoalsKey:
err = func() error {
m.mu.Lock()
defer m.mu.Unlock()
return json.UnmarshalFromString(value, &m.goals)
}()
err = utils.LoadJSONToWrapped[GoalStorage](value, m.Goals)
case RewardsKey:
err = func() error {
m.mu.Lock()
defer m.mu.Unlock()
return json.UnmarshalFromString(value, &m.rewards)
}()
err = utils.LoadJSONToWrapped[RewardStorage](value, m.Rewards)
case QueueKey:
err = func() error {
m.mu.Lock()
defer m.mu.Unlock()
return json.UnmarshalFromString(value, &m.queue)
}()
err = utils.LoadJSONToWrapped[RedeemQueueStorage](value, m.Queue)
case CreateRedeemRPC:
var redeem Redeem
err = json.UnmarshalFromString(value, &redeem)
@ -173,11 +176,7 @@ func (m *Manager) update(key, value string) {
var entry PointsEntry
err = json.UnmarshalFromString(value, &entry)
user := key[len(PointsPrefix):]
func() {
m.mu.Lock()
defer m.mu.Unlock()
m.points[user] = entry
}()
m.points.SetKey(user, entry)
}
}
if err != nil {
@ -188,9 +187,7 @@ func (m *Manager) update(key, value string) {
}
func (m *Manager) GetPoints(user string) int64 {
m.mu.Lock()
defer m.mu.Unlock()
points, ok := m.points[user]
points, ok := m.points.GetKey(user)
if ok {
return points.Points
}
@ -198,12 +195,11 @@ func (m *Manager) GetPoints(user string) int64 {
}
func (m *Manager) setPoints(user string, points int64) error {
m.mu.Lock()
defer m.mu.Unlock()
m.points[user] = PointsEntry{
entry := PointsEntry{
Points: points,
}
return m.db.PutJSON(PointsPrefix+user, m.points[user])
m.points.SetKey(user, entry)
return m.db.PutJSON(PointsPrefix+user, entry)
}
func (m *Manager) GivePoints(pointsToGive map[string]int64) error {
@ -229,13 +225,10 @@ func (m *Manager) TakePoints(pointsToTake map[string]int64) error {
}
func (m *Manager) saveQueue() error {
return m.db.PutJSON(QueueKey, m.queue)
return m.db.PutJSON(QueueKey, m.Queue.Get())
}
func (m *Manager) GetRewardCooldown(rewardID string) time.Time {
m.mu.Lock()
defer m.mu.Unlock()
cooldown, ok := m.cooldowns[rewardID]
if !ok {
// Return zero time for a reward with no cooldown
@ -246,11 +239,8 @@ func (m *Manager) GetRewardCooldown(rewardID string) time.Time {
}
func (m *Manager) AddRedeem(redeem Redeem) error {
m.mu.Lock()
defer m.mu.Unlock()
// Add to local list
m.queue = append(m.queue, redeem)
m.Queue.Set(append(m.Queue.Get(), redeem))
// Send redeem event
if err := m.db.PutJSON(RedeemEvent, redeem); err != nil {
@ -283,13 +273,11 @@ func (m *Manager) PerformRedeem(redeem Redeem) error {
}
func (m *Manager) RemoveRedeem(redeem Redeem) error {
m.mu.Lock()
defer m.mu.Unlock()
for index, queued := range m.queue {
queue := m.Queue.Get()
for index, queued := range queue {
if queued.When == redeem.When && queued.Username == redeem.Username && queued.Reward.ID == redeem.Reward.ID {
// Remove redemption from list
m.queue = append(m.queue[:index], m.queue[index+1:]...)
m.Queue.Set(append(queue[:index], queue[index+1:]...))
// Save points
return m.saveQueue()
@ -300,25 +288,18 @@ func (m *Manager) RemoveRedeem(redeem Redeem) error {
}
func (m *Manager) SaveGoals() error {
return m.db.PutJSON(GoalsKey, m.goals)
}
func (m *Manager) Goals() []Goal {
m.mu.Lock()
defer m.mu.Unlock()
return m.goals[:]
return m.db.PutJSON(GoalsKey, m.Goals.Get())
}
func (m *Manager) ContributeGoal(goal Goal, user string, points int64) error {
m.mu.Lock()
defer m.mu.Unlock()
for i, savedGoal := range m.goals {
goals := m.Goals.Get()
for i, savedGoal := range goals {
if savedGoal.ID != goal.ID {
continue
}
m.goals[i].Contributed += points
m.goals[i].Contributors[user] += points
goals[i].Contributed += points
goals[i].Contributors[user] += points
m.Goals.Set(goals)
return m.SaveGoals()
}
return ErrGoalNotFound
@ -353,16 +334,8 @@ func (m *Manager) PerformContribution(goal Goal, user string, points int64) erro
return m.ContributeGoal(goal, user, points)
}
func (m *Manager) Rewards() []Reward {
m.mu.Lock()
defer m.mu.Unlock()
return m.rewards[:]
}
func (m *Manager) GetReward(id string) Reward {
m.mu.Lock()
defer m.mu.Unlock()
for _, reward := range m.rewards {
for _, reward := range m.Rewards.Get() {
if reward.ID == id {
return reward
}
@ -371,9 +344,7 @@ func (m *Manager) GetReward(id string) Reward {
}
func (m *Manager) GetGoal(id string) Goal {
m.mu.Lock()
defer m.mu.Unlock()
for _, goal := range m.goals {
for _, goal := range m.Goals.Get() {
if goal.ID == id {
return goal
}
@ -381,8 +352,9 @@ func (m *Manager) GetGoal(id string) Goal {
return Goal{}
}
func (m *Manager) Config() Config {
m.mu.Lock()
defer m.mu.Unlock()
return m.config
func (m *Manager) Equals(c utils.Comparable) bool {
if manager, ok := c.(*Manager); ok {
return m == manager
}
return false
}

View File

@ -1,4 +1,4 @@
package twitch
package loyalty
import (
"fmt"
@ -6,139 +6,160 @@ import (
"strings"
"time"
"git.sr.ht/~hamcha/containers"
"github.com/strimertul/strimertul/twitch"
"go.uber.org/zap"
irc "github.com/gempir/go-twitch-irc/v3"
"github.com/strimertul/strimertul/modules/loyalty"
)
func (b *Bot) SetupLoyalty(loyalty *loyalty.Manager) {
b.Loyalty = loyalty
config := loyalty.Config()
b.SetBanList(config.BanList)
func (m *Manager) SetupTwitch() {
bot := m.twitchClient.Bot
if bot == nil {
return
}
// Add loyalty-based commands
b.commands["!redeem"] = BotCommand{
bot.RegisterCommand("!redeem", twitch.BotCommand{
Description: "Redeem a reward with loyalty points",
Usage: "!redeem <reward-id> [request text]",
AccessLevel: ALTEveryone,
Handler: cmdRedeemReward,
AccessLevel: twitch.ALTEveryone,
Handler: m.cmdRedeemReward,
Enabled: true,
}
b.commands["!balance"] = BotCommand{
})
bot.RegisterCommand("!balance", twitch.BotCommand{
Description: "See your current point balance",
Usage: "!balance",
AccessLevel: ALTEveryone,
Handler: cmdBalance,
AccessLevel: twitch.ALTEveryone,
Handler: m.cmdBalance,
Enabled: true,
}
b.commands["!goals"] = BotCommand{
})
bot.RegisterCommand("!goals", twitch.BotCommand{
Description: "Check currently active community goals",
Usage: "!goals",
AccessLevel: ALTEveryone,
Handler: cmdGoalList,
AccessLevel: twitch.ALTEveryone,
Handler: m.cmdGoalList,
Enabled: true,
}
b.commands["!contribute"] = BotCommand{
})
bot.RegisterCommand("!contribute", twitch.BotCommand{
Description: "Contribute points to a community goal",
Usage: "!contribute <points> [<goal-id>]",
AccessLevel: ALTEveryone,
Handler: cmdContributeGoal,
AccessLevel: twitch.ALTEveryone,
Handler: m.cmdContributeGoal,
Enabled: true,
}
})
// Setup message handler for tracking user activity
bot.OnMessage.Subscribe(m)
// Setup handler for adding points over time
b.Client.OnConnect(func() {
go func() {
go func() {
config := m.Config.Get()
if config.Enabled && bot != nil {
for {
status := loyalty.Status()
if status.Enabled {
config := loyalty.Config()
if config.Points.Interval > 0 {
// Wait for next poll
time.Sleep(time.Duration(config.Points.Interval) * time.Second)
if config.Points.Interval > 0 {
// Wait for next poll
select {
case <-m.ctx.Done():
return
case <-time.After(time.Duration(config.Points.Interval) * time.Second):
}
// If stream is confirmed offline, don't give points away!
isOnline := b.api.streamOnline.Get()
if !isOnline {
// If stream is confirmed offline, don't give points away!
isOnline := m.twitchClient.IsLive()
if !isOnline {
continue
}
m.logger.Debug("awarding points")
// Get user list
users, err := bot.Client.Userlist(bot.Config.Channel)
if err != nil {
m.logger.Error("error listing users", zap.Error(err))
continue
}
// Iterate for each user in the list
pointsToGive := make(map[string]int64)
for _, user := range users {
// Check if user is blocked
if m.IsBanned(user) {
continue
}
b.logger.Debug("awarding points")
// Check if user was active (chatting) for the bonus dingus
award := config.Points.Amount
if m.IsActive(user) {
award += config.Points.ActivityBonus
}
// Get user list
users, err := b.Client.Userlist(b.config.Channel)
// Add to point pool if already on it, otherwise initialize
pointsToGive[user] = award
}
m.ResetActivity()
// If changes were made, save the pool!
if len(users) > 0 {
err := m.GivePoints(pointsToGive)
if err != nil {
b.logger.Error("error listing users", zap.Error(err))
continue
}
// Iterate for each user in the list
pointsToGive := make(map[string]int64)
for _, user := range users {
// Check if user is blocked
if b.IsBanned(user) {
continue
}
// Check if user was active (chatting) for the bonus dingus
award := config.Points.Amount
if b.IsActive(user) {
award += config.Points.ActivityBonus
}
// Add to point pool if already on it, otherwise initialize
pointsToGive[user] = award
}
b.ResetActivity()
// If changes were made, save the pool!
if len(users) > 0 {
err := b.Loyalty.GivePoints(pointsToGive)
if err != nil {
b.logger.Error("error giving points to user", zap.Error(err))
}
m.logger.Error("error giving points to user", zap.Error(err))
}
}
}
}
}()
})
}
}()
}
func (b *Bot) SetBanList(banned []string) {
b.banlist = make(map[string]bool)
for _, usr := range banned {
b.banlist[usr] = true
func (m *Manager) StopTwitch() {
bot := m.twitchClient.Bot
if bot != nil {
bot.RemoveCommand("!redeem")
bot.RemoveCommand("!balance")
bot.RemoveCommand("!goals")
bot.RemoveCommand("!contribute")
// Remove message handler
bot.OnMessage.Unsubscribe(m)
}
}
func (b *Bot) IsBanned(user string) bool {
banned, ok := b.banlist[user]
func (m *Manager) HandleBotMessage(message irc.PrivateMessage) {
m.activeUsers.SetKey(message.User.Name, true)
}
func (m *Manager) SetBanList(banned []string) {
m.banlist = make(map[string]bool)
for _, usr := range banned {
m.banlist[usr] = true
}
}
func (m *Manager) IsBanned(user string) bool {
banned, ok := m.banlist[user]
return ok && banned
}
func (b *Bot) IsActive(user string) bool {
b.mu.Lock()
active, ok := b.activeUsers[user]
b.mu.Unlock()
func (m *Manager) IsActive(user string) bool {
active, ok := m.activeUsers.GetKey(user)
return ok && active
}
func (b *Bot) ResetActivity() {
b.mu.Lock()
b.activeUsers = make(map[string]bool)
b.mu.Unlock()
func (m *Manager) ResetActivity() {
m.activeUsers = containers.NewSyncMap[string, bool]()
}
func cmdBalance(bot *Bot, message irc.PrivateMessage) {
func (m *Manager) cmdBalance(bot *twitch.Bot, message irc.PrivateMessage) {
// Get user balance
balance := bot.Loyalty.GetPoints(message.User.Name)
bot.Client.Say(message.Channel, fmt.Sprintf("%s: You have %d %s!", message.User.DisplayName, balance, bot.Loyalty.Config().Currency))
balance := m.GetPoints(message.User.Name)
bot.Client.Say(message.Channel, fmt.Sprintf("%s: You have %d %s!", message.User.DisplayName, balance, m.Config.Get().Currency))
}
func cmdRedeemReward(bot *Bot, message irc.PrivateMessage) {
func (m *Manager) cmdRedeemReward(bot *twitch.Bot, message irc.PrivateMessage) {
parts := strings.Fields(message.Message)
if len(parts) < 2 {
return
@ -146,7 +167,7 @@ func cmdRedeemReward(bot *Bot, message irc.PrivateMessage) {
redeemID := parts[1]
// Find reward
reward := bot.Loyalty.GetReward(redeemID)
reward := m.GetReward(redeemID)
if reward.ID == "" {
return
}
@ -157,8 +178,8 @@ func cmdRedeemReward(bot *Bot, message irc.PrivateMessage) {
}
// Get user balance
balance := bot.Loyalty.GetPoints(message.User.Name)
config := bot.Loyalty.Config()
balance := m.GetPoints(message.User.Name)
config := m.Config.Get()
// Check if user can afford the reward
if balance-reward.Price < 0 {
@ -172,7 +193,7 @@ func cmdRedeemReward(bot *Bot, message irc.PrivateMessage) {
}
// Perform redeem
if err := bot.Loyalty.PerformRedeem(loyalty.Redeem{
if err := m.PerformRedeem(Redeem{
Username: message.User.Name,
DisplayName: message.User.DisplayName,
When: time.Now(),
@ -180,21 +201,21 @@ func cmdRedeemReward(bot *Bot, message irc.PrivateMessage) {
RequestText: text,
}); err != nil {
switch err {
case loyalty.ErrRedeemInCooldown:
nextAvailable := bot.Loyalty.GetRewardCooldown(reward.ID)
case ErrRedeemInCooldown:
nextAvailable := m.GetRewardCooldown(reward.ID)
bot.Client.Say(message.Channel, fmt.Sprintf("%s: That reward is in cooldown (available in %s)", message.User.DisplayName,
time.Until(nextAvailable).Truncate(time.Second)))
default:
bot.logger.Error("error while performing redeem", zap.Error(err))
m.logger.Error("error while performing redeem", zap.Error(err))
}
return
}
bot.Client.Say(message.Channel, fmt.Sprintf("HolidayPresent %s has redeemed %s! (new balance: %d %s)", message.User.DisplayName, reward.Name, bot.Loyalty.GetPoints(message.User.Name), config.Currency))
bot.Client.Say(message.Channel, fmt.Sprintf("HolidayPresent %s has redeemed %s! (new balance: %d %s)", message.User.DisplayName, reward.Name, m.GetPoints(message.User.Name), config.Currency))
}
func cmdGoalList(bot *Bot, message irc.PrivateMessage) {
goals := bot.Loyalty.Goals()
func (m *Manager) cmdGoalList(bot *twitch.Bot, message irc.PrivateMessage) {
goals := m.Goals.Get()
if len(goals) < 1 {
bot.Client.Say(message.Channel, fmt.Sprintf("%s: There are no active community goals right now :(!", message.User.DisplayName))
return
@ -204,14 +225,14 @@ func cmdGoalList(bot *Bot, message irc.PrivateMessage) {
if !goal.Enabled {
continue
}
msg += fmt.Sprintf("%s (%d/%d %s) [id: %s] | ", goal.Name, goal.Contributed, goal.TotalGoal, bot.Loyalty.Config().Currency, goal.ID)
msg += fmt.Sprintf("%s (%d/%d %s) [id: %s] | ", goal.Name, goal.Contributed, goal.TotalGoal, m.Config.Get().Currency, goal.ID)
}
msg += " Contribute with <!contribute POINTS GOALID>"
bot.Client.Say(message.Channel, msg)
}
func cmdContributeGoal(bot *Bot, message irc.PrivateMessage) {
goals := bot.Loyalty.Goals()
func (m *Manager) cmdContributeGoal(bot *twitch.Bot, message irc.PrivateMessage) {
goals := m.Goals.Get()
// Set defaults if user doesn't provide them
points := int64(100)
@ -283,12 +304,12 @@ func cmdContributeGoal(bot *Bot, message irc.PrivateMessage) {
}
// Add points to goal
if err := bot.Loyalty.PerformContribution(selectedGoal, message.User.Name, points); err != nil {
bot.logger.Error("error while contributing to goal", zap.Error(err))
if err := m.PerformContribution(selectedGoal, message.User.Name, points); err != nil {
m.logger.Error("error while contributing to goal", zap.Error(err))
return
}
config := bot.Loyalty.Config()
config := m.Config.Get()
newRemaining := selectedGoal.TotalGoal - selectedGoal.Contributed
bot.Client.Say(message.Channel, fmt.Sprintf("ShowOfHands %s contributed %d %s to \"%s\"!! Only %d %s left!", message.User.DisplayName, points, config.Currency, selectedGoal.Name, newRemaining, config.Currency))

15
main.go
View File

@ -1,6 +1,7 @@
package main
import (
"context"
"embed"
"fmt"
"log"
@ -12,10 +13,6 @@ import (
"go.uber.org/zap/zapcore"
"github.com/strimertul/strimertul/modules"
"github.com/strimertul/strimertul/modules/loyalty"
"github.com/strimertul/strimertul/modules/twitch"
jsoniter "github.com/json-iterator/go"
"github.com/urfave/cli/v2"
"github.com/wailsapp/wails/v2"
@ -27,20 +24,11 @@ import (
var json = jsoniter.ConfigFastest
const databaseDefaultDriver = "pebble"
var appVersion = "v0.0.0-UNKNOWN"
//go:embed frontend/dist
var frontend embed.FS
type ModuleConstructor = func(manager *modules.Manager) error
var moduleList = map[modules.ModuleID]ModuleConstructor{
modules.ModuleLoyalty: loyalty.Register,
modules.ModuleTwitch: twitch.Register,
}
func main() {
err := fixconsole.FixConsoleIfNeeded()
if err != nil {
@ -99,6 +87,7 @@ func main() {
level = zapcore.InfoLevel
}
initLogger(level)
ctx.Context = context.WithValue(ctx.Context, "logger", logger)
return nil
},
After: func(ctx *cli.Context) error {

View File

@ -1,53 +0,0 @@
package modules
import (
"go.uber.org/zap"
)
type ModuleStatus struct {
Enabled bool
Working bool
Data interface{}
StatusString string
}
func (m ModuleStatus) String() string {
return m.StatusString
}
type Module interface {
Status() ModuleStatus
Close() error
}
type ModuleID string
const (
// Required
ModuleDB ModuleID = "db"
ModuleHTTP ModuleID = "http"
// Feature modules
ModuleLoyalty ModuleID = "loyalty"
// Streaming providers
ModuleTwitch ModuleID = "twitch"
)
type Manager struct {
Modules map[ModuleID]Module
logger *zap.Logger
}
func NewManager(log *zap.Logger) *Manager {
return &Manager{
Modules: make(map[ModuleID]Module),
logger: log,
}
}
func (m *Manager) Logger(module ModuleID) *zap.Logger {
return m.logger.With(zap.String("module", string(module)))
}

View File

@ -6,7 +6,8 @@ import (
"text/template"
"time"
"github.com/strimertul/strimertul/modules/loyalty"
"github.com/strimertul/strimertul/utils"
"go.uber.org/zap"
"github.com/Masterminds/sprig/v3"
@ -15,14 +16,12 @@ import (
type Bot struct {
Client *irc.Client
Config BotConfig
api *Client
username string
config BotConfig
logger *zap.Logger
lastMessage time.Time
activeUsers map[string]bool
banlist map[string]bool
chatHistory []irc.PrivateMessage
commands map[string]BotCommand
@ -30,12 +29,24 @@ type Bot struct {
customTemplates map[string]*template.Template
customFunctions template.FuncMap
OnConnect *utils.PubSub[BotConnectHandler]
OnMessage *utils.PubSub[BotMessageHandler]
mu sync.Mutex
// Module specific vars
Loyalty *loyalty.Manager
Timers *BotTimerModule
Alerts *BotAlertsModule
Timers *BotTimerModule
Alerts *BotAlertsModule
}
type BotConnectHandler interface {
utils.Comparable
HandleBotConnect()
}
type BotMessageHandler interface {
utils.Comparable
HandleBotMessage(message irc.PrivateMessage)
}
func NewBot(api *Client, config BotConfig) *Bot {
@ -43,30 +54,45 @@ func NewBot(api *Client, config BotConfig) *Bot {
client := irc.NewClient(config.Username, config.Token)
bot := &Bot{
Client: client,
Client: client,
Config: config,
username: strings.ToLower(config.Username), // Normalize username
config: config,
logger: api.logger,
api: api,
lastMessage: time.Now(),
activeUsers: make(map[string]bool),
banlist: make(map[string]bool),
mu: sync.Mutex{},
commands: make(map[string]BotCommand),
customCommands: make(map[string]BotCustomCommand),
customTemplates: make(map[string]*template.Template),
OnConnect: utils.NewPubSub[BotConnectHandler](),
OnMessage: utils.NewPubSub[BotMessageHandler](),
}
client.OnConnect(func() {
for _, handler := range bot.OnConnect.Subscribers() {
if handler != nil {
handler.HandleBotConnect()
}
}
})
client.OnPrivateMessage(func(message irc.PrivateMessage) {
for _, handler := range bot.OnMessage.Subscribers() {
if handler != nil {
handler.HandleBotMessage(message)
}
}
// Ignore messages for a while or twitch will get mad!
if message.Time.Before(bot.lastMessage.Add(time.Second * 2)) {
bot.logger.Debug("message received too soon, ignoring")
return
}
bot.mu.Lock()
bot.activeUsers[message.User.Name] = true
lcmessage := strings.ToLower(message.Message)
lowercaseMessage := strings.ToLower(message.Message)
// Check if it's a command
if strings.HasPrefix(message.Message, "!") {
@ -75,10 +101,10 @@ func NewBot(api *Client, config BotConfig) *Bot {
if !data.Enabled {
continue
}
if !strings.HasPrefix(lcmessage, cmd) {
if !strings.HasPrefix(lowercaseMessage, cmd) {
continue
}
parts := strings.SplitN(lcmessage, " ", 2)
parts := strings.SplitN(lowercaseMessage, " ", 2)
if parts[0] != cmd {
continue
}
@ -93,10 +119,10 @@ func NewBot(api *Client, config BotConfig) *Bot {
continue
}
lc := strings.ToLower(cmd)
if !strings.HasPrefix(lcmessage, lc) {
if !strings.HasPrefix(lowercaseMessage, lc) {
continue
}
parts := strings.SplitN(lcmessage, " ", 2)
parts := strings.SplitN(lowercaseMessage, " ", 2)
if parts[0] != lc {
continue
}
@ -106,9 +132,9 @@ func NewBot(api *Client, config BotConfig) *Bot {
bot.mu.Unlock()
bot.api.db.PutJSON(ChatEventKey, message)
if bot.config.ChatHistory > 0 {
if len(bot.chatHistory) >= bot.config.ChatHistory {
bot.chatHistory = bot.chatHistory[len(bot.chatHistory)-bot.config.ChatHistory+1:]
if bot.Config.ChatHistory > 0 {
if len(bot.chatHistory) >= bot.Config.ChatHistory {
bot.chatHistory = bot.chatHistory[len(bot.chatHistory)-bot.Config.ChatHistory+1:]
}
bot.chatHistory = append(bot.chatHistory, message)
bot.api.db.PutJSON(ChatHistoryKey, bot.chatHistory)
@ -184,7 +210,7 @@ func (b *Bot) updateCommands(value string) {
}
func (b *Bot) handleWriteMessageRPC(value string) {
b.Client.Say(b.config.Channel, value)
b.Client.Say(b.Config.Channel, value)
}
func (b *Bot) updateTemplates() error {
@ -203,7 +229,16 @@ func (b *Bot) Connect() error {
}
func (b *Bot) WriteMessage(message string) {
b.Client.Say(b.config.Channel, message)
b.Client.Say(b.Config.Channel, message)
}
func (b *Bot) RegisterCommand(trigger string, command BotCommand) {
// TODO make it goroutine safe?
b.commands[trigger] = command
}
func (b *Bot) RemoveCommand(trigger string) {
delete(b.commands, trigger)
}
func getUserAccessLevel(user irc.User) AccessLevelType {

View File

@ -71,7 +71,7 @@ func (c *Client) GetLoggedUser() (helix.User, error) {
if len(users.Data.Users) < 1 {
return helix.User{}, fmt.Errorf("no users found")
}
return users.Data.Users[0], nil
}

View File

@ -9,11 +9,9 @@ import (
lru "github.com/hashicorp/golang-lru"
jsoniter "github.com/json-iterator/go"
"github.com/nicklaw5/helix/v2"
"github.com/strimertul/strimertul/modules/database"
"github.com/strimertul/strimertul/database"
"github.com/strimertul/strimertul/http"
"go.uber.org/zap"
"github.com/strimertul/strimertul/modules"
"github.com/strimertul/strimertul/modules/loyalty"
)
var json = jsoniter.ConfigFastest
@ -21,35 +19,27 @@ var json = jsoniter.ConfigFastest
type Client struct {
Config Config
Bot *Bot
db *database.DBModule
db *database.LocalDBClient
API *helix.Client
logger *zap.Logger
manager *modules.Manager
eventCache *lru.Cache
restart chan bool
streamOnline *containers.RWSync[bool]
}
func Register(manager *modules.Manager) error {
db, ok := manager.Modules["db"].(*database.DBModule)
if !ok {
return errors.New("db module not found")
}
logger := manager.Logger(modules.ModuleTwitch)
func NewClient(db *database.LocalDBClient, server *http.Server, logger *zap.Logger) (*Client, error) {
eventCache, err := lru.New(128)
if err != nil {
return fmt.Errorf("could not create LRU cache for events: %w", err)
return nil, fmt.Errorf("could not create LRU cache for events: %w", err)
}
// Get Twitch config
// Get Twitch Config
var config Config
err = db.GetJSON(ConfigKey, &config)
if err != nil {
if !errors.Is(err, database.ErrEmptyKey) {
return fmt.Errorf("failed to get twitch config: %w", err)
return nil, fmt.Errorf("failed to get twitch Config: %w", err)
}
config.Enabled = false
}
@ -58,83 +48,85 @@ func Register(manager *modules.Manager) error {
client := &Client{
Config: config,
db: db,
logger: logger,
logger: logger.With(zap.String("service", "twitch")),
restart: make(chan bool, 128),
streamOnline: containers.NewRWSync(false),
manager: manager,
eventCache: eventCache,
}
// Listen for config changes
// Listen for Config changes
err = db.SubscribeKey(ConfigKey, func(value string) {
err := json.UnmarshalFromString(value, &config)
if err != nil {
logger.Error("failed to unmarshal config", zap.Error(err))
client.logger.Error("failed to unmarshal Config", zap.Error(err))
return
}
api, err := client.getHelixAPI()
api, err := client.getHelixAPI(config)
if err != nil {
logger.Warn("failed to create new twitch client, keeping old credentials", zap.Error(err))
client.logger.Warn("failed to create new twitch client, keeping old credentials", zap.Error(err))
return
}
client.API = api
client.Config = config
logger.Info("reloaded/updated Twitch API")
client.logger.Info("reloaded/updated Twitch API")
})
if err != nil {
client.logger.Error("could not setup twitch config reload subscription", zap.Error(err))
client.logger.Error("could not setup twitch Config reload subscription", zap.Error(err))
}
err = db.SubscribeKey(BotConfigKey, func(value string) {
var twitchBotConfig BotConfig
err := json.UnmarshalFromString(value, &twitchBotConfig)
if err != nil {
logger.Error("failed to unmarshal config", zap.Error(err))
client.logger.Error("failed to unmarshal Config", zap.Error(err))
return
}
err = client.Bot.Client.Disconnect()
if err != nil {
logger.Warn("failed to disconnect from Twitch IRC", zap.Error(err))
client.logger.Warn("failed to disconnect from Twitch IRC", zap.Error(err))
}
if client.Config.EnableBot {
if err := client.startBot(manager); err != nil {
if err := client.startBot(); err != nil {
if !errors.Is(err, database.ErrEmptyKey) {
logger.Error("failed to re-create bot", zap.Error(err))
client.logger.Error("failed to re-create bot", zap.Error(err))
}
}
}
client.restart <- true
logger.Info("reloaded/restarted Twitch bot")
client.logger.Info("reloaded/restarted Twitch bot")
})
if err != nil {
client.logger.Error("could not setup twitch bot config reload subscription", zap.Error(err))
client.logger.Error("could not setup twitch bot Config reload subscription", zap.Error(err))
}
if config.Enabled {
client.API, err = client.getHelixAPI()
client.API, err = client.getHelixAPI(config)
if err != nil {
client.logger.Error("failed to create twitch client", zap.Error(err))
} else {
server.SetRoute("/twitch/callback", client.AuthorizeCallback)
go client.runStatusPoll()
go client.connectWebsocket()
}
}
if client.Config.EnableBot {
if err := client.startBot(manager); err != nil {
if err := client.startBot(); err != nil {
if !errors.Is(err, database.ErrEmptyKey) {
return err
return nil, err
}
}
}
go client.runStatusPoll()
go client.connectWebsocket()
go func() {
for {
if client.Config.EnableBot && client.Bot != nil {
err := client.RunBot()
if err != nil {
logger.Error("failed to connect to Twitch IRC", zap.Error(err))
// Wait for config change before retrying
client.logger.Error("failed to connect to Twitch IRC", zap.Error(err))
// Wait for Config change before retrying
<-client.restart
}
} else {
@ -143,14 +135,7 @@ func Register(manager *modules.Manager) error {
}
}()
manager.Modules[modules.ModuleTwitch] = client
// If loyalty module is enabled, set-up loyalty commands
if loyaltyManager, ok := client.manager.Modules[modules.ModuleLoyalty].(*loyalty.Manager); ok && client.Bot != nil {
client.Bot.SetupLoyalty(loyaltyManager)
}
return nil
return client, nil
}
func (c *Client) runStatusPoll() {
@ -160,14 +145,14 @@ func (c *Client) runStatusPoll() {
time.Sleep(60 * time.Second)
// Make sure we're configured and connected properly first
if !c.Config.Enabled || c.Bot == nil || c.Bot.config.Channel == "" {
if !c.Config.Enabled || c.Bot == nil || c.Bot.Config.Channel == "" {
continue
}
// Check if streamer is online, if possible
func() {
status, err := c.API.GetStreams(&helix.StreamsParams{
UserLogins: []string{c.Bot.config.Channel}, // TODO Replace with something non bot dependant
UserLogins: []string{c.Bot.Config.Channel}, // TODO Replace with something non bot dependant
})
if err != nil {
c.logger.Error("Error checking stream status", zap.Error(err))
@ -183,13 +168,13 @@ func (c *Client) runStatusPoll() {
}
}
func (c *Client) startBot(manager *modules.Manager) error {
// Get Twitch bot config
func (c *Client) startBot() error {
// Get Twitch bot Config
var twitchBotConfig BotConfig
err := c.db.GetJSON(BotConfigKey, &twitchBotConfig)
if err != nil {
if !errors.Is(err, database.ErrEmptyKey) {
return fmt.Errorf("failed to get bot config: %w", err)
return fmt.Errorf("failed to get bot Config: %w", err)
}
c.Config.EnableBot = false
}
@ -197,15 +182,10 @@ func (c *Client) startBot(manager *modules.Manager) error {
// Create and run IRC bot
c.Bot = NewBot(c, twitchBotConfig)
// If loyalty module is enabled, set-up loyalty commands
if loyaltyManager, ok := manager.Modules[modules.ModuleLoyalty].(*loyalty.Manager); ok && c.Bot != nil {
c.Bot.SetupLoyalty(loyaltyManager)
}
return nil
}
func (c *Client) getHelixAPI() (*helix.Client, error) {
func (c *Client) getHelixAPI(config Config) (*helix.Client, error) {
redirectURI, err := c.getRedirectURI()
if err != nil {
return nil, err
@ -213,8 +193,8 @@ func (c *Client) getHelixAPI() (*helix.Client, error) {
// Create Twitch client
api, err := helix.NewClient(&helix.Options{
ClientID: c.Config.APIClientID,
ClientSecret: c.Config.APIClientSecret,
ClientID: config.APIClientID,
ClientSecret: config.APIClientSecret,
RedirectURI: redirectURI,
})
if err != nil {
@ -245,19 +225,8 @@ func (c *Client) RunBot() error {
}
}
func (c *Client) Status() modules.ModuleStatus {
if !c.Config.Enabled {
return modules.ModuleStatus{
Enabled: false,
}
}
return modules.ModuleStatus{
Enabled: true,
Working: c.Bot != nil && c.Bot.Client != nil,
Data: struct{}{},
StatusString: "",
}
func (c *Client) IsLive() bool {
return c.streamOnline.Get()
}
func (c *Client) Close() error {

7
utils/equal.go Normal file
View File

@ -0,0 +1,7 @@
package utils
// Comparable is a workaround for Go incomplete implementation of generics
// See https://github.com/golang/go/issues/56548
type Comparable interface {
Equals(Comparable) bool
}

18
utils/json.go Normal file
View File

@ -0,0 +1,18 @@
package utils
import (
"git.sr.ht/~hamcha/containers"
jsoniter "github.com/json-iterator/go"
)
var json = jsoniter.ConfigFastest
func LoadJSONToWrapped[T any](data string, sync containers.Wrapped[T]) error {
var result T
err := json.UnmarshalFromString(data, &result)
if err != nil {
return err
}
sync.Set(result)
return nil
}

33
utils/pubsub.go Normal file
View File

@ -0,0 +1,33 @@
package utils
import "git.sr.ht/~hamcha/containers"
type PubSub[T Comparable] struct {
subscribers *containers.RWSync[[]T]
}
func NewPubSub[T Comparable]() *PubSub[T] {
return &PubSub[T]{
subscribers: containers.NewRWSync([]T{}),
}
}
func (p *PubSub[T]) Subscribe(handler T) {
p.subscribers.Set(append(p.subscribers.Get(), handler))
}
func (p *PubSub[T]) Unsubscribe(handler T) {
arr := p.subscribers.Get()
// Use slice trick to in-place remove entry if found
for index := range arr {
if arr[index].Equals(handler) {
arr[index] = arr[len(arr)-1]
p.subscribers.Set(arr[:len(arr)-1])
return
}
}
}
func (p *PubSub[T]) Subscribers() []T {
return p.subscribers.Get()
}