Backport fixes from glimesh branch

This commit is contained in:
Ash Keel 2022-06-17 00:51:27 +02:00
parent d352e0ebfc
commit acaca8a400
No known key found for this signature in database
GPG Key ID: BAD8D93E7314ED3E
9 changed files with 127 additions and 73 deletions

1
go.mod
View File

@ -19,6 +19,7 @@ require (
)
require (
git.sr.ht/~hamcha/containers v0.0.3 // indirect
github.com/DataDog/zstd v1.4.5 // indirect
github.com/Masterminds/goutils v1.1.1 // indirect
github.com/Masterminds/semver/v3 v3.1.1 // indirect

2
go.sum
View File

@ -1,5 +1,7 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
git.sr.ht/~hamcha/containers v0.0.3 h1:obG9X8s5iOIahVe+EGpkBDYmUAO78oTi9Y9gRurt334=
git.sr.ht/~hamcha/containers v0.0.3/go.mod h1:RiZphUpy9t6EnL4Gf6uzByM9QrBoqRCEPo7kz2wzbhE=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=

View File

@ -2,6 +2,7 @@ package main
import (
"embed"
"errors"
"flag"
"fmt"
"io/fs"
@ -126,7 +127,7 @@ func main() {
if *driver == "auto" {
file, err := ioutil.ReadFile(filepath.Join(options.directory, "stul-driver"))
if err != nil {
if err == os.ErrNotExist {
if errors.Is(err, os.ErrNotExist) {
*driver = "badger"
} else {
failOnError(err, "failed to open database driver file")

View File

@ -1,6 +1,7 @@
package database
import (
"errors"
"fmt"
"github.com/strimertul/strimertul/modules"
@ -14,7 +15,10 @@ var json = jsoniter.ConfigFastest
var (
// ErrUnknown is returned when a response is received that doesn't match any expected outcome.
ErrUnknown = fmt.Errorf("unknown error")
ErrUnknown = errors.New("unknown error")
// ErrEmptyKey is when a key is requested as JSON object but is an empty string (or unset)
ErrEmptyKey = errors.New("empty key")
)
type DBModule struct {
@ -84,7 +88,7 @@ func (mod *DBModule) Subscribe(fn kv.SubscriptionCallback, prefixes ...string) e
if err != nil {
return err
}
mod.client.SetPrefixSubCallback(prefix, fn)
go mod.client.SetPrefixSubCallback(prefix, fn)
}
return nil
}
@ -94,6 +98,9 @@ func (mod *DBModule) GetJSON(key string, dst interface{}) error {
if err != nil {
return err
}
if res == "" {
return ErrEmptyKey
}
return json.Unmarshal([]byte(res), dst)
}

View File

@ -8,6 +8,7 @@ import (
"net/http"
"net/http/pprof"
"git.sr.ht/~hamcha/containers"
jsoniter "github.com/json-iterator/go"
"github.com/strimertul/strimertul/modules/database"
@ -118,7 +119,7 @@ func (s *Server) makeMux() *http.ServeMux {
func (s *Server) Listen() error {
// Start HTTP server
restart := newSafeBool(false)
restart := containers.NewRWSync(false)
exit := make(chan error)
go func() {
err := s.db.Subscribe(func(key, value string) {

View File

@ -2,6 +2,7 @@ package loyalty
import (
"errors"
"fmt"
"strings"
"sync"
"time"
@ -11,7 +12,6 @@ import (
"github.com/strimertul/strimertul/modules/stulbe"
jsoniter "github.com/json-iterator/go"
kv "github.com/strimertul/kilovolt/v8"
"go.uber.org/zap"
)
@ -47,28 +47,27 @@ func Register(manager *modules.Manager) error {
mu: sync.Mutex{},
cooldowns: make(map[string]time.Time),
}
// Ger data from DB
// Get data from DB
if err := db.GetJSON(ConfigKey, &loyalty.config); err != nil {
if errors.Is(err, kv.ErrorKeyNotFound) {
logger.Warn("missing configuration for loyalty (but it's enabled). Please make sure to set it up properly!")
} else {
return err
if !errors.Is(err, database.ErrEmptyKey) {
return fmt.Errorf("could not retrieve loyalty config: %w", err)
}
loyalty.config.Enabled = false
}
// Retrieve configs
if err := db.GetJSON(RewardsKey, &loyalty.rewards); err != nil {
if !errors.Is(err, kv.ErrorKeyNotFound) {
if !errors.Is(err, database.ErrEmptyKey) {
return err
}
}
if err := db.GetJSON(GoalsKey, &loyalty.goals); err != nil {
if !errors.Is(err, kv.ErrorKeyNotFound) {
if !errors.Is(err, database.ErrEmptyKey) {
return err
}
}
if err := db.GetJSON(QueueKey, &loyalty.queue); err != nil {
if !errors.Is(err, kv.ErrorKeyNotFound) {
if !errors.Is(err, database.ErrEmptyKey) {
return err
}
}
@ -76,7 +75,7 @@ func Register(manager *modules.Manager) error {
// Retrieve user points
points, err := db.GetAll(PointsPrefix)
if err != nil {
if !errors.Is(err, kv.ErrorKeyNotFound) {
if !errors.Is(err, database.ErrEmptyKey) {
return err
}
points = make(map[string]string)

View File

@ -23,11 +23,17 @@ type Module interface {
type ModuleID string
const (
// Required
ModuleDB ModuleID = "db"
ModuleHTTP ModuleID = "http"
// Feature modules
ModuleLoyalty ModuleID = "loyalty"
ModuleTwitch ModuleID = "twitch"
ModuleStulbe ModuleID = "stulbe"
ModuleDB ModuleID = "db"
ModuleHTTP ModuleID = "http"
// Streaming providers
ModuleTwitch ModuleID = "twitch"
)
type Manager struct {

View File

@ -3,7 +3,9 @@ package twitch
import (
"errors"
"fmt"
"time"
"git.sr.ht/~hamcha/containers"
jsoniter "github.com/json-iterator/go"
"github.com/nicklaw5/helix/v2"
"github.com/strimertul/strimertul/modules/database"
@ -20,7 +22,8 @@ type Client struct {
API *helix.Client
logger *zap.Logger
restart chan bool
restart chan bool
streamOnline *containers.RWSync[bool]
}
func Register(manager *modules.Manager) error {
@ -35,38 +38,51 @@ func Register(manager *modules.Manager) error {
var config Config
err := db.GetJSON(ConfigKey, &config)
if err != nil {
return fmt.Errorf("failed to get twitch config: %w", err)
if !errors.Is(err, database.ErrEmptyKey) {
return fmt.Errorf("failed to get twitch config: %w", err)
}
config.Enabled = false
}
// Create Twitch client
api, err := getHelixAPI(config.APIClientID, config.APIClientSecret)
if err != nil {
return fmt.Errorf("failed to create twitch client: %w", err)
var api *helix.Client
if config.Enabled {
api, err = getHelixAPI(config.APIClientID, config.APIClientSecret)
if err != nil {
return fmt.Errorf("failed to create twitch client: %w", err)
}
}
client := &Client{
Config: config,
db: db,
API: api,
logger: logger,
restart: make(chan bool),
Config: config,
db: db,
API: api,
logger: logger,
restart: make(chan bool, 128),
streamOnline: containers.NewRWSync(false),
}
// Get Twitch bot config
var twitchBotConfig BotConfig
err = db.GetJSON(BotConfigKey, &twitchBotConfig)
if err != nil {
return fmt.Errorf("failed to get bot config: %w", err)
if client.Config.EnableBot {
if err := client.startBot(manager); err != nil {
if !errors.Is(err, database.ErrEmptyKey) {
return err
}
}
}
// Create and run IRC bot
client.Bot = NewBot(client, twitchBotConfig)
go client.runStatusPoll()
go func() {
for {
err := client.RunBot()
if err != nil {
logger.Error("failed to connect to Twitch IRC", zap.Error(err))
// Wait for config change before retrying
if client.Config.EnableBot && client.Bot != nil {
err := client.RunBot()
if err != nil {
logger.Error("failed to connect to Twitch IRC", zap.Error(err))
// Wait for config change before retrying
<-client.restart
}
} else {
<-client.restart
}
}
@ -94,6 +110,7 @@ func Register(manager *modules.Manager) error {
client.API = api
logger.Info("reloaded/updated Twitch API")
case BotConfigKey:
var twitchBotConfig BotConfig
err := jsoniter.ConfigFastest.UnmarshalFromString(value, &twitchBotConfig)
if err != nil {
logger.Error("failed to unmarshal config", zap.Error(err))
@ -103,7 +120,13 @@ func Register(manager *modules.Manager) error {
if err != nil {
logger.Warn("failed to disconnect from Twitch IRC", zap.Error(err))
}
client.Bot = NewBot(client, twitchBotConfig)
if client.Config.EnableBot {
if err := client.startBot(manager); err != nil {
if !errors.Is(err, database.ErrEmptyKey) {
logger.Error("failed to re-create bot", zap.Error(err))
}
}
}
client.restart <- true
logger.Info("reloaded/restarted Twitch bot")
}
@ -114,6 +137,53 @@ func Register(manager *modules.Manager) error {
return nil
}
func (c *Client) runStatusPoll() {
c.logger.Info("status poll started")
for {
// Wait for next poll
time.Sleep(60 * time.Second)
// Check if streamer is online, if possible
func() {
status, err := c.API.GetStreams(&helix.StreamsParams{
UserLogins: []string{c.Bot.config.Channel}, //TODO Replace with something non bot dependant
})
if err != nil {
c.logger.Error("Error checking stream status", zap.Error(err))
} else {
c.streamOnline.Set(len(status.Data.Streams) > 0)
}
err = c.db.PutJSON(StreamInfoKey, status.Data.Streams)
if err != nil {
c.logger.Warn("Error saving stream info", zap.Error(err))
}
}()
}
}
func (c *Client) startBot(manager *modules.Manager) error {
// Get Twitch bot config
var twitchBotConfig BotConfig
err := c.db.GetJSON(BotConfigKey, &twitchBotConfig)
if err != nil {
if !errors.Is(err, database.ErrEmptyKey) {
return fmt.Errorf("failed to get bot config: %w", err)
}
c.Config.EnableBot = false
}
// Create and run IRC bot
c.Bot = NewBot(c, twitchBotConfig)
// If loyalty module is enabled, set-up loyalty commands
if loyaltyManager, ok := manager.Modules[modules.ModuleLoyalty].(*loyalty.Manager); ok && c.Bot != nil {
c.Bot.SetupLoyalty(loyaltyManager)
}
return nil
}
func getHelixAPI(clientID string, clientSecret string) (*helix.Client, error) {
// Create Twitch client
api, err := helix.NewClient(&helix.Options{

View File

@ -4,13 +4,11 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"time"
"go.uber.org/zap"
irc "github.com/gempir/go-twitch-irc/v3"
"github.com/nicklaw5/helix/v2"
"github.com/strimertul/strimertul/modules/loyalty"
)
@ -51,35 +49,6 @@ func (b *Bot) SetupLoyalty(loyalty *loyalty.Manager) {
// Setup handler for adding points over time
b.Client.OnConnect(func() {
b.logger.Info("status poll started")
var statusMux sync.RWMutex
streamOnline := true
go func() {
for {
// Wait for next poll
time.Sleep(60 * time.Second)
// Check if streamer is online, if possible
func() {
statusMux.Lock()
defer statusMux.Unlock()
streamOnline = true
status, err := b.api.API.GetStreams(&helix.StreamsParams{
UserLogins: []string{b.config.Channel},
})
if err != nil {
b.logger.Error("Error checking stream status", zap.Error(err))
} else {
streamOnline = len(status.Data.Streams) > 0
}
err = b.api.db.PutJSON(StreamInfoKey, status.Data.Streams)
if err != nil {
b.logger.Warn("Error saving stream info", zap.Error(err))
}
}()
}
}()
go func() {
for {
status := loyalty.Status()
@ -90,9 +59,7 @@ func (b *Bot) SetupLoyalty(loyalty *loyalty.Manager) {
time.Sleep(time.Duration(config.Points.Interval) * time.Second)
// If stream is confirmed offline, don't give points away!
statusMux.RLock()
isOnline := streamOnline
statusMux.RUnlock()
isOnline := b.api.streamOnline.Get()
if !isOnline {
continue
}