diff --git a/twitch/client.eventsub.go b/twitch/client.eventsub.go index 85aef38..57f1595 100644 --- a/twitch/client.eventsub.go +++ b/twitch/client.eventsub.go @@ -5,6 +5,7 @@ import ( "time" "github.com/gorilla/websocket" + jsoniter "github.com/json-iterator/go" "github.com/nicklaw5/helix/v2" "go.uber.org/zap" @@ -12,22 +13,36 @@ import ( const websocketEndpoint = "wss://eventsub-beta.wss.twitch.tv/ws" -func (c *Client) connectWebsocket(userClient *helix.Client) { - connection, _, err := websocket.DefaultDialer.Dial(websocketEndpoint, nil) +func (c *Client) eventSubLoop(userClient *helix.Client) { + endpoint := websocketEndpoint + var err error + for endpoint != "" { + endpoint, err = c.connectWebsocket(endpoint, userClient) + if err != nil { + c.logger.Error("eventsub ws read error", zap.Error(err)) + break + } + } +} + +func (c *Client) connectWebsocket(url string, userClient *helix.Client) (string, error) { + connection, _, err := websocket.DefaultDialer.Dial(url, nil) if err != nil { c.logger.Error("could not connect to eventsub ws", zap.Error(err)) - return + return "", err } defer connection.Close() - received := make(chan []byte) - wsErr := make(chan error) - readFromWS := func(connection *websocket.Conn, recv chan<- []byte) { + received := make(chan []byte, 10) + wsErr := make(chan error, 1) + + readFromWS := func(connection *websocket.Conn, recv chan<- []byte, wsErr chan<- error) { for { messageType, messageData, err := connection.ReadMessage() if err != nil { - c.logger.Error("eventsub ws read error", zap.Error(err)) wsErr <- err + close(recv) + close(wsErr) return } if messageType != websocket.TextMessage { @@ -37,16 +52,16 @@ func (c *Client) connectWebsocket(userClient *helix.Client) { recv <- messageData } } - go readFromWS(connection, received) + go readFromWS(connection, received, wsErr) for { // Wait for next message or closing/error var messageData []byte select { case <-c.ctx.Done(): - return - case <-wsErr: - return + return "", nil + case err = <-wsErr: + return "", err case messageData = <-received: } @@ -83,16 +98,7 @@ func (c *Client) connectWebsocket(userClient *helix.Client) { } c.logger.Info("eventsub ws connection reset requested", zap.String("session-id", reconnectData.Session.Id), zap.String("reconnect-url", reconnectData.Session.ReconnectUrl)) - // Try reconnecting to the new URL - newConnection, _, err := websocket.DefaultDialer.Dial(reconnectData.Session.ReconnectUrl, nil) - if err != nil { - c.logger.Error("eventsub ws reconnect error", zap.Error(err)) - break - } else { - _ = connection.Close() - go readFromWS(newConnection, received) - connection = newConnection - } + return reconnectData.Session.ReconnectUrl, nil case "notification": go c.processEvent(wsMessage) case "revocation": diff --git a/twitch/client.go b/twitch/client.go index 768fe47..1fffffc 100644 --- a/twitch/client.go +++ b/twitch/client.go @@ -216,7 +216,7 @@ func newClient(config Config, db *database.LocalDBClient, server *http.Server, l client.logger.Error("no users found, please authenticate in Twitch configuration -> Events") } else { client.User = users.Data.Users[0] - go client.connectWebsocket(userClient) + go client.eventSubLoop(userClient) } } else { client.logger.Warn("twitch user not identified, this will break most features")