From e3f4b49ba5bdab49fafae64ca52854aef899dd26 Mon Sep 17 00:00:00 2001 From: Ash Keel Date: Sun, 16 May 2021 17:55:40 +0200 Subject: [PATCH] Update to new kilovolt, move points to per-user --- database/db.go | 14 +++-- go.mod | 2 +- go.sum | 5 +- main.go | 9 +-- modules/loyalty/data.go | 6 +- modules/loyalty/manager.go | 108 ++++++++++++++++++++++------------- modules/loyalty/migration.go | 42 ++++++++++++++ modules/stulbe/client.go | 19 +++--- 8 files changed, 143 insertions(+), 62 deletions(-) create mode 100644 modules/loyalty/migration.go diff --git a/database/db.go b/database/db.go index e6ced23..fde7062 100644 --- a/database/db.go +++ b/database/db.go @@ -101,9 +101,9 @@ func (db *DB) GetJSON(key string, dst interface{}) error { func (db *DB) GetAll(prefix string) (map[string]string, error) { out := make(map[string]string) err := db.client.View(func(t *badger.Txn) error { - it := t.NewIterator(badger.IteratorOptions{ - Prefix: []byte(prefix), - }) + opt := badger.DefaultIteratorOptions + opt.Prefix = []byte(prefix) + it := t.NewIterator(opt) defer it.Close() for it.Rewind(); it.Valid(); it.Next() { item := it.Item() @@ -111,7 +111,7 @@ func (db *DB) GetAll(prefix string) (map[string]string, error) { if err != nil { return err } - out[string(item.Key())] = string(byt) + out[string(item.Key()[len(prefix):])] = string(byt) } return nil }) @@ -143,3 +143,9 @@ func (db *DB) PutJSONBulk(kvs map[string]interface{}) error { return nil }) } + +func (db *DB) RemoveKey(key string) error { + return db.client.Update(func(t *badger.Txn) error { + return t.Delete([]byte(key)) + }) +} diff --git a/go.mod b/go.mod index 4920827..a46301e 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,6 @@ require ( github.com/nicklaw5/helix v1.15.0 github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 github.com/sirupsen/logrus v1.8.1 - github.com/strimertul/kilovolt/v3 v3.0.3 + github.com/strimertul/kilovolt/v4 v4.0.1 github.com/strimertul/stulbe-client-go v0.1.0 ) diff --git a/go.sum b/go.sum index e29cf8d..340fc95 100644 --- a/go.sum +++ b/go.sum @@ -105,9 +105,10 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/strimertul/kilovolt-client-go v1.1.1 h1:uy6y/WKJyubAPHb+wPJz5We+fVwzWIplHiclSAhEY2E= github.com/strimertul/kilovolt-client-go v1.1.1/go.mod h1:jXbd37kXDdDeKnOWao/JSNMdSYmuhBBHG+LWIBzuXr8= +github.com/strimertul/kilovolt/v3 v3.0.0 h1:3gE0FdH3fL5UgMocR6Z7lI9hk3jD8ds8yL47D4Z7758= github.com/strimertul/kilovolt/v3 v3.0.0/go.mod h1:AgfPYRp+kffN64tcqCcQUZdpL/Dm5DGHIYRDm9t3E0Y= -github.com/strimertul/kilovolt/v3 v3.0.3 h1:1+WmI8bi3Uwylr2l7+zkzr3kFHN73fm1Oala4aXxLQI= -github.com/strimertul/kilovolt/v3 v3.0.3/go.mod h1:eweKrkaRD061PYcLS06L0FirEZ+uuQCWMcew7aZhXfk= +github.com/strimertul/kilovolt/v4 v4.0.1 h1:81isohdSixVURO2+dZKKZBPw97HJmNN4/BXn6ADFoWM= +github.com/strimertul/kilovolt/v4 v4.0.1/go.mod h1:AO2ZFQtSB+AcjCw0RTkXjbM6XBAjhsXsrRq10BX95kw= github.com/strimertul/stulbe v0.2.5 h1:qrJFwttrWfwSfHsvTgI3moZjhBwbYN8Xe8gicCeISpc= github.com/strimertul/stulbe v0.2.5/go.mod h1:0AsY4OVf1dNCwOn9s7KySuAxJ85w88pXeostu1n9E7w= github.com/strimertul/stulbe-client-go v0.1.0 h1:3lMPqrELDd4j5IBP0KDgdnuN2cEdr40Wore8DXioxFk= diff --git a/main.go b/main.go index 64215e1..1dc2c14 100644 --- a/main.go +++ b/main.go @@ -9,7 +9,7 @@ import ( "runtime" "time" - kv "github.com/strimertul/kilovolt/v3" + kv "github.com/strimertul/kilovolt/v4" "github.com/strimertul/strimertul/database" "github.com/strimertul/strimertul/modules" @@ -111,7 +111,8 @@ func main() { } // Initialize KV (required) - hub := kv.NewHub(db.Client(), wrapLogger("kv")) + hub, err := kv.NewHub(db.Client(), wrapLogger("kv")) + failOnError(err, "Could not initialize kilovolt hub") go hub.Run() // Get HTTP config @@ -132,7 +133,7 @@ func main() { var loyaltyManager *loyalty.Manager loyaltyLogger := wrapLogger("loyalty") if moduleConfig.EnableLoyalty { - loyaltyManager, err = loyalty.NewManager(db, hub, loyaltyLogger) + loyaltyManager, err = loyalty.NewManager(db, loyaltyLogger) if err != nil { log.WithError(err).Error("Loyalty initialization failed! Module was temporarily disabled") moduleConfig.EnableLoyalty = false @@ -142,7 +143,7 @@ func main() { go stulbeManager.ReplicateKey(loyalty.ConfigKey) go stulbeManager.ReplicateKey(loyalty.RewardsKey) go stulbeManager.ReplicateKey(loyalty.GoalsKey) - go stulbeManager.ReplicateKey(loyalty.PointsKey) + go stulbeManager.ReplicateKey(loyalty.PointsPrefix) } } diff --git a/modules/loyalty/data.go b/modules/loyalty/data.go index b26d076..a4a56f0 100644 --- a/modules/loyalty/data.go +++ b/modules/loyalty/data.go @@ -43,9 +43,11 @@ type Goal struct { Contributors map[string]int64 `json:"contributors"` } -const PointsKey = "loyalty/users" +const PointsPrefix = "loyalty/points/" -type PointStorage map[string]int64 +type PointsEntry struct { + Points int64 `json:"points"` +} const QueueKey = "loyalty/redeem-queue" diff --git a/modules/loyalty/manager.go b/modules/loyalty/manager.go index 26c133b..9370c65 100644 --- a/modules/loyalty/manager.go +++ b/modules/loyalty/manager.go @@ -3,13 +3,13 @@ package loyalty import ( "context" "errors" + "strings" "sync" "github.com/dgraph-io/badger/v3" jsoniter "github.com/json-iterator/go" "github.com/sirupsen/logrus" - kv "github.com/strimertul/kilovolt/v3" "github.com/strimertul/strimertul/database" ) @@ -18,24 +18,31 @@ var ( ) type Manager struct { - points PointStorage + points map[string]PointsEntry config Config rewards RewardStorage goals GoalStorage queue RedeemQueueStorage mu sync.Mutex - hub *kv.Hub + db *database.DB logger logrus.FieldLogger } -func NewManager(db *database.DB, hub *kv.Hub, log logrus.FieldLogger) (*Manager, error) { +func NewManager(db *database.DB, log logrus.FieldLogger) (*Manager, error) { if log == nil { log = logrus.New() } + // Check if we need to migrate + // TODO Remove this in the future + err := migratePoints(db, log) + if err != nil { + return nil, err + } + manager := &Manager{ logger: log, - hub: hub, + db: db, mu: sync.Mutex{}, } // Ger data from DB @@ -46,13 +53,8 @@ func NewManager(db *database.DB, hub *kv.Hub, log logrus.FieldLogger) (*Manager, return nil, err } } - if err := db.GetJSON(PointsKey, &manager.points); err != nil { - if err == badger.ErrKeyNotFound { - manager.points = make(PointStorage) - } else { - return nil, err - } - } + + // Retrieve configs if err := db.GetJSON(RewardsKey, &manager.rewards); err != nil { if err != badger.ErrKeyNotFound { return nil, err @@ -69,6 +71,24 @@ func NewManager(db *database.DB, hub *kv.Hub, log logrus.FieldLogger) (*Manager, } } + // Retrieve user points + points, err := db.GetAll(PointsPrefix) + if err != nil { + if err != badger.ErrKeyNotFound { + return nil, err + } + points = make(map[string]string) + } + manager.points = make(map[string]PointsEntry) + for k, v := range points { + var entry PointsEntry + err := jsoniter.ConfigFastest.UnmarshalFromString(v, &entry) + if err != nil { + return nil, err + } + manager.points[k] = entry + } + // Subscribe for changes go func() { db.Subscribe(context.Background(), manager.update, "loyalty/") @@ -80,15 +100,14 @@ func NewManager(db *database.DB, hub *kv.Hub, log logrus.FieldLogger) (*Manager, func (m *Manager) update(kvs []database.ModifiedKV) error { for _, kv := range kvs { var err error - switch string(kv.Key) { + key := string(kv.Key) + + // Check for config changes/RPC + switch key { case ConfigKey: m.mu.Lock() defer m.mu.Unlock() err = jsoniter.ConfigFastest.Unmarshal(kv.Data, &m.config) - case PointsKey: - m.mu.Lock() - defer m.mu.Unlock() - err = jsoniter.ConfigFastest.Unmarshal(kv.Data, &m.points) case GoalsKey: m.mu.Lock() defer m.mu.Unlock() @@ -113,6 +132,18 @@ func (m *Manager) update(kvs []database.ModifiedKV) error { if err == nil { err = m.RemoveRedeem(redeem) } + default: + // Check for prefix changes + switch { + // User point changed + case strings.HasPrefix(kv.Key, PointsPrefix): + m.mu.Lock() + defer m.mu.Unlock() + var entry PointsEntry + err = jsoniter.ConfigFastest.Unmarshal(kv.Data, &entry) + user := kv.Key[len(PointsPrefix):] + m.points[user] = entry + } } if err != nil { m.logger.WithFields(logrus.Fields{ @@ -126,54 +157,49 @@ func (m *Manager) update(kvs []database.ModifiedKV) error { return nil } -func (m *Manager) savePoints() error { - m.mu.Lock() - defer m.mu.Unlock() - data, _ := jsoniter.ConfigFastest.Marshal(m.points) - return m.hub.WriteKey(PointsKey, string(data)) -} - func (m *Manager) GetPoints(user string) int64 { m.mu.Lock() defer m.mu.Unlock() points, ok := m.points[user] if ok { - return points + return points.Points } return 0 } -func (m *Manager) setPoints(user string, points int64) { +func (m *Manager) setPoints(user string, points int64) error { m.mu.Lock() defer m.mu.Unlock() - m.points[user] = points + m.points[user] = PointsEntry{ + Points: points, + } + return m.db.PutJSON(PointsPrefix+user, m.points[user]) } func (m *Manager) GivePoints(pointsToGive map[string]int64) error { // Add points to each user for user, points := range pointsToGive { balance := m.GetPoints(user) - m.setPoints(user, balance+points) + if err := m.setPoints(user, balance+points); err != nil { + return err + } } - - // Save points - return m.savePoints() + return nil } func (m *Manager) TakePoints(pointsToTake map[string]int64) error { // Add points to each user for user, points := range pointsToTake { balance := m.GetPoints(user) - m.setPoints(user, balance-points) + if err := m.setPoints(user, balance-points); err != nil { + return err + } } - - // Save points - return m.savePoints() + return nil } func (m *Manager) saveQueue() error { - data, _ := jsoniter.ConfigFastest.Marshal(m.queue) - return m.hub.WriteKey(QueueKey, string(data)) + return m.db.PutJSON(QueueKey, m.queue) } func (m *Manager) AddRedeem(redeem Redeem) error { @@ -184,8 +210,9 @@ func (m *Manager) AddRedeem(redeem Redeem) error { m.queue = append(m.queue, redeem) // Send redeem event - data, _ := jsoniter.ConfigFastest.Marshal(redeem) - m.hub.WriteKey(RedeemEvent, string(data)) + if err := m.db.PutJSON(RedeemEvent, redeem); err != nil { + return err + } // Save points return m.saveQueue() @@ -209,8 +236,7 @@ func (m *Manager) RemoveRedeem(redeem Redeem) error { } func (m *Manager) SaveGoals() error { - data, _ := jsoniter.ConfigFastest.Marshal(m.goals) - return m.hub.WriteKey(GoalsKey, string(data)) + return m.db.PutJSON(GoalsKey, m.goals) } func (m *Manager) Goals() []Goal { diff --git a/modules/loyalty/migration.go b/modules/loyalty/migration.go new file mode 100644 index 0000000..77498ab --- /dev/null +++ b/modules/loyalty/migration.go @@ -0,0 +1,42 @@ +package loyalty + +import ( + "github.com/sirupsen/logrus" + "github.com/strimertul/strimertul/database" +) + +const OldPointsKey = "loyalty/users" + +type OldPointStorage map[string]int64 + +func migratePoints(db *database.DB, log logrus.FieldLogger) error { + // Retrieve old storage + var oldStorage OldPointStorage + err := db.GetJSON(OldPointsKey, &oldStorage) + if err == database.ErrKeyNotFound { + // No migration needed, points are already kaput + return nil + } + if err != nil { + return err + } + + // Move to new format + newStorage := make(map[string]interface{}) + for user, points := range oldStorage { + userkey := PointsPrefix + user + newStorage[userkey] = PointsEntry{ + Points: points, + } + } + + // Bulk add to database + if err := db.PutJSONBulk(newStorage); err != nil { + return err + } + + log.Info("Migrated to new loyalty point format") + + // Remove old key + return db.RemoveKey(OldPointsKey) +} diff --git a/modules/stulbe/client.go b/modules/stulbe/client.go index da3a57d..4bd80ce 100644 --- a/modules/stulbe/client.go +++ b/modules/stulbe/client.go @@ -44,20 +44,23 @@ func (m *Manager) Close() { m.Client.Close() } -func (m *Manager) ReplicateKey(key string) error { +func (m *Manager) ReplicateKey(prefix string) error { // Set key to current value - val, err := m.db.GetKey(key) + vals, err := m.db.GetAll(prefix) if err != nil { return err } - err = m.Client.KV.SetKey(key, string(val)) - if err != nil { - return err + for k, v := range vals { + err = m.Client.KV.SetKey(k, v) + if err != nil { + return err + } } + m.logger.WithFields(logrus.Fields{ - "key": key, - }).Debug("set to remote") + "prefix": prefix, + }).Debug("synched to remote") // Subscribe to local datastore and update remote on change return m.db.Subscribe(context.Background(), func(pairs []database.ModifiedKV) error { @@ -72,5 +75,5 @@ func (m *Manager) ReplicateKey(key string) error { } return nil - }, key) + }, prefix) }