2021-05-07 16:36:23 +00:00
|
|
|
package stulbe
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
2021-05-10 21:09:15 +00:00
|
|
|
"context"
|
2021-05-07 16:36:23 +00:00
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"net/http"
|
2021-05-08 14:02:47 +00:00
|
|
|
"net/url"
|
2021-05-10 21:09:15 +00:00
|
|
|
"sync"
|
2021-05-07 16:36:23 +00:00
|
|
|
|
2021-05-08 14:02:47 +00:00
|
|
|
"github.com/gorilla/websocket"
|
2021-05-07 16:36:23 +00:00
|
|
|
jsoniter "github.com/json-iterator/go"
|
|
|
|
"github.com/nicklaw5/helix"
|
|
|
|
"github.com/sirupsen/logrus"
|
|
|
|
|
2021-05-08 14:02:47 +00:00
|
|
|
kv "github.com/strimertul/kilovolt/v2"
|
2021-05-07 16:36:23 +00:00
|
|
|
"github.com/strimertul/stulbe/api"
|
|
|
|
|
2021-05-10 21:09:15 +00:00
|
|
|
"github.com/strimertul/strimertul/database"
|
2021-05-07 16:36:23 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
type Client struct {
|
|
|
|
Endpoint string
|
|
|
|
|
|
|
|
client *http.Client
|
|
|
|
token string
|
|
|
|
logger logrus.FieldLogger
|
2021-05-08 14:02:47 +00:00
|
|
|
ws *websocket.Conn
|
2021-05-10 21:09:15 +00:00
|
|
|
db *database.DB
|
|
|
|
mu sync.Mutex // Used to avoid concurrent writes to socket
|
2021-05-07 16:36:23 +00:00
|
|
|
}
|
|
|
|
|
2021-05-10 21:09:15 +00:00
|
|
|
func NewClient(db *database.DB, logger logrus.FieldLogger) (*Client, error) {
|
2021-05-07 16:36:23 +00:00
|
|
|
var config Config
|
2021-05-10 21:09:15 +00:00
|
|
|
err := db.GetJSON(ConfigKey, &config)
|
2021-05-07 16:36:23 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
client := &Client{
|
|
|
|
Endpoint: config.Endpoint,
|
|
|
|
token: "",
|
|
|
|
logger: logger,
|
|
|
|
client: &http.Client{},
|
2021-05-08 14:02:47 +00:00
|
|
|
ws: nil,
|
2021-05-10 21:09:15 +00:00
|
|
|
db: db,
|
|
|
|
mu: sync.Mutex{},
|
2021-05-07 16:36:23 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
err = client.Authenticate(config.Username, config.AuthKey)
|
2021-05-08 14:02:47 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
err = client.ConnectToWebsocket()
|
2021-05-07 16:36:23 +00:00
|
|
|
|
|
|
|
return client, err
|
|
|
|
}
|
|
|
|
|
2021-05-08 14:02:47 +00:00
|
|
|
func (s *Client) Close() {
|
|
|
|
if s.ws != nil {
|
|
|
|
s.ws.Close()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-05-07 16:36:23 +00:00
|
|
|
func (s *Client) Authenticate(user string, authKey string) error {
|
|
|
|
body := new(bytes.Buffer)
|
|
|
|
err := jsoniter.ConfigFastest.NewEncoder(body).Encode(api.AuthRequest{User: user, AuthKey: authKey})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
resp, err := s.client.Post(fmt.Sprintf("%s/api/auth", s.Endpoint), "application/json", body)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if resp.StatusCode >= 400 {
|
|
|
|
return getAPIError(resp)
|
|
|
|
}
|
|
|
|
|
|
|
|
var reply api.AuthResponse
|
|
|
|
err = jsoniter.ConfigFastest.NewDecoder(resp.Body).Decode(&reply)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
s.token = reply.Token
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Client) authenticated() bool {
|
|
|
|
return s.token != ""
|
|
|
|
}
|
|
|
|
|
2021-05-08 14:02:47 +00:00
|
|
|
func (s *Client) ConnectToWebsocket() error {
|
|
|
|
if !s.authenticated() {
|
|
|
|
return ErrNotAuthenticated
|
|
|
|
}
|
|
|
|
|
|
|
|
uri, err := url.Parse(fmt.Sprintf("%s/ws", s.Endpoint))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if uri.Scheme == "https" {
|
|
|
|
uri.Scheme = "wss"
|
|
|
|
} else {
|
|
|
|
uri.Scheme = "ws"
|
|
|
|
}
|
|
|
|
|
|
|
|
s.ws, _, err = websocket.DefaultDialer.Dial(uri.String(), http.Header{
|
|
|
|
"Authorization": []string{"Bearer " + s.token},
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
s.logger.Info("connected to ws, reading")
|
|
|
|
for {
|
|
|
|
_, message, err := s.ws.ReadMessage()
|
|
|
|
if err != nil {
|
|
|
|
s.logger.WithError(err).Error("websocket read error")
|
|
|
|
return
|
|
|
|
}
|
2021-05-10 21:09:15 +00:00
|
|
|
s.logger.WithField("message", string(message)).Debug("recv ws")
|
2021-05-08 14:02:47 +00:00
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2021-05-10 21:09:15 +00:00
|
|
|
func (s *Client) ReplicateKey(key string) error {
|
|
|
|
// Set key to current value
|
|
|
|
val, err := s.db.GetKey(key)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
s.send(kv.Request{
|
|
|
|
CmdName: kv.CmdWriteKey,
|
|
|
|
Data: map[string]interface{}{
|
|
|
|
"key": key,
|
|
|
|
"data": string(val),
|
|
|
|
},
|
|
|
|
})
|
|
|
|
|
|
|
|
// Subscribe to local datastore and update remote on change
|
|
|
|
return s.db.Subscribe(context.Background(), func(pairs []database.ModifiedKV) error {
|
|
|
|
for _, changed := range pairs {
|
|
|
|
err := s.send(kv.Request{
|
|
|
|
CmdName: kv.CmdWriteKey,
|
|
|
|
Data: map[string]interface{}{
|
|
|
|
"key": changed.Key,
|
|
|
|
"data": string(changed.Data),
|
|
|
|
},
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}, key)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Client) send(v interface{}) error {
|
|
|
|
s.mu.Lock()
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
return s.ws.WriteJSON(v)
|
|
|
|
}
|
|
|
|
|
2021-05-07 16:36:23 +00:00
|
|
|
func (s *Client) newAuthRequest(method string, url string, body io.Reader) (*http.Request, error) {
|
|
|
|
if !s.authenticated() {
|
|
|
|
return nil, ErrNotAuthenticated
|
|
|
|
}
|
|
|
|
|
|
|
|
req, err := http.NewRequest(method, url, body)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
req.Header.Set("Authorization", "Bearer "+s.token)
|
|
|
|
|
|
|
|
return req, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Client) StreamStatus(streamer string) (*helix.Stream, error) {
|
|
|
|
uri := fmt.Sprintf("%s/api/stream/%s/status", s.Endpoint, streamer)
|
|
|
|
req, err := s.newAuthRequest("GET", uri, nil)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
resp, err := s.client.Do(req)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if resp.StatusCode >= 400 {
|
|
|
|
return nil, getAPIError(resp)
|
|
|
|
}
|
|
|
|
|
|
|
|
var streams []helix.Stream
|
|
|
|
err = jsoniter.ConfigFastest.NewDecoder(resp.Body).Decode(&streams)
|
|
|
|
if len(streams) < 1 {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return &streams[0], err
|
|
|
|
}
|
|
|
|
|
|
|
|
func getAPIError(r *http.Response) error {
|
|
|
|
var apiError api.ResponseError
|
|
|
|
err := jsoniter.ConfigFastest.NewDecoder(r.Body).Decode(&apiError)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return errors.New(apiError.Error)
|
|
|
|
}
|