strimertul/database/database.go

230 lines
5.2 KiB
Go
Raw Normal View History

2022-02-01 11:35:34 +00:00
package database
import (
2024-03-12 23:50:59 +00:00
"context"
"encoding/json"
2022-06-16 22:51:27 +00:00
"errors"
2022-02-01 11:35:34 +00:00
"fmt"
kv "git.sr.ht/~ashkeel/kilovolt/v12"
2022-02-01 11:35:34 +00:00
)
type CancelFunc func()
2022-02-01 11:35:34 +00:00
var (
// ErrUnknown is returned when a response is received that doesn't match any expected outcome.
2022-06-16 22:51:27 +00:00
ErrUnknown = errors.New("unknown error")
// ErrEmptyKey is when a key is requested as JSON object but is an empty string (or unset)
ErrEmptyKey = errors.New("empty key")
2022-02-01 11:35:34 +00:00
)
2024-03-10 16:38:18 +00:00
type Database interface {
GetKey(key string) (string, error)
PutKey(key string, data string) error
2024-03-12 23:50:59 +00:00
RemoveKey(key string) error
2024-03-10 16:38:18 +00:00
SubscribePrefix(fn kv.SubscriptionCallback, prefixes ...string) (cancelFn CancelFunc, err error)
SubscribeKey(key string, fn func(string)) (cancelFn CancelFunc, err error)
2024-03-12 23:50:59 +00:00
SubscribePrefixContext(ctx context.Context, fn kv.SubscriptionCallback, prefixes ...string) error
SubscribeKeyContext(ctx context.Context, key string, fn func(string)) error
2024-03-10 16:38:18 +00:00
GetAll(prefix string) (map[string]string, error)
2024-03-12 23:50:59 +00:00
GetJSON(key string, dst any) error
2024-03-10 16:38:18 +00:00
PutJSON(key string, data any) error
PutJSONBulk(kvs map[string]any) error
2024-03-12 23:50:59 +00:00
2024-03-10 16:38:18 +00:00
Hub() *kv.Hub
}
2022-11-30 18:15:47 +00:00
type LocalDBClient struct {
2022-02-01 11:35:34 +00:00
client *kv.LocalClient
hub *kv.Hub
}
type KvPair struct {
Key string
Data string
}
func NewLocalClient(hub *kv.Hub) (*LocalDBClient, error) {
2022-11-30 18:15:47 +00:00
// Create local client
localClient := kv.NewLocalClient(kv.ClientOptions{})
2022-11-30 18:15:47 +00:00
// Run client and add it to the hub
2022-02-01 11:35:34 +00:00
go localClient.Run()
hub.AddClient(localClient)
localClient.Wait()
2022-11-30 18:15:47 +00:00
// Bypass authentication
2022-02-01 11:35:34 +00:00
err := hub.SetAuthenticated(localClient.UID(), true)
if err != nil {
return nil, err
}
2022-11-30 18:15:47 +00:00
return &LocalDBClient{
2022-02-01 11:35:34 +00:00
client: localClient,
hub: hub,
2022-11-30 18:15:47 +00:00
}, nil
2022-02-01 11:35:34 +00:00
}
2022-11-30 18:15:47 +00:00
func (mod *LocalDBClient) Hub() *kv.Hub {
2022-02-01 11:35:34 +00:00
return mod.hub
}
2022-11-30 18:15:47 +00:00
func (mod *LocalDBClient) Close() error {
2022-02-01 11:35:34 +00:00
mod.hub.RemoveClient(mod.client)
return nil
}
2022-11-30 18:15:47 +00:00
func (mod *LocalDBClient) GetKey(key string) (string, error) {
res, err := mod.makeRequest(kv.CmdReadKey, map[string]any{"key": key})
2022-02-01 11:35:34 +00:00
if err != nil {
return "", err
}
return res.Data.(string), nil
}
2022-11-30 18:15:47 +00:00
func (mod *LocalDBClient) PutKey(key string, data string) error {
_, err := mod.makeRequest(kv.CmdWriteKey, map[string]any{"key": key, "data": data})
2022-02-01 11:35:34 +00:00
return err
}
2024-03-12 23:50:59 +00:00
func (mod *LocalDBClient) SubscribePrefixContext(ctx context.Context, fn kv.SubscriptionCallback, prefixes ...string) error {
cancel, err := mod.SubscribePrefix(fn, prefixes...)
if err != nil {
return err
}
go func() {
<-ctx.Done()
cancel()
}()
return nil
}
func (mod *LocalDBClient) SubscribeKeyContext(ctx context.Context, key string, fn func(string)) error {
cancel, err := mod.SubscribeKey(key, fn)
if err != nil {
return err
}
go func() {
<-ctx.Done()
cancel()
}()
return nil
}
2024-02-25 13:46:59 +00:00
func (mod *LocalDBClient) SubscribePrefix(fn kv.SubscriptionCallback, prefixes ...string) (cancelFn CancelFunc, err error) {
var ids []int64
2022-02-01 11:35:34 +00:00
for _, prefix := range prefixes {
_, err = mod.makeRequest(kv.CmdSubscribePrefix, map[string]any{"prefix": prefix})
2022-02-01 11:35:34 +00:00
if err != nil {
2024-02-25 13:46:59 +00:00
return nil, err
}
ids = append(ids, mod.client.SetPrefixSubCallback(prefix, fn))
}
2024-02-25 13:46:59 +00:00
return func() {
for _, id := range ids {
mod.client.UnsetCallback(id)
2022-02-01 11:35:34 +00:00
}
2024-02-25 13:46:59 +00:00
}, nil
2022-02-01 11:35:34 +00:00
}
2024-02-25 13:46:59 +00:00
func (mod *LocalDBClient) SubscribeKey(key string, fn func(string)) (cancelFn CancelFunc, err error) {
_, err = mod.makeRequest(kv.CmdSubscribeKey, map[string]any{"key": key})
if err != nil {
2024-02-25 13:46:59 +00:00
return nil, err
}
id := mod.client.SetKeySubCallback(key, func(changedKey string, value string) {
if key != changedKey {
return
}
fn(value)
})
2024-02-25 13:46:59 +00:00
return func() {
mod.client.UnsetCallback(id)
2024-02-25 13:46:59 +00:00
}, nil
}
func (mod *LocalDBClient) GetJSON(key string, dst any) error {
2022-02-01 11:35:34 +00:00
res, err := mod.GetKey(key)
if err != nil {
if errors.Is(err, kv.ErrorKeyNotFound) {
return ErrEmptyKey
}
2022-02-01 11:35:34 +00:00
return err
}
2022-06-16 22:51:27 +00:00
if res == "" {
return ErrEmptyKey
}
2022-02-01 11:35:34 +00:00
return json.Unmarshal([]byte(res), dst)
}
2022-11-30 18:15:47 +00:00
func (mod *LocalDBClient) GetAll(prefix string) (map[string]string, error) {
res, err := mod.makeRequest(kv.CmdReadPrefix, map[string]any{"prefix": prefix})
2022-02-01 11:35:34 +00:00
if err != nil {
return nil, err
}
out := make(map[string]string)
for key, value := range res.Data.(map[string]any) {
2022-02-01 11:35:34 +00:00
out[key] = value.(string)
}
return out, nil
}
func (mod *LocalDBClient) PutJSON(key string, data any) error {
2022-02-01 11:35:34 +00:00
byt, err := json.Marshal(data)
if err != nil {
return err
}
return mod.PutKey(key, string(byt))
}
func (mod *LocalDBClient) PutJSONBulk(kvs map[string]any) error {
encoded := make(map[string]any)
2022-02-01 11:35:34 +00:00
for k, v := range kvs {
byt, err := json.Marshal(v)
if err != nil {
return err
}
encoded[k] = string(byt)
}
2023-05-21 19:15:28 +00:00
_, err := mod.makeRequest(kv.CmdWriteBulk, encoded)
2022-02-01 11:35:34 +00:00
return err
}
2022-11-30 18:15:47 +00:00
func (mod *LocalDBClient) RemoveKey(key string) error {
_, err := mod.makeRequest(kv.CmdRemoveKey, map[string]any{"key": key})
return err
2022-02-01 11:35:34 +00:00
}
func (mod *LocalDBClient) makeRequest(cmd string, data map[string]any) (kv.Response, error) {
2022-02-01 11:35:34 +00:00
req, chn := mod.client.MakeRequest(cmd, data)
mod.hub.SendMessage(req)
return getResponse(<-chn)
}
func getResponse(response any) (kv.Response, error) {
2022-02-01 11:35:34 +00:00
switch c := response.(type) {
case kv.Response:
return c, nil
case kv.Error:
return kv.Response{}, &KvError{c}
}
return kv.Response{}, ErrUnknown
}
type KvError struct {
ErrorData kv.Error
}
func (kv *KvError) Error() string {
return fmt.Sprintf("%s: %s", kv.ErrorData.Error, kv.ErrorData.Details)
}