From 4a764c0450729b3acee5643e6fc96382056e6037 Mon Sep 17 00:00:00 2001 From: Ash Keel Date: Tue, 1 Feb 2022 12:35:34 +0100 Subject: [PATCH] Multiple databases! --- CHANGELOG.md | 7 +- driver.badgerdb.go | 288 +++++++++++++++++++++++++ go.mod | 10 +- go.sum | 51 +++-- main.go | 116 ++++------ migrations.go | 92 -------- modules/database/badger/test.go | 1 + modules/database/database.go | 163 +++++++++++++++ modules/database/db.go | 229 -------------------- modules/http/server.go | 70 +++---- modules/loyalty/manager.go | 239 ++++++++++----------- modules/loyalty/migration.go | 8 +- modules/stulbe/client.go | 70 +++---- modules/twitch/bot.go | 50 ++--- modules/twitch/client.go | 64 +++--- modules/twitch/commands.go | 2 +- modules/twitch/modules.alerts.go | 349 +++++++++++++++---------------- modules/twitch/modules.timer.go | 22 +- 18 files changed, 950 insertions(+), 881 deletions(-) create mode 100644 driver.badgerdb.go delete mode 100644 migrations.go create mode 100644 modules/database/badger/test.go create mode 100644 modules/database/database.go delete mode 100644 modules/database/db.go diff --git a/CHANGELOG.md b/CHANGELOG.md index ae22ba2..cc119f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,16 +10,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - **New UI**: Strimertul now features a more slick and better organized UI. +- **Multiple database options**: Strimertul now supports multiple databases drivers. BadgerDB will remain as default (and currently the only option) for the time being but other databases are coming in the future. - **Database operations**: You can now export and import the entire database as JSON files - **Database backups**: The database will periodically save optimized copies of itself in a backup directory, directory and intervals are both configurable, though for the time being you might need to periodically clean it before it becomes too large. -- **Manual garbage collection**: You can now launch strimertul with `--run-gc` to manually trigger garbage collection for the database. This will launch strimertul, execute a round of garbage collection and exit. - **Exposed internal metrics via keys**: `twitch/chat-activity` and `twitch/stream-status` now expose previously internal-only info about the current stream. ### Changed +- The logging library has been changed to zap, the format of logs will therefore be wildly different. - A lot of the command line parameters have changed syntax (eg. from -noheader to -no-header), please check the new formats using `-h` if you rely on them. - Database schema has slightly changed, strimertul will auto-migrate to the new format if it detects old schema in use. + +### Removed + - Twitch chat history doesn't have an explicit toggle anymore, they are always enabled unless the `chat_history` setting is set to 0. +- Loyalty point migration from v1.2.0 and earlier has been removed. If you are somehow running such an old version of strimertul and using loyalty points, run any version of strimertul between v1.3.0 and v1.7.0 first to make sure all points are migrated to the new format. ## [1.7.0] - 2021-12-07 diff --git a/driver.badgerdb.go b/driver.badgerdb.go new file mode 100644 index 0000000..76270ad --- /dev/null +++ b/driver.badgerdb.go @@ -0,0 +1,288 @@ +package main + +import ( + "bufio" + "encoding/binary" + "errors" + "fmt" + "io" + "os" + "time" + + "github.com/dgraph-io/badger/v3" + "github.com/dgraph-io/badger/v3/pb" + "github.com/golang/protobuf/proto" + + badger_driver "github.com/strimertul/kv-badgerdb" + + kv "github.com/strimertul/kilovolt/v8" + "github.com/strimertul/strimertul/modules/loyalty" + "github.com/strimertul/strimertul/modules/stulbe" + "github.com/strimertul/strimertul/modules/twitch" + + jsoniter "github.com/json-iterator/go" + "go.uber.org/zap" +) + +func makeBadgerHub(options dbOptions) (*badger.DB, *kv.Hub, error) { + // Loading routine + db, err := badger.Open(badger.DefaultOptions(options.directory).WithSyncWrites(true)) + failOnError(err, "Could not open DB") + + // Run migrations + pre200MigrateModuleConfig(db) + + // Run garbage collection every once in a while + go func() { + ticker := time.NewTicker(15 * time.Minute) + defer ticker.Stop() + for range ticker.C { + // Run DB garbage collection until it's done + var err error + for err == nil { + err = db.RunValueLogGC(0.5) + } + } + }() + + if options.restore != "" { + file, err := os.Open(options.restore) + failOnError(err, "Could not open backup") + failOnError(badgerRestoreOverwrite(db, file), "Could not restore database") + _ = db.Sync() + logger.Info("Restored database from backup") + } + + // Backup database periodically + go func() { + if options.backupDir == "" { + logger.Warn("Backup directory not set, database backups are disabled (this is dangerous, power loss will result in your database being potentially wiped!)") + return + } + + err := os.MkdirAll(options.backupDir, 0755) + if err != nil { + logger.Error("Could not create backup directory, moving to a temporary folder", zap.Error(err)) + options.backupDir = os.TempDir() + logger.Info("Using temporary directory", zap.String("backup-dir", options.backupDir)) + return + } + + ticker := time.NewTicker(time.Duration(options.backupInterval) * time.Minute) + defer ticker.Stop() + for range ticker.C { + // Run backup procedure + file, err := os.Create(fmt.Sprintf("%s/%s.db", options.backupDir, time.Now().Format("20060102-150405"))) + if err != nil { + logger.Error("Could not create backup file", zap.Error(err)) + continue + } + _, err = db.Backup(file, 0) + if err != nil { + logger.Error("Could not backup database", zap.Error(err)) + } + _ = file.Close() + logger.Info("Database backed up", zap.String("backup-file", file.Name())) + } + }() + + hub, err := kv.NewHub(badger_driver.NewBadgerBackend(db), kv.HubOptions{}, logger) + return db, hub, err +} + +func badgerClose(db *badger.DB) error { + return db.Close() +} + +func badgerRestoreOverwrite(db *badger.DB, r io.Reader) error { + br := bufio.NewReaderSize(r, 16<<10) + unmarshalBuf := make([]byte, 1<<10) + + for { + var sz uint64 + err := binary.Read(br, binary.LittleEndian, &sz) + if err == io.EOF { + break + } else if err != nil { + return err + } + + if cap(unmarshalBuf) < int(sz) { + unmarshalBuf = make([]byte, sz) + } + + if _, err = io.ReadFull(br, unmarshalBuf[:sz]); err != nil { + return err + } + + list := &pb.KVList{} + if err := proto.Unmarshal(unmarshalBuf[:sz], list); err != nil { + return err + } + + err = db.Update(func(txn *badger.Txn) error { + for _, kvpair := range list.Kv { + err := txn.Set(kvpair.Key, kvpair.Value) + if err != nil { + return err + } + } + return nil + }) + if err != nil { + return err + } + } + + return nil +} + +// pre200MigrateModuleConfig migrates <2.0 module configs to 2.0+ +func pre200MigrateModuleConfig(db *badger.DB) { + const pre180ModuleConfigKey = "stul-meta/modules" + + type pre180ModuleConfig struct { + CompletedOnboarding bool `json:"configured"` + EnableTwitch bool `json:"twitch"` + EnableStulbe bool `json:"stulbe"` + EnableLoyalty bool `json:"loyalty"` + } + + // Check if onboarding was completed + var moduleConfig pre180ModuleConfig + err := db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte(pre180ModuleConfigKey)) + if err != nil { + return err + } + err = item.Value(func(val []byte) error { + return jsoniter.Unmarshal(val, &moduleConfig) + }) + if err != nil { + return err + } + return nil + }) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + // Either first boot or migration already done + return + } else { + fatalError(err, "Could not read from DB") + } + } + + // ?? Should never happen, maybe we just have an empty key? + if !moduleConfig.CompletedOnboarding { + err = db.Update(func(txn *badger.Txn) error { + return txn.Delete([]byte(pre180ModuleConfigKey)) + }) + failOnError(err, "Failed to remove pre-1.8 module config") + return + } + + // Migrate to new config by updating every related module + var twitchConfig twitch.Config + err = db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte(twitch.ConfigKey)) + if err != nil { + return err + } + err = item.Value(func(val []byte) error { + return jsoniter.Unmarshal(val, &twitchConfig) + }) + if err != nil { + return err + } + return nil + }) + if err != nil { + if !errors.Is(err, badger.ErrKeyNotFound) { + fatalError(err, "Could not read from DB") + } + } else { + twitchConfig.Enabled = moduleConfig.EnableTwitch + err = db.Update(func(txn *badger.Txn) error { + byt, err := jsoniter.ConfigFastest.Marshal(twitchConfig) + if err != nil { + return err + } + return txn.Set([]byte(twitch.ConfigKey), byt) + }) + if err != nil { + logger.Error("Failed to update twitch config during 1.8 migration", zap.Error(err)) + } + } + + var stulbeConfig stulbe.Config + err = db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte(stulbe.ConfigKey)) + if err != nil { + return err + } + err = item.Value(func(val []byte) error { + return jsoniter.Unmarshal(val, &stulbeConfig) + }) + if err != nil { + return err + } + return nil + }) + if err != nil { + if !errors.Is(err, badger.ErrKeyNotFound) { + fatalError(err, "Could not read from DB") + } + } else { + stulbeConfig.Enabled = moduleConfig.EnableStulbe + err = db.Update(func(txn *badger.Txn) error { + byt, err := jsoniter.ConfigFastest.Marshal(stulbeConfig) + if err != nil { + return err + } + return txn.Set([]byte(stulbe.ConfigKey), byt) + }) + if err != nil { + logger.Error("Failed to update stulbe config during 1.8 migration", zap.Error(err)) + } + } + + var loyaltyConfig loyalty.Config + err = db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte(loyalty.ConfigKey)) + if err != nil { + return err + } + err = item.Value(func(val []byte) error { + return jsoniter.Unmarshal(val, &loyaltyConfig) + }) + if err != nil { + return err + } + return nil + }) + if err != nil { + if !errors.Is(err, badger.ErrKeyNotFound) { + fatalError(err, "Could not read from DB") + } + } else { + loyaltyConfig.Enabled = moduleConfig.EnableLoyalty + err = db.Update(func(txn *badger.Txn) error { + byt, err := jsoniter.ConfigFastest.Marshal(loyaltyConfig) + if err != nil { + return err + } + return txn.Set([]byte(loyalty.ConfigKey), byt) + }) + if err != nil { + logger.Error("Failed to update loyalty config during 1.8 migration", zap.Error(err)) + } + } + + logger.Info("Migrated module config to 2.0+") + + // Remove old config key + err = db.Update(func(txn *badger.Txn) error { + return txn.Delete([]byte(pre180ModuleConfigKey)) + }) + failOnError(err, "Failed to remove pre-1.8 module config") +} diff --git a/go.mod b/go.mod index 33747ee..f07c072 100644 --- a/go.mod +++ b/go.mod @@ -4,13 +4,17 @@ go 1.16 require ( github.com/Masterminds/sprig/v3 v3.2.2 + github.com/dgraph-io/badger v1.6.0 github.com/dgraph-io/badger/v3 v3.2103.2 github.com/gempir/go-twitch-irc/v2 v2.5.0 - github.com/golang/protobuf v1.5.2 + github.com/golang/protobuf v1.4.2 github.com/json-iterator/go v1.1.12 + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/nicklaw5/helix/v2 v2.2.1 github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 - github.com/strimertul/kilovolt/v7 v7.0.1 - github.com/strimertul/stulbe-client-go v0.7.0 + github.com/strimertul/kilovolt/v8 v8.0.2 + github.com/strimertul/kv-badgerdb v1.2.1 + github.com/strimertul/stulbe-client-go v0.7.2 go.uber.org/zap v1.20.0 + golang.org/x/sys v0.0.0-20210909193231-528a39cd75f3 // indirect ) diff --git a/go.sum b/go.sum index 89ad7af..74e96a8 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9 h1:HD8gA2tkByhMAwYaFAX9w2l7vxvBQ5NMoxDrkhqhtn4= +github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI= github.com/Masterminds/goutils v1.1.1/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU= @@ -23,6 +25,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo= +github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= github.com/dgraph-io/badger/v3 v3.2103.2 h1:dpyM5eCJAtQCBcMCZcT4UBZchuTJgCywerHHgmxfxM8= github.com/dgraph-io/badger/v3 v3.2103.2/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M= github.com/dgraph-io/ristretto v0.1.0 h1:Jv3CGQHp9OjuMBSne1485aDpUkTKEcUqF+jm/LuerPI= @@ -45,17 +49,22 @@ github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4er github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= -github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw= github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -84,8 +93,9 @@ github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrk github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/reflectwalk v1.0.0 h1:9D+8oIskB4VJBN5SFlmc27fSlIBZaov1Wpk/IfikLNY= github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= -github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= @@ -123,12 +133,15 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/strimertul/kilovolt-client-go/v7 v7.0.1 h1:M8KqpujDwu4Cg9PQJ2rUG95S+Fq5zYTeshLBJL2Cw0I= -github.com/strimertul/kilovolt-client-go/v7 v7.0.1/go.mod h1:8ZnvQqoYUZRTJEjhfo+zsLxP1FZ3UMQD5dVhH7pnHiA= -github.com/strimertul/kilovolt/v7 v7.0.1 h1:VUz5ECpjLMxSgijDF3Wa9EeCYzVD1FsCsJD2xf7hl0U= -github.com/strimertul/kilovolt/v7 v7.0.1/go.mod h1:mWpyHDmfKOFdwW1oNH67EoRR2uqIXw5ieQktNg1cDno= -github.com/strimertul/stulbe-client-go v0.7.0 h1:L9wgYRv2HJ+Kv84qjXYnLfBZgJcS3x0sVxKUf056ECA= -github.com/strimertul/stulbe-client-go v0.7.0/go.mod h1:4M+NubPW+NV4KrpzCgKAHs6tyKLLbS0c9XoHiQ6o/LA= +github.com/strimertul/kilovolt-client-go/v8 v8.0.0 h1:d3BAm5qavK9GPUpOtljpsyrjmSfR2AInGe1ypZP9apc= +github.com/strimertul/kilovolt-client-go/v8 v8.0.0/go.mod h1:PNEbu0zrdYD9B9UYUoLSpV+saRJlC0cr9OHdPALUb+o= +github.com/strimertul/kilovolt/v8 v8.0.0/go.mod h1:vW++ELCWnYzENIIP33p+zDGQjz/GpQ5z7YRCBrBtCzA= +github.com/strimertul/kilovolt/v8 v8.0.2 h1:hgobhb95b1cyD5Mpq3McR2AKxUhuoQc4tNTyQAwe0vg= +github.com/strimertul/kilovolt/v8 v8.0.2/go.mod h1:vW++ELCWnYzENIIP33p+zDGQjz/GpQ5z7YRCBrBtCzA= +github.com/strimertul/kv-badgerdb v1.2.1 h1:9zRW05/rkZ47UWYeAmbfTCozYn/nXysFzd0D1iC2gxM= +github.com/strimertul/kv-badgerdb v1.2.1/go.mod h1:aFlPPOSxUNgzMBRj3Rfo2vSc9YzWyq09d+qwjgEMW9I= +github.com/strimertul/stulbe-client-go v0.7.2 h1:mco2JjkYuahgq1p8nlH7TRWNgFKyQMPb83AnBNO6B6E= +github.com/strimertul/stulbe-client-go v0.7.2/go.mod h1:moBqGVP+6cDkJM760YUhSuLgrenHsRRgnC6s+91TzSs= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -183,12 +196,14 @@ golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210909193231-528a39cd75f3 h1:3Ad41xy2WCESpufXwgs7NpDSu+vjxqLt2UFqUV+20bI= +golang.org/x/sys v0.0.0-20210909193231-528a39cd75f3/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -211,9 +226,13 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/main.go b/main.go index 32c044c..4b52cb0 100644 --- a/main.go +++ b/main.go @@ -11,19 +11,22 @@ import ( "runtime" "time" + "github.com/dgraph-io/badger/v3" + "github.com/strimertul/strimertul/modules/database" + + kv "github.com/strimertul/kilovolt/v8" + "go.uber.org/zap/zapcore" jsoniter "github.com/json-iterator/go" "go.uber.org/zap" "github.com/strimertul/strimertul/modules" - "github.com/strimertul/strimertul/modules/database" "github.com/strimertul/strimertul/modules/http" "github.com/strimertul/strimertul/modules/loyalty" "github.com/strimertul/strimertul/modules/stulbe" "github.com/strimertul/strimertul/modules/twitch" - "github.com/dgraph-io/badger/v3" "github.com/pkg/browser" _ "net/http/pprof" @@ -50,18 +53,25 @@ var moduleList = map[modules.ModuleID]ModuleConstructor{ modules.ModuleTwitch: twitch.Register, } +type dbOptions struct { + directory string + restore string + backupDir string + backupInterval int +} + func main() { // Get cmd line parameters noHeader := flag.Bool("no-header", false, "Do not print the app header") dbDir := flag.String("database-dir", "data", "Path to strimertül database dir") debug := flag.Bool("debug", false, "Start in debug mode (more logging)") json := flag.Bool("json", false, "Print logging in JSON format") - cleanup := flag.Bool("run-gc", false, "Run garbage collection and exit immediately after") exportDB := flag.Bool("export", false, "Export database as JSON") importDB := flag.String("import", "", "Import database from JSON file") restoreDB := flag.String("restore", "", "Restore database from backup file") backupDir := flag.String("backup-dir", "backups", "Path to directory with database backups") backupInterval := flag.Int("backup-interval", 60, "Backup database every X minutes, 0 to disable") + driver := flag.String("driver", "badger", "Database driver to use (available: badger,pebble)") flag.Parse() rand.Seed(time.Now().UnixNano()) @@ -93,24 +103,28 @@ func main() { // Create module manager manager := modules.NewManager(logger) - // Loading routine - db, err := database.Open(badger.DefaultOptions(*dbDir), manager) - failOnError(err, "Could not open DB") - defer func() { - if err := db.Close(); err != nil { - logger.Error("Could not close DB", zap.Error(err)) - } - }() - - if *cleanup { - // Run DB garbage collection until it's done - var err error - for err == nil { - err = db.Client().RunValueLogGC(0.5) - } - return + // Make KV hub + var hub *kv.Hub + var err error + logger.Info("opening database", zap.String("driver", *driver)) + switch *driver { + case "badger": + var db *badger.DB + db, hub, err = makeBadgerHub(dbOptions{directory: *dbDir, backupDir: *backupDir, backupInterval: *backupInterval, restore: *restoreDB}) + defer func() { + if err := badgerClose(db); err != nil { + logger.Fatal("Failed to close database", zap.Error(err)) + } + }() + default: + logger.Fatal("Unknown database driver", zap.String("driver", *driver)) } + go hub.Run() + + db, err := database.NewDBModule(hub, manager) + failOnError(err, "Failed to initialize database module") + if *exportDB { // Export database to stdout data, err := db.GetAll("") @@ -128,7 +142,7 @@ func main() { errors := 0 imported := 0 for key, value := range entries { - err = db.PutKey(key, []byte(value)) + err = db.PutKey(key, value) if err != nil { logger.Error("Could not import entry", zap.String("key", key), zap.Error(err)) errors += 1 @@ -136,28 +150,16 @@ func main() { imported += 1 } } - _ = db.Client().Sync() logger.Info("Imported database from file", zap.Int("imported", imported), zap.Int("errors", errors)) } - if *restoreDB != "" { - file, err := os.Open(*restoreDB) - failOnError(err, "Could not open backup") - err = db.RestoreOverwrite(file) - failOnError(err, "Could not restore database") - _ = db.Client().Sync() - logger.Info("Restored database from backup") - } - // Set meta keys - _ = db.PutKey("stul-meta/version", []byte(appVersion)) - - runMigrations(db) + _ = db.PutKey("stul-meta/version", appVersion) for module, constructor := range moduleList { err := constructor(manager) if err != nil { - logger.Error("Could not register module", zap.String("module", string(module))) + logger.Error("Could not register module", zap.String("module", string(module)), zap.Error(err)) } else { //goland:noinspection GoDeferInLoop defer func() { @@ -189,52 +191,6 @@ func main() { } }() - // Run garbage collection every once in a while - go func() { - ticker := time.NewTicker(15 * time.Minute) - defer ticker.Stop() - for range ticker.C { - // Run DB garbage collection until it's done - var err error - for err == nil { - err = db.Client().RunValueLogGC(0.5) - } - } - }() - - // Backup database periodically - go func() { - if *backupDir == "" { - logger.Warn("Backup directory not set, database backups are disabled (this is dangerous, power loss will result in your database being potentially wiped!)") - return - } - - err := os.MkdirAll(*backupDir, 0755) - if err != nil { - logger.Error("Could not create backup directory, moving to a temporary folder", zap.Error(err)) - *backupDir = os.TempDir() - logger.Info("Using temporary directory", zap.String("backup-dir", *backupDir)) - return - } - - ticker := time.NewTicker(time.Duration(*backupInterval) * time.Minute) - defer ticker.Stop() - for range ticker.C { - // Run backup procedure - file, err := os.Create(fmt.Sprintf("%s/%s.db", *backupDir, time.Now().Format("20060102-150405"))) - if err != nil { - logger.Error("Could not create backup file", zap.Error(err)) - continue - } - _, err = db.Client().Backup(file, 0) - if err != nil { - logger.Error("Could not backup database", zap.Error(err)) - } - _ = file.Close() - logger.Info("Database backed up", zap.String("backup-file", file.Name())) - } - }() - // Start HTTP server failOnError(httpServer.Listen(), "HTTP server stopped") } diff --git a/migrations.go b/migrations.go deleted file mode 100644 index d4bc9a6..0000000 --- a/migrations.go +++ /dev/null @@ -1,92 +0,0 @@ -package main - -import ( - "errors" - - "github.com/strimertul/strimertul/modules/database" - "github.com/strimertul/strimertul/modules/loyalty" - "github.com/strimertul/strimertul/modules/stulbe" - "github.com/strimertul/strimertul/modules/twitch" - - "github.com/dgraph-io/badger/v3" - "go.uber.org/zap" -) - -func runMigrations(db *database.DB) { - pre180MigrateModuleConfig(db) -} - -// pre180MigrateModuleConfig migrates <1.8 module configs to 1.8+ -func pre180MigrateModuleConfig(db *database.DB) { - const pre180ModuleConfigKey = "stul-meta/modules" - - type pre180ModuleConfig struct { - CompletedOnboarding bool `json:"configured"` - EnableTwitch bool `json:"twitch"` - EnableStulbe bool `json:"stulbe"` - EnableLoyalty bool `json:"loyalty"` - } - - // Check if onboarding was completed - var moduleConfig pre180ModuleConfig - err := db.GetJSON(pre180ModuleConfigKey, &moduleConfig) - if err != nil { - if errors.Is(err, badger.ErrKeyNotFound) { - // Either first boot or migration already done - return - } else { - fatalError(err, "Could not read from DB") - } - } - - // ?? Should never happen, maybe we just have an empty key? - if !moduleConfig.CompletedOnboarding { - failOnError(db.RemoveKey(pre180ModuleConfigKey), "Failed to remove pre-1.8 module config") - return - } - - // Migrate to new config by updating every related module - var twitchConfig twitch.Config - err = db.GetJSON(twitch.ConfigKey, &twitchConfig) - if err != nil { - if !errors.Is(err, badger.ErrKeyNotFound) { - fatalError(err, "Could not read from DB") - } - } else { - twitchConfig.Enabled = moduleConfig.EnableTwitch - if err := db.PutJSON(twitch.ConfigKey, twitchConfig); err != nil { - logger.Error("Failed to update twitch config during 1.8 migration", zap.Error(err)) - } - } - - var stulbeConfig stulbe.Config - err = db.GetJSON(stulbe.ConfigKey, &stulbeConfig) - if err != nil { - if !errors.Is(err, badger.ErrKeyNotFound) { - fatalError(err, "Could not read from DB") - } - } else { - stulbeConfig.Enabled = moduleConfig.EnableStulbe - if err := db.PutJSON(stulbe.ConfigKey, stulbeConfig); err != nil { - logger.Error("Failed to update stulbe config during 1.8 migration", zap.Error(err)) - } - } - - var loyaltyConfig loyalty.Config - err = db.GetJSON(loyalty.ConfigKey, &loyaltyConfig) - if err != nil { - if !errors.Is(err, badger.ErrKeyNotFound) { - fatalError(err, "Could not read from DB") - } - } else { - loyaltyConfig.Enabled = moduleConfig.EnableLoyalty - if err := db.PutJSON(loyalty.ConfigKey, loyaltyConfig); err != nil { - logger.Error("Failed to update loyalty config during 1.8 migration", zap.Error(err)) - } - } - - logger.Info("Migrated module config to 1.8+") - - // Remove old config key - failOnError(db.RemoveKey(pre180ModuleConfigKey), "Failed to remove pre-1.8 module config") -} diff --git a/modules/database/badger/test.go b/modules/database/badger/test.go new file mode 100644 index 0000000..2b90f77 --- /dev/null +++ b/modules/database/badger/test.go @@ -0,0 +1 @@ +package badger diff --git a/modules/database/database.go b/modules/database/database.go new file mode 100644 index 0000000..4711ffe --- /dev/null +++ b/modules/database/database.go @@ -0,0 +1,163 @@ +package database + +import ( + "fmt" + + "github.com/strimertul/strimertul/modules" + + jsoniter "github.com/json-iterator/go" + kv "github.com/strimertul/kilovolt/v8" + "go.uber.org/zap" +) + +var json = jsoniter.ConfigFastest + +var ( + // ErrUnknown is returned when a response is received that doesn't match any expected outcome. + ErrUnknown = fmt.Errorf("unknown error") +) + +type DBModule struct { + client *kv.LocalClient + hub *kv.Hub + logger *zap.Logger +} + +type KvPair struct { + Key string + Data string +} + +func NewDBModule(hub *kv.Hub, manager *modules.Manager) (*DBModule, error) { + logger := manager.Logger(modules.ModuleDB) + localClient := kv.NewLocalClient(kv.ClientOptions{}, logger) + go localClient.Run() + hub.AddClient(localClient) + localClient.Wait() + err := hub.SetAuthenticated(localClient.UID(), true) + if err != nil { + return nil, err + } + module := &DBModule{ + client: localClient, + hub: hub, + logger: logger, + } + + manager.Modules[modules.ModuleDB] = module + return module, nil +} + +func (mod *DBModule) Hub() *kv.Hub { + return mod.hub +} + +func (mod *DBModule) Status() modules.ModuleStatus { + return modules.ModuleStatus{ + Enabled: mod.hub != nil, + Working: mod.client != nil, + StatusString: "ok", + } +} + +func (mod *DBModule) Close() error { + mod.hub.RemoveClient(mod.client) + return nil +} + +func (mod *DBModule) GetKey(key string) (string, error) { + res, err := mod.makeRequest(kv.CmdReadKey, map[string]interface{}{"key": key}) + if err != nil { + return "", err + } + return res.Data.(string), nil +} + +func (mod *DBModule) PutKey(key string, data string) error { + _, err := mod.makeRequest(kv.CmdWriteKey, map[string]interface{}{"key": key, "data": data}) + return err +} + +func (mod *DBModule) Subscribe(fn kv.SubscriptionCallback, prefixes ...string) error { + for _, prefix := range prefixes { + _, err := mod.makeRequest(kv.CmdSubscribePrefix, map[string]interface{}{"prefix": prefix}) + if err != nil { + return err + } + mod.client.SetPrefixSubCallback(prefix, fn) + } + return nil +} + +func (mod *DBModule) GetJSON(key string, dst interface{}) error { + res, err := mod.GetKey(key) + if err != nil { + return err + } + return json.Unmarshal([]byte(res), dst) +} + +func (mod *DBModule) GetAll(prefix string) (map[string]string, error) { + res, err := mod.makeRequest(kv.CmdReadPrefix, map[string]interface{}{"prefix": prefix}) + if err != nil { + return nil, err + } + + out := make(map[string]string) + for key, value := range res.Data.(map[string]interface{}) { + out[key] = value.(string) + } + return out, nil +} + +func (mod *DBModule) PutJSON(key string, data interface{}) error { + byt, err := json.Marshal(data) + if err != nil { + return err + } + + return mod.PutKey(key, string(byt)) +} + +func (mod *DBModule) PutJSONBulk(kvs map[string]interface{}) error { + encoded := make(map[string]interface{}) + for k, v := range kvs { + byt, err := json.Marshal(v) + if err != nil { + return err + } + encoded[k] = string(byt) + } + _, chn := mod.client.MakeRequest(kv.CmdWriteBulk, encoded) + _, err := getResponse(<-chn) + return err +} + +func (mod *DBModule) RemoveKey(key string) error { + // TODO + return mod.PutKey(key, "") +} + +func (mod *DBModule) makeRequest(cmd string, data map[string]interface{}) (kv.Response, error) { + req, chn := mod.client.MakeRequest(cmd, data) + mod.hub.SendMessage(req) + return getResponse(<-chn) +} + +func getResponse(response interface{}) (kv.Response, error) { + switch c := response.(type) { + case kv.Response: + return c, nil + case kv.Error: + return kv.Response{}, &KvError{c} + } + return kv.Response{}, ErrUnknown +} + +type KvError struct { + ErrorData kv.Error +} + +func (kv *KvError) Error() string { + return fmt.Sprintf("%s: %s", kv.ErrorData.Error, kv.ErrorData.Details) +} diff --git a/modules/database/db.go b/modules/database/db.go deleted file mode 100644 index e72e56a..0000000 --- a/modules/database/db.go +++ /dev/null @@ -1,229 +0,0 @@ -package database - -import ( - "bufio" - "context" - "encoding/binary" - "io" - - "go.uber.org/zap" - - "github.com/golang/protobuf/proto" - "github.com/strimertul/strimertul/modules" - - "github.com/dgraph-io/badger/v3" - "github.com/dgraph-io/badger/v3/pb" - jsoniter "github.com/json-iterator/go" -) - -var json = jsoniter.ConfigFastest - -var ( - ErrKeyNotFound = badger.ErrKeyNotFound -) - -type DB struct { - client *badger.DB - logger *zap.Logger -} - -type ModifiedKV struct { - Key string - Data []byte - Meta []byte - Version uint64 - Expires uint64 -} - -func Open(options badger.Options, manager *modules.Manager) (*DB, error) { - // Create logger - logger := manager.Logger(modules.ModuleDB) - - // Open database - client, err := badger.Open(options) - if err != nil { - return nil, err - } - - // Create DB instance - db := &DB{ - client: client, - logger: logger, - } - - // Register DB module - manager.Modules[modules.ModuleDB] = db - - return db, nil -} - -func (db *DB) Client() *badger.DB { - return db.client -} - -func (db *DB) Status() modules.ModuleStatus { - lsm, vlog := db.client.Size() - return modules.ModuleStatus{ - Enabled: true, - Working: !db.client.IsClosed(), - Data: struct { - LSMSize int64 - VlogSize int64 - }{ - lsm, - vlog, - }, - StatusString: db.client.LevelsToString(), - } -} - -func (db *DB) Close() error { - return db.client.Close() -} - -func (db *DB) GetKey(key string) ([]byte, error) { - var byt []byte - err := db.client.View(func(t *badger.Txn) error { - item, err := t.Get([]byte(key)) - if err != nil { - return err - } - byt, err = item.ValueCopy(nil) - return err - }) - return byt, err -} - -func (db *DB) PutKey(key string, data []byte) error { - return db.client.Update(func(t *badger.Txn) error { - return t.Set([]byte(key), data) - }) -} - -func (db *DB) Subscribe(ctx context.Context, fn func(changed []ModifiedKV) error, prefixes ...string) error { - prefixList := make([][]byte, len(prefixes)) - for index, prefix := range prefixes { - prefixList[index] = []byte(prefix) - } - var matches []pb.Match - for _, prefix := range prefixList { - matches = append(matches, pb.Match{ - Prefix: prefix, - }) - } - return db.client.Subscribe(ctx, func(kv *badger.KVList) error { - modified := make([]ModifiedKV, len(kv.Kv)) - for index, newKV := range kv.Kv { - modified[index] = ModifiedKV{ - Key: string(newKV.Key), - Data: newKV.Value, - Meta: newKV.UserMeta, - Version: newKV.Version, - Expires: newKV.ExpiresAt, - } - } - return fn(modified) - }, matches) -} - -func (db *DB) GetJSON(key string, dst interface{}) error { - return db.client.View(func(t *badger.Txn) error { - item, err := t.Get([]byte(key)) - if err != nil { - return err - } - byt, err := item.ValueCopy(nil) - if err != nil { - return err - } - return json.Unmarshal(byt, dst) - }) -} - -func (db *DB) GetAll(prefix string) (map[string]string, error) { - out := make(map[string]string) - err := db.client.View(func(t *badger.Txn) error { - opt := badger.DefaultIteratorOptions - opt.Prefix = []byte(prefix) - it := t.NewIterator(opt) - defer it.Close() - for it.Rewind(); it.Valid(); it.Next() { - item := it.Item() - byt, err := item.ValueCopy(nil) - if err != nil { - return err - } - out[string(item.Key()[len(prefix):])] = string(byt) - } - return nil - }) - return out, err -} - -func (db *DB) PutJSON(key string, data interface{}) error { - return db.client.Update(func(t *badger.Txn) error { - byt, err := json.Marshal(data) - if err != nil { - return err - } - return t.Set([]byte(key), byt) - }) -} - -func (db *DB) PutJSONBulk(kvs map[string]interface{}) error { - return db.client.Update(func(t *badger.Txn) error { - for k, v := range kvs { - byt, err := json.Marshal(v) - if err != nil { - return err - } - err = t.Set([]byte(k), byt) - if err != nil { - return err - } - } - return nil - }) -} - -func (db *DB) RemoveKey(key string) error { - return db.client.Update(func(t *badger.Txn) error { - return t.Delete([]byte(key)) - }) -} - -func (db *DB) RestoreOverwrite(r io.Reader) error { - br := bufio.NewReaderSize(r, 16<<10) - unmarshalBuf := make([]byte, 1<<10) - - for { - var sz uint64 - err := binary.Read(br, binary.LittleEndian, &sz) - if err == io.EOF { - break - } else if err != nil { - return err - } - - if cap(unmarshalBuf) < int(sz) { - unmarshalBuf = make([]byte, sz) - } - - if _, err = io.ReadFull(br, unmarshalBuf[:sz]); err != nil { - return err - } - - list := &pb.KVList{} - if err := proto.Unmarshal(unmarshalBuf[:sz], list); err != nil { - return err - } - - for _, kv := range list.Kv { - if err := db.PutKey(string(kv.Key), kv.Value); err != nil { - return err - } - } - } - - return nil -} diff --git a/modules/http/server.go b/modules/http/server.go index dac0f4a..0081901 100644 --- a/modules/http/server.go +++ b/modules/http/server.go @@ -7,19 +7,19 @@ import ( "io/fs" "net/http" - "github.com/strimertul/kilovolt/v7/drivers/badgerdb" + jsoniter "github.com/json-iterator/go" + + "github.com/strimertul/strimertul/modules/database" "go.uber.org/zap" + kv "github.com/strimertul/kilovolt/v8" "github.com/strimertul/strimertul/modules" - "github.com/strimertul/strimertul/modules/database" - - kv "github.com/strimertul/kilovolt/v7" ) type Server struct { Config ServerConfig - db *database.DB + db *database.DBModule logger *zap.Logger server *http.Server frontend fs.FS @@ -28,7 +28,7 @@ type Server struct { } func NewServer(manager *modules.Manager) (*Server, error) { - db, ok := manager.Modules["db"].(*database.DB) + db, ok := manager.Modules["db"].(*database.DBModule) if !ok { return nil, errors.New("db module not found") } @@ -55,13 +55,13 @@ func NewServer(manager *modules.Manager) (*Server, error) { } } - server.hub, err = kv.NewHub(badgerdb.NewBadgerBackend(db.Client()), kv.HubOptions{ + // Set hub + server.hub = db.Hub() + + // Set password + server.hub.SetOptions(kv.HubOptions{ Password: server.Config.KVPassword, - }, logger.With(zap.String("module", "kv"))) - if err != nil { - return nil, err - } - go server.hub.Run() + }) // Register module manager.Modules[modules.ModuleHTTP] = server @@ -113,34 +113,32 @@ func (s *Server) Listen() error { restart := newSafeBool(false) exit := make(chan error) go func() { - err := s.db.Subscribe(context.Background(), func(changed []database.ModifiedKV) error { - for _, pair := range changed { - if pair.Key == ServerConfigKey { - oldBind := s.Config.Bind - oldPassword := s.Config.KVPassword - err := s.db.GetJSON(ServerConfigKey, &s.Config) + err := s.db.Subscribe(func(key, value string) { + if key == ServerConfigKey { + oldBind := s.Config.Bind + oldPassword := s.Config.KVPassword + err := jsoniter.ConfigFastest.Unmarshal([]byte(value), &s.Config) + if err != nil { + s.logger.Error("Failed to unmarshal config", zap.Error(err)) + return + } + s.mux = s.makeMux() + // Restart hub if password changed + if oldPassword != s.Config.KVPassword { + s.hub.SetOptions(kv.HubOptions{ + Password: s.Config.KVPassword, + }) + } + // Restart server if bind changed + if oldBind != s.Config.Bind { + restart.Set(true) + err = s.server.Shutdown(context.Background()) if err != nil { - return err - } - s.mux = s.makeMux() - // Restart hub if password changed - if oldPassword != s.Config.KVPassword { - s.hub.SetOptions(kv.HubOptions{ - Password: s.Config.KVPassword, - }) - } - // Restart server if bind changed - if oldBind != s.Config.Bind { - restart.Set(true) - err = s.server.Shutdown(context.Background()) - if err != nil { - s.logger.Error("Failed to shutdown server", zap.Error(err)) - return err - } + s.logger.Error("Failed to shutdown server", zap.Error(err)) + return } } } - return nil }, ServerConfigKey) if err != nil { exit <- fmt.Errorf("error while handling subscription to HTTP config changes: %w", err) diff --git a/modules/loyalty/manager.go b/modules/loyalty/manager.go index 3b63fa7..d79acf7 100644 --- a/modules/loyalty/manager.go +++ b/modules/loyalty/manager.go @@ -1,20 +1,18 @@ package loyalty import ( - "context" "errors" "strings" "sync" "time" - "go.uber.org/zap" - "github.com/strimertul/strimertul/modules" "github.com/strimertul/strimertul/modules/database" "github.com/strimertul/strimertul/modules/stulbe" - "github.com/dgraph-io/badger/v3" jsoniter "github.com/json-iterator/go" + kv "github.com/strimertul/kilovolt/v8" + "go.uber.org/zap" ) var ( @@ -30,26 +28,19 @@ type Manager struct { goals GoalStorage queue RedeemQueueStorage mu sync.Mutex - db *database.DB + db *database.DBModule logger *zap.Logger cooldowns map[string]time.Time } func Register(manager *modules.Manager) error { - db, ok := manager.Modules["db"].(*database.DB) + db, ok := manager.Modules["db"].(*database.DBModule) if !ok { return errors.New("db module not found") } logger := manager.Logger(modules.ModuleLoyalty) - // Check if we need to migrate - // TODO Remove this in the future - err := migratePoints(db, logger) - if err != nil { - return err - } - loyalty := &Manager{ logger: logger, db: db, @@ -58,7 +49,7 @@ func Register(manager *modules.Manager) error { } // Ger data from DB if err := db.GetJSON(ConfigKey, &loyalty.config); err != nil { - if errors.Is(err, badger.ErrKeyNotFound) { + if errors.Is(err, kv.ErrorKeyNotFound) { logger.Warn("missing configuration for loyalty (but it's enabled). Please make sure to set it up properly!") } else { return err @@ -67,17 +58,17 @@ func Register(manager *modules.Manager) error { // Retrieve configs if err := db.GetJSON(RewardsKey, &loyalty.rewards); err != nil { - if !errors.Is(err, badger.ErrKeyNotFound) { + if !errors.Is(err, kv.ErrorKeyNotFound) { return err } } if err := db.GetJSON(GoalsKey, &loyalty.goals); err != nil { - if !errors.Is(err, badger.ErrKeyNotFound) { + if !errors.Is(err, kv.ErrorKeyNotFound) { return err } } if err := db.GetJSON(QueueKey, &loyalty.queue); err != nil { - if !errors.Is(err, badger.ErrKeyNotFound) { + if !errors.Is(err, kv.ErrorKeyNotFound) { return err } } @@ -85,7 +76,7 @@ func Register(manager *modules.Manager) error { // Retrieve user points points, err := db.GetAll(PointsPrefix) if err != nil { - if !errors.Is(err, badger.ErrKeyNotFound) { + if !errors.Is(err, kv.ErrorKeyNotFound) { return err } points = make(map[string]string) @@ -101,8 +92,8 @@ func Register(manager *modules.Manager) error { } // Subscribe for changes - go db.Subscribe(context.Background(), loyalty.update, "loyalty/") - go db.Subscribe(context.Background(), loyalty.handleRemote, "stulbe/loyalty/") + go db.Subscribe(loyalty.update, "loyalty/") + go db.Subscribe(loyalty.handleRemote, "stulbe/loyalty/") // Replicate keys on stulbe if available if stulbeManager, ok := manager.Modules["stulbe"].(*stulbe.Manager); ok { @@ -145,121 +136,115 @@ func (m *Manager) Close() error { return nil } -func (m *Manager) update(kvs []database.ModifiedKV) error { - for _, kv := range kvs { - var err error +func (m *Manager) update(key, value string) { + var err error - // Check for config changes/RPC - switch kv.Key { - case ConfigKey: - err = func() error { - m.mu.Lock() - defer m.mu.Unlock() - return jsoniter.ConfigFastest.Unmarshal(kv.Data, &m.config) - }() - case GoalsKey: - err = func() error { - m.mu.Lock() - defer m.mu.Unlock() - return jsoniter.ConfigFastest.Unmarshal(kv.Data, &m.goals) - }() - case RewardsKey: - err = func() error { - m.mu.Lock() - defer m.mu.Unlock() - return jsoniter.ConfigFastest.Unmarshal(kv.Data, &m.rewards) - }() - case QueueKey: - err = func() error { - m.mu.Lock() - defer m.mu.Unlock() - return jsoniter.ConfigFastest.Unmarshal(kv.Data, &m.queue) - }() - case CreateRedeemRPC: - var redeem Redeem - err = jsoniter.ConfigFastest.Unmarshal(kv.Data, &redeem) - if err == nil { - err = m.AddRedeem(redeem) - } - case RemoveRedeemRPC: - var redeem Redeem - err = jsoniter.ConfigFastest.Unmarshal(kv.Data, &redeem) - if err == nil { - err = m.RemoveRedeem(redeem) - } - default: - // Check for prefix changes - switch { - // User point changed - case strings.HasPrefix(kv.Key, PointsPrefix): - var entry PointsEntry - err = jsoniter.ConfigFastest.Unmarshal(kv.Data, &entry) - user := kv.Key[len(PointsPrefix):] - func() { - m.mu.Lock() - defer m.mu.Unlock() - m.points[user] = entry - }() - } + // Check for config changes/RPC + switch key { + case ConfigKey: + err = func() error { + m.mu.Lock() + defer m.mu.Unlock() + return jsoniter.ConfigFastest.UnmarshalFromString(value, &m.config) + }() + case GoalsKey: + err = func() error { + m.mu.Lock() + defer m.mu.Unlock() + return jsoniter.ConfigFastest.UnmarshalFromString(value, &m.goals) + }() + case RewardsKey: + err = func() error { + m.mu.Lock() + defer m.mu.Unlock() + return jsoniter.ConfigFastest.UnmarshalFromString(value, &m.rewards) + }() + case QueueKey: + err = func() error { + m.mu.Lock() + defer m.mu.Unlock() + return jsoniter.ConfigFastest.UnmarshalFromString(value, &m.queue) + }() + case CreateRedeemRPC: + var redeem Redeem + err = jsoniter.ConfigFastest.UnmarshalFromString(value, &redeem) + if err == nil { + err = m.AddRedeem(redeem) } - if err != nil { - m.logger.Error("subscribe error: invalid JSON received on key", zap.Error(err), zap.String("key", kv.Key)) - } else { - m.logger.Debug("updated key", zap.String("key", kv.Key)) + case RemoveRedeemRPC: + var redeem Redeem + err = jsoniter.ConfigFastest.UnmarshalFromString(value, &redeem) + if err == nil { + err = m.RemoveRedeem(redeem) + } + default: + // Check for prefix changes + switch { + // User point changed + case strings.HasPrefix(key, PointsPrefix): + var entry PointsEntry + err = jsoniter.ConfigFastest.UnmarshalFromString(value, &entry) + user := key[len(PointsPrefix):] + func() { + m.mu.Lock() + defer m.mu.Unlock() + m.points[user] = entry + }() } } - return nil + if err != nil { + m.logger.Error("subscribe error: invalid JSON received on key", zap.Error(err), zap.String("key", key)) + } else { + m.logger.Debug("updated key", zap.String("key", key)) + } } -func (m *Manager) handleRemote(kvs []database.ModifiedKV) error { - for _, kv := range kvs { - m.logger.Debug("loyalty request from stulbe", zap.String("key", kv.Key)) - switch kv.Key { - case KVExLoyaltyRedeem: - // Parse request - var redeemRequest ExLoyaltyRedeem - err := jsoniter.ConfigFastest.Unmarshal(kv.Data, &redeemRequest) - if err != nil { - m.logger.Warn("error decoding redeem request", zap.Error(err)) - break - } - // Find reward - reward := m.GetReward(redeemRequest.RewardID) - if reward.ID == "" { - m.logger.Warn("redeem request contains invalid reward id", zap.String("reward-id", redeemRequest.RewardID)) - break - } - err = m.PerformRedeem(Redeem{ - Username: redeemRequest.Username, - DisplayName: redeemRequest.DisplayName, - Reward: reward, - When: time.Now(), - RequestText: redeemRequest.RequestText, - }) - if err != nil { - m.logger.Warn("error performing redeem request", zap.Error(err)) - } - case KVExLoyaltyContribute: - // Parse request - var contributeRequest ExLoyaltyContribute - err := jsoniter.ConfigFastest.Unmarshal(kv.Data, &contributeRequest) - if err != nil { - m.logger.Warn("error decoding contribution request", zap.Error(err)) - break - } - // Find goal - goal := m.GetGoal(contributeRequest.GoalID) - if goal.ID == "" { - m.logger.Warn("contribute request contains invalid goal id", zap.String("goal-id", contributeRequest.GoalID)) - break - } - err = m.PerformContribution(goal, contributeRequest.Username, contributeRequest.Amount) - if err != nil { - m.logger.Warn("error performing contribution request", zap.Error(err)) - } +func (m *Manager) handleRemote(key, value string) { + m.logger.Debug("loyalty request from stulbe", zap.String("key", key)) + switch key { + case KVExLoyaltyRedeem: + // Parse request + var redeemRequest ExLoyaltyRedeem + err := jsoniter.ConfigFastest.UnmarshalFromString(value, &redeemRequest) + if err != nil { + m.logger.Warn("error decoding redeem request", zap.Error(err)) + break + } + // Find reward + reward := m.GetReward(redeemRequest.RewardID) + if reward.ID == "" { + m.logger.Warn("redeem request contains invalid reward id", zap.String("reward-id", redeemRequest.RewardID)) + break + } + err = m.PerformRedeem(Redeem{ + Username: redeemRequest.Username, + DisplayName: redeemRequest.DisplayName, + Reward: reward, + When: time.Now(), + RequestText: redeemRequest.RequestText, + }) + if err != nil { + m.logger.Warn("error performing redeem request", zap.Error(err)) + } + case KVExLoyaltyContribute: + // Parse request + var contributeRequest ExLoyaltyContribute + err := jsoniter.ConfigFastest.UnmarshalFromString(value, &contributeRequest) + if err != nil { + m.logger.Warn("error decoding contribution request", zap.Error(err)) + break + } + // Find goal + goal := m.GetGoal(contributeRequest.GoalID) + if goal.ID == "" { + m.logger.Warn("contribute request contains invalid goal id", zap.String("goal-id", contributeRequest.GoalID)) + break + } + err = m.PerformContribution(goal, contributeRequest.Username, contributeRequest.Amount) + if err != nil { + m.logger.Warn("error performing contribution request", zap.Error(err)) } } - return nil } func (m *Manager) GetPoints(user string) int64 { diff --git a/modules/loyalty/migration.go b/modules/loyalty/migration.go index ca34bd0..5fb26a9 100644 --- a/modules/loyalty/migration.go +++ b/modules/loyalty/migration.go @@ -3,20 +3,22 @@ package loyalty import ( "errors" - "go.uber.org/zap" + kv "github.com/strimertul/kilovolt/v8" "github.com/strimertul/strimertul/modules/database" + + "go.uber.org/zap" ) const OldPointsKey = "loyalty/users" type OldPointStorage map[string]int64 -func migratePoints(db *database.DB, log *zap.Logger) error { +func migratePoints(db *database.DBModule, log *zap.Logger) error { // Retrieve old storage var oldStorage OldPointStorage err := db.GetJSON(OldPointsKey, &oldStorage) - if errors.Is(err, database.ErrKeyNotFound) { + if errors.Is(err, kv.ErrorKeyNotFound) { // No migration needed, points are already kaput return nil } diff --git a/modules/stulbe/client.go b/modules/stulbe/client.go index 04afc4a..ccf23eb 100644 --- a/modules/stulbe/client.go +++ b/modules/stulbe/client.go @@ -1,28 +1,28 @@ package stulbe import ( - "context" + "encoding/json" "errors" + "github.com/strimertul/strimertul/modules/database" + "go.uber.org/zap" "github.com/strimertul/strimertul/modules" - "github.com/strimertul/strimertul/modules/database" - "github.com/strimertul/stulbe-client-go" ) type Manager struct { Config Config Client *stulbe.Client - db *database.DB + db *database.DBModule logger *zap.Logger restart chan bool } func Register(manager *modules.Manager) error { - db, ok := manager.Modules["db"].(*database.DB) + db, ok := manager.Modules["db"].(*database.DBModule) if !ok { return errors.New("db module not found") } @@ -67,32 +67,29 @@ func Register(manager *modules.Manager) error { }() // Listen for config changes - go db.Subscribe(context.Background(), func(changed []database.ModifiedKV) error { - for _, kv := range changed { - if kv.Key == ConfigKey { - var config Config - err := db.GetJSON(ConfigKey, &config) - if err != nil { - logger.Warn("Failed to get config", zap.Error(err)) - continue - } + go db.Subscribe(func(key, value string) { + if key == ConfigKey { + var config Config + err := json.Unmarshal([]byte(value), &config) + if err != nil { + logger.Warn("Failed to get new config", zap.Error(err)) + return + } - client, err := stulbe.NewClient(stulbe.ClientOptions{ - Endpoint: config.Endpoint, - Username: config.Username, - AuthKey: config.AuthKey, - }) - if err != nil { - logger.Warn("Failed to update stulbe client, keeping old settings", zap.Error(err)) - } else { - stulbeManager.Client.Close() - stulbeManager.Client = client - stulbeManager.restart <- true - logger.Info("updated/restarted stulbe client") - } + client, err := stulbe.NewClient(stulbe.ClientOptions{ + Endpoint: config.Endpoint, + Username: config.Username, + AuthKey: config.AuthKey, + }) + if err != nil { + logger.Warn("Failed to update stulbe client, keeping old settings", zap.Error(err)) + } else { + stulbeManager.Client.Close() + stulbeManager.Client = client + stulbeManager.restart <- true + logger.Info("updated/restarted stulbe client") } } - return nil }, ConfigKey) // Register module @@ -109,7 +106,7 @@ func (m *Manager) ReceiveEvents() error { for { select { case kv := <-chn: - err := m.db.PutKey(kv.Key, []byte(kv.Value)) + err := m.db.PutKey(kv.Key, kv.Value) if err != nil { return err } @@ -160,16 +157,13 @@ func (m *Manager) ReplicateKey(prefix string) error { m.logger.Debug("synced to remote", zap.String("prefix", prefix)) // Subscribe to local datastore and update remote on change - return m.db.Subscribe(context.Background(), func(pairs []database.ModifiedKV) error { - for _, changed := range pairs { - err := m.Client.KV.SetKey(changed.Key, string(changed.Data)) - if err != nil { - return err - } - m.logger.Debug("replicated to remote", zap.String("key", changed.Key)) + return m.db.Subscribe(func(key, value string) { + err := m.Client.KV.SetKey(key, value) + if err != nil { + m.logger.Error("failed to replicate key", zap.String("key", key), zap.Error(err)) + } else { + m.logger.Debug("replicated to remote", zap.String("key", key)) } - - return nil }, prefix) } diff --git a/modules/twitch/bot.go b/modules/twitch/bot.go index 1f309b6..a90b41f 100644 --- a/modules/twitch/bot.go +++ b/modules/twitch/bot.go @@ -1,13 +1,11 @@ package twitch import ( - "context" "strings" "sync" "text/template" "time" - "github.com/strimertul/strimertul/modules/database" "github.com/strimertul/strimertul/modules/loyalty" "go.uber.org/zap" @@ -157,41 +155,37 @@ func NewBot(api *Client, config BotConfig) *Bot { if err != nil { bot.logger.Error("failed to parse custom commands", zap.Error(err)) } - go api.db.Subscribe(context.Background(), bot.updateCommands, CustomCommandsKey) - go api.db.Subscribe(context.Background(), bot.handleWriteMessageRPC, WriteMessageRPC) + go api.db.Subscribe(bot.updateCommands, CustomCommandsKey) + go api.db.Subscribe(bot.handleWriteMessageRPC, WriteMessageRPC) return bot } -func (b *Bot) updateCommands(kvs []database.ModifiedKV) error { - for _, kv := range kvs { - switch kv.Key { - case CustomCommandsKey: - err := func() error { - b.mu.Lock() - defer b.mu.Unlock() - return jsoniter.ConfigFastest.Unmarshal(kv.Data, &b.customCommands) - }() - if err != nil { - return err - } - // Recreate templates - if err := b.updateTemplates(); err != nil { - return err - } +func (b *Bot) updateCommands(key, value string) { + switch key { + case CustomCommandsKey: + err := func() error { + b.mu.Lock() + defer b.mu.Unlock() + return jsoniter.ConfigFastest.UnmarshalFromString(value, &b.customCommands) + }() + if err != nil { + b.logger.Error("failed to decode new custom commands", zap.Error(err)) + return + } + // Recreate templates + if err := b.updateTemplates(); err != nil { + b.logger.Error("failed to update custom commands templates", zap.Error(err)) + return } } - return nil } -func (b *Bot) handleWriteMessageRPC(kvs []database.ModifiedKV) error { - for _, kv := range kvs { - switch kv.Key { - case WriteMessageRPC: - b.Client.Say(b.config.Channel, string(kv.Data)) - } +func (b *Bot) handleWriteMessageRPC(key, value string) { + switch key { + case WriteMessageRPC: + b.Client.Say(b.config.Channel, value) } - return nil } func (b *Bot) updateTemplates() error { diff --git a/modules/twitch/client.go b/modules/twitch/client.go index 11992f4..34112b5 100644 --- a/modules/twitch/client.go +++ b/modules/twitch/client.go @@ -1,23 +1,22 @@ package twitch import ( - "context" "errors" "fmt" jsoniter "github.com/json-iterator/go" "github.com/nicklaw5/helix/v2" + "github.com/strimertul/strimertul/modules/database" "go.uber.org/zap" "github.com/strimertul/strimertul/modules" - "github.com/strimertul/strimertul/modules/database" "github.com/strimertul/strimertul/modules/loyalty" ) type Client struct { Config Config Bot *Bot - db *database.DB + db *database.DBModule API *helix.Client logger *zap.Logger @@ -25,7 +24,7 @@ type Client struct { } func Register(manager *modules.Manager) error { - db, ok := manager.Modules["db"].(*database.DB) + db, ok := manager.Modules["db"].(*database.DBModule) if !ok { return errors.New("db module not found") } @@ -79,38 +78,35 @@ func Register(manager *modules.Manager) error { } // Listen for config changes - go db.Subscribe(context.Background(), func(changed []database.ModifiedKV) error { - for _, kv := range changed { - switch kv.Key { - case ConfigKey: - err := jsoniter.ConfigFastest.Unmarshal(kv.Data, &config) - if err != nil { - logger.Error("failed to unmarshal config", zap.Error(err)) - continue - } - api, err := getHelixAPI(config.APIClientID, config.APIClientSecret) - if err != nil { - logger.Warn("failed to create new twitch client, keeping old credentials", zap.Error(err)) - continue - } - client.API = api - logger.Info("reloaded/updated Twitch API") - case BotConfigKey: - err := jsoniter.ConfigFastest.Unmarshal(kv.Data, &twitchBotConfig) - if err != nil { - logger.Error("failed to unmarshal config", zap.Error(err)) - continue - } - err = client.Bot.Client.Disconnect() - if err != nil { - logger.Warn("failed to disconnect from Twitch IRC", zap.Error(err)) - } - client.Bot = NewBot(client, twitchBotConfig) - client.restart <- true - logger.Info("reloaded/restarted Twitch bot") + go db.Subscribe(func(key, value string) { + switch key { + case ConfigKey: + err := jsoniter.ConfigFastest.UnmarshalFromString(value, &config) + if err != nil { + logger.Error("failed to unmarshal config", zap.Error(err)) + return } + api, err := getHelixAPI(config.APIClientID, config.APIClientSecret) + if err != nil { + logger.Warn("failed to create new twitch client, keeping old credentials", zap.Error(err)) + return + } + client.API = api + logger.Info("reloaded/updated Twitch API") + case BotConfigKey: + err := jsoniter.ConfigFastest.UnmarshalFromString(value, &twitchBotConfig) + if err != nil { + logger.Error("failed to unmarshal config", zap.Error(err)) + return + } + err = client.Bot.Client.Disconnect() + if err != nil { + logger.Warn("failed to disconnect from Twitch IRC", zap.Error(err)) + } + client.Bot = NewBot(client, twitchBotConfig) + client.restart <- true + logger.Info("reloaded/restarted Twitch bot") } - return nil }, ConfigKey, BotConfigKey) manager.Modules[modules.ModuleTwitch] = client diff --git a/modules/twitch/commands.go b/modules/twitch/commands.go index 1d11d85..aa8a8c3 100644 --- a/modules/twitch/commands.go +++ b/modules/twitch/commands.go @@ -89,7 +89,7 @@ func (b *Bot) setupFunctions() { counter, _ = strconv.Atoi(string(byt)) } counter += 1 - err := b.api.db.PutKey(counterKey, []byte(strconv.Itoa(counter))) + err := b.api.db.PutKey(counterKey, strconv.Itoa(counter)) if err != nil { b.logger.Error("error saving key", zap.Error(err), zap.String("key", counterKey)) } diff --git a/modules/twitch/modules.alerts.go b/modules/twitch/modules.alerts.go index 339afbf..7293d6d 100644 --- a/modules/twitch/modules.alerts.go +++ b/modules/twitch/modules.alerts.go @@ -2,7 +2,6 @@ package twitch import ( "bytes" - "context" "encoding/json" "math/rand" "sync" @@ -14,8 +13,6 @@ import ( "github.com/Masterminds/sprig/v3" jsoniter "github.com/json-iterator/go" "github.com/nicklaw5/helix/v2" - - "github.com/strimertul/strimertul/modules/database" ) const BotAlertsKey = "twitch/bot-modules/alerts/config" @@ -115,19 +112,16 @@ func SetupAlerts(bot *Bot) *BotAlertsModule { mod.compileTemplates() - go bot.api.db.Subscribe(context.Background(), func(changed []database.ModifiedKV) error { - for _, kv := range changed { - if kv.Key == BotAlertsKey { - err := jsoniter.ConfigFastest.Unmarshal(kv.Data, &mod.Config) - if err != nil { - bot.logger.Debug("error reloading timer config", zap.Error(err)) - } else { - bot.logger.Info("reloaded alert config") - } - mod.compileTemplates() + go bot.api.db.Subscribe(func(key, value string) { + if key == BotAlertsKey { + err := jsoniter.ConfigFastest.UnmarshalFromString(value, &mod.Config) + if err != nil { + bot.logger.Debug("error reloading timer config", zap.Error(err)) + } else { + bot.logger.Info("reloaded alert config") } + mod.compileTemplates() } - return nil }, BotAlertsKey) // Subscriptions are handled with a slight delay as info come from different events and must be aggregated @@ -241,192 +235,189 @@ func SetupAlerts(bot *Bot) *BotAlertsModule { } } - go bot.api.db.Subscribe(context.Background(), func(changed []database.ModifiedKV) error { - for _, kv := range changed { - if kv.Key == "stulbe/ev/webhook" { - var ev eventSubNotification - err := jsoniter.ConfigFastest.Unmarshal(kv.Data, &ev) + go bot.api.db.Subscribe(func(key, value string) { + if key == "stulbe/ev/webhook" { + var ev eventSubNotification + err := jsoniter.ConfigFastest.UnmarshalFromString(value, &ev) + if err != nil { + bot.logger.Debug("error parsing webhook payload", zap.Error(err)) + return + } + switch ev.Subscription.Type { + case helix.EventSubTypeChannelFollow: + // Only process if we care about follows + if !mod.Config.Follow.Enabled { + return + } + // Parse as follow event + var followEv helix.EventSubChannelFollowEvent + err := jsoniter.ConfigFastest.Unmarshal(ev.Event, &followEv) if err != nil { - bot.logger.Debug("error parsing webhook payload", zap.Error(err)) - continue + bot.logger.Debug("error parsing follow event", zap.Error(err)) + return } - switch ev.Subscription.Type { - case helix.EventSubTypeChannelFollow: - // Only process if we care about follows - if !mod.Config.Follow.Enabled { - continue + // Pick a random message + messageID := rand.Intn(len(mod.Config.Follow.Messages)) + // Pick compiled template or fallback to plain text + if tpl, ok := mod.templates.follow.messages[messageID]; ok { + writeTemplate(bot, tpl, &followEv) + } else { + bot.WriteMessage(mod.Config.Follow.Messages[messageID]) + } + // Compile template and send + case helix.EventSubTypeChannelRaid: + // Only process if we care about raids + if !mod.Config.Raid.Enabled { + return + } + // Parse as raid event + var raidEv helix.EventSubChannelRaidEvent + err := jsoniter.ConfigFastest.Unmarshal(ev.Event, &raidEv) + if err != nil { + bot.logger.Debug("error parsing raid event", zap.Error(err)) + return + } + // Pick a random message from base set + messageID := rand.Intn(len(mod.Config.Raid.Messages)) + tpl, ok := mod.templates.raid.messages[messageID] + if !ok { + // Broken template! + mod.bot.WriteMessage(mod.Config.Raid.Messages[messageID]) + return + } + // If we have variations, loop through all the available variations and pick the one with the highest minimum viewers that are met + if len(mod.Config.Raid.Variations) > 0 { + minViewers := -1 + for variationIndex, variation := range mod.Config.Raid.Variations { + if variation.MinViewers != nil && *variation.MinViewers > minViewers && raidEv.Viewers >= *variation.MinViewers { + // Make sure the template is valid + if varTemplates, ok := mod.templates.raid.variations[variationIndex]; ok { + if temp, ok := varTemplates[messageID]; ok { + tpl = temp + minViewers = *variation.MinViewers + } + } + } } - // Parse as follow event - var followEv helix.EventSubChannelFollowEvent - err := jsoniter.ConfigFastest.Unmarshal(ev.Event, &followEv) - if err != nil { - bot.logger.Debug("error parsing follow event", zap.Error(err)) - continue + } + // Compile template and send + writeTemplate(bot, tpl, &raidEv) + case helix.EventSubTypeChannelCheer: + // Only process if we care about bits + if !mod.Config.Cheer.Enabled { + return + } + // Parse as cheer event + var cheerEv helix.EventSubChannelCheerEvent + err := jsoniter.ConfigFastest.Unmarshal(ev.Event, &cheerEv) + if err != nil { + bot.logger.Debug("error parsing cheer event", zap.Error(err)) + return + } + // Pick a random message from base set + messageID := rand.Intn(len(mod.Config.Cheer.Messages)) + tpl, ok := mod.templates.cheer.messages[messageID] + if !ok { + // Broken template! + mod.bot.WriteMessage(mod.Config.Raid.Messages[messageID]) + return + } + // If we have variations, loop through all the available variations and pick the one with the highest minimum amount that is met + if len(mod.Config.Cheer.Variations) > 0 { + minAmount := -1 + for variationIndex, variation := range mod.Config.Cheer.Variations { + if variation.MinAmount != nil && *variation.MinAmount > minAmount && cheerEv.Bits >= *variation.MinAmount { + // Make sure the template is valid + if varTemplates, ok := mod.templates.cheer.variations[variationIndex]; ok { + if temp, ok := varTemplates[messageID]; ok { + tpl = temp + minAmount = *variation.MinAmount + } + } + } } - // Pick a random message - messageID := rand.Intn(len(mod.Config.Follow.Messages)) - // Pick compiled template or fallback to plain text - if tpl, ok := mod.templates.follow.messages[messageID]; ok { - writeTemplate(bot, tpl, &followEv) - } else { - bot.WriteMessage(mod.Config.Follow.Messages[messageID]) - } - // Compile template and send - case helix.EventSubTypeChannelRaid: - // Only process if we care about raids - if !mod.Config.Raid.Enabled { - continue - } - // Parse as raid event - var raidEv helix.EventSubChannelRaidEvent - err := jsoniter.ConfigFastest.Unmarshal(ev.Event, &raidEv) - if err != nil { - bot.logger.Debug("error parsing raid event", zap.Error(err)) - continue - } - // Pick a random message from base set - messageID := rand.Intn(len(mod.Config.Raid.Messages)) - tpl, ok := mod.templates.raid.messages[messageID] - if !ok { - // Broken template! - mod.bot.WriteMessage(mod.Config.Raid.Messages[messageID]) - continue - } - // If we have variations, loop through all the available variations and pick the one with the highest minimum viewers that are met - if len(mod.Config.Raid.Variations) > 0 { - minViewers := -1 - for variationIndex, variation := range mod.Config.Raid.Variations { - if variation.MinViewers != nil && *variation.MinViewers > minViewers && raidEv.Viewers >= *variation.MinViewers { - // Make sure the template is valid - if varTemplates, ok := mod.templates.raid.variations[variationIndex]; ok { + } + // Compile template and send + writeTemplate(bot, tpl, &cheerEv) + case helix.EventSubTypeChannelSubscription: + // Only process if we care about subscriptions + if !mod.Config.Subscription.Enabled { + return + } + // Parse as subscription event + var subEv helix.EventSubChannelSubscribeEvent + err := jsoniter.ConfigFastest.Unmarshal(ev.Event, &subEv) + if err != nil { + bot.logger.Debug("error parsing sub event", zap.Error(err)) + return + } + addPendingSub(subEv) + case helix.EventSubTypeChannelSubscriptionMessage: + // Only process if we care about subscriptions + if !mod.Config.Subscription.Enabled { + return + } + // Parse as subscription event + var subEv helix.EventSubChannelSubscriptionMessageEvent + err := jsoniter.ConfigFastest.Unmarshal(ev.Event, &subEv) + if err != nil { + bot.logger.Debug("error parsing sub event", zap.Error(err)) + return + } + addPendingSub(subEv) + case helix.EventSubTypeChannelSubscriptionGift: + // Only process if we care about gifted subs + if !mod.Config.GiftSub.Enabled { + return + } + // Parse as gift event + var giftEv helix.EventSubChannelSubscriptionGiftEvent + err := jsoniter.ConfigFastest.Unmarshal(ev.Event, &giftEv) + if err != nil { + bot.logger.Debug("error parsing raid event", zap.Error(err)) + return + } + // Pick a random message from base set + messageID := rand.Intn(len(mod.Config.GiftSub.Messages)) + tpl, ok := mod.templates.gift.messages[messageID] + if !ok { + // Broken template! + mod.bot.WriteMessage(mod.Config.GiftSub.Messages[messageID]) + return + } + // If we have variations, loop through all the available variations and pick the one with the highest minimum cumulative total that are met + if len(mod.Config.GiftSub.Variations) > 0 { + if giftEv.IsAnonymous { + for variationIndex, variation := range mod.Config.GiftSub.Variations { + if variation.IsAnonymous != nil && *variation.IsAnonymous { + // Make sure template is valid + if varTemplates, ok := mod.templates.gift.variations[variationIndex]; ok { if temp, ok := varTemplates[messageID]; ok { tpl = temp - minViewers = *variation.MinViewers + break } } } } - } - // Compile template and send - writeTemplate(bot, tpl, &raidEv) - case helix.EventSubTypeChannelCheer: - // Only process if we care about bits - if !mod.Config.Cheer.Enabled { - continue - } - // Parse as cheer event - var cheerEv helix.EventSubChannelCheerEvent - err := jsoniter.ConfigFastest.Unmarshal(ev.Event, &cheerEv) - if err != nil { - bot.logger.Debug("error parsing cheer event", zap.Error(err)) - continue - } - // Pick a random message from base set - messageID := rand.Intn(len(mod.Config.Cheer.Messages)) - tpl, ok := mod.templates.cheer.messages[messageID] - if !ok { - // Broken template! - mod.bot.WriteMessage(mod.Config.Raid.Messages[messageID]) - continue - } - // If we have variations, loop through all the available variations and pick the one with the highest minimum amount that is met - if len(mod.Config.Cheer.Variations) > 0 { - minAmount := -1 - for variationIndex, variation := range mod.Config.Cheer.Variations { - if variation.MinAmount != nil && *variation.MinAmount > minAmount && cheerEv.Bits >= *variation.MinAmount { + } else if giftEv.CumulativeTotal > 0 { + minCumulative := -1 + for variationIndex, variation := range mod.Config.GiftSub.Variations { + if variation.MinCumulative != nil && *variation.MinCumulative > minCumulative && giftEv.CumulativeTotal >= *variation.MinCumulative { // Make sure the template is valid - if varTemplates, ok := mod.templates.cheer.variations[variationIndex]; ok { + if varTemplates, ok := mod.templates.gift.variations[variationIndex]; ok { if temp, ok := varTemplates[messageID]; ok { tpl = temp - minAmount = *variation.MinAmount + minCumulative = *variation.MinCumulative } } } } } - // Compile template and send - writeTemplate(bot, tpl, &cheerEv) - case helix.EventSubTypeChannelSubscription: - // Only process if we care about subscriptions - if !mod.Config.Subscription.Enabled { - continue - } - // Parse as subscription event - var subEv helix.EventSubChannelSubscribeEvent - err := jsoniter.ConfigFastest.Unmarshal(ev.Event, &subEv) - if err != nil { - bot.logger.Debug("error parsing sub event", zap.Error(err)) - continue - } - addPendingSub(subEv) - case helix.EventSubTypeChannelSubscriptionMessage: - // Only process if we care about subscriptions - if !mod.Config.Subscription.Enabled { - continue - } - // Parse as subscription event - var subEv helix.EventSubChannelSubscriptionMessageEvent - err := jsoniter.ConfigFastest.Unmarshal(ev.Event, &subEv) - if err != nil { - bot.logger.Debug("error parsing sub event", zap.Error(err)) - continue - } - addPendingSub(subEv) - case helix.EventSubTypeChannelSubscriptionGift: - // Only process if we care about gifted subs - if !mod.Config.GiftSub.Enabled { - continue - } - // Parse as gift event - var giftEv helix.EventSubChannelSubscriptionGiftEvent - err := jsoniter.ConfigFastest.Unmarshal(ev.Event, &giftEv) - if err != nil { - bot.logger.Debug("error parsing raid event", zap.Error(err)) - continue - } - // Pick a random message from base set - messageID := rand.Intn(len(mod.Config.GiftSub.Messages)) - tpl, ok := mod.templates.gift.messages[messageID] - if !ok { - // Broken template! - mod.bot.WriteMessage(mod.Config.GiftSub.Messages[messageID]) - continue - } - // If we have variations, loop through all the available variations and pick the one with the highest minimum cumulative total that are met - if len(mod.Config.GiftSub.Variations) > 0 { - if giftEv.IsAnonymous { - for variationIndex, variation := range mod.Config.GiftSub.Variations { - if variation.IsAnonymous != nil && *variation.IsAnonymous { - // Make sure template is valid - if varTemplates, ok := mod.templates.gift.variations[variationIndex]; ok { - if temp, ok := varTemplates[messageID]; ok { - tpl = temp - break - } - } - } - } - } else if giftEv.CumulativeTotal > 0 { - minCumulative := -1 - for variationIndex, variation := range mod.Config.GiftSub.Variations { - if variation.MinCumulative != nil && *variation.MinCumulative > minCumulative && giftEv.CumulativeTotal >= *variation.MinCumulative { - // Make sure the template is valid - if varTemplates, ok := mod.templates.gift.variations[variationIndex]; ok { - if temp, ok := varTemplates[messageID]; ok { - tpl = temp - minCumulative = *variation.MinCumulative - } - } - } - } - } - } - // Compile template and send - writeTemplate(bot, tpl, &giftEv) } + // Compile template and send + writeTemplate(bot, tpl, &giftEv) } } - return nil }, "stulbe/ev/webhook") bot.logger.Debug("loaded bot alerts") diff --git a/modules/twitch/modules.timer.go b/modules/twitch/modules.timer.go index ddd7a1b..ad7cb5f 100644 --- a/modules/twitch/modules.timer.go +++ b/modules/twitch/modules.timer.go @@ -1,17 +1,14 @@ package twitch import ( - "context" "math/rand" "sync" "time" "go.uber.org/zap" - jsoniter "github.com/json-iterator/go" - irc "github.com/gempir/go-twitch-irc/v2" - "github.com/strimertul/strimertul/modules/database" + jsoniter "github.com/json-iterator/go" ) const BotTimersKey = "twitch/bot-modules/timers/config" @@ -59,18 +56,15 @@ func SetupTimers(bot *Bot) *BotTimerModule { bot.api.db.PutJSON(BotTimersKey, mod.Config) } - go bot.api.db.Subscribe(context.Background(), func(changed []database.ModifiedKV) error { - for _, kv := range changed { - if kv.Key == BotTimersKey { - err := jsoniter.ConfigFastest.Unmarshal(kv.Data, &mod.Config) - if err != nil { - bot.logger.Debug("error reloading timer config", zap.Error(err)) - } else { - bot.logger.Info("reloaded timer config") - } + go bot.api.db.Subscribe(func(key, value string) { + if key == BotTimersKey { + err := jsoniter.ConfigFastest.UnmarshalFromString(value, &mod.Config) + if err != nil { + bot.logger.Debug("error reloading timer config", zap.Error(err)) + } else { + bot.logger.Info("reloaded timer config") } } - return nil }, BotTimersKey) bot.logger.Debug("loaded timers", zap.Int("timers", len(mod.Config.Timers)))