1
0
Fork 0
mirror of https://git.sr.ht/~ashkeel/strimertul synced 2024-09-20 02:00:49 +00:00

Multiple databases!

This commit is contained in:
Ash Keel 2022-02-01 12:35:34 +01:00
parent 2fae672449
commit 4a764c0450
No known key found for this signature in database
GPG key ID: BAD8D93E7314ED3E
18 changed files with 950 additions and 881 deletions

View file

@ -10,16 +10,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added ### Added
- **New UI**: Strimertul now features a more slick and better organized UI. - **New UI**: Strimertul now features a more slick and better organized UI.
- **Multiple database options**: Strimertul now supports multiple databases drivers. BadgerDB will remain as default (and currently the only option) for the time being but other databases are coming in the future.
- **Database operations**: You can now export and import the entire database as JSON files - **Database operations**: You can now export and import the entire database as JSON files
- **Database backups**: The database will periodically save optimized copies of itself in a backup directory, directory and intervals are both configurable, though for the time being you might need to periodically clean it before it becomes too large. - **Database backups**: The database will periodically save optimized copies of itself in a backup directory, directory and intervals are both configurable, though for the time being you might need to periodically clean it before it becomes too large.
- **Manual garbage collection**: You can now launch strimertul with `--run-gc` to manually trigger garbage collection for the database. This will launch strimertul, execute a round of garbage collection and exit.
- **Exposed internal metrics via keys**: `twitch/chat-activity` and `twitch/stream-status` now expose previously internal-only info about the current stream. - **Exposed internal metrics via keys**: `twitch/chat-activity` and `twitch/stream-status` now expose previously internal-only info about the current stream.
### Changed ### Changed
- The logging library has been changed to zap, the format of logs will therefore be wildly different.
- A lot of the command line parameters have changed syntax (eg. from -noheader to -no-header), please check the new formats using `-h` if you rely on them. - A lot of the command line parameters have changed syntax (eg. from -noheader to -no-header), please check the new formats using `-h` if you rely on them.
- Database schema has slightly changed, strimertul will auto-migrate to the new format if it detects old schema in use. - Database schema has slightly changed, strimertul will auto-migrate to the new format if it detects old schema in use.
### Removed
- Twitch chat history doesn't have an explicit toggle anymore, they are always enabled unless the `chat_history` setting is set to 0. - Twitch chat history doesn't have an explicit toggle anymore, they are always enabled unless the `chat_history` setting is set to 0.
- Loyalty point migration from v1.2.0 and earlier has been removed. If you are somehow running such an old version of strimertul and using loyalty points, run any version of strimertul between v1.3.0 and v1.7.0 first to make sure all points are migrated to the new format.
## [1.7.0] - 2021-12-07 ## [1.7.0] - 2021-12-07

288
driver.badgerdb.go Normal file
View file

@ -0,0 +1,288 @@
package main
import (
"bufio"
"encoding/binary"
"errors"
"fmt"
"io"
"os"
"time"
"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/badger/v3/pb"
"github.com/golang/protobuf/proto"
badger_driver "github.com/strimertul/kv-badgerdb"
kv "github.com/strimertul/kilovolt/v8"
"github.com/strimertul/strimertul/modules/loyalty"
"github.com/strimertul/strimertul/modules/stulbe"
"github.com/strimertul/strimertul/modules/twitch"
jsoniter "github.com/json-iterator/go"
"go.uber.org/zap"
)
func makeBadgerHub(options dbOptions) (*badger.DB, *kv.Hub, error) {
// Loading routine
db, err := badger.Open(badger.DefaultOptions(options.directory).WithSyncWrites(true))
failOnError(err, "Could not open DB")
// Run migrations
pre200MigrateModuleConfig(db)
// Run garbage collection every once in a while
go func() {
ticker := time.NewTicker(15 * time.Minute)
defer ticker.Stop()
for range ticker.C {
// Run DB garbage collection until it's done
var err error
for err == nil {
err = db.RunValueLogGC(0.5)
}
}
}()
if options.restore != "" {
file, err := os.Open(options.restore)
failOnError(err, "Could not open backup")
failOnError(badgerRestoreOverwrite(db, file), "Could not restore database")
_ = db.Sync()
logger.Info("Restored database from backup")
}
// Backup database periodically
go func() {
if options.backupDir == "" {
logger.Warn("Backup directory not set, database backups are disabled (this is dangerous, power loss will result in your database being potentially wiped!)")
return
}
err := os.MkdirAll(options.backupDir, 0755)
if err != nil {
logger.Error("Could not create backup directory, moving to a temporary folder", zap.Error(err))
options.backupDir = os.TempDir()
logger.Info("Using temporary directory", zap.String("backup-dir", options.backupDir))
return
}
ticker := time.NewTicker(time.Duration(options.backupInterval) * time.Minute)
defer ticker.Stop()
for range ticker.C {
// Run backup procedure
file, err := os.Create(fmt.Sprintf("%s/%s.db", options.backupDir, time.Now().Format("20060102-150405")))
if err != nil {
logger.Error("Could not create backup file", zap.Error(err))
continue
}
_, err = db.Backup(file, 0)
if err != nil {
logger.Error("Could not backup database", zap.Error(err))
}
_ = file.Close()
logger.Info("Database backed up", zap.String("backup-file", file.Name()))
}
}()
hub, err := kv.NewHub(badger_driver.NewBadgerBackend(db), kv.HubOptions{}, logger)
return db, hub, err
}
func badgerClose(db *badger.DB) error {
return db.Close()
}
func badgerRestoreOverwrite(db *badger.DB, r io.Reader) error {
br := bufio.NewReaderSize(r, 16<<10)
unmarshalBuf := make([]byte, 1<<10)
for {
var sz uint64
err := binary.Read(br, binary.LittleEndian, &sz)
if err == io.EOF {
break
} else if err != nil {
return err
}
if cap(unmarshalBuf) < int(sz) {
unmarshalBuf = make([]byte, sz)
}
if _, err = io.ReadFull(br, unmarshalBuf[:sz]); err != nil {
return err
}
list := &pb.KVList{}
if err := proto.Unmarshal(unmarshalBuf[:sz], list); err != nil {
return err
}
err = db.Update(func(txn *badger.Txn) error {
for _, kvpair := range list.Kv {
err := txn.Set(kvpair.Key, kvpair.Value)
if err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
}
return nil
}
// pre200MigrateModuleConfig migrates <2.0 module configs to 2.0+
func pre200MigrateModuleConfig(db *badger.DB) {
const pre180ModuleConfigKey = "stul-meta/modules"
type pre180ModuleConfig struct {
CompletedOnboarding bool `json:"configured"`
EnableTwitch bool `json:"twitch"`
EnableStulbe bool `json:"stulbe"`
EnableLoyalty bool `json:"loyalty"`
}
// Check if onboarding was completed
var moduleConfig pre180ModuleConfig
err := db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(pre180ModuleConfigKey))
if err != nil {
return err
}
err = item.Value(func(val []byte) error {
return jsoniter.Unmarshal(val, &moduleConfig)
})
if err != nil {
return err
}
return nil
})
if err != nil {
if errors.Is(err, badger.ErrKeyNotFound) {
// Either first boot or migration already done
return
} else {
fatalError(err, "Could not read from DB")
}
}
// ?? Should never happen, maybe we just have an empty key?
if !moduleConfig.CompletedOnboarding {
err = db.Update(func(txn *badger.Txn) error {
return txn.Delete([]byte(pre180ModuleConfigKey))
})
failOnError(err, "Failed to remove pre-1.8 module config")
return
}
// Migrate to new config by updating every related module
var twitchConfig twitch.Config
err = db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(twitch.ConfigKey))
if err != nil {
return err
}
err = item.Value(func(val []byte) error {
return jsoniter.Unmarshal(val, &twitchConfig)
})
if err != nil {
return err
}
return nil
})
if err != nil {
if !errors.Is(err, badger.ErrKeyNotFound) {
fatalError(err, "Could not read from DB")
}
} else {
twitchConfig.Enabled = moduleConfig.EnableTwitch
err = db.Update(func(txn *badger.Txn) error {
byt, err := jsoniter.ConfigFastest.Marshal(twitchConfig)
if err != nil {
return err
}
return txn.Set([]byte(twitch.ConfigKey), byt)
})
if err != nil {
logger.Error("Failed to update twitch config during 1.8 migration", zap.Error(err))
}
}
var stulbeConfig stulbe.Config
err = db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(stulbe.ConfigKey))
if err != nil {
return err
}
err = item.Value(func(val []byte) error {
return jsoniter.Unmarshal(val, &stulbeConfig)
})
if err != nil {
return err
}
return nil
})
if err != nil {
if !errors.Is(err, badger.ErrKeyNotFound) {
fatalError(err, "Could not read from DB")
}
} else {
stulbeConfig.Enabled = moduleConfig.EnableStulbe
err = db.Update(func(txn *badger.Txn) error {
byt, err := jsoniter.ConfigFastest.Marshal(stulbeConfig)
if err != nil {
return err
}
return txn.Set([]byte(stulbe.ConfigKey), byt)
})
if err != nil {
logger.Error("Failed to update stulbe config during 1.8 migration", zap.Error(err))
}
}
var loyaltyConfig loyalty.Config
err = db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(loyalty.ConfigKey))
if err != nil {
return err
}
err = item.Value(func(val []byte) error {
return jsoniter.Unmarshal(val, &loyaltyConfig)
})
if err != nil {
return err
}
return nil
})
if err != nil {
if !errors.Is(err, badger.ErrKeyNotFound) {
fatalError(err, "Could not read from DB")
}
} else {
loyaltyConfig.Enabled = moduleConfig.EnableLoyalty
err = db.Update(func(txn *badger.Txn) error {
byt, err := jsoniter.ConfigFastest.Marshal(loyaltyConfig)
if err != nil {
return err
}
return txn.Set([]byte(loyalty.ConfigKey), byt)
})
if err != nil {
logger.Error("Failed to update loyalty config during 1.8 migration", zap.Error(err))
}
}
logger.Info("Migrated module config to 2.0+")
// Remove old config key
err = db.Update(func(txn *badger.Txn) error {
return txn.Delete([]byte(pre180ModuleConfigKey))
})
failOnError(err, "Failed to remove pre-1.8 module config")
}

