package client import ( "context" "errors" "fmt" "time" "git.sr.ht/~ashkeel/strimertul/twitch" "git.sr.ht/~ashkeel/strimertul/twitch/eventsub" "git.sr.ht/~ashkeel/containers/sync" jsoniter "github.com/json-iterator/go" "github.com/nicklaw5/helix/v2" "go.uber.org/zap" "git.sr.ht/~ashkeel/strimertul/database" "git.sr.ht/~ashkeel/strimertul/webserver" ) var json = jsoniter.ConfigFastest type Manager struct { client *Client cancelSubs func() } func NewManager(db database.Database, server *webserver.WebServer, logger *zap.Logger) (*Manager, error) { // Get Twitch config var config twitch.Config if err := db.GetJSON(twitch.ConfigKey, &config); err != nil { if !errors.Is(err, database.ErrEmptyKey) { return nil, fmt.Errorf("failed to get twitch config: %w", err) } config.Enabled = false } // Create new client client, err := newClient(config, db, server, logger) if err != nil { return nil, fmt.Errorf("failed to create twitch client: %w", err) } manager := &Manager{ client: client, } // Listen for client config changes cancelConfigSub, err := db.SubscribeKey(twitch.ConfigKey, func(value string) { var newConfig twitch.Config if err := json.UnmarshalFromString(value, &newConfig); err != nil { logger.Error("Failed to decode Twitch integration config", zap.Error(err)) return } var updatedClient *Client updatedClient, err = newClient(newConfig, db, server, logger) if err != nil { logger.Error("Could not create twitch client with new config, keeping old", zap.Error(err)) return } err = manager.client.Close() if err != nil { logger.Warn("Twitch client could not close cleanly", zap.Error(err)) } // New client works, replace old updatedClient.Merge(manager.client) manager.client = updatedClient logger.Info("Reloaded/updated Twitch integration") }) if err != nil { logger.Error("Could not setup twitch config reload subscription", zap.Error(err)) } manager.cancelSubs = func() { if cancelConfigSub != nil { cancelConfigSub() } } return manager, nil } func (m *Manager) Client() *Client { return m.client } func (m *Manager) Close() error { m.cancelSubs() if err := m.client.Close(); err != nil { return err } return nil } type Client struct { Config *sync.RWSync[twitch.Config] DB database.Database API *helix.Client User helix.User Logger *zap.Logger eventSub *eventsub.Client server *webserver.WebServer ctx context.Context cancel context.CancelFunc restart chan bool streamOnline *sync.RWSync[bool] } func (c *Client) Merge(old *Client) { // Copy bot instance and some params c.streamOnline.Set(old.streamOnline.Get()) c.ensureRoute() } // Hacky function to deal with sync issues when restarting client func (c *Client) ensureRoute() { if c.Config.Get().Enabled { c.server.RegisterRoute(twitch.CallbackRoute, c) } } func newClient(config twitch.Config, db database.Database, server *webserver.WebServer, logger *zap.Logger) (*Client, error) { // Create Twitch client ctx, cancel := context.WithCancel(context.Background()) client := &Client{ Config: sync.NewRWSync(config), DB: db, Logger: logger.With(zap.String("service", "twitch")), restart: make(chan bool, 128), streamOnline: sync.NewRWSync(false), ctx: ctx, cancel: cancel, server: server, } if config.Enabled { var err error client.API, err = twitch.GetHelixAPI(db) if err != nil { return nil, fmt.Errorf("failed to create twitch client: %w", err) } server.RegisterRoute(twitch.CallbackRoute, client) if userClient, err := twitch.GetUserClient(db, true); err == nil { users, err := userClient.GetUsers(&helix.UsersParams{}) if err != nil { client.Logger.Error("Failed looking up user", zap.Error(err)) } else if len(users.Data.Users) < 1 { client.Logger.Error("No users found, please authenticate in Twitch configuration -> Events") } else { client.User = users.Data.Users[0] client.eventSub, err = eventsub.Setup(ctx, userClient, client.User, db, logger) if err != nil { client.Logger.Error("Failed to setup EventSub", zap.Error(err)) } } } else { client.Logger.Warn("Twitch user not identified, this will break most features") } go client.runStatusPoll() } return client, nil } func (c *Client) runStatusPoll() { c.Logger.Info("Started polling for stream status") for { // Make sure we're configured and connected properly first if !c.Config.Get().Enabled { continue } // Check if streamer is online, if possible func() { status, err := c.API.GetStreams(&helix.StreamsParams{ UserLogins: []string{c.Config.Get().Channel}, // TODO Replace with something non bot dependant }) if err != nil { c.Logger.Error("Error checking stream status", zap.Error(err)) return } c.streamOnline.Set(len(status.Data.Streams) > 0) err = c.DB.PutJSON(twitch.StreamInfoKey, status.Data.Streams) if err != nil { c.Logger.Warn("Error saving stream info", zap.Error(err)) } }() // Wait for next poll (or cancellation) select { case <-c.ctx.Done(): return case <-time.After(60 * time.Second): } } } func (c *Client) baseURL() (string, error) { var severConfig struct { Bind string `json:"bind"` } err := c.DB.GetJSON("http/config", &severConfig) return severConfig.Bind, err } func (c *Client) Close() error { c.server.UnregisterRoute(twitch.CallbackRoute) defer c.cancel() return nil }