2021-05-07 16:36:23 +00:00
|
|
|
package stulbe
|
|
|
|
|
|
|
|
import (
|
2021-05-10 21:09:15 +00:00
|
|
|
"context"
|
2021-11-23 10:34:02 +00:00
|
|
|
"errors"
|
2021-05-07 16:36:23 +00:00
|
|
|
|
2021-11-23 10:34:02 +00:00
|
|
|
"github.com/strimertul/strimertul/modules"
|
|
|
|
"github.com/strimertul/strimertul/modules/database"
|
2021-05-07 16:36:23 +00:00
|
|
|
|
2021-11-23 10:34:02 +00:00
|
|
|
"github.com/sirupsen/logrus"
|
2021-05-14 11:15:38 +00:00
|
|
|
"github.com/strimertul/stulbe-client-go"
|
2021-05-11 11:12:00 +00:00
|
|
|
)
|
2021-05-07 16:36:23 +00:00
|
|
|
|
2021-05-11 11:12:00 +00:00
|
|
|
type Manager struct {
|
|
|
|
Client *stulbe.Client
|
2021-05-10 21:09:15 +00:00
|
|
|
db *database.DB
|
2021-05-12 17:19:09 +00:00
|
|
|
logger logrus.FieldLogger
|
2021-11-24 10:55:12 +00:00
|
|
|
|
|
|
|
restart chan bool
|
2021-05-07 16:36:23 +00:00
|
|
|
}
|
|
|
|
|
2021-11-23 10:34:02 +00:00
|
|
|
func Initialize(manager *modules.Manager) (*Manager, error) {
|
|
|
|
db, ok := manager.Modules["db"].(*database.DB)
|
|
|
|
if !ok {
|
|
|
|
return nil, errors.New("db module not found")
|
|
|
|
}
|
|
|
|
|
|
|
|
logger := manager.Logger(modules.ModuleStulbe)
|
|
|
|
|
2021-05-07 16:36:23 +00:00
|
|
|
var config Config
|
2021-05-10 21:09:15 +00:00
|
|
|
err := db.GetJSON(ConfigKey, &config)
|
2021-05-07 16:36:23 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2021-11-23 10:34:02 +00:00
|
|
|
// Create client
|
2021-05-11 11:12:00 +00:00
|
|
|
stulbeClient, err := stulbe.NewClient(stulbe.ClientOptions{
|
2021-05-07 16:36:23 +00:00
|
|
|
Endpoint: config.Endpoint,
|
2021-05-11 11:12:00 +00:00
|
|
|
Username: config.Username,
|
|
|
|
AuthKey: config.AuthKey,
|
|
|
|
Logger: logger,
|
|
|
|
})
|
2021-05-08 14:02:47 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2021-11-23 10:34:02 +00:00
|
|
|
// Create manager
|
|
|
|
stulbeManager := &Manager{
|
2021-11-24 10:55:12 +00:00
|
|
|
Client: stulbeClient,
|
|
|
|
db: db,
|
|
|
|
logger: logger,
|
|
|
|
restart: make(chan bool),
|
2021-11-23 10:34:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Register module
|
|
|
|
manager.Modules[modules.ModuleStulbe] = stulbeManager
|
|
|
|
|
2021-11-24 10:55:12 +00:00
|
|
|
// Receive key updates
|
2021-11-23 10:34:02 +00:00
|
|
|
go func() {
|
2021-11-24 10:55:12 +00:00
|
|
|
for {
|
|
|
|
err := stulbeManager.ReceiveEvents()
|
|
|
|
if err != nil {
|
|
|
|
logger.WithError(err).Error("Stulbe subscription died unexpectedly!")
|
|
|
|
// Wait for config change before retrying
|
|
|
|
<-stulbeManager.restart
|
|
|
|
}
|
|
|
|
}
|
2021-11-23 10:34:02 +00:00
|
|
|
}()
|
|
|
|
|
2021-11-24 10:55:12 +00:00
|
|
|
// Listen for config changes
|
|
|
|
go db.Subscribe(context.Background(), func(changed []database.ModifiedKV) error {
|
|
|
|
for _, kv := range changed {
|
|
|
|
if kv.Key == ConfigKey {
|
|
|
|
var config Config
|
|
|
|
err := db.GetJSON(ConfigKey, &config)
|
|
|
|
if err != nil {
|
|
|
|
logger.WithError(err).Warn("Failed to get config")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
client, err := stulbe.NewClient(stulbe.ClientOptions{
|
|
|
|
Endpoint: config.Endpoint,
|
|
|
|
Username: config.Username,
|
|
|
|
AuthKey: config.AuthKey,
|
|
|
|
Logger: logger,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
logger.WithError(err).Warn("Failed to update stulbe client, keeping old settings")
|
|
|
|
} else {
|
|
|
|
stulbeManager.Client.Close()
|
|
|
|
stulbeManager.Client = client
|
|
|
|
stulbeManager.restart <- true
|
|
|
|
logger.Info("updated/restarted stulbe client")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}, ConfigKey)
|
|
|
|
|
2021-11-23 10:34:02 +00:00
|
|
|
return stulbeManager, nil
|
2021-05-07 16:36:23 +00:00
|
|
|
}
|
|
|
|
|
2021-05-18 11:30:27 +00:00
|
|
|
func (m *Manager) ReceiveEvents() error {
|
|
|
|
chn, err := m.Client.KV.SubscribePrefix("stulbe/")
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
for {
|
2021-11-24 10:55:12 +00:00
|
|
|
select {
|
|
|
|
case kv := <-chn:
|
|
|
|
err := m.db.PutKey(kv.Key, []byte(kv.Value))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
case <-m.restart:
|
|
|
|
return nil
|
2021-05-18 11:30:27 +00:00
|
|
|
}
|
2021-11-24 10:55:12 +00:00
|
|
|
|
2021-05-18 11:30:27 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-11-23 10:34:02 +00:00
|
|
|
func (m *Manager) Close() error {
|
2021-05-11 11:12:00 +00:00
|
|
|
m.Client.Close()
|
2021-11-23 10:34:02 +00:00
|
|
|
return nil
|
2021-05-07 16:36:23 +00:00
|
|
|
}
|
|
|
|
|
2021-05-16 15:55:40 +00:00
|
|
|
func (m *Manager) ReplicateKey(prefix string) error {
|
2021-05-11 11:12:00 +00:00
|
|
|
// Set key to current value
|
2021-05-16 15:55:40 +00:00
|
|
|
vals, err := m.db.GetAll(prefix)
|
2021-05-08 14:02:47 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2021-05-18 17:45:14 +00:00
|
|
|
// Add prefix to keys
|
|
|
|
newvals := make(map[string]string)
|
|
|
|
for k, v := range vals {
|
|
|
|
newvals[prefix+k] = v
|
|
|
|
}
|
|
|
|
|
|
|
|
err = m.Client.KV.SetKeys(newvals)
|
2021-05-18 11:30:27 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
2021-05-08 14:02:47 +00:00
|
|
|
}
|
2021-05-16 15:55:40 +00:00
|
|
|
|
2021-05-12 17:19:09 +00:00
|
|
|
m.logger.WithFields(logrus.Fields{
|
2021-05-16 15:55:40 +00:00
|
|
|
"prefix": prefix,
|
2021-11-24 10:55:12 +00:00
|
|
|
}).Debug("synced to remote")
|
2021-05-08 14:02:47 +00:00
|
|
|
|
2021-05-10 21:09:15 +00:00
|
|
|
// Subscribe to local datastore and update remote on change
|
2021-05-11 11:12:00 +00:00
|
|
|
return m.db.Subscribe(context.Background(), func(pairs []database.ModifiedKV) error {
|
2021-05-10 21:09:15 +00:00
|
|
|
for _, changed := range pairs {
|
2021-05-11 11:12:00 +00:00
|
|
|
err := m.Client.KV.SetKey(changed.Key, string(changed.Data))
|
2021-05-10 21:09:15 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2021-05-12 17:19:09 +00:00
|
|
|
m.logger.WithFields(logrus.Fields{
|
|
|
|
"key": changed.Key,
|
|
|
|
}).Debug("replicated to remote")
|
2021-05-10 21:09:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
2021-05-16 15:55:40 +00:00
|
|
|
}, prefix)
|
2021-05-10 21:09:15 +00:00
|
|
|
}
|
2021-11-19 18:37:42 +00:00
|
|
|
|
|
|
|
func (m *Manager) ReplicateKeys(prefixes []string) error {
|
|
|
|
for _, prefix := range prefixes {
|
|
|
|
err := m.ReplicateKey(prefix)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|