10
go.mod
View file

@ -4,13 +4,17 @@ go 1.16
require ( require (
github.com/Masterminds/sprig/v3 v3.2.2 github.com/Masterminds/sprig/v3 v3.2.2
github.com/dgraph-io/badger v1.6.0
github.com/dgraph-io/badger/v3 v3.2103.2 github.com/dgraph-io/badger/v3 v3.2103.2
github.com/gempir/go-twitch-irc/v2 v2.5.0 github.com/gempir/go-twitch-irc/v2 v2.5.0
github.com/golang/protobuf v1.5.2 github.com/golang/protobuf v1.4.2
github.com/json-iterator/go v1.1.12 github.com/json-iterator/go v1.1.12
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/nicklaw5/helix/v2 v2.2.1 github.com/nicklaw5/helix/v2 v2.2.1
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4
github.com/strimertul/kilovolt/v7 v7.0.1 github.com/strimertul/kilovolt/v8 v8.0.2
github.com/strimertul/stulbe-client-go v0.7.0 github.com/strimertul/kv-badgerdb v1.2.1
github.com/strimertul/stulbe-client-go v0.7.2
go.uber.org/zap v1.20.0 go.uber.org/zap v1.20.0
golang.org/x/sys v0.0.0-20210909193231-528a39cd75f3 // indirect
) )

51
go.sum
View file

@ -1,4 +1,6 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9 h1:HD8gA2tkByhMAwYaFAX9w2l7vxvBQ5NMoxDrkhqhtn4=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI= github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI=
github.com/Masterminds/goutils v1.1.1/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU= github.com/Masterminds/goutils v1.1.1/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU=
@ -23,6 +25,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo=
github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgraph-io/badger/v3 v3.2103.2 h1:dpyM5eCJAtQCBcMCZcT4UBZchuTJgCywerHHgmxfxM8= github.com/dgraph-io/badger/v3 v3.2103.2 h1:dpyM5eCJAtQCBcMCZcT4UBZchuTJgCywerHHgmxfxM8=
github.com/dgraph-io/badger/v3 v3.2103.2/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M= github.com/dgraph-io/badger/v3 v3.2103.2/go.mod h1:RHo4/GmYcKKh5Lxu63wLEMHJ70Pac2JqZRYGhlyAo2M=
github.com/dgraph-io/ristretto v0.1.0 h1:Jv3CGQHp9OjuMBSne1485aDpUkTKEcUqF+jm/LuerPI= github.com/dgraph-io/ristretto v0.1.0 h1:Jv3CGQHp9OjuMBSne1485aDpUkTKEcUqF+jm/LuerPI=
@ -45,17 +49,22 @@ github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4er
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw= github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw=
github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@ -84,8 +93,9 @@ github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrk
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/reflectwalk v1.0.0 h1:9D+8oIskB4VJBN5SFlmc27fSlIBZaov1Wpk/IfikLNY= github.com/mitchellh/reflectwalk v1.0.0 h1:9D+8oIskB4VJBN5SFlmc27fSlIBZaov1Wpk/IfikLNY=
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
@ -123,12 +133,15 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/strimertul/kilovolt-client-go/v7 v7.0.1 h1:M8KqpujDwu4Cg9PQJ2rUG95S+Fq5zYTeshLBJL2Cw0I= github.com/strimertul/kilovolt-client-go/v8 v8.0.0 h1:d3BAm5qavK9GPUpOtljpsyrjmSfR2AInGe1ypZP9apc=
github.com/strimertul/kilovolt-client-go/v7 v7.0.1/go.mod h1:8ZnvQqoYUZRTJEjhfo+zsLxP1FZ3UMQD5dVhH7pnHiA= github.com/strimertul/kilovolt-client-go/v8 v8.0.0/go.mod h1:PNEbu0zrdYD9B9UYUoLSpV+saRJlC0cr9OHdPALUb+o=
github.com/strimertul/kilovolt/v7 v7.0.1 h1:VUz5ECpjLMxSgijDF3Wa9EeCYzVD1FsCsJD2xf7hl0U= github.com/strimertul/kilovolt/v8 v8.0.0/go.mod h1:vW++ELCWnYzENIIP33p+zDGQjz/GpQ5z7YRCBrBtCzA=
github.com/strimertul/kilovolt/v7 v7.0.1/go.mod h1:mWpyHDmfKOFdwW1oNH67EoRR2uqIXw5ieQktNg1cDno= github.com/strimertul/kilovolt/v8 v8.0.2 h1:hgobhb95b1cyD5Mpq3McR2AKxUhuoQc4tNTyQAwe0vg=
github.com/strimertul/stulbe-client-go v0.7.0 h1:L9wgYRv2HJ+Kv84qjXYnLfBZgJcS3x0sVxKUf056ECA= github.com/strimertul/kilovolt/v8 v8.0.2/go.mod h1:vW++ELCWnYzENIIP33p+zDGQjz/GpQ5z7YRCBrBtCzA=
github.com/strimertul/stulbe-client-go v0.7.0/go.mod h1:4M+NubPW+NV4KrpzCgKAHs6tyKLLbS0c9XoHiQ6o/LA= github.com/strimertul/kv-badgerdb v1.2.1 h1:9zRW05/rkZ47UWYeAmbfTCozYn/nXysFzd0D1iC2gxM=
github.com/strimertul/kv-badgerdb v1.2.1/go.mod h1:aFlPPOSxUNgzMBRj3Rfo2vSc9YzWyq09d+qwjgEMW9I=
github.com/strimertul/stulbe-client-go v0.7.2 h1:mco2JjkYuahgq1p8nlH7TRWNgFKyQMPb83AnBNO6B6E=
github.com/strimertul/stulbe-client-go v0.7.2/go.mod h1:moBqGVP+6cDkJM760YUhSuLgrenHsRRgnC6s+91TzSs=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@ -183,12 +196,14 @@ golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210909193231-528a39cd75f3 h1:3Ad41xy2WCESpufXwgs7NpDSu+vjxqLt2UFqUV+20bI=
golang.org/x/sys v0.0.0-20210909193231-528a39cd75f3/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
@ -211,9 +226,13 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA
google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

110
main.go
View file

