1
0
Fork 0
mirror of https://git.sr.ht/~ashkeel/strimertul synced 2024-09-18 01:50:50 +00:00

fix: don't block local client on Wait call

This commit is contained in:
Ash Keel 2023-02-16 00:21:34 +01:00
parent 0f93ac1c17
commit 3c107b9a32
No known key found for this signature in database
GPG key ID: BAD8D93E7314ED3E
2 changed files with 28 additions and 22 deletions

View file

@ -5,6 +5,7 @@ import (
"time"
"github.com/gorilla/websocket"
jsoniter "github.com/json-iterator/go"
"github.com/nicklaw5/helix/v2"
"go.uber.org/zap"
@ -12,22 +13,36 @@ import (
const websocketEndpoint = "wss://eventsub-beta.wss.twitch.tv/ws"
func (c *Client) connectWebsocket(userClient *helix.Client) {
connection, _, err := websocket.DefaultDialer.Dial(websocketEndpoint, nil)
func (c *Client) eventSubLoop(userClient *helix.Client) {
endpoint := websocketEndpoint
var err error
for endpoint != "" {
endpoint, err = c.connectWebsocket(endpoint, userClient)
if err != nil {
c.logger.Error("eventsub ws read error", zap.Error(err))
break
}
}
}
func (c *Client) connectWebsocket(url string, userClient *helix.Client) (string, error) {
connection, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
c.logger.Error("could not connect to eventsub ws", zap.Error(err))
return
return "", err
}
defer connection.Close()
received := make(chan []byte)
wsErr := make(chan error)
readFromWS := func(connection *websocket.Conn, recv chan<- []byte) {
received := make(chan []byte, 10)
wsErr := make(chan error, 1)
readFromWS := func(connection *websocket.Conn, recv chan<- []byte, wsErr chan<- error) {
for {
messageType, messageData, err := connection.ReadMessage()
if err != nil {
c.logger.Error("eventsub ws read error", zap.Error(err))
wsErr <- err
close(recv)
close(wsErr)
return
}
if messageType != websocket.TextMessage {
@ -37,16 +52,16 @@ func (c *Client) connectWebsocket(userClient *helix.Client) {
recv <- messageData
}
}
go readFromWS(connection, received)
go readFromWS(connection, received, wsErr)
for {
// Wait for next message or closing/error
var messageData []byte
select {
case <-c.ctx.Done():
return
case <-wsErr:
return
return "", nil
case err = <-wsErr:
return "", err
case messageData = <-received:
}
@ -83,16 +98,7 @@ func (c *Client) connectWebsocket(userClient *helix.Client) {
}
c.logger.Info("eventsub ws connection reset requested", zap.String("session-id", reconnectData.Session.Id), zap.String("reconnect-url", reconnectData.Session.ReconnectUrl))
// Try reconnecting to the new URL
newConnection, _, err := websocket.DefaultDialer.Dial(reconnectData.Session.ReconnectUrl, nil)
if err != nil {
c.logger.Error("eventsub ws reconnect error", zap.Error(err))
break
} else {
_ = connection.Close()
go readFromWS(newConnection, received)
connection = newConnection
}
return reconnectData.Session.ReconnectUrl, nil
case "notification":
go c.processEvent(wsMessage)
case "revocation":

View file

@ -216,7 +216,7 @@ func newClient(config Config, db *database.LocalDBClient, server *http.Server, l
client.logger.Error("no users found, please authenticate in Twitch configuration -> Events")
} else {
client.User = users.Data.Users[0]
go client.connectWebsocket(userClient)
go client.eventSubLoop(userClient)
}
} else {
client.logger.Warn("twitch user not identified, this will break most features")