refactor: ctx over cancel

This commit is contained in:
Ash Keel 2024-03-13 00:50:59 +01:00
parent 31d44b950e
commit a06b9457ea
No known key found for this signature in database
GPG Key ID: 53A9E9A6035DD109
10 changed files with 119 additions and 170 deletions

View File

@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- The required set of permissions has changed. Existing users must re-authenticate their users to the app connected to strimertül.
- The `twitch/ev/eventsub-event` and `twitch/eventsub-history` keys have been replaced by a set of keys in the format `twitch/ev/eventsub-event/<event-id>` and `twitch/eventsub-history/<event-id>`. Users of the old system will have to adjust their logic. A simple trick is to change from get/subscribing from a single key to the entire prefix. The data structure is the same.
- A lot of keys for internal use have been changed, make sure to check the new reference for fixing up any integrations you might have. A migration process will convert v3 keys to v4 keys.
## 3.3.1 - 2023-11-12

19
app.go
View File

@ -12,17 +12,11 @@ import (
"runtime/debug"
"strconv"
"git.sr.ht/~ashkeel/strimertul/migrations"
"git.sr.ht/~ashkeel/strimertul/twitch/client"
"github.com/wailsapp/wails/v2/pkg/options"
kv "github.com/strimertul/kilovolt/v11"
"git.sr.ht/~ashkeel/containers/sync"
"github.com/nicklaw5/helix/v2"
kv "github.com/strimertul/kilovolt/v11"
"github.com/urfave/cli/v2"
"github.com/wailsapp/wails/v2/pkg/options"
"github.com/wailsapp/wails/v2/pkg/runtime"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
@ -30,7 +24,9 @@ import (
"git.sr.ht/~ashkeel/strimertul/database"
"git.sr.ht/~ashkeel/strimertul/docs"
"git.sr.ht/~ashkeel/strimertul/loyalty"
"git.sr.ht/~ashkeel/strimertul/migrations"
"git.sr.ht/~ashkeel/strimertul/twitch"
"git.sr.ht/~ashkeel/strimertul/twitch/client"
"git.sr.ht/~ashkeel/strimertul/webserver"
)
@ -91,7 +87,7 @@ func (a *App) startup(ctx context.Context) {
}
// Check for migrations
if err := migrations.Run(a.driver, a.db, logger); err != nil {
if err := migrations.Run(a.driver, a.db, logger.With(zap.String("operation", "migration"))); err != nil {
a.showFatalError(err, "Failed to migrate database to latest version")
return
}
@ -156,7 +152,7 @@ func (a *App) initializeComponents() error {
}
// Create twitch client
a.twitchManager, err = client.NewManager(a.db, a.httpServer, logger)
a.twitchManager, err = client.NewManager(a.ctx, a.db, a.httpServer, logger)
if err != nil {
return fmt.Errorf("could not initialize twitch client: %w", err)
}
@ -200,9 +196,6 @@ func (a *App) stop(_ context.Context) {
if a.loyaltyManager != nil {
warnOnError(a.loyaltyManager.Close(), "Could not cleanly close loyalty manager")
}
if a.twitchManager != nil {
warnOnError(a.twitchManager.Close(), "Could not cleanly close twitch client")
}
if a.httpServer != nil {
warnOnError(a.httpServer.Close(), "Could not cleanly close HTTP server")
}

View File

@ -1,6 +1,7 @@
package database
import (
"context"
"errors"
"fmt"
@ -24,13 +25,20 @@ var (
type Database interface {
GetKey(key string) (string, error)
PutKey(key string, data string) error
RemoveKey(key string) error
SubscribePrefix(fn kv.SubscriptionCallback, prefixes ...string) (cancelFn CancelFunc, err error)
SubscribeKey(key string, fn func(string)) (cancelFn CancelFunc, err error)
GetJSON(key string, dst any) error
SubscribePrefixContext(ctx context.Context, fn kv.SubscriptionCallback, prefixes ...string) error
SubscribeKeyContext(ctx context.Context, key string, fn func(string)) error
GetAll(prefix string) (map[string]string, error)
GetJSON(key string, dst any) error
PutJSON(key string, data any) error
PutJSONBulk(kvs map[string]any) error
RemoveKey(key string) error
Hub() *kv.Hub
}
@ -87,6 +95,32 @@ func (mod *LocalDBClient) PutKey(key string, data string) error {
return err
}
func (mod *LocalDBClient) SubscribePrefixContext(ctx context.Context, fn kv.SubscriptionCallback, prefixes ...string) error {
cancel, err := mod.SubscribePrefix(fn, prefixes...)
if err != nil {
return err
}
go func() {
<-ctx.Done()
cancel()
}()
return nil
}
func (mod *LocalDBClient) SubscribeKeyContext(ctx context.Context, key string, fn func(string)) error {
cancel, err := mod.SubscribeKey(key, fn)
if err != nil {
return err
}
go func() {
<-ctx.Done()
cancel()
}()
return nil
}
func (mod *LocalDBClient) SubscribePrefix(fn kv.SubscriptionCallback, prefixes ...string) (cancelFn CancelFunc, err error) {
var ids []int64
for _, prefix := range prefixes {

View File

@ -158,7 +158,9 @@ function TwitchEvent({ data }: { data: EventSubNotification }) {
let content: JSX.Element | string;
const message = unwrapEvent(data);
let date = data.date ? new Date(data.date) : null;
let date = data.date
? new Date(data.date)
: new Date(data.subscription.created_at);
switch (message.type) {
case EventSubNotificationType.Followed: {
content = (
@ -382,10 +384,16 @@ function TwitchEventLog({ events }: { events: EventSubNotification[] }) {
{events
.filter((ev) => supportedMessages.includes(ev.subscription.type))
.sort((a, b) =>
a.date && b.date ? Date.parse(b.date) - Date.parse(a.date) : 0,
a.date && b.date
? Date.parse(b.date) - Date.parse(a.date)
: Date.parse(b.subscription.created_at) -
Date.parse(a.subscription.created_at),
)
.map((ev) => (
<TwitchEvent key={`${ev.subscription.id}-${ev.date}`} data={ev} />
<TwitchEvent
key={`${ev.subscription.id}-${ev.subscription.created_at}`}
data={ev}
/>
))}
</EventListContainer>
</Scrollbar>
@ -433,6 +441,19 @@ function TwitchSection() {
const kv = useAppSelector((state) => state.api.client);
const [twitchEvents, setTwitchEvents] = useState<EventSubNotification[]>([]);
const keyfn = (ev: EventSubNotification) =>
ev.subscription.id + ev.subscription.created_at;
const setCleanTwitchEvents = (events: EventSubNotification[]) => {
const eventKeys = events.map(keyfn);
// Clean up duplicates before setting to state
const uniqueEvents = events.filter(
(ev, pos) => eventKeys.indexOf(keyfn(ev)) === pos,
);
setTwitchEvents(uniqueEvents);
};
const loadRecentEvents = async () => {
const keymap = await kv.getKeysByPrefix('twitch/eventsub-history/');
const events = Object.values(keymap)
@ -440,7 +461,7 @@ function TwitchSection() {
.flat()
.sort((a, b) => Date.parse(b.date) - Date.parse(a.date));
setTwitchEvents(events);
setCleanTwitchEvents(events);
};
useEffect(() => {
@ -448,7 +469,7 @@ function TwitchSection() {
const onKeyChange = (value: string) => {
const event = JSON.parse(value) as EventSubNotification;
void setTwitchEvents((prev) => [event, ...prev]);
void setCleanTwitchEvents((prev) => [event, ...prev]);
};
void kv.subscribePrefix('twitch/ev/eventsub-event/', onKeyChange);

View File

@ -14,6 +14,8 @@ import (
)
func migrateToV4(db database.Database, logger *zap.Logger) error {
logger.Info("Migrating database from v3 to v4")
// Rename keys that have no schema changes
for oldKey, newKey := range map[string]string{
"twitch/bot-modules/timers/config": timers.ConfigKey,

View File

@ -5,14 +5,12 @@ import (
"sync"
"text/template"
"git.sr.ht/~ashkeel/strimertul/twitch/eventsub"
template2 "git.sr.ht/~ashkeel/strimertul/twitch/template"
jsoniter "github.com/json-iterator/go"
"go.uber.org/zap"
"git.sr.ht/~ashkeel/strimertul/database"
"git.sr.ht/~ashkeel/strimertul/twitch/eventsub"
template2 "git.sr.ht/~ashkeel/strimertul/twitch/template"
)
var json = jsoniter.ConfigFastest
@ -41,9 +39,6 @@ type Module struct {
templater template2.Engine
templates templateCacheMap
cancelAlertSub database.CancelFunc
cancelTwitchEventSub database.CancelFunc
pendingMux sync.Mutex
pendingSubs map[string]subMixedEvent
}
@ -59,8 +54,7 @@ func Setup(ctx context.Context, db database.Database, logger *zap.Logger, templa
}
// Load config from database
err := db.GetJSON(ConfigKey, &mod.Config)
if err != nil {
if err := db.GetJSON(ConfigKey, &mod.Config); err != nil {
logger.Debug("Config load error", zap.Error(err))
mod.Config = Config{}
// Save empty config
@ -72,7 +66,7 @@ func Setup(ctx context.Context, db database.Database, logger *zap.Logger, templa
mod.compileTemplates()
mod.cancelAlertSub, err = db.SubscribeKey(ConfigKey, func(value string) {
if err := db.SubscribeKeyContext(ctx, ConfigKey, func(value string) {
err := json.UnmarshalFromString(value, &mod.Config)
if err != nil {
logger.Warn("Error loading alert config", zap.Error(err))
@ -80,13 +74,11 @@ func Setup(ctx context.Context, db database.Database, logger *zap.Logger, templa
logger.Info("Reloaded alert config")
}
mod.compileTemplates()
})
if err != nil {
}); err != nil {
logger.Error("Could not set-up bot alert reload subscription", zap.Error(err))
}
mod.cancelTwitchEventSub, err = db.SubscribePrefix(mod.onEventSubEvent, eventsub.EventKeyPrefix)
if err != nil {
if err := db.SubscribePrefixContext(ctx, mod.onEventSubEvent, eventsub.EventKeyPrefix); err != nil {
logger.Error("Could not setup twitch alert subscription", zap.Error(err))
}
@ -94,12 +86,3 @@ func Setup(ctx context.Context, db database.Database, logger *zap.Logger, templa
return mod
}
func (m *Module) Close() {
if m.cancelAlertSub != nil {
m.cancelAlertSub()
}
if m.cancelTwitchEventSub != nil {
m.cancelTwitchEventSub()
}
}

View File

@ -36,17 +36,12 @@ type Module struct {
customTemplates *sync.Map[string, *textTemplate.Template]
customFunctions textTemplate.FuncMap
cancelContext context.CancelFunc
cancelUpdateSub database.CancelFunc
cancelWriteRPCSub database.CancelFunc
cancelChatMessageSub database.CancelFunc
cancelContext context.CancelFunc
}
func Setup(ctx context.Context, db database.Database, api *helix.Client, user helix.User, logger *zap.Logger, templater template.Engine) *Module {
newContext, cancel := context.WithCancel(ctx)
mod := &Module{
ctx: newContext,
ctx: ctx,
db: db,
api: api,
user: user,
@ -58,8 +53,6 @@ func Setup(ctx context.Context, db database.Database, api *helix.Client, user he
customCommands: sync.NewMap[string, CustomCommand](),
customTemplates: sync.NewMap[string, *textTemplate.Template](),
customFunctions: make(textTemplate.FuncMap),
cancelContext: cancel,
}
// Get config
@ -73,16 +66,13 @@ func Setup(ctx context.Context, db database.Database, api *helix.Client, user he
}
}
var err error
mod.cancelChatMessageSub, err = db.SubscribeKey(eventsub.EventKeyPrefix+helix.EventSubTypeChannelChatMessage, mod.onChatMessage)
if err != nil {
if err := db.SubscribeKeyContext(ctx, eventsub.EventKeyPrefix+helix.EventSubTypeChannelChatMessage, mod.onChatMessage); err != nil {
logger.Error("Could not subscribe to chat messages", zap.Error(err))
}
// Load custom commands
var customCommands map[string]CustomCommand
err = db.GetJSON(CustomCommandsKey, &customCommands)
if err != nil {
if err := db.GetJSON(CustomCommandsKey, &customCommands); err != nil {
if errors.Is(err, database.ErrEmptyKey) {
customCommands = make(map[string]CustomCommand)
} else {
@ -91,38 +81,21 @@ func Setup(ctx context.Context, db database.Database, api *helix.Client, user he
}
mod.customCommands.Set(customCommands)
err = mod.updateTemplates()
if err != nil {
if err := mod.updateTemplates(); err != nil {
logger.Error("Failed to parse custom commands", zap.Error(err))
}
mod.cancelUpdateSub, err = db.SubscribeKey(CustomCommandsKey, mod.updateCommands)
if err != nil {
if err := db.SubscribeKeyContext(ctx, CustomCommandsKey, mod.updateCommands); err != nil {
logger.Error("Could not set-up chat command reload subscription", zap.Error(err))
}
mod.cancelWriteRPCSub, err = db.SubscribeKey(WriteMessageRPC, mod.handleWriteMessageRPC)
if err != nil {
if err := db.SubscribeKeyContext(ctx, WriteMessageRPC, mod.handleWriteMessageRPC); err != nil {
logger.Error("Could not set-up chat command reload subscription", zap.Error(err))
}
return mod
}
func (mod *Module) Close() {
if mod.cancelChatMessageSub != nil {
mod.cancelChatMessageSub()
}
if mod.cancelUpdateSub != nil {
mod.cancelUpdateSub()
}
if mod.cancelWriteRPCSub != nil {
mod.cancelWriteRPCSub()
}
mod.cancelContext()
}
func (mod *Module) onChatMessage(newValue string) {
var chatMessage struct {
Event helix.EventSubChannelChatMessageEvent `json:"event"`

View File

@ -6,33 +6,27 @@ import (
"fmt"
"time"
"git.sr.ht/~ashkeel/strimertul/twitch/timers"
"git.sr.ht/~ashkeel/strimertul/twitch/alerts"
"git.sr.ht/~ashkeel/strimertul/twitch/chat"
"git.sr.ht/~ashkeel/strimertul/twitch"
"git.sr.ht/~ashkeel/strimertul/twitch/eventsub"
"git.sr.ht/~ashkeel/containers/sync"
jsoniter "github.com/json-iterator/go"
"github.com/nicklaw5/helix/v2"
"go.uber.org/zap"
"git.sr.ht/~ashkeel/strimertul/database"
"git.sr.ht/~ashkeel/strimertul/twitch"
"git.sr.ht/~ashkeel/strimertul/twitch/alerts"
"git.sr.ht/~ashkeel/strimertul/twitch/chat"
"git.sr.ht/~ashkeel/strimertul/twitch/eventsub"
"git.sr.ht/~ashkeel/strimertul/twitch/timers"
"git.sr.ht/~ashkeel/strimertul/webserver"
)
var json = jsoniter.ConfigFastest
type Manager struct {
client *Client
cancelSubs func()
client *Client
}
func NewManager(db database.Database, server *webserver.WebServer, logger *zap.Logger) (*Manager, error) {
func NewManager(ctx context.Context, db database.Database, server *webserver.WebServer, logger *zap.Logger) (*Manager, error) {
// Get Twitch config
var config twitch.Config
if err := db.GetJSON(twitch.ConfigKey, &config); err != nil {
@ -43,8 +37,10 @@ func NewManager(db database.Database, server *webserver.WebServer, logger *zap.L
}
// Create new client
client, err := newClient(config, db, server, logger)
clientContext, cancel := context.WithCancel(ctx)
client, err := newClient(clientContext, config, db, server, logger)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create twitch client: %w", err)
}
@ -53,41 +49,32 @@ func NewManager(db database.Database, server *webserver.WebServer, logger *zap.L
}
// Listen for client config changes
cancelConfigSub, err := db.SubscribeKey(twitch.ConfigKey, func(value string) {
if err = db.SubscribeKeyContext(ctx, twitch.ConfigKey, func(value string) {
var newConfig twitch.Config
if err := json.UnmarshalFromString(value, &newConfig); err != nil {
logger.Error("Failed to decode Twitch integration config", zap.Error(err))
return
}
cancel()
var updatedClient *Client
updatedClient, err = newClient(newConfig, db, server, logger)
clientContext, cancel = context.WithCancel(ctx)
updatedClient, err = newClient(clientContext, newConfig, db, server, logger)
if err != nil {
logger.Error("Could not create twitch client with new config, keeping old", zap.Error(err))
return
}
err = manager.client.Close()
if err != nil {
logger.Warn("Twitch client could not close cleanly", zap.Error(err))
}
// New client works, replace old
updatedClient.Merge(manager.client)
manager.client = updatedClient
logger.Info("Reloaded/updated Twitch integration")
})
if err != nil {
}); err != nil {
logger.Error("Could not setup twitch config reload subscription", zap.Error(err))
}
manager.cancelSubs = func() {
if cancelConfigSub != nil {
cancelConfigSub()
}
}
return manager, nil
}
@ -95,16 +82,6 @@ func (m *Manager) Client() *Client {
return m.client
}
func (m *Manager) Close() error {
m.cancelSubs()
if err := m.client.Close(); err != nil {
return err
}
return nil
}
type Client struct {
Config *sync.RWSync[twitch.Config]
DB database.Database
@ -138,9 +115,8 @@ func (c *Client) ensureRoute() {
}
}
func newClient(config twitch.Config, db database.Database, server *webserver.WebServer, logger *zap.Logger) (*Client, error) {
func newClient(ctx context.Context, config twitch.Config, db database.Database, server *webserver.WebServer, logger *zap.Logger) (*Client, error) {
// Create Twitch client
ctx, cancel := context.WithCancel(context.Background())
client := &Client{
Config: sync.NewRWSync(config),
DB: db,
@ -148,7 +124,6 @@ func newClient(config twitch.Config, db database.Database, server *webserver.Web
restart: make(chan bool, 128),
streamOnline: sync.NewRWSync(false),
ctx: ctx,
cancel: cancel,
server: server,
}
@ -189,6 +164,11 @@ func newClient(config twitch.Config, db database.Database, server *webserver.Web
go client.runStatusPoll()
go func() {
<-ctx.Done()
server.UnregisterRoute(twitch.CallbackRoute)
}()
return client, nil
}
@ -203,7 +183,7 @@ func (c *Client) runStatusPoll() {
// Check if streamer is online, if possible
func() {
status, err := c.API.GetStreams(&helix.StreamsParams{
UserLogins: []string{c.Config.Get().Channel}, // TODO Replace with something non bot dependant
UserIDs: []string{c.User.ID},
})
if err != nil {
c.Logger.Error("Error checking stream status", zap.Error(err))
@ -225,28 +205,3 @@ func (c *Client) runStatusPoll() {
}
}
}
func (c *Client) baseURL() (string, error) {
var severConfig struct {
Bind string `json:"bind"`
}
err := c.DB.GetJSON("http/config", &severConfig)
return severConfig.Bind, err
}
func (c *Client) Close() error {
c.server.UnregisterRoute(twitch.CallbackRoute)
defer c.cancel()
if c.Chat != nil {
c.Chat.Close()
}
if c.Alerts != nil {
c.Alerts.Close()
}
if c.Timers != nil {
c.Timers.Close()
}
return nil
}

View File

@ -12,7 +12,4 @@ type Config struct {
// Twitch API App Client Secret
APIClientSecret string `json:"api_client_secret" desc:"Twitch API App Client Secret"`
// Twitch channel to use
Channel string `json:"channel" desc:"Twitch channel to join and use"`
}

View File

@ -23,10 +23,9 @@ type Module struct {
lastTrigger *sync.Map[string, time.Time]
messages *sync.Slice[int]
logger *zap.Logger
db database.Database
ctx context.Context
cancelTimerSub database.CancelFunc
logger *zap.Logger
db database.Database
ctx context.Context
}
func Setup(ctx context.Context, db database.Database, logger *zap.Logger) *Module {
@ -45,8 +44,7 @@ func Setup(ctx context.Context, db database.Database, logger *zap.Logger) *Modul
}
// Load config from database
err := db.GetJSON(ConfigKey, &mod.Config)
if err != nil {
if err := db.GetJSON(ConfigKey, &mod.Config); err != nil {
logger.Debug("Config load error", zap.Error(err))
mod.Config = Config{
Timers: make(map[string]ChatTimer),
@ -58,15 +56,13 @@ func Setup(ctx context.Context, db database.Database, logger *zap.Logger) *Modul
}
}
mod.cancelTimerSub, err = db.SubscribeKey(ConfigKey, func(value string) {
err := json.UnmarshalFromString(value, &mod.Config)
if err != nil {
logger.Debug("Error reloading timer config", zap.Error(err))
} else {
logger.Info("Reloaded timer config")
if err := db.SubscribeKeyContext(ctx, ConfigKey, func(value string) {
if err := json.UnmarshalFromString(value, &mod.Config); err != nil {
logger.Warn("Error reloading timer config", zap.Error(err))
return
}
})
if err != nil {
logger.Info("Reloaded timer config")
}); err != nil {
logger.Error("Could not set-up timer reload subscription", zap.Error(err))
}
@ -149,12 +145,6 @@ func (m *Module) ProcessTimer(name string, timer ChatTimer, activity int) {
m.lastTrigger.SetKey(name, now)
}
func (m *Module) Close() {
if m.cancelTimerSub != nil {
m.cancelTimerSub()
}
}
func (m *Module) currentChatActivity() int {
total := 0
for _, v := range m.messages.Get() {