@ -11,19 +11,22 @@ import (
"runtime" "runtime"
"time" "time"
"github.com/dgraph-io/badger/v3"
"github.com/strimertul/strimertul/modules/database"
kv "github.com/strimertul/kilovolt/v8"
"go.uber.org/zap/zapcore" "go.uber.org/zap/zapcore"
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/strimertul/strimertul/modules" "github.com/strimertul/strimertul/modules"
"github.com/strimertul/strimertul/modules/database"
"github.com/strimertul/strimertul/modules/http" "github.com/strimertul/strimertul/modules/http"
"github.com/strimertul/strimertul/modules/loyalty" "github.com/strimertul/strimertul/modules/loyalty"
"github.com/strimertul/strimertul/modules/stulbe" "github.com/strimertul/strimertul/modules/stulbe"
"github.com/strimertul/strimertul/modules/twitch" "github.com/strimertul/strimertul/modules/twitch"
"github.com/dgraph-io/badger/v3"
"github.com/pkg/browser" "github.com/pkg/browser"
_ "net/http/pprof" _ "net/http/pprof"
@ -50,18 +53,25 @@ var moduleList = map[modules.ModuleID]ModuleConstructor{
modules.ModuleTwitch: twitch.Register, modules.ModuleTwitch: twitch.Register,
} }
type dbOptions struct {
directory string
restore string
backupDir string
backupInterval int
}
func main() { func main() {
// Get cmd line parameters // Get cmd line parameters
noHeader := flag.Bool("no-header", false, "Do not print the app header") noHeader := flag.Bool("no-header", false, "Do not print the app header")
dbDir := flag.String("database-dir", "data", "Path to strimertül database dir") dbDir := flag.String("database-dir", "data", "Path to strimertül database dir")
debug := flag.Bool("debug", false, "Start in debug mode (more logging)") debug := flag.Bool("debug", false, "Start in debug mode (more logging)")
json := flag.Bool("json", false, "Print logging in JSON format") json := flag.Bool("json", false, "Print logging in JSON format")
cleanup := flag.Bool("run-gc", false, "Run garbage collection and exit immediately after")
exportDB := flag.Bool("export", false, "Export database as JSON") exportDB := flag.Bool("export", false, "Export database as JSON")
importDB := flag.String("import", "", "Import database from JSON file") importDB := flag.String("import", "", "Import database from JSON file")
restoreDB := flag.String("restore", "", "Restore database from backup file") restoreDB := flag.String("restore", "", "Restore database from backup file")
backupDir := flag.String("backup-dir", "backups", "Path to directory with database backups") backupDir := flag.String("backup-dir", "backups", "Path to directory with database backups")
backupInterval := flag.Int("backup-interval", 60, "Backup database every X minutes, 0 to disable") backupInterval := flag.Int("backup-interval", 60, "Backup database every X minutes, 0 to disable")
driver := flag.String("driver", "badger", "Database driver to use (available: badger,pebble)")
flag.Parse() flag.Parse()
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
@ -93,23 +103,27 @@ func main() {
// Create module manager // Create module manager
manager := modules.NewManager(logger) manager := modules.NewManager(logger)
// Loading routine // Make KV hub
db, err := database.Open(badger.DefaultOptions(*dbDir), manager) var hub *kv.Hub
failOnError(err, "Could not open DB") var err error
logger.Info("opening database", zap.String("driver", *driver))
switch *driver {
case "badger":
var db *badger.DB
db, hub, err = makeBadgerHub(dbOptions{directory: *dbDir, backupDir: *backupDir, backupInterval: *backupInterval, restore: *restoreDB})
defer func() { defer func() {
if err := db.Close(); err != nil { if err := badgerClose(db); err != nil {
logger.Error("Could not close DB", zap.Error(err)) logger.Fatal("Failed to close database", zap.Error(err))
} }
}() }()
default:
logger.Fatal("Unknown database driver", zap.String("driver", *driver))
}
if *cleanup { go hub.Run()
// Run DB garbage collection until it's done
var err error db, err := database.NewDBModule(hub, manager)
for err == nil { failOnError(err, "Failed to initialize database module")
err = db.Client().RunValueLogGC(0.5)
}
return
}
if *exportDB { if *exportDB {
// Export database to stdout // Export database to stdout
@ -128,7 +142,7 @@ func main() {
errors := 0 errors := 0
imported := 0 imported := 0
for key, value := range entries { for key, value := range entries {
err = db.PutKey(key, []byte(value)) err = db.PutKey(key, value)
if err != nil { if err != nil {
logger.Error("Could not import entry", zap.String("key", key), zap.Error(err)) logger.Error("Could not import entry", zap.String("key", key), zap.Error(err))
errors += 1 errors += 1
@ -136,28 +150,16 @@ func main() {
imported += 1 imported += 1
} }
} }
_ = db.Client().Sync()
logger.Info("Imported database from file", zap.Int("imported", imported), zap.Int("errors", errors)) logger.Info("Imported database from file", zap.Int("imported", imported), zap.Int("errors", errors))
} }
if *restoreDB != "" {
file, err := os.Open(*restoreDB)
failOnError(err, "Could not open backup")
err = db.RestoreOverwrite(file)
failOnError(err, "Could not restore database")
_ = db.Client().Sync()
logger.Info("Restored database from backup")
}
// Set meta keys // Set meta keys
_ = db.PutKey("stul-meta/version", []byte(appVersion)) _ = db.PutKey("stul-meta/version", appVersion)
runMigrations(db)
for module, constructor := range moduleList { for module, constructor := range moduleList {
err := constructor(manager) err := constructor(manager)
if err != nil { if err != nil {
logger.Error("Could not register module", zap.String("module", string(module))) logger.Error("Could not register module", zap.String("module", string(module)), zap.Error(err))
} else { } else {
//goland:noinspection GoDeferInLoop //goland:noinspection GoDeferInLoop
defer func() { defer func() {
@ -189,52 +191,6 @@ func main() {
} }
}() }()
// Run garbage collection every once in a while
go func() {
ticker := time.NewTicker(15 * time.Minute)
defer ticker.Stop()
for range ticker.C {
// Run DB garbage collection until it's done
var err error
for err == nil {
err = db.Client().RunValueLogGC(0.5)
}
}
}()
// Backup database periodically
go func() {
if *backupDir == "" {
logger.Warn("Backup directory not set, database backups are disabled (this is dangerous, power loss will result in your database being potentially wiped!)")
return
}
err := os.MkdirAll(*backupDir, 0755)
if err != nil {
logger.Error("Could not create backup directory, moving to a temporary folder", zap.Error(err))
*backupDir = os.TempDir()
logger.Info("Using temporary directory", zap.String("backup-dir", *backupDir))
return
}
ticker := time.NewTicker(time.Duration(*backupInterval) * time.Minute)
defer ticker.Stop()
for range ticker.C {
// Run backup procedure
file, err := os.Create(fmt.Sprintf("%s/%s.db", *backupDir, time.Now().Format("20060102-150405")))
if err != nil {
logger.Error("Could not create backup file", zap.Error(err))
continue
}
_, err = db.Client().Backup(file, 0)
if err != nil {
logger.Error("Could not backup database", zap.Error(err))
}
_ = file.Close()
logger.Info("Database backed up", zap.String("backup-file", file.Name()))
}
}()
// Start HTTP server // Start HTTP server
failOnError(httpServer.Listen(), "HTTP server stopped") failOnError(httpServer.Listen(), "HTTP server stopped")
} }

View file

@ -1,92 +0,0 @@
package main
import (
"errors"
"github.com/strimertul/strimertul/modules/database"
"github.com/strimertul/strimertul/modules/loyalty"
"github.com/strimertul/strimertul/modules/stulbe"
"github.com/strimertul/strimertul/modules/twitch"
"github.com/dgraph-io/badger/v3"
"go.uber.org/zap"
)
func runMigrations(db *database.DB) {
pre180MigrateModuleConfig(db)
}
// pre180MigrateModuleConfig migrates <1.8 module configs to 1.8+
func pre180MigrateModuleConfig(db *database.DB) {
const pre180ModuleConfigKey = "stul-meta/modules"
type pre180ModuleConfig struct {
CompletedOnboarding bool `json:"configured"`
EnableTwitch bool `json:"twitch"`
EnableStulbe bool `json:"stulbe"`
EnableLoyalty bool `json:"loyalty"`
}
// Check if onboarding was completed
var moduleConfig pre180ModuleConfig
err := db.GetJSON(pre180ModuleConfigKey, &moduleConfig)
if err != nil {
if errors.Is(err, badger.ErrKeyNotFound) {
// Either first boot or migration already done
return
} else {
fatalError(err, "Could not read from DB")
}
}
// ?? Should never happen, maybe we just have an empty key?
if !moduleConfig.CompletedOnboarding {
failOnError(db.RemoveKey(pre180ModuleConfigKey), "Failed to remove pre-1.8 module config")
return
}
// Migrate to new config by updating every related module
var twitchConfig twitch.Config
err = db.GetJSON(twitch.ConfigKey, &twitchConfig)
if err != nil {
if !errors.Is(err, badger.ErrKeyNotFound) {
fatalError(err, "Could not read from DB")
}
} else {
twitchConfig.Enabled = moduleConfig.EnableTwitch
if err := db.PutJSON(twitch.ConfigKey, twitchConfig); err != nil {
logger.Error("Failed to update twitch config during 1.8 migration", zap.Error(err))
}
}
var stulbeConfig stulbe.Config
err = db.GetJSON(stulbe.ConfigKey, &stulbeConfig)
if err != nil {
if !errors.Is(err, badger.ErrKeyNotFound) {
fatalError(err, "Could not read from DB")
}
} else {
stulbeConfig.Enabled = moduleConfig.EnableStulbe
if err := db.PutJSON(stulbe.ConfigKey, stulbeConfig); err != nil {
logger.Error("Failed to update stulbe config during 1.8 migration", zap.Error(err))
}
}
var loyaltyConfig loyalty.Config
err = db.GetJSON(loyalty.ConfigKey, &loyaltyConfig)
if err != nil {
if !errors.Is(err, badger.ErrKeyNotFound) {
fatalError(err, "Could not read from DB")
}
} else {
loyaltyConfig.Enabled = moduleConfig.EnableLoyalty
if err := db.PutJSON(loyalty.ConfigKey, loyaltyConfig); err != nil {
logger.Error("Failed to update loyalty config during 1.8 migration", zap.Error(err))
}
}
logger.Info("Migrated module config to 1.8+")
// Remove old config key
failOnError(db.RemoveKey(pre180ModuleConfigKey), "Failed to remove pre-1.8 module config")
}

View file

@ -0,0 +1 @@
package badger

View file

@ -0,0 +1,163 @@
package database
import (
"fmt"
"github.com/strimertul/strimertul/modules"
jsoniter "github.com/json-iterator/go"
kv "github.com/strimertul/kilovolt/v8"
"go.uber.org/zap"
)
var json = jsoniter.ConfigFastest
var (
// ErrUnknown is returned when a response is received that doesn't match any expected outcome.
ErrUnknown = fmt.Errorf("unknown error")
)
type DBModule struct {
client *kv.LocalClient
hub *kv.Hub
logger *zap.Logger
}
type KvPair struct {
Key string
Data string
}
func NewDBModule(hub *kv.Hub, manager *modules.Manager) (*DBModule, error) {
logger := manager.Logger(modules.ModuleDB)
localClient := kv.NewLocalClient(kv.ClientOptions{}, logger)
go localClient.Run()
hub.AddClient(localClient)
localClient.Wait()
err := hub.SetAuthenticated(localClient.UID(), true)
if err != nil {
return nil, err
}
module := &DBModule{
client: localClient,
hub: hub,
logger: logger,
}
manager.Modules[modules.ModuleDB] = module
return module, nil
}
func (mod *DBModule) Hub() *kv.Hub {
return mod.hub
}
func (mod *DBModule) Status() modules.ModuleStatus {
return modules.ModuleStatus{
Enabled: mod.hub != nil,
Working: mod.client != nil,
StatusString: "ok",
}
}
func (mod *DBModule) Close() error {
mod.hub.RemoveClient(mod.client)
return nil
}
func (mod *DBModule) GetKey(key string) (string, error) {
res, err := mod.makeRequest(kv.CmdReadKey, map[string]interface{}{"key": key})
if err != nil {
return "", err
}
return res.Data.(string), nil
}
func (mod *DBModule) PutKey(key string, data string) error {
_, err := mod.makeRequest(kv.CmdWriteKey, map[string]interface{}{"key": key, "data": data})
return err
}
func (mod *DBModule) Subscribe(fn kv.SubscriptionCallback, prefixes ...string) error {
for _, prefix := range prefixes {
_, err := mod.makeRequest(kv.CmdSubscribePrefix, map[string]interface{}{"prefix": prefix})
if err != nil {
return err
}
mod.client.SetPrefixSubCallback(prefix, fn)
}
return nil
}
func (mod *DBModule) GetJSON(key string, dst interface{}) error {
res, err := mod.GetKey(key)
if err != nil {
return err
}
return json.Unmarshal([]byte(res), dst)
}
func (mod *DBModule) GetAll(prefix string) (map[string]string, error) {
res, err := mod.makeRequest(kv.CmdReadPrefix, map[string]interface{}{"prefix": prefix})
if err != nil {
return nil, err
}
out := make(map[string]string)
for key, value := range res.Data.(map[string]interface{}) {
out[key] = value.(string)
}
return out, nil
}
func (mod *DBModule) PutJSON(key string, data interface{}) error {
byt, err := json.Marshal(data)
if err != nil {
return err
}
return mod.PutKey(key, string(byt))
}
func (mod *DBModule) PutJSONBulk(kvs map[string]interface{}) error {
encoded := make(map[string]interface{})
for k, v := range kvs {
byt, err := json.Marshal(v)
if err != nil {
return err
}
encoded[k] = string(byt)
}
_, chn := mod.client.MakeRequest(kv.CmdWriteBulk, encoded)
_, err := getResponse(<-chn)
return err
}
func (mod *DBModule) RemoveKey(key string) error {
// TODO
return mod.PutKey(key, "")
}
func (mod *DBModule) makeRequest(cmd string, data map[string]interface{}) (kv.Response, error) {
req, chn := mod.client.MakeRequest(cmd, data)
mod.hub.SendMessage(req)
return getResponse(<-chn)
}
func getResponse(response interface{}) (kv.Response, error) {
switch c := response.(type) {
case kv.Response:
return c, nil
case kv.Error:
return kv.Response{}, &KvError{c}
}
return kv.Response{}, ErrUnknown
}
type KvError struct {
ErrorData kv.Error
}
func (kv *KvError) Error() string {
return fmt.Sprintf("%s: %s", kv.ErrorData.Error, kv.ErrorData.Details)
}

View file

@ -1,229 +0,0 @@
package database
import (
"bufio"
"context"
"encoding/binary"
"io"
"go.uber.org/zap"
"github.com/golang/protobuf/proto"
"github.com/strimertul/strimertul/modules"
"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/badger/v3/pb"
jsoniter "github.com/json-iterator/go"
)
var json = jsoniter.ConfigFastest
var (
ErrKeyNotFound = badger.ErrKeyNotFound
)
type DB struct {
client *badger.DB
logger *zap.Logger
}
type ModifiedKV struct {
Key string
Data []byte
Meta []byte
Version uint64
Expires uint64
}
func Open(options badger.Options, manager *modules.Manager) (*DB, error) {
// Create logger
logger := manager.Logger(modules.ModuleDB)
// Open database
client, err := badger.Open(options)
if err != nil {
return nil, err
}
// Create DB instance
db := &DB{
client: client,
logger: logger,
}
// Register DB module
manager.Modules[modules.ModuleDB] = db
return db, nil
}
func (db *DB) Client() *badger.DB {
return db.client
}
func (db *DB) Status() modules.ModuleStatus {
lsm, vlog := db.client.Size()
return modules.ModuleStatus{
Enabled: true,
Working: !db.client.IsClosed(),
Data: struct {
LSMSize int64
VlogSize int64
}{
lsm,
vlog,
},
StatusString: db.client.LevelsToString(),
}
}
func (db *DB) Close() error {
return db.client.Close()
}
func (db *DB) GetKey(key string) ([]byte, error) {
var byt []byte
err := db.client.View(func(t *badger.Txn) error {
item, err := t.Get([]byte(key))
if err != nil {
return err
}
byt, err = item.ValueCopy(nil)
return err
})
return byt, err
}
func (db *DB) PutKey(key string, data []byte) error {
return db.client.Update(func(t *badger.Txn) error {
return t.Set([]byte(key), data)
})
}
func (db *DB) Subscribe(ctx context.Context, fn func(changed []ModifiedKV) error, prefixes ...string) error {
prefixList := make([][]byte, len(prefixes))
for index, prefix := range prefixes {
prefixList[index] = []byte(prefix)
}
var matches []pb.Match
for _, prefix := range prefixList {
matches = append(matches, pb.Match{
Prefix: prefix,
})
}
return db.client.Subscribe(ctx, func(kv *badger.KVList) error {
modified := make([]ModifiedKV, len(kv.Kv))
for index, newKV := range kv.Kv {
modified[index] = ModifiedKV{
Key: string(newKV.Key),
Data: newKV.Value,
Meta: newKV.UserMeta,
Version: newKV.Version,
Expires: newKV.ExpiresAt,
}
}
return fn(modified)
}, matches)
}
func (db *DB) GetJSON(key string, dst interface{}) error {
return db.client.View(func(t *badger.Txn) error {
item, err := t.Get([]byte(key))
if err != nil {
return err
}
byt, err := item.ValueCopy(nil)
if err != nil {
return err
}
return json.Unmarshal(byt, dst)
})
}
func (db *DB) GetAll(prefix string) (map[string]string, error) {
out := make(map[string]string)
err := db.client.View(func(t *badger.Txn) error {
opt := badger.DefaultIteratorOptions
opt.Prefix = []byte(prefix)
it := t.NewIterator(opt)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
byt, err := item.ValueCopy(nil)
if err != nil {
return err
}
out[string(item.Key()[len(prefix):])] = string(byt)
}
return nil
})
return out, err
}
func (db *DB) PutJSON(key string, data interface{}) error {
return db.client.Update(func(t *badger.Txn) error {
byt, err := json.Marshal(data)
if err != nil {
return err
}
return t.Set([]byte(key), byt)
})
}
func (db *DB) PutJSONBulk(kvs map[string]interface{}) error {
return db.client.Update(func(t *badger.Txn) error {
for k, v := range kvs {
byt, err := json.Marshal(v)
if err != nil {
return err
}
err = t.Set([]byte(k), byt)
if err != nil {
return err
}
}
return nil
})
}
func (db *DB) RemoveKey(key string) error {
return db.client.Update(func(t *badger.Txn) error {
return t.Delete([]byte(key))
})
}
func (db *DB) RestoreOverwrite(r io.Reader) error {
br := bufio.NewReaderSize(r, 16<<10)
unmarshalBuf := make([]byte, 1<<10)
for {
var sz uint64
err := binary.Read(br, binary.LittleEndian, &sz)
if err == io.EOF {
break
} else if err != nil {
return err
}
if cap(unmarshalBuf) < int(sz) {
unmarshalBuf = make([]byte, sz)
}
if _, err = io.ReadFull(br, unmarshalBuf[:sz]); err != nil {
return err
}
list := &pb.KVList{}
if err := proto.Unmarshal(unmarshalBuf[:sz], list); err != nil {
return err
}
for _, kv := range list.Kv {
if err := db.PutKey(string(kv.Key), kv.Value); err != nil {
return err
}
}
}
return nil
}

View file

@ -7,19 +7,19 @@ import (
"io/fs" "io/fs"
"net/http" "net/http"
"github.com/strimertul/kilovolt/v7/drivers/badgerdb" jsoniter "github.com/json-iterator/go"
"github.com/strimertul/strimertul/modules/database"
"go.uber.org/zap" "go.uber.org/zap"
kv "github.com/strimertul/kilovolt/v8"
"github.com/strimertul/strimertul/modules" "github.com/strimertul/strimertul/modules"
"github.com/strimertul/strimertul/modules/database"
kv "github.com/strimertul/kilovolt/v7"
) )
type Server struct { type Server struct {
Config ServerConfig Config ServerConfig
db *database.DB db *database.DBModule
logger *zap.Logger logger *zap.Logger
server *http.Server server *http.Server
frontend fs.FS frontend fs.FS
@ -28,7 +28,7 @@ type Server struct {
} }
func NewServer(manager *modules.Manager) (*Server, error) { func NewServer(manager *modules.Manager) (*Server, error) {
db, ok := manager.Modules["db"].(*database.DB) db, ok := manager.Modules["db"].(*database.DBModule)
if !ok { if !ok {
return nil, errors.New("db module not found") return nil, errors.New("db module not found")
} }
@ -55,13 +55,13 @@ func NewServer(manager *modules.Manager) (*Server, error) {
} }
} }
server.hub, err = kv.NewHub(badgerdb.NewBadgerBackend(db.Client()), kv.HubOptions{ // Set hub
server.hub = db.Hub()
// Set password
server.hub.SetOptions(kv.HubOptions{
Password: server.Config.KVPassword, Password: server.Config.KVPassword,
}, logger.With(zap.String("module", "kv"))) })
if err != nil {
return nil, err
}
go server.hub.Run()
// Register module // Register module
manager.Modules[modules.ModuleHTTP] = server manager.Modules[modules.ModuleHTTP] = server
@ -113,14 +113,14 @@ func (s *Server) Listen() error {
restart := newSafeBool(false) restart := newSafeBool(false)
exit := make(chan error) exit := make(chan error)
go func() { go func() {
err := s.db.Subscribe(context.Background(), func(changed []database.ModifiedKV) error { err := s.db.Subscribe(func(key, value string) {
for _, pair := range changed { if key == ServerConfigKey {
if pair.Key == ServerConfigKey {
oldBind := s.Config.Bind oldBind := s.Config.Bind
oldPassword := s.Config.KVPassword oldPassword := s.Config.KVPassword
err := s.db.GetJSON(ServerConfigKey, &s.Config) err := jsoniter.ConfigFastest.Unmarshal([]byte(value), &s.Config)
if err != nil { if err != nil {
return err s.logger.Error("Failed to unmarshal config", zap.Error(err))
return
} }
s.mux = s.makeMux() s.mux = s.makeMux()
// Restart hub if password changed // Restart hub if password changed
@ -135,12 +135,10 @@ func (s *Server) Listen() error {
err = s.server.Shutdown(context.Background()) err = s.server.Shutdown(context.Background())
if err != nil { if err != nil {
s.logger.Error("Failed to shutdown server", zap.Error(err)) s.logger.Error("Failed to shutdown server", zap.Error(err))
return err return
} }
} }
} }
}
return nil
}, ServerConfigKey) }, ServerConfigKey)
if err != nil { if err != nil {
exit <- fmt.Errorf("error while handling subscription to HTTP config changes: %w", err) exit <- fmt.Errorf("error while handling subscription to HTTP config changes: %w", err)

View file

@ -1,20 +1,18 @@
package loyalty package loyalty
import ( import (
"context"
"errors" "errors"
"strings" "strings"
"sync" "sync"
"time" "time"
"go.uber.org/zap"
"github.com/strimertul/strimertul/modules" "github.com/strimertul/strimertul/modules"
"github.com/strimertul/strimertul/modules/database" "github.com/strimertul/strimertul/modules/database"
"github.com/strimertul/strimertul/modules/stulbe" "github.com/strimertul/strimertul/modules/stulbe"
"github.com/dgraph-io/badger/v3"
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
kv "github.com/strimertul/kilovolt/v8"
"go.uber.org/zap"
) )
var ( var (
@ -30,26 +28,19 @@ type Manager struct {
goals GoalStorage goals GoalStorage
queue RedeemQueueStorage queue RedeemQueueStorage
mu sync.Mutex mu sync.Mutex
db *database.DB db *database.DBModule
logger *zap.Logger logger *zap.Logger
cooldowns map[string]time.Time cooldowns map[string]time.Time
} }
func Register(manager *modules.Manager) error { func Register(manager *modules.Manager) error {
db, ok := manager.Modules["db"].(*database.DB) db, ok := manager.Modules["db"].(*database.DBModule)
if !ok { if !ok {
return errors.New("db module not found") return errors.New("db module not found")
} }
logger := manager.Logger(modules.ModuleLoyalty) logger := manager.Logger(modules.ModuleLoyalty)
// Check if we need to migrate
// TODO Remove this in the future
err := migratePoints(db, logger)
if err != nil {
return err
}
loyalty := &Manager{ loyalty := &Manager{
logger: logger, logger: logger,
db: db, db: db,
@ -58,7 +49,7 @@ func Register(manager *modules.Manager) error {
} }
// Ger data from DB // Ger data from DB
if err := db.GetJSON(ConfigKey, &loyalty.config); err != nil { if err := db.GetJSON(ConfigKey, &loyalty.config); err != nil {
if errors.Is(err, badger.ErrKeyNotFound) { if errors.Is(err, kv.ErrorKeyNotFound) {
logger.Warn("missing configuration for loyalty (but it's enabled). Please make sure to set it up properly!") logger.Warn("missing configuration for loyalty (but it's enabled). Please make sure to set it up properly!")
} else { } else {
return err return err
@ -67,17 +58,17 @@ func Register(manager *modules.Manager) error {
// Retrieve configs // Retrieve configs
if err := db.GetJSON(RewardsKey, &loyalty.rewards); err != nil { if err := db.GetJSON(RewardsKey, &loyalty.rewards); err != nil {
if !errors.Is(err, badger.ErrKeyNotFound) { if !errors.Is(err, kv.ErrorKeyNotFound) {
return err return err
} }
} }
if err := db.GetJSON(GoalsKey, &loyalty.goals); err != nil { if err := db.GetJSON(GoalsKey, &loyalty.goals); err != nil {
if !errors.Is(err, badger.ErrKeyNotFound) { if !errors.Is(err, kv.ErrorKeyNotFound) {
return err return err
} }
} }
if err := db.GetJSON(QueueKey, &loyalty.queue); err != nil { if err := db.GetJSON(QueueKey, &loyalty.queue); err != nil {
if !errors.Is(err, badger.ErrKeyNotFound) { if !errors.Is(err, kv.ErrorKeyNotFound) {
return err return err
} }
} }
@ -85,7 +76,7 @@ func Register(manager *modules.Manager) error {
// Retrieve user points // Retrieve user points
points, err := db.GetAll(PointsPrefix) points, err := db.GetAll(PointsPrefix)
if err != nil { if err != nil {
if !errors.Is(err, badger.ErrKeyNotFound) { if !errors.Is(err, kv.ErrorKeyNotFound) {
return err return err
} }
points = make(map[string]string) points = make(map[string]string)
@ -101,8 +92,8 @@ func Register(manager *modules.Manager) error {
} }
// Subscribe for changes // Subscribe for changes
go db.Subscribe(context.Background(), loyalty.update, "loyalty/") go db.Subscribe(loyalty.update, "loyalty/")
go db.Subscribe(context.Background(), loyalty.handleRemote, "stulbe/loyalty/") go db.Subscribe(loyalty.handleRemote, "stulbe/loyalty/")
// Replicate keys on stulbe if available // Replicate keys on stulbe if available
if stulbeManager, ok := manager.Modules["stulbe"].(*stulbe.Manager); ok { if stulbeManager, ok := manager.Modules["stulbe"].(*stulbe.Manager); ok {
@ -145,45 +136,44 @@ func (m *Manager) Close() error {
return nil return nil
} }
func (m *Manager) update(kvs []database.ModifiedKV) error { func (m *Manager) update(key, value string) {
for _, kv := range kvs {
var err error var err error
// Check for config changes/RPC // Check for config changes/RPC
switch kv.Key { switch key {
case ConfigKey: case ConfigKey:
err = func() error { err = func() error {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
return jsoniter.ConfigFastest.Unmarshal(kv.Data, &m.config) return jsoniter.ConfigFastest.UnmarshalFromString(value, &m.config)
}() }()
case GoalsKey: case GoalsKey:
err = func() error { err = func() error {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
return jsoniter.ConfigFastest.Unmarshal(kv.Data, &m.goals) return jsoniter.ConfigFastest.UnmarshalFromString(value, &m.goals)
}() }()
case RewardsKey: case RewardsKey:
err = func() error { err = func() error {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
return jsoniter.ConfigFastest.Unmarshal(kv.Data, &m.rewards) return jsoniter.ConfigFastest.UnmarshalFromString(value, &m.rewards)
}() }()
case QueueKey: case QueueKey:
err = func() error { err = func() error {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
return jsoniter.ConfigFastest.Unmarshal(kv.Data, &m.queue) return jsoniter.ConfigFastest.UnmarshalFromString(value, &m.queue)
}() }()
case CreateRedeemRPC: case CreateRedeemRPC:
var redeem Redeem var redeem Redeem
err = jsoniter.ConfigFastest.Unmarshal(kv.Data, &redeem) err = jsoniter.ConfigFastest.UnmarshalFromString(value, &redeem)
if err == nil { if err == nil {
err = m.AddRedeem(redeem) err = m.AddRedeem(redeem)
} }
case RemoveRedeemRPC: case RemoveRedeemRPC:
var redeem Redeem var redeem Redeem
err = jsoniter.ConfigFastest.Unmarshal(kv.Data, &redeem) err = jsoniter.ConfigFastest.UnmarshalFromString(value, &redeem)
if err == nil { if err == nil {
err = m.RemoveRedeem(redeem) err = m.RemoveRedeem(redeem)
} }
@ -191,10 +181,10 @@ func (m *Manager) update(kvs []database.ModifiedKV) error {
// Check for prefix changes // Check for prefix changes
switch { switch {
// User point changed // User point changed
case strings.HasPrefix(kv.Key, PointsPrefix): case strings.HasPrefix(key, PointsPrefix):
var entry PointsEntry var entry PointsEntry
err = jsoniter.ConfigFastest.Unmarshal(kv.Data, &entry) err = jsoniter.ConfigFastest.UnmarshalFromString(value, &entry)
user := kv.Key[len(PointsPrefix):] user := key[len(PointsPrefix):]
func() { func() {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
@ -203,22 +193,19 @@ func (m *Manager) update(kvs []database.ModifiedKV) error {
} }
} }
if err != nil { if err != nil {
m.logger.Error("subscribe error: invalid JSON received on key", zap.Error(err), zap.String("key", kv.Key)) m.logger.Error("subscribe error: invalid JSON received on key", zap.Error(err), zap.String("key", key))
} else { } else {
m.logger.Debug("updated key", zap.String("key", kv.Key)) m.logger.Debug("updated key", zap.String("key", key))
} }
}
return nil
} }
func (m *Manager) handleRemote(kvs []database.ModifiedKV) error { func (m *Manager) handleRemote(key, value string) {
for _, kv := range kvs { m.logger.Debug("loyalty request from stulbe", zap.String("key", key))
m.logger.Debug("loyalty request from stulbe", zap.String("key", kv.Key)) switch key {
switch kv.Key {
case KVExLoyaltyRedeem: case KVExLoyaltyRedeem:
// Parse request // Parse request
var redeemRequest ExLoyaltyRedeem var redeemRequest ExLoyaltyRedeem
err := jsoniter.ConfigFastest.Unmarshal(kv.Data, &redeemRequest) err := jsoniter.ConfigFastest.UnmarshalFromString(value, &redeemRequest)
if err != nil { if err != nil {
m.logger.Warn("error decoding redeem request", zap.Error(err)) m.logger.Warn("error decoding redeem request", zap.Error(err))
break break
@ -242,7 +229,7 @@ func (m *Manager) handleRemote(kvs []database.ModifiedKV) error {
case KVExLoyaltyContribute: case KVExLoyaltyContribute:
// Parse request // Parse request
var contributeRequest ExLoyaltyContribute var contributeRequest ExLoyaltyContribute
err := jsoniter.ConfigFastest.Unmarshal(kv.Data, &contributeRequest) err := jsoniter.ConfigFastest.UnmarshalFromString(value, &contributeRequest)
if err != nil { if err != nil {
m.logger.Warn("error decoding contribution request", zap.Error(err)) m.logger.Warn("error decoding contribution request", zap.Error(err))
break break
@ -258,8 +245,6 @@ func (m *Manager) handleRemote(kvs []database.ModifiedKV) error {
m.logger.Warn("error performing contribution request", zap.Error(err)) m.logger.Warn("error performing contribution request", zap.Error(err))
} }
} }
}
return nil
} }
func (m *Manager) GetPoints(user string) int64 { func (m *Manager) GetPoints(user string) int64 {

View file

@ -3,20 +3,22 @@ package loyalty
import ( import (
"errors" "errors"
"go.uber.org/zap" kv "github.com/strimertul/kilovolt/v8"
"github.com/strimertul/strimertul/modules/database" "github.com/strimertul/strimertul/modules/database"
"go.uber.org/zap"
) )
const OldPointsKey = "loyalty/users" const OldPointsKey = "loyalty/users"
type OldPointStorage map[string]int64 type OldPointStorage map[string]int64
func migratePoints(db *database.DB, log *zap.Logger) error { func migratePoints(db *database.DBModule, log *zap.Logger) error {
// Retrieve old storage // Retrieve old storage
var oldStorage OldPointStorage var oldStorage OldPointStorage
err := db.GetJSON(OldPointsKey, &oldStorage) err := db.GetJSON(OldPointsKey, &oldStorage)
if errors.Is(err, database.ErrKeyNotFound) { if errors.Is(err, kv.ErrorKeyNotFound) {
// No migration needed, points are already kaput // No migration needed, points are already kaput
return nil return nil
} }

View file

@ -1,28 +1,28 @@
package stulbe package stulbe
import ( import (
"context" "encoding/json"
"errors" "errors"
"github.com/strimertul/strimertul/modules/database"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/strimertul/strimertul/modules" "github.com/strimertul/strimertul/modules"
"github.com/strimertul/strimertul/modules/database"
"github.com/strimertul/stulbe-client-go" "github.com/strimertul/stulbe-client-go"
) )
type Manager struct { type Manager struct {
Config Config Config Config
Client *stulbe.Client Client *stulbe.Client
db *database.DB db *database.DBModule
logger *zap.Logger logger *zap.Logger
restart chan bool restart chan bool
} }
func Register(manager *modules.Manager) error { func Register(manager *modules.Manager) error {
db, ok := manager.Modules["db"].(*database.DB) db, ok := manager.Modules["db"].(*database.DBModule)
if !ok { if !ok {
return errors.New("db module not found") return errors.New("db module not found")
} }
@ -67,14 +67,13 @@ func Register(manager *modules.Manager) error {
}() }()
// Listen for config changes // Listen for config changes
go db.Subscribe(context.Background(), func(changed []database.ModifiedKV) error { go db.Subscribe(func(key, value string) {
for _, kv := range changed { if key == ConfigKey {
if kv.Key == ConfigKey {
var config Config var config Config
err := db.GetJSON(ConfigKey, &config) err := json.Unmarshal([]byte(value), &config)
if err != nil { if err != nil {
logger.Warn("Failed to get config", zap.Error(err)) logger.Warn("Failed to get new config", zap.Error(err))
continue return
} }
client, err := stulbe.NewClient(stulbe.ClientOptions{ client, err := stulbe.NewClient(stulbe.ClientOptions{
@ -91,8 +90,6 @@ func Register(manager *modules.Manager) error {
logger.Info("updated/restarted stulbe client") logger.Info("updated/restarted stulbe client")
} }
} }
}
return nil
}, ConfigKey) }, ConfigKey)
// Register module // Register module
@ -109,7 +106,7 @@ func (m *Manager) ReceiveEvents() error {
for { for {
select { select {
case kv := <-chn: case kv := <-chn:
err := m.db.PutKey(kv.Key, []byte(kv.Value)) err := m.db.PutKey(kv.Key, kv.Value)
if err != nil { if err != nil {
return err return err
} }
@ -160,16 +157,13 @@ func (m *Manager) ReplicateKey(prefix string) error {
m.logger.Debug("synced to remote", zap.String("prefix", prefix)) m.logger.Debug("synced to remote", zap.String("prefix", prefix))
// Subscribe to local datastore and update remote on change // Subscribe to local datastore and update remote on change
return m.db.Subscribe(context.Background(), func(pairs []database.ModifiedKV) error { return m.db.Subscribe(func(key, value string) {
for _, changed := range pairs { err := m.Client.KV.SetKey(key, value)
err := m.Client.KV.SetKey(changed.Key, string(changed.Data))
if err != nil { if err != nil {
return err m.logger.Error("failed to replicate key", zap.String("key", key), zap.Error(err))
} else {
m.logger.Debug("replicated to remote", zap.String("key", key))
} }
m.logger.Debug("replicated to remote", zap.String("key", changed.Key))
}
return nil
}, prefix) }, prefix)
} }

View file

@ -1,13 +1,11 @@
package twitch package twitch
import ( import (
"context"
"strings" "strings"
"sync" "sync"
"text/template" "text/template"
"time" "time"
"github.com/strimertul/strimertul/modules/database"
"github.com/strimertul/strimertul/modules/loyalty" "github.com/strimertul/strimertul/modules/loyalty"
"go.uber.org/zap" "go.uber.org/zap"
@ -157,41 +155,37 @@ func NewBot(api *Client, config BotConfig) *Bot {
if err != nil { if err != nil {
bot.logger.Error("failed to parse custom commands", zap.Error(err)) bot.logger.Error("failed to parse custom commands", zap.Error(err))
} }
go api.db.Subscribe(context.Background(), bot.updateCommands, CustomCommandsKey) go api.db.Subscribe(bot.updateCommands, CustomCommandsKey)
go api.db.Subscribe(context.Background(), bot.handleWriteMessageRPC, WriteMessageRPC) go api.db.Subscribe(bot.handleWriteMessageRPC, WriteMessageRPC)
return bot return bot
} }
func (b *Bot) updateCommands(kvs []database.ModifiedKV) error { func (b *Bot) updateCommands(key, value string) {
for _, kv := range kvs { switch key {
switch kv.Key {
case CustomCommandsKey: case CustomCommandsKey:
err := func() error { err := func() error {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
return jsoniter.ConfigFastest.Unmarshal(kv.Data, &b.customCommands) return jsoniter.ConfigFastest.UnmarshalFromString(value, &b.customCommands)
}() }()
if err != nil { if err != nil {
return err b.logger.Error("failed to decode new custom commands", zap.Error(err))
return
} }
// Recreate templates // Recreate templates
if err := b.updateTemplates(); err != nil { if err := b.updateTemplates(); err != nil {
return err b.logger.Error("failed to update custom commands templates", zap.Error(err))
return
} }
} }
}
return nil
} }
func (b *Bot) handleWriteMessageRPC(kvs []database.ModifiedKV) error { func (b *Bot) handleWriteMessageRPC(key, value string) {
for _, kv := range kvs { switch key {
switch kv.Key {
case WriteMessageRPC: case WriteMessageRPC:
b.Client.Say(b.config.Channel, string(kv.Data)) b.Client.Say(b.config.Channel, value)
} }
}
return nil
} }
func (b *Bot) updateTemplates() error { func (b *Bot) updateTemplates() error {

View file

@ -1,23 +1,22 @@
package twitch package twitch
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
"github.com/nicklaw5/helix/v2" "github.com/nicklaw5/helix/v2"
"github.com/strimertul/strimertul/modules/database"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/strimertul/strimertul/modules" "github.com/strimertul/strimertul/modules"
"github.com/strimertul/strimertul/modules/database"
"github.com/strimertul/strimertul/modules/loyalty" "github.com/strimertul/strimertul/modules/loyalty"
) )
type Client struct { type Client struct {
Config Config Config Config
Bot *Bot Bot *Bot
db *database.DB db *database.DBModule
API *helix.Client API *helix.Client
logger *zap.Logger logger *zap.Logger
@ -25,7 +24,7 @@ type Client struct {
} }
func Register(manager *modules.Manager) error { func Register(manager *modules.Manager) error {
db, ok := manager.Modules["db"].(*database.DB) db, ok := manager.Modules["db"].(*database.DBModule)
if !ok { if !ok {
return errors.New("db module not found") return errors.New("db module not found")
} }
@ -79,27 +78,26 @@ func Register(manager *modules.Manager) error {
} }
// Listen for config changes // Listen for config changes
go db.Subscribe(context.Background(), func(changed []database.ModifiedKV) error { go db.Subscribe(func(key, value string) {
for _, kv := range changed { switch key {
switch kv.Key {
case ConfigKey: case ConfigKey:
err := jsoniter.ConfigFastest.Unmarshal(kv.Data, &config) err := jsoniter.ConfigFastest.UnmarshalFromString(value, &config)
if err != nil { if err != nil {
logger.Error("failed to unmarshal config", zap.Error(err)) logger.Error("failed to unmarshal config", zap.Error(err))
continue return
} }
api, err := getHelixAPI(config.APIClientID, config.APIClientSecret) api, err := getHelixAPI(config.APIClientID, config.APIClientSecret)
if err != nil { if err != nil {
logger.Warn("failed to create new twitch client, keeping old credentials", zap.Error(err)) logger.Warn("failed to create new twitch client, keeping old credentials", zap.Error(err))
continue return
} }
client.API = api client.API = api
logger.Info("reloaded/updated Twitch API") logger.Info("reloaded/updated Twitch API")
case BotConfigKey: case BotConfigKey:
err := jsoniter.ConfigFastest.Unmarshal(kv.Data, &twitchBotConfig) err := jsoniter.ConfigFastest.UnmarshalFromString(value, &twitchBotConfig)
if err != nil { if err != nil {
logger.Error("failed to unmarshal config", zap.Error(err)) logger.Error("failed to unmarshal config", zap.Error(err))
continue return
} }
err = client.Bot.Client.Disconnect() err = client.Bot.Client.Disconnect()
if err != nil { if err != nil {
@ -109,8 +107,6 @@ func Register(manager *modules.Manager) error {
client.restart <- true client.restart <- true
logger.Info("reloaded/restarted Twitch bot") logger.Info("reloaded/restarted Twitch bot")
} }
}
return nil
}, ConfigKey, BotConfigKey) }, ConfigKey, BotConfigKey)
manager.Modules[modules.ModuleTwitch] = client manager.Modules[modules.ModuleTwitch] = client

View file

@ -89,7 +89,7 @@ func (b *Bot) setupFunctions() {
counter, _ = strconv.Atoi(string(byt)) counter, _ = strconv.Atoi(string(byt))
} }
counter += 1 counter += 1
err := b.api.db.PutKey(counterKey, []byte(strconv.Itoa(counter))) err := b.api.db.PutKey(counterKey, strconv.Itoa(counter))
if err != nil { if err != nil {
b.logger.Error("error saving key", zap.Error(err), zap.String("key", counterKey)) b.logger.Error("error saving key", zap.Error(err), zap.String("key", counterKey))
} }

View file

@ -2,7 +2,6 @@ package twitch
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"math/rand" "math/rand"
"sync" "sync"
@ -14,8 +13,6 @@ import (
"github.com/Masterminds/sprig/v3" "github.com/Masterminds/sprig/v3"
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
"github.com/nicklaw5/helix/v2" "github.com/nicklaw5/helix/v2"
"github.com/strimertul/strimertul/modules/database"
) )
const BotAlertsKey = "twitch/bot-modules/alerts/config" const BotAlertsKey = "twitch/bot-modules/alerts/config"
@ -115,10 +112,9 @@ func SetupAlerts(bot *Bot) *BotAlertsModule {
mod.compileTemplates() mod.compileTemplates()
go bot.api.db.Subscribe(context.Background(), func(changed []database.ModifiedKV) error { go bot.api.db.Subscribe(func(key, value string) {
for _, kv := range changed { if key == BotAlertsKey {
if kv.Key == BotAlertsKey { err := jsoniter.ConfigFastest.UnmarshalFromString(value, &mod.Config)
err := jsoniter.ConfigFastest.Unmarshal(kv.Data, &mod.Config)
if err != nil { if err != nil {
bot.logger.Debug("error reloading timer config", zap.Error(err)) bot.logger.Debug("error reloading timer config", zap.Error(err))
} else { } else {
@ -126,8 +122,6 @@ func SetupAlerts(bot *Bot) *BotAlertsModule {
} }
mod.compileTemplates() mod.compileTemplates()
} }
}
return nil
}, BotAlertsKey) }, BotAlertsKey)
// Subscriptions are handled with a slight delay as info come from different events and must be aggregated // Subscriptions are handled with a slight delay as info come from different events and must be aggregated
@ -241,27 +235,26 @@ func SetupAlerts(bot *Bot) *BotAlertsModule {
} }
} }
go bot.api.db.Subscribe(context.Background(), func(changed []database.ModifiedKV) error { go bot.api.db.Subscribe(func(key, value string) {
for _, kv := range changed { if key == "stulbe/ev/webhook" {
if kv.Key == "stulbe/ev/webhook" {
var ev eventSubNotification var ev eventSubNotification
err := jsoniter.ConfigFastest.Unmarshal(kv.Data, &ev) err := jsoniter.ConfigFastest.UnmarshalFromString(value, &ev)
if err != nil { if err != nil {
bot.logger.Debug("error parsing webhook payload", zap.Error(err)) bot.logger.Debug("error parsing webhook payload", zap.Error(err))
continue return
} }
switch ev.Subscription.Type { switch ev.Subscription.Type {
case helix.EventSubTypeChannelFollow: case helix.EventSubTypeChannelFollow:
// Only process if we care about follows // Only process if we care about follows
if !mod.Config.Follow.Enabled { if !mod.Config.Follow.Enabled {
continue return
} }
// Parse as follow event // Parse as follow event
var followEv helix.EventSubChannelFollowEvent var followEv helix.EventSubChannelFollowEvent
err := jsoniter.ConfigFastest.Unmarshal(ev.Event, &followEv) err := jsoniter.ConfigFastest.Unmarshal(ev.Event, &followEv)
if err != nil { if err != nil {
bot.logger.Debug("error parsing follow event", zap.Error(err)) bot.logger.Debug("error parsing follow event", zap.Error(err))
continue return
} }
// Pick a random message // Pick a random message
messageID := rand.Intn(len(mod.Config.Follow.Messages)) messageID := rand.Intn(len(mod.Config.Follow.Messages))
@ -275,14 +268,14 @@ func SetupAlerts(bot *Bot) *BotAlertsModule {
case helix.EventSubTypeChannelRaid: case helix.EventSubTypeChannelRaid:
// Only process if we care about raids // Only process if we care about raids
if !mod.Config.Raid.Enabled { if !mod.Config.Raid.Enabled {
continue return
} }
// Parse as raid event // Parse as raid event
var raidEv helix.EventSubChannelRaidEvent var raidEv helix.EventSubChannelRaidEvent
err := jsoniter.ConfigFastest.Unmarshal(ev.Event, &raidEv) err := jsoniter.ConfigFastest.Unmarshal(ev.Event, &raidEv)
if err != nil { if err != nil {
bot.logger.Debug("error parsing raid event", zap.Error(err)) bot.logger.Debug("error parsing raid event", zap.Error(err))
continue return
} }
// Pick a random message from base set // Pick a random message from base set
messageID := rand.Intn(len(mod.Config.Raid.Messages)) messageID := rand.Intn(len(mod.Config.Raid.Messages))
@ -290,7 +283,7 @@ func SetupAlerts(bot *Bot) *BotAlertsModule {
if !ok { if !ok {
// Broken template! // Broken template!
mod.bot.WriteMessage(mod.Config.Raid.Messages[messageID]) mod.bot.WriteMessage(mod.Config.Raid.Messages[messageID])
continue return
} }
// If we have variations, loop through all the available variations and pick the one with the highest minimum viewers that are met // If we have variations, loop through all the available variations and pick the one with the highest minimum viewers that are met
if len(mod.Config.Raid.Variations) > 0 { if len(mod.Config.Raid.Variations) > 0 {
@ -312,14 +305,14 @@ func SetupAlerts(bot *Bot) *BotAlertsModule {
case helix.EventSubTypeChannelCheer: case helix.EventSubTypeChannelCheer:
// Only process if we care about bits // Only process if we care about bits
if !mod.Config.Cheer.Enabled { if !mod.Config.Cheer.Enabled {
continue return
} }
// Parse as cheer event // Parse as cheer event
var cheerEv helix.EventSubChannelCheerEvent var cheerEv helix.EventSubChannelCheerEvent
err := jsoniter.ConfigFastest.Unmarshal(ev.Event, &cheerEv) err := jsoniter.ConfigFastest.Unmarshal(ev.Event, &cheerEv)
if err != nil { if err != nil {
bot.logger.Debug("error parsing cheer event", zap.Error(err)) bot.logger.Debug("error parsing cheer event", zap.Error(err))
continue return
} }
// Pick a random message from base set // Pick a random message from base set
messageID := rand.Intn(len(mod.Config.Cheer.Messages)) messageID := rand.Intn(len(mod.Config.Cheer.Messages))
@ -327,7 +320,7 @@ func SetupAlerts(bot *Bot) *BotAlertsModule {
if !ok { if !ok {
// Broken template! // Broken template!
mod.bot.WriteMessage(mod.Config.Raid.Messages[messageID]) mod.bot.WriteMessage(mod.Config.Raid.Messages[messageID])
continue return
} }
// If we have variations, loop through all the available variations and pick the one with the highest minimum amount that is met // If we have variations, loop through all the available variations and pick the one with the highest minimum amount that is met
if len(mod.Config.Cheer.Variations) > 0 { if len(mod.Config.Cheer.Variations) > 0 {
@ -349,40 +342,40 @@ func SetupAlerts(bot *Bot) *BotAlertsModule {
case helix.EventSubTypeChannelSubscription: case helix.EventSubTypeChannelSubscription:
// Only process if we care about subscriptions // Only process if we care about subscriptions
if !mod.Config.Subscription.Enabled { if !mod.Config.Subscription.Enabled {
continue return
} }
// Parse as subscription event // Parse as subscription event
var subEv helix.EventSubChannelSubscribeEvent var subEv helix.EventSubChannelSubscribeEvent
err := jsoniter.ConfigFastest.Unmarshal(ev.Event, &subEv) err := jsoniter.ConfigFastest.Unmarshal(ev.Event, &subEv)
if err != nil { if err != nil {
bot.logger.Debug("error parsing sub event", zap.Error(err)) bot.logger.Debug("error parsing sub event", zap.Error(err))
continue return
} }
addPendingSub(subEv) addPendingSub(subEv)
case helix.EventSubTypeChannelSubscriptionMessage: case helix.EventSubTypeChannelSubscriptionMessage:
// Only process if we care about subscriptions // Only process if we care about subscriptions
if !mod.Config.Subscription.Enabled { if !mod.Config.Subscription.Enabled {
continue return
} }
// Parse as subscription event // Parse as subscription event
var subEv helix.EventSubChannelSubscriptionMessageEvent var subEv helix.EventSubChannelSubscriptionMessageEvent
err := jsoniter.ConfigFastest.Unmarshal(ev.Event, &subEv) err := jsoniter.ConfigFastest.Unmarshal(ev.Event, &subEv)
if err != nil { if err != nil {
bot.logger.Debug("error parsing sub event", zap.Error(err)) bot.logger.Debug("error parsing sub event", zap.Error(err))
continue return
} }
addPendingSub(subEv) addPendingSub(subEv)
case helix.EventSubTypeChannelSubscriptionGift: case helix.EventSubTypeChannelSubscriptionGift:
// Only process if we care about gifted subs // Only process if we care about gifted subs
if !mod.Config.GiftSub.Enabled { if !mod.Config.GiftSub.Enabled {
continue return
} }
// Parse as gift event // Parse as gift event
var giftEv helix.EventSubChannelSubscriptionGiftEvent var giftEv helix.EventSubChannelSubscriptionGiftEvent
err := jsoniter.ConfigFastest.Unmarshal(ev.Event, &giftEv) err := jsoniter.ConfigFastest.Unmarshal(ev.Event, &giftEv)
if err != nil { if err != nil {
bot.logger.Debug("error parsing raid event", zap.Error(err)) bot.logger.Debug("error parsing raid event", zap.Error(err))
continue return
} }
// Pick a random message from base set // Pick a random message from base set
messageID := rand.Intn(len(mod.Config.GiftSub.Messages)) messageID := rand.Intn(len(mod.Config.GiftSub.Messages))
@ -390,7 +383,7 @@ func SetupAlerts(bot *Bot) *BotAlertsModule {
if !ok { if !ok {
// Broken template! // Broken template!
mod.bot.WriteMessage(mod.Config.GiftSub.Messages[messageID]) mod.bot.WriteMessage(mod.Config.GiftSub.Messages[messageID])
continue return
} }
// If we have variations, loop through all the available variations and pick the one with the highest minimum cumulative total that are met // If we have variations, loop through all the available variations and pick the one with the highest minimum cumulative total that are met
if len(mod.Config.GiftSub.Variations) > 0 { if len(mod.Config.GiftSub.Variations) > 0 {
@ -425,8 +418,6 @@ func SetupAlerts(bot *Bot) *BotAlertsModule {
writeTemplate(bot, tpl, &giftEv) writeTemplate(bot, tpl, &giftEv)
} }
} }
}
return nil
}, "stulbe/ev/webhook") }, "stulbe/ev/webhook")
bot.logger.Debug("loaded bot alerts") bot.logger.Debug("loaded bot alerts")

View file

@ -1,17 +1,14 @@
package twitch package twitch
import ( import (
"context"
"math/rand" "math/rand"
"sync" "sync"
"time" "time"
"go.uber.org/zap" "go.uber.org/zap"
jsoniter "github.com/json-iterator/go"
irc "github.com/gempir/go-twitch-irc/v2" irc "github.com/gempir/go-twitch-irc/v2"
"github.com/strimertul/strimertul/modules/database" jsoniter "github.com/json-iterator/go"
) )
const BotTimersKey = "twitch/bot-modules/timers/config" const BotTimersKey = "twitch/bot-modules/timers/config"
@ -59,18 +56,15 @@ func SetupTimers(bot *Bot) *BotTimerModule {
bot.api.db.PutJSON(BotTimersKey, mod.Config) bot.api.db.PutJSON(BotTimersKey, mod.Config)
} }
go bot.api.db.Subscribe(context.Background(), func(changed []database.ModifiedKV) error { go bot.api.db.Subscribe(func(key, value string) {
for _, kv := range changed { if key == BotTimersKey {
if kv.Key == BotTimersKey { err := jsoniter.ConfigFastest.UnmarshalFromString(value, &mod.Config)
err := jsoniter.ConfigFastest.Unmarshal(kv.Data, &mod.Config)
if err != nil { if err != nil {
bot.logger.Debug("error reloading timer config", zap.Error(err)) bot.logger.Debug("error reloading timer config", zap.Error(err))
} else { } else {
bot.logger.Info("reloaded timer config") bot.logger.Info("reloaded timer config")
} }
} }
}
return nil
}, BotTimersKey) }, BotTimersKey)
bot.logger.Debug("loaded timers", zap.Int("timers", len(mod.Config.Timers))) bot.logger.Debug("loaded timers", zap.Int("timers", len(mod.Config.Timers)))