diff --git a/modules/database/database.go b/modules/database/database.go index 608b642..b1924da 100644 --- a/modules/database/database.go +++ b/modules/database/database.go @@ -93,6 +93,20 @@ func (mod *DBModule) Subscribe(fn kv.SubscriptionCallback, prefixes ...string) e return nil } +func (mod *DBModule) SubscribeKey(fn func(string), key string) error { + _, err := mod.makeRequest(kv.CmdSubscribePrefix, map[string]interface{}{"prefix": key}) + if err != nil { + return err + } + go mod.client.SetPrefixSubCallback(key, func(changedKey string, value string) { + if key != changedKey { + return + } + fn(value) + }) + return nil +} + func (mod *DBModule) GetJSON(key string, dst interface{}) error { res, err := mod.GetKey(key) if err != nil { diff --git a/modules/http/server.go b/modules/http/server.go index 374822e..ea2bf95 100644 --- a/modules/http/server.go +++ b/modules/http/server.go @@ -134,31 +134,29 @@ func (s *Server) Listen() error { restart := containers.NewRWSync(false) exit := make(chan error) go func() { - err := s.db.Subscribe(func(key, value string) { - if key == ServerConfigKey { - oldBind := s.Config.Bind - oldPassword := s.Config.KVPassword - err := json.Unmarshal([]byte(value), &s.Config) + err := s.db.SubscribeKey(func(value string) { + oldBind := s.Config.Bind + oldPassword := s.Config.KVPassword + err := json.Unmarshal([]byte(value), &s.Config) + if err != nil { + s.logger.Error("Failed to unmarshal config", zap.Error(err)) + return + } + s.mux = s.makeMux() + // Restart hub if password changed + if oldPassword != s.Config.KVPassword { + s.hub.SetOptions(kv.HubOptions{ + Password: s.Config.KVPassword, + }) + } + // Restart server if bind changed + if oldBind != s.Config.Bind { + restart.Set(true) + err = s.server.Shutdown(context.Background()) if err != nil { - s.logger.Error("Failed to unmarshal config", zap.Error(err)) + s.logger.Error("Failed to shutdown server", zap.Error(err)) return } - s.mux = s.makeMux() - // Restart hub if password changed - if oldPassword != s.Config.KVPassword { - s.hub.SetOptions(kv.HubOptions{ - Password: s.Config.KVPassword, - }) - } - // Restart server if bind changed - if oldBind != s.Config.Bind { - restart.Set(true) - err = s.server.Shutdown(context.Background()) - if err != nil { - s.logger.Error("Failed to shutdown server", zap.Error(err)) - return - } - } } }, ServerConfigKey) if err != nil { diff --git a/modules/loyalty/manager.go b/modules/loyalty/manager.go index 99f3da2..346a1de 100644 --- a/modules/loyalty/manager.go +++ b/modules/loyalty/manager.go @@ -92,7 +92,10 @@ func Register(manager *modules.Manager) error { } // Subscribe for changes - go db.Subscribe(loyalty.update, "loyalty/") + err = db.Subscribe(loyalty.update, "loyalty/") + if err != nil { + logger.Error("could not setup loyalty reload subscription", zap.Error(err)) + } // Register module manager.Modules[modules.ModuleLoyalty] = loyalty diff --git a/modules/twitch/bot.go b/modules/twitch/bot.go index dae4981..b6d0515 100644 --- a/modules/twitch/bot.go +++ b/modules/twitch/bot.go @@ -154,37 +154,37 @@ func NewBot(api *Client, config BotConfig) *Bot { if err != nil { bot.logger.Error("failed to parse custom commands", zap.Error(err)) } - go api.db.Subscribe(bot.updateCommands, CustomCommandsKey) - go api.db.Subscribe(bot.handleWriteMessageRPC, WriteMessageRPC) + err = api.db.SubscribeKey(bot.updateCommands, CustomCommandsKey) + if err != nil { + bot.logger.Error("could not set-up bot command reload subscription", zap.Error(err)) + } + err = api.db.SubscribeKey(bot.handleWriteMessageRPC, WriteMessageRPC) + if err != nil { + bot.logger.Error("could not set-up bot command reload subscription", zap.Error(err)) + } return bot } -func (b *Bot) updateCommands(key, value string) { - switch key { - case CustomCommandsKey: - err := func() error { - b.mu.Lock() - defer b.mu.Unlock() - return json.UnmarshalFromString(value, &b.customCommands) - }() - if err != nil { - b.logger.Error("failed to decode new custom commands", zap.Error(err)) - return - } - // Recreate templates - if err := b.updateTemplates(); err != nil { - b.logger.Error("failed to update custom commands templates", zap.Error(err)) - return - } +func (b *Bot) updateCommands(value string) { + err := func() error { + b.mu.Lock() + defer b.mu.Unlock() + return json.UnmarshalFromString(value, &b.customCommands) + }() + if err != nil { + b.logger.Error("failed to decode new custom commands", zap.Error(err)) + return + } + // Recreate templates + if err := b.updateTemplates(); err != nil { + b.logger.Error("failed to update custom commands templates", zap.Error(err)) + return } } -func (b *Bot) handleWriteMessageRPC(key, value string) { - switch key { - case WriteMessageRPC: - b.Client.Say(b.config.Channel, value) - } +func (b *Bot) handleWriteMessageRPC(value string) { + b.Client.Say(b.config.Channel, value) } func (b *Bot) updateTemplates() error { diff --git a/modules/twitch/client.go b/modules/twitch/client.go index e62f017..8f5a8f4 100644 --- a/modules/twitch/client.go +++ b/modules/twitch/client.go @@ -66,44 +66,49 @@ func Register(manager *modules.Manager) error { } // Listen for config changes - go db.Subscribe(func(key, value string) { - switch key { - case ConfigKey: - err := json.UnmarshalFromString(value, &config) - if err != nil { - logger.Error("failed to unmarshal config", zap.Error(err)) - return - } - api, err := client.getHelixAPI() - if err != nil { - logger.Warn("failed to create new twitch client, keeping old credentials", zap.Error(err)) - return - } - client.API = api + err = db.SubscribeKey(func(value string) { + err := json.UnmarshalFromString(value, &config) + if err != nil { + logger.Error("failed to unmarshal config", zap.Error(err)) + return + } + api, err := client.getHelixAPI() + if err != nil { + logger.Warn("failed to create new twitch client, keeping old credentials", zap.Error(err)) + return + } + client.API = api - logger.Info("reloaded/updated Twitch API") - case BotConfigKey: - var twitchBotConfig BotConfig - err := json.UnmarshalFromString(value, &twitchBotConfig) - if err != nil { - logger.Error("failed to unmarshal config", zap.Error(err)) - return - } - err = client.Bot.Client.Disconnect() - if err != nil { - logger.Warn("failed to disconnect from Twitch IRC", zap.Error(err)) - } - if client.Config.EnableBot { - if err := client.startBot(manager); err != nil { - if !errors.Is(err, database.ErrEmptyKey) { - logger.Error("failed to re-create bot", zap.Error(err)) - } + logger.Info("reloaded/updated Twitch API") + }, ConfigKey) + if err != nil { + client.logger.Error("could not setup twitch config reload subscription", zap.Error(err)) + } + + err = db.SubscribeKey(func(value string) { + var twitchBotConfig BotConfig + err := json.UnmarshalFromString(value, &twitchBotConfig) + if err != nil { + logger.Error("failed to unmarshal config", zap.Error(err)) + return + } + err = client.Bot.Client.Disconnect() + if err != nil { + logger.Warn("failed to disconnect from Twitch IRC", zap.Error(err)) + } + if client.Config.EnableBot { + if err := client.startBot(manager); err != nil { + if !errors.Is(err, database.ErrEmptyKey) { + logger.Error("failed to re-create bot", zap.Error(err)) } } - client.restart <- true - logger.Info("reloaded/restarted Twitch bot") } - }, ConfigKey, BotConfigKey) + client.restart <- true + logger.Info("reloaded/restarted Twitch bot") + }, BotConfigKey) + if err != nil { + client.logger.Error("could not setup twitch bot config reload subscription", zap.Error(err)) + } if config.Enabled { client.API, err = client.getHelixAPI() diff --git a/modules/twitch/modules.alerts.go b/modules/twitch/modules.alerts.go index a75a7f7..89f2d31 100644 --- a/modules/twitch/modules.alerts.go +++ b/modules/twitch/modules.alerts.go @@ -111,17 +111,18 @@ func SetupAlerts(bot *Bot) *BotAlertsModule { mod.compileTemplates() - go bot.api.db.Subscribe(func(key, value string) { - if key == BotAlertsKey { - err := json.UnmarshalFromString(value, &mod.Config) - if err != nil { - bot.logger.Debug("error reloading timer config", zap.Error(err)) - } else { - bot.logger.Info("reloaded alert config") - } - mod.compileTemplates() + err = bot.api.db.SubscribeKey(func(value string) { + err := json.UnmarshalFromString(value, &mod.Config) + if err != nil { + bot.logger.Debug("error reloading timer config", zap.Error(err)) + } else { + bot.logger.Info("reloaded alert config") } + mod.compileTemplates() }, BotAlertsKey) + if err != nil { + bot.logger.Error("could not set-up bot alert reload subscription", zap.Error(err)) + } // Subscriptions are handled with a slight delay as info come from different events and must be aggregated pendingSubs := make(map[string]subMixedEvent) @@ -234,190 +235,191 @@ func SetupAlerts(bot *Bot) *BotAlertsModule { } } - go bot.api.db.Subscribe(func(key, value string) { - if key == EventSubEventKey { - var ev eventSubNotification - err := json.UnmarshalFromString(value, &ev) - if err != nil { - bot.logger.Debug("error parsing webhook payload", zap.Error(err)) + err = bot.api.db.SubscribeKey(func(value string) { + var ev eventSubNotification + err := json.UnmarshalFromString(value, &ev) + if err != nil { + bot.logger.Debug("error parsing webhook payload", zap.Error(err)) + return + } + switch ev.Subscription.Type { + case helix.EventSubTypeChannelFollow: + // Only process if we care about follows + if !mod.Config.Follow.Enabled { return } - switch ev.Subscription.Type { - case helix.EventSubTypeChannelFollow: - // Only process if we care about follows - if !mod.Config.Follow.Enabled { - return - } - // Parse as follow event - var followEv helix.EventSubChannelFollowEvent - err := json.Unmarshal(ev.Event, &followEv) - if err != nil { - bot.logger.Debug("error parsing follow event", zap.Error(err)) - return - } - // Pick a random message - messageID := rand.Intn(len(mod.Config.Follow.Messages)) - // Pick compiled template or fallback to plain text - if tpl, ok := mod.templates.follow.messages[messageID]; ok { - writeTemplate(bot, tpl, &followEv) - } else { - bot.WriteMessage(mod.Config.Follow.Messages[messageID]) - } - // Compile template and send - case helix.EventSubTypeChannelRaid: - // Only process if we care about raids - if !mod.Config.Raid.Enabled { - return - } - // Parse as raid event - var raidEv helix.EventSubChannelRaidEvent - err := json.Unmarshal(ev.Event, &raidEv) - if err != nil { - bot.logger.Debug("error parsing raid event", zap.Error(err)) - return - } - // Pick a random message from base set - messageID := rand.Intn(len(mod.Config.Raid.Messages)) - tpl, ok := mod.templates.raid.messages[messageID] - if !ok { - // Broken template! - mod.bot.WriteMessage(mod.Config.Raid.Messages[messageID]) - return - } - // If we have variations, loop through all the available variations and pick the one with the highest minimum viewers that are met - if len(mod.Config.Raid.Variations) > 0 { - minViewers := -1 - for variationIndex, variation := range mod.Config.Raid.Variations { - if variation.MinViewers != nil && *variation.MinViewers > minViewers && raidEv.Viewers >= *variation.MinViewers { - // Make sure the template is valid - if varTemplates, ok := mod.templates.raid.variations[variationIndex]; ok { - if temp, ok := varTemplates[messageID]; ok { - tpl = temp - minViewers = *variation.MinViewers - } - } - } - } - } - // Compile template and send - writeTemplate(bot, tpl, &raidEv) - case helix.EventSubTypeChannelCheer: - // Only process if we care about bits - if !mod.Config.Cheer.Enabled { - return - } - // Parse as cheer event - var cheerEv helix.EventSubChannelCheerEvent - err := json.Unmarshal(ev.Event, &cheerEv) - if err != nil { - bot.logger.Debug("error parsing cheer event", zap.Error(err)) - return - } - // Pick a random message from base set - messageID := rand.Intn(len(mod.Config.Cheer.Messages)) - tpl, ok := mod.templates.cheer.messages[messageID] - if !ok { - // Broken template! - mod.bot.WriteMessage(mod.Config.Raid.Messages[messageID]) - return - } - // If we have variations, loop through all the available variations and pick the one with the highest minimum amount that is met - if len(mod.Config.Cheer.Variations) > 0 { - minAmount := -1 - for variationIndex, variation := range mod.Config.Cheer.Variations { - if variation.MinAmount != nil && *variation.MinAmount > minAmount && cheerEv.Bits >= *variation.MinAmount { - // Make sure the template is valid - if varTemplates, ok := mod.templates.cheer.variations[variationIndex]; ok { - if temp, ok := varTemplates[messageID]; ok { - tpl = temp - minAmount = *variation.MinAmount - } - } - } - } - } - // Compile template and send - writeTemplate(bot, tpl, &cheerEv) - case helix.EventSubTypeChannelSubscription: - // Only process if we care about subscriptions - if !mod.Config.Subscription.Enabled { - return - } - // Parse as subscription event - var subEv helix.EventSubChannelSubscribeEvent - err := json.Unmarshal(ev.Event, &subEv) - if err != nil { - bot.logger.Debug("error parsing sub event", zap.Error(err)) - return - } - addPendingSub(subEv) - case helix.EventSubTypeChannelSubscriptionMessage: - // Only process if we care about subscriptions - if !mod.Config.Subscription.Enabled { - return - } - // Parse as subscription event - var subEv helix.EventSubChannelSubscriptionMessageEvent - err := json.Unmarshal(ev.Event, &subEv) - if err != nil { - bot.logger.Debug("error parsing sub event", zap.Error(err)) - return - } - addPendingSub(subEv) - case helix.EventSubTypeChannelSubscriptionGift: - // Only process if we care about gifted subs - if !mod.Config.GiftSub.Enabled { - return - } - // Parse as gift event - var giftEv helix.EventSubChannelSubscriptionGiftEvent - err := json.Unmarshal(ev.Event, &giftEv) - if err != nil { - bot.logger.Debug("error parsing raid event", zap.Error(err)) - return - } - // Pick a random message from base set - messageID := rand.Intn(len(mod.Config.GiftSub.Messages)) - tpl, ok := mod.templates.gift.messages[messageID] - if !ok { - // Broken template! - mod.bot.WriteMessage(mod.Config.GiftSub.Messages[messageID]) - return - } - // If we have variations, loop through all the available variations and pick the one with the highest minimum cumulative total that are met - if len(mod.Config.GiftSub.Variations) > 0 { - if giftEv.IsAnonymous { - for variationIndex, variation := range mod.Config.GiftSub.Variations { - if variation.IsAnonymous != nil && *variation.IsAnonymous { - // Make sure template is valid - if varTemplates, ok := mod.templates.gift.variations[variationIndex]; ok { - if temp, ok := varTemplates[messageID]; ok { - tpl = temp - break - } - } - } - } - } else if giftEv.CumulativeTotal > 0 { - minCumulative := -1 - for variationIndex, variation := range mod.Config.GiftSub.Variations { - if variation.MinCumulative != nil && *variation.MinCumulative > minCumulative && giftEv.CumulativeTotal >= *variation.MinCumulative { - // Make sure the template is valid - if varTemplates, ok := mod.templates.gift.variations[variationIndex]; ok { - if temp, ok := varTemplates[messageID]; ok { - tpl = temp - minCumulative = *variation.MinCumulative - } - } - } - } - } - } - // Compile template and send - writeTemplate(bot, tpl, &giftEv) + // Parse as follow event + var followEv helix.EventSubChannelFollowEvent + err := json.Unmarshal(ev.Event, &followEv) + if err != nil { + bot.logger.Debug("error parsing follow event", zap.Error(err)) + return } + // Pick a random message + messageID := rand.Intn(len(mod.Config.Follow.Messages)) + // Pick compiled template or fallback to plain text + if tpl, ok := mod.templates.follow.messages[messageID]; ok { + writeTemplate(bot, tpl, &followEv) + } else { + bot.WriteMessage(mod.Config.Follow.Messages[messageID]) + } + // Compile template and send + case helix.EventSubTypeChannelRaid: + // Only process if we care about raids + if !mod.Config.Raid.Enabled { + return + } + // Parse as raid event + var raidEv helix.EventSubChannelRaidEvent + err := json.Unmarshal(ev.Event, &raidEv) + if err != nil { + bot.logger.Debug("error parsing raid event", zap.Error(err)) + return + } + // Pick a random message from base set + messageID := rand.Intn(len(mod.Config.Raid.Messages)) + tpl, ok := mod.templates.raid.messages[messageID] + if !ok { + // Broken template! + mod.bot.WriteMessage(mod.Config.Raid.Messages[messageID]) + return + } + // If we have variations, loop through all the available variations and pick the one with the highest minimum viewers that are met + if len(mod.Config.Raid.Variations) > 0 { + minViewers := -1 + for variationIndex, variation := range mod.Config.Raid.Variations { + if variation.MinViewers != nil && *variation.MinViewers > minViewers && raidEv.Viewers >= *variation.MinViewers { + // Make sure the template is valid + if varTemplates, ok := mod.templates.raid.variations[variationIndex]; ok { + if temp, ok := varTemplates[messageID]; ok { + tpl = temp + minViewers = *variation.MinViewers + } + } + } + } + } + // Compile template and send + writeTemplate(bot, tpl, &raidEv) + case helix.EventSubTypeChannelCheer: + // Only process if we care about bits + if !mod.Config.Cheer.Enabled { + return + } + // Parse as cheer event + var cheerEv helix.EventSubChannelCheerEvent + err := json.Unmarshal(ev.Event, &cheerEv) + if err != nil { + bot.logger.Debug("error parsing cheer event", zap.Error(err)) + return + } + // Pick a random message from base set + messageID := rand.Intn(len(mod.Config.Cheer.Messages)) + tpl, ok := mod.templates.cheer.messages[messageID] + if !ok { + // Broken template! + mod.bot.WriteMessage(mod.Config.Raid.Messages[messageID]) + return + } + // If we have variations, loop through all the available variations and pick the one with the highest minimum amount that is met + if len(mod.Config.Cheer.Variations) > 0 { + minAmount := -1 + for variationIndex, variation := range mod.Config.Cheer.Variations { + if variation.MinAmount != nil && *variation.MinAmount > minAmount && cheerEv.Bits >= *variation.MinAmount { + // Make sure the template is valid + if varTemplates, ok := mod.templates.cheer.variations[variationIndex]; ok { + if temp, ok := varTemplates[messageID]; ok { + tpl = temp + minAmount = *variation.MinAmount + } + } + } + } + } + // Compile template and send + writeTemplate(bot, tpl, &cheerEv) + case helix.EventSubTypeChannelSubscription: + // Only process if we care about subscriptions + if !mod.Config.Subscription.Enabled { + return + } + // Parse as subscription event + var subEv helix.EventSubChannelSubscribeEvent + err := json.Unmarshal(ev.Event, &subEv) + if err != nil { + bot.logger.Debug("error parsing sub event", zap.Error(err)) + return + } + addPendingSub(subEv) + case helix.EventSubTypeChannelSubscriptionMessage: + // Only process if we care about subscriptions + if !mod.Config.Subscription.Enabled { + return + } + // Parse as subscription event + var subEv helix.EventSubChannelSubscriptionMessageEvent + err := json.Unmarshal(ev.Event, &subEv) + if err != nil { + bot.logger.Debug("error parsing sub event", zap.Error(err)) + return + } + addPendingSub(subEv) + case helix.EventSubTypeChannelSubscriptionGift: + // Only process if we care about gifted subs + if !mod.Config.GiftSub.Enabled { + return + } + // Parse as gift event + var giftEv helix.EventSubChannelSubscriptionGiftEvent + err := json.Unmarshal(ev.Event, &giftEv) + if err != nil { + bot.logger.Debug("error parsing raid event", zap.Error(err)) + return + } + // Pick a random message from base set + messageID := rand.Intn(len(mod.Config.GiftSub.Messages)) + tpl, ok := mod.templates.gift.messages[messageID] + if !ok { + // Broken template! + mod.bot.WriteMessage(mod.Config.GiftSub.Messages[messageID]) + return + } + // If we have variations, loop through all the available variations and pick the one with the highest minimum cumulative total that are met + if len(mod.Config.GiftSub.Variations) > 0 { + if giftEv.IsAnonymous { + for variationIndex, variation := range mod.Config.GiftSub.Variations { + if variation.IsAnonymous != nil && *variation.IsAnonymous { + // Make sure template is valid + if varTemplates, ok := mod.templates.gift.variations[variationIndex]; ok { + if temp, ok := varTemplates[messageID]; ok { + tpl = temp + break + } + } + } + } + } else if giftEv.CumulativeTotal > 0 { + minCumulative := -1 + for variationIndex, variation := range mod.Config.GiftSub.Variations { + if variation.MinCumulative != nil && *variation.MinCumulative > minCumulative && giftEv.CumulativeTotal >= *variation.MinCumulative { + // Make sure the template is valid + if varTemplates, ok := mod.templates.gift.variations[variationIndex]; ok { + if temp, ok := varTemplates[messageID]; ok { + tpl = temp + minCumulative = *variation.MinCumulative + } + } + } + } + } + } + // Compile template and send + writeTemplate(bot, tpl, &giftEv) } }, EventSubEventKey) + if err != nil { + bot.logger.Error("could not setup twitch alert subscription", zap.Error(err)) + } bot.logger.Debug("loaded bot alerts") diff --git a/modules/twitch/modules.timer.go b/modules/twitch/modules.timer.go index c55bada..78273b9 100644 --- a/modules/twitch/modules.timer.go +++ b/modules/twitch/modules.timer.go @@ -55,16 +55,17 @@ func SetupTimers(bot *Bot) *BotTimerModule { bot.api.db.PutJSON(BotTimersKey, mod.Config) } - go bot.api.db.Subscribe(func(key, value string) { - if key == BotTimersKey { - err := json.UnmarshalFromString(value, &mod.Config) - if err != nil { - bot.logger.Debug("error reloading timer config", zap.Error(err)) - } else { - bot.logger.Info("reloaded timer config") - } + err = bot.api.db.SubscribeKey(func(value string) { + err := json.UnmarshalFromString(value, &mod.Config) + if err != nil { + bot.logger.Debug("error reloading timer config", zap.Error(err)) + } else { + bot.logger.Info("reloaded timer config") } }, BotTimersKey) + if err != nil { + bot.logger.Error("could not set-up timer reload subscription", zap.Error(err)) + } bot.logger.Debug("loaded timers", zap.Int("timers", len(mod.Config.Timers)))