2022-11-23 15:34:49 +00:00
package twitch
2022-11-23 21:22:49 +00:00
import (
"fmt"
"time"
2023-11-10 20:36:15 +00:00
"git.sr.ht/~ashkeel/strimertul/utils"
2023-02-23 09:50:52 +00:00
2022-11-23 21:22:49 +00:00
"github.com/gorilla/websocket"
2022-12-04 13:45:34 +00:00
jsoniter "github.com/json-iterator/go"
2023-05-17 17:16:37 +00:00
"github.com/nicklaw5/helix/v2"
2022-12-04 13:45:34 +00:00
"go.uber.org/zap"
2022-11-23 21:22:49 +00:00
)
2023-05-17 16:44:59 +00:00
const websocketEndpoint = "wss://eventsub.wss.twitch.tv/ws"
2022-11-23 21:22:49 +00:00
2023-02-15 23:21:34 +00:00
func ( c * Client ) eventSubLoop ( userClient * helix . Client ) {
endpoint := websocketEndpoint
var err error
2023-03-05 19:11:19 +00:00
var connection * websocket . Conn
2023-02-15 23:21:34 +00:00
for endpoint != "" {
2023-03-05 19:11:19 +00:00
endpoint , connection , err = c . connectWebsocket ( endpoint , connection , userClient )
2023-02-15 23:21:34 +00:00
if err != nil {
2023-04-19 13:27:13 +00:00
c . logger . Error ( "EventSub websocket read error" , zap . Error ( err ) )
2023-02-15 23:21:34 +00:00
}
}
2023-03-09 12:17:18 +00:00
if connection != nil {
utils . Close ( connection , c . logger )
}
2023-02-15 23:21:34 +00:00
}
2023-05-04 13:14:21 +00:00
func readLoop ( connection * websocket . Conn , recv chan <- [ ] byte , wsErr chan <- error ) {
for {
messageType , messageData , err := connection . ReadMessage ( )
if err != nil {
wsErr <- err
close ( recv )
close ( wsErr )
return
}
if messageType != websocket . TextMessage {
continue
}
recv <- messageData
}
}
2023-03-05 19:11:19 +00:00
func ( c * Client ) connectWebsocket ( url string , oldConnection * websocket . Conn , userClient * helix . Client ) ( string , * websocket . Conn , error ) {
2023-02-15 23:21:34 +00:00
connection , _ , err := websocket . DefaultDialer . Dial ( url , nil )
2022-11-23 21:22:49 +00:00
if err != nil {
2023-04-19 13:27:13 +00:00
c . logger . Error ( "Could not establish a connection to the EventSub websocket" , zap . Error ( err ) )
2023-03-05 19:11:19 +00:00
return "" , nil , err
2022-11-23 21:22:49 +00:00
}
2023-02-15 23:21:34 +00:00
received := make ( chan [ ] byte , 10 )
wsErr := make ( chan error , 1 )
2023-05-04 13:14:21 +00:00
go readLoop ( connection , received , wsErr )
2022-12-03 15:16:59 +00:00
for {
// Wait for next message or closing/error
var messageData [ ] byte
select {
case <- c . ctx . Done ( ) :
2023-03-05 19:11:19 +00:00
return "" , nil , nil
2023-02-15 23:21:34 +00:00
case err = <- wsErr :
2023-04-19 13:27:13 +00:00
return url , nil , err // Return the endpoint so we can reconnect
2022-12-03 15:16:59 +00:00
case messageData = <- received :
2022-11-23 21:22:49 +00:00
}
var wsMessage EventSubWebsocketMessage
err = json . Unmarshal ( messageData , & wsMessage )
if err != nil {
2023-04-19 13:27:13 +00:00
c . logger . Error ( "Error decoding EventSub message" , zap . Error ( err ) )
2022-11-23 21:22:49 +00:00
continue
}
2024-02-25 13:58:35 +00:00
reconnectURL , done , err := c . processMessage ( wsMessage , oldConnection , userClient )
2023-05-04 13:14:21 +00:00
if done {
return reconnectURL , connection , err
2022-11-23 21:22:49 +00:00
}
}
}
2024-02-25 13:58:35 +00:00
func ( c * Client ) processMessage ( wsMessage EventSubWebsocketMessage , oldConnection * websocket . Conn , userClient * helix . Client ) ( string , bool , error ) {
2023-05-04 13:14:21 +00:00
switch wsMessage . Metadata . MessageType {
case "session_keepalive" :
// Nothing to do
case "session_welcome" :
var welcomeData WelcomeMessagePayload
err := json . Unmarshal ( wsMessage . Payload , & welcomeData )
if err != nil {
2023-05-05 13:02:23 +00:00
c . logger . Error ( "Error decoding EventSub welcome message" , zap . String ( "message-type" , wsMessage . Metadata . MessageType ) , zap . Error ( err ) )
2023-05-04 13:14:21 +00:00
break
}
2024-02-25 13:58:35 +00:00
c . logger . Info ( "Connection to EventSub websocket established" , zap . String ( "session-id" , welcomeData . Session . ID ) )
2023-05-04 13:14:21 +00:00
// We can only close the old connection once the new one has been established
if oldConnection != nil {
utils . Close ( oldConnection , c . logger )
}
// Add subscription to websocket session
2024-02-25 13:58:35 +00:00
err = c . addSubscriptionsForSession ( userClient , welcomeData . Session . ID )
2023-05-04 13:14:21 +00:00
if err != nil {
c . logger . Error ( "Could not add subscriptions" , zap . Error ( err ) )
break
}
case "session_reconnect" :
var reconnectData WelcomeMessagePayload
err := json . Unmarshal ( wsMessage . Payload , & reconnectData )
if err != nil {
2023-05-05 13:02:23 +00:00
c . logger . Error ( "Error decoding EventSub session reconnect parameters" , zap . String ( "message-type" , wsMessage . Metadata . MessageType ) , zap . Error ( err ) )
2023-05-04 13:14:21 +00:00
break
}
2024-02-25 13:58:35 +00:00
c . logger . Info ( "EventSub websocket requested a reconnection" , zap . String ( "session-id" , reconnectData . Session . ID ) , zap . String ( "reconnect-url" , reconnectData . Session . ReconnectURL ) )
2023-05-04 13:14:21 +00:00
2024-02-25 13:58:35 +00:00
return reconnectData . Session . ReconnectURL , true , nil
2023-05-04 13:14:21 +00:00
case "notification" :
go c . processEvent ( wsMessage )
case "revocation" :
// TODO idk what to do here
}
2024-02-25 13:58:35 +00:00
return "" , false , nil
2023-05-04 13:14:21 +00:00
}
2022-11-23 21:22:49 +00:00
func ( c * Client ) processEvent ( message EventSubWebsocketMessage ) {
// Check if we processed this already
2024-02-25 13:58:35 +00:00
if message . Metadata . MessageID != "" {
if c . eventCache . Contains ( message . Metadata . MessageID ) {
c . logger . Debug ( "Received duplicate event, ignoring" , zap . String ( "message-id" , message . Metadata . MessageID ) )
2022-11-23 21:22:49 +00:00
return
}
}
2024-02-25 13:58:35 +00:00
defer c . eventCache . Add ( message . Metadata . MessageID , message . Metadata . MessageTimestamp )
2022-11-23 21:22:49 +00:00
// Decode data
var notificationData NotificationMessagePayload
err := json . Unmarshal ( message . Payload , & notificationData )
if err != nil {
2023-05-05 13:02:23 +00:00
c . logger . Error ( "Error decoding EventSub notification payload" , zap . String ( "message-type" , message . Metadata . MessageType ) , zap . Error ( err ) )
2022-11-23 21:22:49 +00:00
}
2023-01-26 15:37:30 +00:00
notificationData . Date = time . Now ( )
2022-11-23 21:22:49 +00:00
2024-02-29 20:44:17 +00:00
eventKey := fmt . Sprintf ( "%s%s" , EventSubEventKeyPrefix , notificationData . Subscription . Type )
historyKey := fmt . Sprintf ( "%s%s" , EventSubHistoryKeyPrefix , notificationData . Subscription . Type )
err = c . db . PutJSON ( eventKey , notificationData )
c . logger . Info ( "Stored event" , zap . String ( "key" , eventKey ) , zap . String ( "notification-type" , notificationData . Subscription . Type ) )
2022-11-23 21:22:49 +00:00
if err != nil {
2024-02-29 20:44:17 +00:00
c . logger . Error ( "Error storing event to database" , zap . String ( "key" , eventKey ) , zap . Error ( err ) )
2022-11-23 21:22:49 +00:00
}
var archive [ ] NotificationMessagePayload
2024-02-29 20:44:17 +00:00
err = c . db . GetJSON ( historyKey , & archive )
2022-11-23 21:22:49 +00:00
if err != nil {
archive = [ ] NotificationMessagePayload { }
}
archive = append ( archive , notificationData )
if len ( archive ) > EventSubHistorySize {
archive = archive [ len ( archive ) - EventSubHistorySize : ]
}
2024-02-29 20:44:17 +00:00
err = c . db . PutJSON ( historyKey , archive )
2022-11-23 21:22:49 +00:00
if err != nil {
2024-02-29 20:44:17 +00:00
c . logger . Error ( "Error storing event to database" , zap . String ( "key" , historyKey ) , zap . Error ( err ) )
2022-11-23 21:22:49 +00:00
}
}
2023-02-02 20:24:14 +00:00
func ( c * Client ) addSubscriptionsForSession ( userClient * helix . Client , session string ) error {
2022-11-30 23:29:57 +00:00
if c . savedSubscriptions [ session ] {
// Already subscribed
return nil
}
2022-11-23 21:22:49 +00:00
transport := helix . EventSubTransport {
Method : "websocket" ,
SessionID : session ,
}
for topic , version := range subscriptionVersions {
2023-02-02 20:24:14 +00:00
sub , err := userClient . CreateEventSubSubscription ( & helix . EventSubSubscription {
2022-11-23 21:22:49 +00:00
Type : topic ,
Version : version ,
Status : "enabled" ,
Transport : transport ,
2023-02-02 20:24:14 +00:00
Condition : topicCondition ( topic , c . User . ID ) ,
2022-11-23 21:22:49 +00:00
} )
if sub . Error != "" || sub . ErrorMessage != "" {
2023-04-19 13:27:13 +00:00
c . logger . Error ( "EventSub Subscription error" , zap . String ( "topic" , topic ) , zap . String ( "topic-version" , version ) , zap . String ( "err" , sub . Error ) , zap . String ( "message" , sub . ErrorMessage ) )
2022-11-23 21:22:49 +00:00
return fmt . Errorf ( "%s: %s" , sub . Error , sub . ErrorMessage )
}
if err != nil {
return fmt . Errorf ( "error subscribing to %s: %w" , topic , err )
}
}
2022-11-30 23:29:57 +00:00
c . savedSubscriptions [ session ] = true
2022-11-23 21:22:49 +00:00
return nil
}
func topicCondition ( topic string , id string ) helix . EventSubCondition {
switch topic {
2024-02-29 20:44:17 +00:00
case helix . EventSubTypeChannelRaid :
2022-11-23 21:22:49 +00:00
return helix . EventSubCondition {
ToBroadcasterUserID : id ,
}
2024-02-29 20:44:17 +00:00
case helix . EventSubTypeChannelFollow :
2023-02-23 09:50:52 +00:00
return helix . EventSubCondition {
BroadcasterUserID : id ,
ModeratorUserID : id ,
}
2024-02-29 20:44:17 +00:00
case
helix . EventSubTypeChannelChatMessage ,
helix . EventSubTypeChannelChatNotification :
{
return helix . EventSubCondition {
BroadcasterUserID : id ,
UserID : id ,
}
}
2022-11-23 21:22:49 +00:00
default :
return helix . EventSubCondition {
BroadcasterUserID : id ,
}
}
}
type EventSubWebsocketMessage struct {
Metadata EventSubMetadata ` json:"metadata" `
Payload jsoniter . RawMessage ` json:"payload" `
}
type WelcomeMessagePayload struct {
Session struct {
2024-02-25 13:58:35 +00:00
ID string ` json:"id" `
2022-11-23 21:22:49 +00:00
Status string ` json:"status" `
ConnectedAt time . Time ` json:"connected_at" `
KeepaliveTimeoutSeconds int ` json:"keepalive_timeout_seconds" `
2024-02-25 13:58:35 +00:00
ReconnectURL string ` json:"reconnect_url,omitempty" `
2022-11-23 21:22:49 +00:00
} ` json:"session" `
}
type NotificationMessagePayload struct {
2022-11-23 21:45:26 +00:00
Subscription helix . EventSubSubscription ` json:"subscription" `
Event jsoniter . RawMessage ` json:"event" `
2023-01-26 15:37:30 +00:00
Date time . Time ` json:"date,omitempty" `
2022-11-23 21:22:49 +00:00
}
type EventSubMetadata struct {
2024-02-25 13:58:35 +00:00
MessageID string ` json:"message_id" `
2022-11-23 21:22:49 +00:00
MessageType string ` json:"message_type" `
MessageTimestamp time . Time ` json:"message_timestamp" `
SubscriptionType string ` json:"subscription_type" `
SubscriptionVersion string ` json:"subscription_version" `
}
var subscriptionVersions = map [ string ] string {
helix . EventSubTypeChannelUpdate : "1" ,
2023-02-23 09:50:52 +00:00
helix . EventSubTypeChannelFollow : "2" ,
2022-11-23 21:22:49 +00:00
helix . EventSubTypeChannelSubscription : "1" ,
helix . EventSubTypeChannelSubscriptionGift : "1" ,
helix . EventSubTypeChannelSubscriptionMessage : "1" ,
helix . EventSubTypeChannelCheer : "1" ,
helix . EventSubTypeChannelRaid : "1" ,
2024-02-29 20:44:17 +00:00
helix . EventSubTypeChannelChatMessage : "1" ,
helix . EventSubTypeChannelChatNotification : "1" ,
2022-11-23 21:22:49 +00:00
helix . EventSubTypeChannelPollBegin : "1" ,
helix . EventSubTypeChannelPollProgress : "1" ,
helix . EventSubTypeChannelPollEnd : "1" ,
helix . EventSubTypeChannelPredictionBegin : "1" ,
helix . EventSubTypeChannelPredictionProgress : "1" ,
helix . EventSubTypeChannelPredictionLock : "1" ,
helix . EventSubTypeChannelPredictionEnd : "1" ,
helix . EventSubTypeHypeTrainBegin : "1" ,
helix . EventSubTypeHypeTrainProgress : "1" ,
helix . EventSubTypeHypeTrainEnd : "1" ,
helix . EventSubTypeChannelPointsCustomRewardAdd : "1" ,
helix . EventSubTypeChannelPointsCustomRewardUpdate : "1" ,
helix . EventSubTypeChannelPointsCustomRewardRemove : "1" ,
helix . EventSubTypeChannelPointsCustomRewardRedemptionAdd : "1" ,
helix . EventSubTypeChannelPointsCustomRewardRedemptionUpdate : "1" ,
helix . EventSubTypeStreamOnline : "1" ,
helix . EventSubTypeStreamOffline : "1" ,
}