From 6c1ba74b180987f9e2eebee062fa05ce2c182e33 Mon Sep 17 00:00:00 2001 From: Ash Keel Date: Thu, 4 May 2023 15:14:21 +0200 Subject: [PATCH] refactor: split off some more functions --- docs/data.go | 37 +++--- twitch/bot.alerts.go | 252 ++++++++++++++++++++------------------ twitch/bot.go | 195 +++++++++++++++-------------- twitch/bot.timer.go | 88 ++++++------- twitch/client.eventsub.go | 115 +++++++++-------- 5 files changed, 361 insertions(+), 326 deletions(-) diff --git a/docs/data.go b/docs/data.go index edbee66..928f38b 100644 --- a/docs/data.go +++ b/docs/data.go @@ -86,7 +86,11 @@ func parseType(typ reflect.Type) (out DataObject) { } out.Kind = getKind(typ.Kind()) - if out.Kind == KindArray || out.Kind == KindDict { + + switch out.Kind { + case KindStruct: + out.Keys = parseStruct(typ) + case KindArray, KindDict: elem := parseType(typ.Elem()) out.Element = &elem if out.Kind == KindDict { @@ -94,19 +98,22 @@ func parseType(typ reflect.Type) (out DataObject) { out.Key = &key } } - if out.Kind == KindStruct { - for index := 0; index < typ.NumField(); index++ { - field := typ.Field(index) - obj := parseType(field.Type) - if jsonName, ok := field.Tag.Lookup("json"); ok { - parts := strings.SplitN(jsonName, ",", 2) - obj.Name = parts[0] - } else { - obj.Name = field.Name - } - obj.Description = field.Tag.Get("desc") - out.Keys = append(out.Keys, obj) - } - } + return } + +func parseStruct(typ reflect.Type) (out []DataObject) { + for index := 0; index < typ.NumField(); index++ { + field := typ.Field(index) + obj := parseType(field.Type) + if jsonName, ok := field.Tag.Lookup("json"); ok { + parts := strings.SplitN(jsonName, ",", 2) + obj.Name = parts[0] + } else { + obj.Name = field.Name + } + obj.Description = field.Tag.Get("desc") + out = append(out, obj) + } + return out +} diff --git a/twitch/bot.alerts.go b/twitch/bot.alerts.go index 0529ba6..0799a21 100644 --- a/twitch/bot.alerts.go +++ b/twitch/bot.alerts.go @@ -87,11 +87,16 @@ type BotAlertsModule struct { cancelAlertSub database.CancelFunc cancelTwitchEventSub database.CancelFunc + + pendingMux sync.Mutex + pendingSubs map[string]subMixedEvent } func SetupAlerts(bot *Bot) *BotAlertsModule { mod := &BotAlertsModule{ - bot: bot, + bot: bot, + pendingMux: sync.Mutex{}, + pendingSubs: make(map[string]subMixedEvent), } // Load config from database @@ -111,7 +116,7 @@ func SetupAlerts(bot *Bot) *BotAlertsModule { err, mod.cancelAlertSub = bot.api.db.SubscribeKey(BotAlertsKey, func(value string) { err := json.UnmarshalFromString(value, &mod.Config) if err != nil { - bot.logger.Debug("Error reloading timer config", zap.Error(err)) + bot.logger.Warn("Error loading alert config", zap.Error(err)) } else { bot.logger.Info("Reloaded alert config") } @@ -121,122 +126,11 @@ func SetupAlerts(bot *Bot) *BotAlertsModule { bot.logger.Error("Could not set-up bot alert reload subscription", zap.Error(err)) } - // Subscriptions are handled with a slight delay as info come from different events and must be aggregated - pendingSubs := make(map[string]subMixedEvent) - pendingMux := sync.Mutex{} - processPendingSub := func(user string) { - pendingMux.Lock() - defer pendingMux.Unlock() - sub, ok := pendingSubs[user] - defer delete(pendingSubs, user) - if !ok { - // Somehow it's gone? Return early - return - } - // One last check in case config changed - if !mod.Config.Subscription.Enabled { - return - } - // Assign random message - messageID := rand.Intn(len(mod.Config.Subscription.Messages)) - tpl, ok := mod.templates[templateTypeSubscription][mod.Config.Subscription.Messages[messageID]] - // If template is broken, write it as is (soft fail, plus we raise attention I guess?) - if !ok { - mod.bot.WriteMessage(mod.Config.Subscription.Messages[messageID]) - return - } - // Check for variations, either by streak or gifted - if sub.IsGift { - for _, variation := range mod.Config.Subscription.Variations { - if variation.IsGifted != nil && *variation.IsGifted { - // Get random template from variations - messageID = rand.Intn(len(variation.Messages)) - // Make sure template is valid - if temp, ok := mod.templates[templateTypeSubscription][variation.Messages[messageID]]; ok { - tpl = temp - break - } - } - } - } else if sub.DurationMonths > 0 { - minMonths := -1 - for _, variation := range mod.Config.Subscription.Variations { - if variation.MinStreak != nil && sub.DurationMonths >= *variation.MinStreak && sub.DurationMonths >= minMonths { - // Get random template from variations - messageID = rand.Intn(len(variation.Messages)) - // Make sure template is valid - if temp, ok := mod.templates[templateTypeSubscription][variation.Messages[messageID]]; ok { - tpl = temp - minMonths = *variation.MinStreak - } - } - } - } - writeTemplate(bot, tpl, sub) - } - addPendingSub := func(ev any) { - switch sub := ev.(type) { - case helix.EventSubChannelSubscribeEvent: - pendingMux.Lock() - defer pendingMux.Unlock() - if ev, ok := pendingSubs[sub.UserID]; ok { - // Already pending, add extra data - ev.IsGift = sub.IsGift - pendingSubs[sub.UserID] = ev - return - } - pendingSubs[sub.UserID] = subMixedEvent{ - UserID: sub.UserID, - UserLogin: sub.UserLogin, - UserName: sub.UserName, - BroadcasterUserID: sub.BroadcasterUserID, - BroadcasterUserLogin: sub.BroadcasterUserLogin, - BroadcasterUserName: sub.BroadcasterUserName, - Tier: sub.Tier, - IsGift: sub.IsGift, - } - go func() { - // Wait a bit to make sure we aggregate all events - time.Sleep(time.Second * 3) - processPendingSub(sub.UserID) - }() - case helix.EventSubChannelSubscriptionMessageEvent: - pendingMux.Lock() - defer pendingMux.Unlock() - if ev, ok := pendingSubs[sub.UserID]; ok { - // Already pending, add extra data - ev.StreakMonths = sub.StreakMonths - ev.DurationMonths = sub.DurationMonths - ev.CumulativeMonths = sub.CumulativeMonths - ev.Message = sub.Message - return - } - pendingSubs[sub.UserID] = subMixedEvent{ - UserID: sub.UserID, - UserLogin: sub.UserLogin, - UserName: sub.UserName, - BroadcasterUserID: sub.BroadcasterUserID, - BroadcasterUserLogin: sub.BroadcasterUserLogin, - BroadcasterUserName: sub.BroadcasterUserName, - Tier: sub.Tier, - StreakMonths: sub.StreakMonths, - DurationMonths: sub.DurationMonths, - CumulativeMonths: sub.CumulativeMonths, - Message: sub.Message, - } - go func() { - // Wait a bit to make sure we aggregate all events - time.Sleep(time.Second * 3) - processPendingSub(sub.UserID) - }() - } - } - err, mod.cancelTwitchEventSub = bot.api.db.SubscribeKey(EventSubEventKey, func(value string) { var ev eventSubNotification err := json.UnmarshalFromString(value, &ev) if err != nil { - bot.logger.Debug("Error parsing webhook payload", zap.Error(err)) + bot.logger.Warn("Error parsing webhook payload", zap.Error(err)) return } switch ev.Subscription.Type { @@ -249,7 +143,7 @@ func SetupAlerts(bot *Bot) *BotAlertsModule { var followEv helix.EventSubChannelFollowEvent err := json.Unmarshal(ev.Event, &followEv) if err != nil { - bot.logger.Debug("Error parsing follow event", zap.Error(err)) + bot.logger.Warn("Error parsing follow event", zap.Error(err)) return } // Pick a random message @@ -270,7 +164,7 @@ func SetupAlerts(bot *Bot) *BotAlertsModule { var raidEv helix.EventSubChannelRaidEvent err := json.Unmarshal(ev.Event, &raidEv) if err != nil { - bot.logger.Debug("Error parsing raid event", zap.Error(err)) + bot.logger.Warn("Error parsing raid event", zap.Error(err)) return } // Pick a random message from base set @@ -306,7 +200,7 @@ func SetupAlerts(bot *Bot) *BotAlertsModule { var cheerEv helix.EventSubChannelCheerEvent err := json.Unmarshal(ev.Event, &cheerEv) if err != nil { - bot.logger.Debug("Error parsing cheer event", zap.Error(err)) + bot.logger.Warn("Error parsing cheer event", zap.Error(err)) return } // Pick a random message from base set @@ -342,10 +236,10 @@ func SetupAlerts(bot *Bot) *BotAlertsModule { var subEv helix.EventSubChannelSubscribeEvent err := json.Unmarshal(ev.Event, &subEv) if err != nil { - bot.logger.Debug("Error parsing new subscription event", zap.Error(err)) + bot.logger.Warn("Error parsing new subscription event", zap.Error(err)) return } - addPendingSub(subEv) + mod.addMixedEvent(subEv) case helix.EventSubTypeChannelSubscriptionMessage: // Only process if we care about subscriptions if !mod.Config.Subscription.Enabled { @@ -355,10 +249,10 @@ func SetupAlerts(bot *Bot) *BotAlertsModule { var subEv helix.EventSubChannelSubscriptionMessageEvent err := json.Unmarshal(ev.Event, &subEv) if err != nil { - bot.logger.Debug("Error parsing returning subscription event", zap.Error(err)) + bot.logger.Warn("Error parsing returning subscription event", zap.Error(err)) return } - addPendingSub(subEv) + mod.addMixedEvent(subEv) case helix.EventSubTypeChannelSubscriptionGift: // Only process if we care about gifted subs if !mod.Config.GiftSub.Enabled { @@ -368,7 +262,7 @@ func SetupAlerts(bot *Bot) *BotAlertsModule { var giftEv helix.EventSubChannelSubscriptionGiftEvent err := json.Unmarshal(ev.Event, &giftEv) if err != nil { - bot.logger.Debug("Error parsing subscription gifted event", zap.Error(err)) + bot.logger.Warn("Error parsing subscription gifted event", zap.Error(err)) return } // Pick a random message from base set @@ -419,6 +313,120 @@ func SetupAlerts(bot *Bot) *BotAlertsModule { return mod } +// Subscriptions are handled with a slight delay as info come from different events and must be aggregated +func (m *BotAlertsModule) addMixedEvent(event any) { + switch sub := event.(type) { + case helix.EventSubChannelSubscribeEvent: + m.pendingMux.Lock() + defer m.pendingMux.Unlock() + if ev, ok := m.pendingSubs[sub.UserID]; ok { + // Already pending, add extra data + ev.IsGift = sub.IsGift + m.pendingSubs[sub.UserID] = ev + return + } + m.pendingSubs[sub.UserID] = subMixedEvent{ + UserID: sub.UserID, + UserLogin: sub.UserLogin, + UserName: sub.UserName, + BroadcasterUserID: sub.BroadcasterUserID, + BroadcasterUserLogin: sub.BroadcasterUserLogin, + BroadcasterUserName: sub.BroadcasterUserName, + Tier: sub.Tier, + IsGift: sub.IsGift, + } + go func() { + // Wait a bit to make sure we aggregate all events + time.Sleep(time.Second * 3) + m.processPendingSub(sub.UserID) + }() + case helix.EventSubChannelSubscriptionMessageEvent: + m.pendingMux.Lock() + defer m.pendingMux.Unlock() + if ev, ok := m.pendingSubs[sub.UserID]; ok { + // Already pending, add extra data + ev.StreakMonths = sub.StreakMonths + ev.DurationMonths = sub.DurationMonths + ev.CumulativeMonths = sub.CumulativeMonths + ev.Message = sub.Message + return + } + m.pendingSubs[sub.UserID] = subMixedEvent{ + UserID: sub.UserID, + UserLogin: sub.UserLogin, + UserName: sub.UserName, + BroadcasterUserID: sub.BroadcasterUserID, + BroadcasterUserLogin: sub.BroadcasterUserLogin, + BroadcasterUserName: sub.BroadcasterUserName, + Tier: sub.Tier, + StreakMonths: sub.StreakMonths, + DurationMonths: sub.DurationMonths, + CumulativeMonths: sub.CumulativeMonths, + Message: sub.Message, + } + go func() { + // Wait a bit to make sure we aggregate all events + time.Sleep(time.Second * 3) + m.processPendingSub(sub.UserID) + }() + } +} + +func (m *BotAlertsModule) processPendingSub(user string) { + m.pendingMux.Lock() + defer m.pendingMux.Unlock() + sub, ok := m.pendingSubs[user] + defer delete(m.pendingSubs, user) + if !ok { + // Somehow it's gone? Return early + return + } + + // One last check in case config changed + if !m.Config.Subscription.Enabled { + return + } + + // Assign random message + messageID := rand.Intn(len(m.Config.Subscription.Messages)) + tpl, ok := m.templates[templateTypeSubscription][m.Config.Subscription.Messages[messageID]] + + // If template is broken, write it as is (soft fail, plus we raise attention I guess?) + if !ok { + m.bot.WriteMessage(m.Config.Subscription.Messages[messageID]) + return + } + + // Check for variations, either by streak or gifted + if sub.IsGift { + for _, variation := range m.Config.Subscription.Variations { + if variation.IsGifted != nil && *variation.IsGifted { + // Get random template from variations + messageID = rand.Intn(len(variation.Messages)) + // Make sure template is valid + if temp, ok := m.templates[templateTypeSubscription][variation.Messages[messageID]]; ok { + tpl = temp + break + } + } + } + } else if sub.DurationMonths > 0 { + minMonths := -1 + for _, variation := range m.Config.Subscription.Variations { + if variation.MinStreak != nil && sub.DurationMonths >= *variation.MinStreak && sub.DurationMonths >= minMonths { + // Get random template from variations + messageID = rand.Intn(len(variation.Messages)) + // Make sure template is valid + if temp, ok := m.templates[templateTypeSubscription][variation.Messages[messageID]]; ok { + tpl = temp + minMonths = *variation.MinStreak + } + } + } + } + writeTemplate(m.bot, tpl, sub) +} + func (m *BotAlertsModule) compileTemplates() { m.templates = templateCacheMap{ templateTypeSubscription: make(templateCache), diff --git a/twitch/bot.go b/twitch/bot.go index 14cc39a..a8d1358 100644 --- a/twitch/bot.go +++ b/twitch/bot.go @@ -79,101 +79,10 @@ func newBot(api *Client, config BotConfig) *Bot { OnMessage: utils.NewPubSub[BotMessageHandler](), } - client.OnConnect(func() { - for _, handler := range bot.OnConnect.Subscribers() { - if handler != nil { - handler.HandleBotConnect() - } - } - }) - - client.OnPrivateMessage(func(message irc.PrivateMessage) { - for _, handler := range bot.OnMessage.Subscribers() { - if handler != nil { - handler.HandleBotMessage(message) - } - } - - // Ignore messages for a while or twitch will get mad! - if message.Time.Before(bot.lastMessage.Get().Add(time.Second * 2)) { - bot.logger.Debug("Message received too soon, ignoring") - return - } - - lowercaseMessage := strings.ToLower(message.Message) - - // Check if it's a command - if strings.HasPrefix(message.Message, "!") { - // Run through supported commands - for cmd, data := range bot.commands.Copy() { - if !data.Enabled { - continue - } - if !strings.HasPrefix(lowercaseMessage, cmd) { - continue - } - parts := strings.SplitN(lowercaseMessage, " ", 2) - if parts[0] != cmd { - continue - } - go data.Handler(bot, message) - bot.lastMessage.Set(time.Now()) - } - } - - // Run through custom commands - for cmd, data := range bot.customCommands.Copy() { - if !data.Enabled { - continue - } - lc := strings.ToLower(cmd) - if !strings.HasPrefix(lowercaseMessage, lc) { - continue - } - parts := strings.SplitN(lowercaseMessage, " ", 2) - if parts[0] != lc { - continue - } - go cmdCustom(bot, cmd, data, message) - bot.lastMessage.Set(time.Now()) - } - - err := bot.api.db.PutJSON(ChatEventKey, message) - if err != nil { - bot.logger.Warn("Could not save chat message to key", zap.String("key", ChatEventKey), zap.Error(err)) - } - if bot.Config.ChatHistory > 0 { - history := bot.chatHistory.Get() - if len(history) >= bot.Config.ChatHistory { - history = history[len(history)-bot.Config.ChatHistory+1:] - } - bot.chatHistory.Set(append(history, message)) - err = bot.api.db.PutJSON(ChatHistoryKey, bot.chatHistory.Get()) - if err != nil { - bot.logger.Warn("Could not save message to chat history", zap.Error(err)) - } - } - - if bot.Timers != nil { - go bot.Timers.OnMessage(message) - } - }) - - client.OnUserJoinMessage(func(message irc.UserJoinMessage) { - if strings.ToLower(message.User) == bot.username { - bot.logger.Info("Twitch bot joined channel", zap.String("channel", message.Channel)) - } else { - bot.logger.Debug("User joined channel", zap.String("channel", message.Channel), zap.String("username", message.User)) - } - }) - - client.OnUserPartMessage(func(message irc.UserPartMessage) { - if strings.ToLower(message.User) == bot.username { - bot.logger.Info("Twitch bot left channel", zap.String("channel", message.Channel)) - } else { - bot.logger.Debug("User left channel", zap.String("channel", message.Channel), zap.String("username", message.User)) - } - }) + client.OnConnect(bot.onConnectHandler) + client.OnPrivateMessage(bot.onMessageHandler) + client.OnUserJoinMessage(bot.onJoinHandler) + client.OnUserPartMessage(bot.onPartHandler) bot.Client.Join(config.Channel) bot.setupFunctions() @@ -210,6 +119,102 @@ func newBot(api *Client, config BotConfig) *Bot { return bot } +func (b *Bot) onJoinHandler(message irc.UserJoinMessage) { + if strings.ToLower(message.User) == b.username { + b.logger.Info("Twitch bot joined channel", zap.String("channel", message.Channel)) + } else { + b.logger.Debug("User joined channel", zap.String("channel", message.Channel), zap.String("username", message.User)) + } +} + +func (b *Bot) onPartHandler(message irc.UserPartMessage) { + if strings.ToLower(message.User) == b.username { + b.logger.Info("Twitch bot left channel", zap.String("channel", message.Channel)) + } else { + b.logger.Debug("User left channel", zap.String("channel", message.Channel), zap.String("username", message.User)) + } +} + +func (b *Bot) onMessageHandler(message irc.PrivateMessage) { + for _, handler := range b.OnMessage.Subscribers() { + if handler != nil { + handler.HandleBotMessage(message) + } + } + + // Ignore messages for a while or twitch will get mad! + if message.Time.Before(b.lastMessage.Get().Add(time.Second * 2)) { + b.logger.Debug("Message received too soon, ignoring") + return + } + + lowercaseMessage := strings.ToLower(message.Message) + + // Check if it's a command + if strings.HasPrefix(message.Message, "!") { + // Run through supported commands + for cmd, data := range b.commands.Copy() { + if !data.Enabled { + continue + } + if !strings.HasPrefix(lowercaseMessage, cmd) { + continue + } + parts := strings.SplitN(lowercaseMessage, " ", 2) + if parts[0] != cmd { + continue + } + go data.Handler(b, message) + b.lastMessage.Set(time.Now()) + } + } + + // Run through custom commands + for cmd, data := range b.customCommands.Copy() { + if !data.Enabled { + continue + } + lc := strings.ToLower(cmd) + if !strings.HasPrefix(lowercaseMessage, lc) { + continue + } + parts := strings.SplitN(lowercaseMessage, " ", 2) + if parts[0] != lc { + continue + } + go cmdCustom(b, cmd, data, message) + b.lastMessage.Set(time.Now()) + } + + err := b.api.db.PutJSON(ChatEventKey, message) + if err != nil { + b.logger.Warn("Could not save chat message to key", zap.String("key", ChatEventKey), zap.Error(err)) + } + if b.Config.ChatHistory > 0 { + history := b.chatHistory.Get() + if len(history) >= b.Config.ChatHistory { + history = history[len(history)-b.Config.ChatHistory+1:] + } + b.chatHistory.Set(append(history, message)) + err = b.api.db.PutJSON(ChatHistoryKey, b.chatHistory.Get()) + if err != nil { + b.logger.Warn("Could not save message to chat history", zap.Error(err)) + } + } + + if b.Timers != nil { + go b.Timers.OnMessage(message) + } +} + +func (b *Bot) onConnectHandler() { + for _, handler := range b.OnConnect.Subscribers() { + if handler != nil { + handler.HandleBotConnect() + } + } +} + func (b *Bot) Close() error { if b.cancelUpdateSub != nil { b.cancelUpdateSub() diff --git a/twitch/bot.timer.go b/twitch/bot.timer.go index a462735..410b043 100644 --- a/twitch/bot.timer.go +++ b/twitch/bot.timer.go @@ -110,53 +110,57 @@ func (m *BotTimerModule) runTimers() { activity := m.currentChatActivity() // Reset timer - func() { - index := time.Now().Minute() % AverageMessageWindow - messages := m.messages.Get() - messages[index] = 0 - m.messages.Set(messages) - }() + index := time.Now().Minute() % AverageMessageWindow + messages := m.messages.Get() + messages[index] = 0 + m.messages.Set(messages) // Run timers - func() { - now := time.Now() - for name, timer := range m.Config.Timers { - // Must be enabled - if !timer.Enabled { - continue - } - // Check if enough time has passed - lastTriggeredTime, ok := m.lastTrigger.GetKey(name) - if !ok { - // If it's the first time we're checking it, start the cooldown - lastTriggeredTime = time.Now() - m.lastTrigger.SetKey(name, lastTriggeredTime) - } - minDelay := timer.MinimumDelay - if minDelay < 60 { - minDelay = 60 - } - if now.Sub(lastTriggeredTime) < time.Duration(minDelay)*time.Second { - continue - } - // Make sure chat activity is high enough - if activity < timer.MinimumChatActivity { - continue - } - - // Pick a random message - message := timer.Messages[rand.Intn(len(timer.Messages))] - - // Write message to chat - m.bot.WriteMessage(message) - - // Update last trigger - m.lastTrigger.SetKey(name, now) - } - }() + for name, timer := range m.Config.Timers { + m.ProcessTimer(name, timer, activity) + } } } +func (m *BotTimerModule) ProcessTimer(name string, timer BotTimer, activity int) { + // Must be enabled + if !timer.Enabled { + return + } + + // Check if enough time has passed + lastTriggeredTime, ok := m.lastTrigger.GetKey(name) + if !ok { + // If it's the first time we're checking it, start the cooldown + lastTriggeredTime = time.Now() + m.lastTrigger.SetKey(name, lastTriggeredTime) + } + + minDelay := timer.MinimumDelay + if minDelay < 60 { + minDelay = 60 + } + + now := time.Now() + if now.Sub(lastTriggeredTime) < time.Duration(minDelay)*time.Second { + return + } + + // Make sure chat activity is high enough + if activity < timer.MinimumChatActivity { + return + } + + // Pick a random message + message := timer.Messages[rand.Intn(len(timer.Messages))] + + // Write message to chat + m.bot.WriteMessage(message) + + // Update last trigger + m.lastTrigger.SetKey(name, now) +} + func (m *BotTimerModule) Close() { if m.cancelTimerSub != nil { m.cancelTimerSub() diff --git a/twitch/client.eventsub.go b/twitch/client.eventsub.go index 96d2aec..fcb46b3 100644 --- a/twitch/client.eventsub.go +++ b/twitch/client.eventsub.go @@ -29,6 +29,23 @@ func (c *Client) eventSubLoop(userClient *helix.Client) { } } +func readLoop(connection *websocket.Conn, recv chan<- []byte, wsErr chan<- error) { + for { + messageType, messageData, err := connection.ReadMessage() + if err != nil { + wsErr <- err + close(recv) + close(wsErr) + return + } + if messageType != websocket.TextMessage { + continue + } + + recv <- messageData + } +} + func (c *Client) connectWebsocket(url string, oldConnection *websocket.Conn, userClient *helix.Client) (string, *websocket.Conn, error) { connection, _, err := websocket.DefaultDialer.Dial(url, nil) if err != nil { @@ -39,23 +56,7 @@ func (c *Client) connectWebsocket(url string, oldConnection *websocket.Conn, use received := make(chan []byte, 10) wsErr := make(chan error, 1) - readFromWS := func(connection *websocket.Conn, recv chan<- []byte, wsErr chan<- error) { - for { - messageType, messageData, err := connection.ReadMessage() - if err != nil { - wsErr <- err - close(recv) - close(wsErr) - return - } - if messageType != websocket.TextMessage { - continue - } - - recv <- messageData - } - } - go readFromWS(connection, received, wsErr) + go readLoop(connection, received, wsErr) for { // Wait for next message or closing/error @@ -75,45 +76,55 @@ func (c *Client) connectWebsocket(url string, oldConnection *websocket.Conn, use continue } - switch wsMessage.Metadata.MessageType { - case "session_keepalive": - // Nothing to do - case "session_welcome": - var welcomeData WelcomeMessagePayload - err = json.Unmarshal(wsMessage.Payload, &welcomeData) - if err != nil { - c.logger.Error("Error decoding EventSub message", zap.String("message-type", wsMessage.Metadata.MessageType), zap.Error(err)) - break - } - c.logger.Info("Connection to EventSub websocket established", zap.String("session-id", welcomeData.Session.Id)) - - if oldConnection != nil { - utils.Close(oldConnection, c.logger) - } - // Add subscription to websocket session - err = c.addSubscriptionsForSession(userClient, welcomeData.Session.Id) - if err != nil { - c.logger.Error("Could not add subscriptions", zap.Error(err)) - break - } - case "session_reconnect": - var reconnectData WelcomeMessagePayload - err = json.Unmarshal(wsMessage.Payload, &reconnectData) - if err != nil { - c.logger.Error("Error decoding EventSub message", zap.String("message-type", wsMessage.Metadata.MessageType), zap.Error(err)) - break - } - c.logger.Info("EventSub websocket requested a reconnection", zap.String("session-id", reconnectData.Session.Id), zap.String("reconnect-url", reconnectData.Session.ReconnectUrl)) - - return reconnectData.Session.ReconnectUrl, connection, nil - case "notification": - go c.processEvent(wsMessage) - case "revocation": - // TODO idk what to do here + reconnectURL, err, done := c.processMessage(wsMessage, oldConnection, userClient) + if done { + return reconnectURL, connection, err } } } +func (c *Client) processMessage(wsMessage EventSubWebsocketMessage, oldConnection *websocket.Conn, userClient *helix.Client) (string, error, bool) { + switch wsMessage.Metadata.MessageType { + case "session_keepalive": + // Nothing to do + case "session_welcome": + var welcomeData WelcomeMessagePayload + err := json.Unmarshal(wsMessage.Payload, &welcomeData) + if err != nil { + c.logger.Error("Error decoding EventSub message", zap.String("message-type", wsMessage.Metadata.MessageType), zap.Error(err)) + break + } + c.logger.Info("Connection to EventSub websocket established", zap.String("session-id", welcomeData.Session.Id)) + + // We can only close the old connection once the new one has been established + if oldConnection != nil { + utils.Close(oldConnection, c.logger) + } + + // Add subscription to websocket session + err = c.addSubscriptionsForSession(userClient, welcomeData.Session.Id) + if err != nil { + c.logger.Error("Could not add subscriptions", zap.Error(err)) + break + } + case "session_reconnect": + var reconnectData WelcomeMessagePayload + err := json.Unmarshal(wsMessage.Payload, &reconnectData) + if err != nil { + c.logger.Error("Error decoding EventSub message", zap.String("message-type", wsMessage.Metadata.MessageType), zap.Error(err)) + break + } + c.logger.Info("EventSub websocket requested a reconnection", zap.String("session-id", reconnectData.Session.Id), zap.String("reconnect-url", reconnectData.Session.ReconnectUrl)) + + return reconnectData.Session.ReconnectUrl, nil, true + case "notification": + go c.processEvent(wsMessage) + case "revocation": + // TODO idk what to do here + } + return "", nil, false +} + func (c *Client) processEvent(message EventSubWebsocketMessage) { // Check if we processed this already if message.Metadata.MessageId != "" {