mirror of
https://git.sr.ht/~ashkeel/strimertul
synced 2024-09-18 01:50:50 +00:00
Update to new kilovolt, move points to per-user
This commit is contained in:
parent
f9a222e8b8
commit
e3f4b49ba5
8 changed files with 143 additions and 62 deletions
|
@ -101,9 +101,9 @@ func (db *DB) GetJSON(key string, dst interface{}) error {
|
|||
func (db *DB) GetAll(prefix string) (map[string]string, error) {
|
||||
out := make(map[string]string)
|
||||
err := db.client.View(func(t *badger.Txn) error {
|
||||
it := t.NewIterator(badger.IteratorOptions{
|
||||
Prefix: []byte(prefix),
|
||||
})
|
||||
opt := badger.DefaultIteratorOptions
|
||||
opt.Prefix = []byte(prefix)
|
||||
it := t.NewIterator(opt)
|
||||
defer it.Close()
|
||||
for it.Rewind(); it.Valid(); it.Next() {
|
||||
item := it.Item()
|
||||
|
@ -111,7 +111,7 @@ func (db *DB) GetAll(prefix string) (map[string]string, error) {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
out[string(item.Key())] = string(byt)
|
||||
out[string(item.Key()[len(prefix):])] = string(byt)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
@ -143,3 +143,9 @@ func (db *DB) PutJSONBulk(kvs map[string]interface{}) error {
|
|||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (db *DB) RemoveKey(key string) error {
|
||||
return db.client.Update(func(t *badger.Txn) error {
|
||||
return t.Delete([]byte(key))
|
||||
})
|
||||
}
|
||||
|
|
2
go.mod
2
go.mod
|
@ -10,6 +10,6 @@ require (
|
|||
github.com/nicklaw5/helix v1.15.0
|
||||
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4
|
||||
github.com/sirupsen/logrus v1.8.1
|
||||
github.com/strimertul/kilovolt/v3 v3.0.3
|
||||
github.com/strimertul/kilovolt/v4 v4.0.1
|
||||
github.com/strimertul/stulbe-client-go v0.1.0
|
||||
)
|
||||
|
|
5
go.sum
5
go.sum
|
@ -105,9 +105,10 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
|
|||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
github.com/strimertul/kilovolt-client-go v1.1.1 h1:uy6y/WKJyubAPHb+wPJz5We+fVwzWIplHiclSAhEY2E=
|
||||
github.com/strimertul/kilovolt-client-go v1.1.1/go.mod h1:jXbd37kXDdDeKnOWao/JSNMdSYmuhBBHG+LWIBzuXr8=
|
||||
github.com/strimertul/kilovolt/v3 v3.0.0 h1:3gE0FdH3fL5UgMocR6Z7lI9hk3jD8ds8yL47D4Z7758=
|
||||
github.com/strimertul/kilovolt/v3 v3.0.0/go.mod h1:AgfPYRp+kffN64tcqCcQUZdpL/Dm5DGHIYRDm9t3E0Y=
|
||||
github.com/strimertul/kilovolt/v3 v3.0.3 h1:1+WmI8bi3Uwylr2l7+zkzr3kFHN73fm1Oala4aXxLQI=
|
||||
github.com/strimertul/kilovolt/v3 v3.0.3/go.mod h1:eweKrkaRD061PYcLS06L0FirEZ+uuQCWMcew7aZhXfk=
|
||||
github.com/strimertul/kilovolt/v4 v4.0.1 h1:81isohdSixVURO2+dZKKZBPw97HJmNN4/BXn6ADFoWM=
|
||||
github.com/strimertul/kilovolt/v4 v4.0.1/go.mod h1:AO2ZFQtSB+AcjCw0RTkXjbM6XBAjhsXsrRq10BX95kw=
|
||||
github.com/strimertul/stulbe v0.2.5 h1:qrJFwttrWfwSfHsvTgI3moZjhBwbYN8Xe8gicCeISpc=
|
||||
github.com/strimertul/stulbe v0.2.5/go.mod h1:0AsY4OVf1dNCwOn9s7KySuAxJ85w88pXeostu1n9E7w=
|
||||
github.com/strimertul/stulbe-client-go v0.1.0 h1:3lMPqrELDd4j5IBP0KDgdnuN2cEdr40Wore8DXioxFk=
|
||||
|
|
9
main.go
9
main.go
|
@ -9,7 +9,7 @@ import (
|
|||
"runtime"
|
||||
"time"
|
||||
|
||||
kv "github.com/strimertul/kilovolt/v3"
|
||||
kv "github.com/strimertul/kilovolt/v4"
|
||||
|
||||
"github.com/strimertul/strimertul/database"
|
||||
"github.com/strimertul/strimertul/modules"
|
||||
|
@ -111,7 +111,8 @@ func main() {
|
|||
}
|
||||
|
||||
// Initialize KV (required)
|
||||
hub := kv.NewHub(db.Client(), wrapLogger("kv"))
|
||||
hub, err := kv.NewHub(db.Client(), wrapLogger("kv"))
|
||||
failOnError(err, "Could not initialize kilovolt hub")
|
||||
go hub.Run()
|
||||
|
||||
// Get HTTP config
|
||||
|
@ -132,7 +133,7 @@ func main() {
|
|||
var loyaltyManager *loyalty.Manager
|
||||
loyaltyLogger := wrapLogger("loyalty")
|
||||
if moduleConfig.EnableLoyalty {
|
||||
loyaltyManager, err = loyalty.NewManager(db, hub, loyaltyLogger)
|
||||
loyaltyManager, err = loyalty.NewManager(db, loyaltyLogger)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Loyalty initialization failed! Module was temporarily disabled")
|
||||
moduleConfig.EnableLoyalty = false
|
||||
|
@ -142,7 +143,7 @@ func main() {
|
|||
go stulbeManager.ReplicateKey(loyalty.ConfigKey)
|
||||
go stulbeManager.ReplicateKey(loyalty.RewardsKey)
|
||||
go stulbeManager.ReplicateKey(loyalty.GoalsKey)
|
||||
go stulbeManager.ReplicateKey(loyalty.PointsKey)
|
||||
go stulbeManager.ReplicateKey(loyalty.PointsPrefix)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -43,9 +43,11 @@ type Goal struct {
|
|||
Contributors map[string]int64 `json:"contributors"`
|
||||
}
|
||||
|
||||
const PointsKey = "loyalty/users"
|
||||
const PointsPrefix = "loyalty/points/"
|
||||
|
||||
type PointStorage map[string]int64
|
||||
type PointsEntry struct {
|
||||
Points int64 `json:"points"`
|
||||
}
|
||||
|
||||
const QueueKey = "loyalty/redeem-queue"
|
||||
|
||||
|
|
|
@ -3,13 +3,13 @@ package loyalty
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/dgraph-io/badger/v3"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
kv "github.com/strimertul/kilovolt/v3"
|
||||
"github.com/strimertul/strimertul/database"
|
||||
)
|
||||
|
||||
|
@ -18,24 +18,31 @@ var (
|
|||
)
|
||||
|
||||
type Manager struct {
|
||||
points PointStorage
|
||||
points map[string]PointsEntry
|
||||
config Config
|
||||
rewards RewardStorage
|
||||
goals GoalStorage
|
||||
queue RedeemQueueStorage
|
||||
mu sync.Mutex
|
||||
hub *kv.Hub
|
||||
db *database.DB
|
||||
logger logrus.FieldLogger
|
||||
}
|
||||
|
||||
func NewManager(db *database.DB, hub *kv.Hub, log logrus.FieldLogger) (*Manager, error) {
|
||||
func NewManager(db *database.DB, log logrus.FieldLogger) (*Manager, error) {
|
||||
if log == nil {
|
||||
log = logrus.New()
|
||||
}
|
||||
|
||||
// Check if we need to migrate
|
||||
// TODO Remove this in the future
|
||||
err := migratePoints(db, log)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
manager := &Manager{
|
||||
logger: log,
|
||||
hub: hub,
|
||||
db: db,
|
||||
mu: sync.Mutex{},
|
||||
}
|
||||
// Ger data from DB
|
||||
|
@ -46,13 +53,8 @@ func NewManager(db *database.DB, hub *kv.Hub, log logrus.FieldLogger) (*Manager,
|
|||
return nil, err
|
||||
}
|
||||
}
|
||||
if err := db.GetJSON(PointsKey, &manager.points); err != nil {
|
||||
if err == badger.ErrKeyNotFound {
|
||||
manager.points = make(PointStorage)
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Retrieve configs
|
||||
if err := db.GetJSON(RewardsKey, &manager.rewards); err != nil {
|
||||
if err != badger.ErrKeyNotFound {
|
||||
return nil, err
|
||||
|
@ -69,6 +71,24 @@ func NewManager(db *database.DB, hub *kv.Hub, log logrus.FieldLogger) (*Manager,
|
|||
}
|
||||
}
|
||||
|
||||
// Retrieve user points
|
||||
points, err := db.GetAll(PointsPrefix)
|
||||
if err != nil {
|
||||
if err != badger.ErrKeyNotFound {
|
||||
return nil, err
|
||||
}
|
||||
points = make(map[string]string)
|
||||
}
|
||||
manager.points = make(map[string]PointsEntry)
|
||||
for k, v := range points {
|
||||
var entry PointsEntry
|
||||
err := jsoniter.ConfigFastest.UnmarshalFromString(v, &entry)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
manager.points[k] = entry
|
||||
}
|
||||
|
||||
// Subscribe for changes
|
||||
go func() {
|
||||
db.Subscribe(context.Background(), manager.update, "loyalty/")
|
||||
|
@ -80,15 +100,14 @@ func NewManager(db *database.DB, hub *kv.Hub, log logrus.FieldLogger) (*Manager,
|
|||
func (m *Manager) update(kvs []database.ModifiedKV) error {
|
||||
for _, kv := range kvs {
|
||||
var err error
|
||||
switch string(kv.Key) {
|
||||
key := string(kv.Key)
|
||||
|
||||
// Check for config changes/RPC
|
||||
switch key {
|
||||
case ConfigKey:
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
err = jsoniter.ConfigFastest.Unmarshal(kv.Data, &m.config)
|
||||
case PointsKey:
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
err = jsoniter.ConfigFastest.Unmarshal(kv.Data, &m.points)
|
||||
case GoalsKey:
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
@ -113,6 +132,18 @@ func (m *Manager) update(kvs []database.ModifiedKV) error {
|
|||
if err == nil {
|
||||
err = m.RemoveRedeem(redeem)
|
||||
}
|
||||
default:
|
||||
// Check for prefix changes
|
||||
switch {
|
||||
// User point changed
|
||||
case strings.HasPrefix(kv.Key, PointsPrefix):
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
var entry PointsEntry
|
||||
err = jsoniter.ConfigFastest.Unmarshal(kv.Data, &entry)
|
||||
user := kv.Key[len(PointsPrefix):]
|
||||
m.points[user] = entry
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
m.logger.WithFields(logrus.Fields{
|
||||
|
@ -126,54 +157,49 @@ func (m *Manager) update(kvs []database.ModifiedKV) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) savePoints() error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
data, _ := jsoniter.ConfigFastest.Marshal(m.points)
|
||||
return m.hub.WriteKey(PointsKey, string(data))
|
||||
}
|
||||
|
||||
func (m *Manager) GetPoints(user string) int64 {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
points, ok := m.points[user]
|
||||
if ok {
|
||||
return points
|
||||
return points.Points
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *Manager) setPoints(user string, points int64) {
|
||||
func (m *Manager) setPoints(user string, points int64) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.points[user] = points
|
||||
m.points[user] = PointsEntry{
|
||||
Points: points,
|
||||
}
|
||||
return m.db.PutJSON(PointsPrefix+user, m.points[user])
|
||||
}
|
||||
|
||||
func (m *Manager) GivePoints(pointsToGive map[string]int64) error {
|
||||
// Add points to each user
|
||||
for user, points := range pointsToGive {
|
||||
balance := m.GetPoints(user)
|
||||
m.setPoints(user, balance+points)
|
||||
if err := m.setPoints(user, balance+points); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Save points
|
||||
return m.savePoints()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) TakePoints(pointsToTake map[string]int64) error {
|
||||
// Add points to each user
|
||||
for user, points := range pointsToTake {
|
||||
balance := m.GetPoints(user)
|
||||
m.setPoints(user, balance-points)
|
||||
if err := m.setPoints(user, balance-points); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Save points
|
||||
return m.savePoints()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) saveQueue() error {
|
||||
data, _ := jsoniter.ConfigFastest.Marshal(m.queue)
|
||||
return m.hub.WriteKey(QueueKey, string(data))
|
||||
return m.db.PutJSON(QueueKey, m.queue)
|
||||
}
|
||||
|
||||
func (m *Manager) AddRedeem(redeem Redeem) error {
|
||||
|
@ -184,8 +210,9 @@ func (m *Manager) AddRedeem(redeem Redeem) error {
|
|||
m.queue = append(m.queue, redeem)
|
||||
|
||||
// Send redeem event
|
||||
data, _ := jsoniter.ConfigFastest.Marshal(redeem)
|
||||
m.hub.WriteKey(RedeemEvent, string(data))
|
||||
if err := m.db.PutJSON(RedeemEvent, redeem); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Save points
|
||||
return m.saveQueue()
|
||||
|
@ -209,8 +236,7 @@ func (m *Manager) RemoveRedeem(redeem Redeem) error {
|
|||
}
|
||||
|
||||
func (m *Manager) SaveGoals() error {
|
||||
data, _ := jsoniter.ConfigFastest.Marshal(m.goals)
|
||||
return m.hub.WriteKey(GoalsKey, string(data))
|
||||
return m.db.PutJSON(GoalsKey, m.goals)
|
||||
}
|
||||
|
||||
func (m *Manager) Goals() []Goal {
|
||||
|
|
42
modules/loyalty/migration.go
Normal file
42
modules/loyalty/migration.go
Normal file
|
@ -0,0 +1,42 @@
|
|||
package loyalty
|
||||
|
||||
import (
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/strimertul/strimertul/database"
|
||||
)
|
||||
|
||||
const OldPointsKey = "loyalty/users"
|
||||
|
||||
type OldPointStorage map[string]int64
|
||||
|
||||
func migratePoints(db *database.DB, log logrus.FieldLogger) error {
|
||||
// Retrieve old storage
|
||||
var oldStorage OldPointStorage
|
||||
err := db.GetJSON(OldPointsKey, &oldStorage)
|
||||
if err == database.ErrKeyNotFound {
|
||||
// No migration needed, points are already kaput
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Move to new format
|
||||
newStorage := make(map[string]interface{})
|
||||
for user, points := range oldStorage {
|
||||
userkey := PointsPrefix + user
|
||||
newStorage[userkey] = PointsEntry{
|
||||
Points: points,
|
||||
}
|
||||
}
|
||||
|
||||
// Bulk add to database
|
||||
if err := db.PutJSONBulk(newStorage); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info("Migrated to new loyalty point format")
|
||||
|
||||
// Remove old key
|
||||
return db.RemoveKey(OldPointsKey)
|
||||
}
|
|
@ -44,20 +44,23 @@ func (m *Manager) Close() {
|
|||
m.Client.Close()
|
||||
}
|
||||
|
||||
func (m *Manager) ReplicateKey(key string) error {
|
||||
func (m *Manager) ReplicateKey(prefix string) error {
|
||||
// Set key to current value
|
||||
val, err := m.db.GetKey(key)
|
||||
vals, err := m.db.GetAll(prefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = m.Client.KV.SetKey(key, string(val))
|
||||
for k, v := range vals {
|
||||
err = m.Client.KV.SetKey(k, v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
m.logger.WithFields(logrus.Fields{
|
||||
"key": key,
|
||||
}).Debug("set to remote")
|
||||
"prefix": prefix,
|
||||
}).Debug("synched to remote")
|
||||
|
||||
// Subscribe to local datastore and update remote on change
|
||||
return m.db.Subscribe(context.Background(), func(pairs []database.ModifiedKV) error {
|
||||
|
@ -72,5 +75,5 @@ func (m *Manager) ReplicateKey(key string) error {
|
|||
}
|
||||
|
||||
return nil
|
||||
}, key)
|
||||
}, prefix)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue