Add stat importer from Tg-cli dumps

This commit is contained in:
Hamcha 2016-02-12 16:11:33 +00:00
parent 193f755c57
commit e907f31cda
4 changed files with 247 additions and 100 deletions

1
.gitignore vendored
View file

@ -3,3 +3,4 @@ stats.db
clessy-broker clessy-broker
clessy-mods clessy-mods
clessy-stats clessy-stats
clessy-stats-import

View file

@ -15,5 +15,8 @@ clessy-mods: install-tg
clessy-stats: install-tg clessy-stats: install-tg
go build -o clessy-stats github.com/hamcha/clessy/stats go build -o clessy-stats github.com/hamcha/clessy/stats
clessy-stats-import: install-tg
go build -o clessy-stats-import github.com/hamcha/clessy/stats-import
clean: clean:
rm -f clessy-broker clessy-mods clessy-stats rm -f clessy-broker clessy-mods clessy-stats

225
stats-import/main.go Normal file
View file

@ -0,0 +1,225 @@
package main
import (
"bufio"
"encoding/binary"
"encoding/json"
"flag"
"log"
"os"
"strconv"
"time"
"github.com/boltdb/bolt"
)
type User struct {
Username string `json:"username"`
}
type Message struct {
Event string `json:"event"`
ReplyID *int `json:"reply_id,omitempty"`
FwdUser *User `json:"fwd_from"`
Text string `json:"text"`
From User `json:"from"`
Date int `json:"date"`
}
type Stats struct {
Total uint64
ByUser map[string]uint64
ByHour [24]uint64
ByWeekday [7]uint64
ByDate map[string]uint64
Replies uint64
Forward uint64
}
func assert(err error) {
if err != nil {
panic(err)
}
}
func main() {
logFile := flag.String("logfile", "tl.log", "Telegram CLI dump")
boltdbFile := flag.String("boltdb", "stats.db", "BoltDB database file")
flag.Parse()
file, err := os.Open(*logFile)
assert(err)
var data Stats
data.ByUser = make(map[string]uint64)
data.ByDate = make(map[string]uint64)
scanner := bufio.NewScanner(file)
lines := 0
log.Printf("Started processing %s...\n\n", *logFile)
for scanner.Scan() {
var msg Message
err := json.Unmarshal([]byte(scanner.Text()), &msg)
assert(err)
processMessage(msg, &data)
lines++
if lines%10000 == 0 {
log.Printf("Processed %d lines...\n", lines)
}
}
log.Printf("\nFinished with success, processed %d lines.\n", lines)
log.Printf("Opening database %s for writing...\n", *boltdbFile)
db, err := bolt.Open(*boltdbFile, 0600, nil)
err = update(db, data)
assert(err)
log.Printf("All done! Bye!\n")
}
func processMessage(msg Message, data *Stats) {
data.Total++
if msg.ReplyID != nil {
data.Replies++
}
if msg.FwdUser != nil {
data.Forward++
}
date := time.Unix(int64(msg.Date), 0)
data.ByHour[date.Hour()]++
data.ByWeekday[date.Weekday()]++
val, exists := data.ByUser[msg.From.Username]
if !exists {
val = 0
}
data.ByUser[msg.From.Username] = val + 1
datekey := date.Format("2006-1-2")
val, exists = data.ByDate[datekey]
if !exists {
val = 0
}
data.ByDate[datekey] = val + 1
}
func MakeUint(bval []byte, bucketName string, key string) uint64 {
if bval != nil {
intval, bts := binary.Uvarint(bval)
if bts > 0 {
return intval
} else {
log.Printf("[%s] Value of key \"%s\" is NaN: %v\r\n", bucketName, key, bval)
return 0
}
} else {
log.Printf("[%s] Key \"%s\" does not exist, set to 0\n", bucketName, key)
return 0
}
}
func PutUint(value uint64) []byte {
bytes := make([]byte, 10)
n := binary.PutUvarint(bytes, value)
return bytes[:n]
}
func update(db *bolt.DB, data Stats) error {
return db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte("global"))
if err != nil {
return err
}
// Update total
total := MakeUint(b.Get([]byte("count")), "global", "count")
total += data.Total
err = b.Put([]byte("count"), PutUint(total))
if err != nil {
return err
}
// Update replies
replies := MakeUint(b.Get([]byte("replies")), "global", "replies")
replies += data.Replies
err = b.Put([]byte("replies"), PutUint(total))
if err != nil {
return err
}
// Update forward
forward := MakeUint(b.Get([]byte("forward")), "global", "forward")
forward += data.Forward
err = b.Put([]byte("forward"), PutUint(total))
if err != nil {
return err
}
// Update hour counters
b, err = tx.CreateBucketIfNotExists([]byte("hour"))
if err != nil {
return err
}
for i := 0; i < 24; i++ {
curhour := MakeUint(b.Get([]byte{byte(i)}), "hour", strconv.Itoa(i))
curhour += data.ByHour[i]
err = b.Put([]byte{byte(i)}, PutUint(curhour))
if err != nil {
return err
}
}
// Update weekday counters
b, err = tx.CreateBucketIfNotExists([]byte("weekday"))
if err != nil {
return err
}
for i := 0; i < 7; i++ {
curwday := MakeUint(b.Get([]byte{byte(i)}), "weekday", strconv.Itoa(i))
curwday += data.ByWeekday[i]
err = b.Put([]byte{byte(i)}, PutUint(curwday))
if err != nil {
return err
}
}
// Update date counters
b, err = tx.CreateBucketIfNotExists([]byte("date"))
if err != nil {
return err
}
for day, count := range data.ByDate {
count += MakeUint(b.Get([]byte(day)), "date", day)
err = b.Put([]byte(day), PutUint(count))
if err != nil {
return err
}
}
// Update user counters
b, err = tx.CreateBucketIfNotExists([]byte("users-count"))
if err != nil {
return err
}
for user, count := range data.ByUser {
count += MakeUint(b.Get([]byte(user)), "users-count", user)
err = b.Put([]byte(user), PutUint(count))
if err != nil {
return err
}
}
return nil
})
}

View file

@ -25,16 +25,12 @@ const (
type Stats struct { type Stats struct {
ByUserCount map[string]uint64 ByUserCount map[string]uint64
ByUserAvgLen map[string]uint64
ByUserAvgCount map[string]uint64
ByWeekday [7]uint64 ByWeekday [7]uint64
ByHour [24]uint64 ByHour [24]uint64
ByType [MessageTypeMax]uint64 ByType [MessageTypeMax]uint64
TodayDate time.Time TodayDate time.Time
Today uint64 Today uint64
TotalCount uint64 TotalCount uint64
TotalTxtCount uint64
TotalAvgLength uint64
Replies uint64 Replies uint64
Forward uint64 Forward uint64
} }
@ -73,22 +69,13 @@ func loadStats() {
} }
// Load total messages counter // Load total messages counter
bval := b.Get([]byte("count")) stats.TotalCount = MakeUint(b.Get([]byte("count")), "global", "count")
stats.TotalCount = MakeUint(bval, "global", "count")
bval = b.Get([]byte("avg"))
stats.TotalAvgLength = MakeUint(bval, "global", "avg")
bval = b.Get([]byte("avgcount"))
stats.TotalTxtCount = MakeUint(bval, "global", "avgcount")
// Load total replies counter // Load total replies counter
bval = b.Get([]byte("replies")) stats.Replies = MakeUint(b.Get([]byte("replies")), "global", "replies")
stats.Replies = MakeUint(bval, "global", "replies")
// Load total replies counter // Load total replies counter
bval = b.Get([]byte("forward")) stats.Forward = MakeUint(b.Get([]byte("forward")), "global", "forward")
stats.Forward = MakeUint(bval, "global", "forward")
// Load hour counters // Load hour counters
b, err = tx.CreateBucketIfNotExists([]byte("hour")) b, err = tx.CreateBucketIfNotExists([]byte("hour"))
@ -97,8 +84,7 @@ func loadStats() {
} }
for i := 0; i < 24; i++ { for i := 0; i < 24; i++ {
bval = b.Get([]byte{byte(i)}) stats.ByHour[i] = MakeUint(b.Get([]byte{byte(i)}), "hour", strconv.Itoa(i))
stats.ByHour[i] = MakeUint(bval, "hour", strconv.Itoa(i))
} }
// Load weekday counters // Load weekday counters
@ -108,8 +94,7 @@ func loadStats() {
} }
for i := 0; i < 7; i++ { for i := 0; i < 7; i++ {
bval = b.Get([]byte{byte(i)}) stats.ByWeekday[i] = MakeUint(b.Get([]byte{byte(i)}), "weekday", strconv.Itoa(i))
stats.ByWeekday[i] = MakeUint(bval, "weekday", strconv.Itoa(i))
} }
// Load today's message counter, if possible // Load today's message counter, if possible
@ -119,8 +104,7 @@ func loadStats() {
} }
todayKey := stats.TodayDate.Format("2006-1-2") todayKey := stats.TodayDate.Format("2006-1-2")
bval = b.Get([]byte(todayKey)) stats.Today = MakeUint(b.Get([]byte(todayKey)), "date", todayKey)
stats.Today = MakeUint(bval, "date", todayKey)
// Load user counters // Load user counters
stats.ByUserCount = make(map[string]uint64) stats.ByUserCount = make(map[string]uint64)
@ -133,34 +117,13 @@ func loadStats() {
return nil return nil
}) })
stats.ByUserAvgLen = make(map[string]uint64)
b, err = tx.CreateBucketIfNotExists([]byte("users-avg"))
if err != nil {
return err
}
b.ForEach(func(user, messages []byte) error {
stats.ByUserAvgLen[string(user)] = MakeUint(messages, "users-avg", string(user))
return nil
})
stats.ByUserAvgCount = make(map[string]uint64)
b, err = tx.CreateBucketIfNotExists([]byte("users-avgcount"))
if err != nil {
return err
}
b.ForEach(func(user, messages []byte) error {
stats.ByUserAvgCount[string(user)] = MakeUint(messages, "users-avgcount", string(user))
return nil
})
// Load type counters // Load type counters
b, err = tx.CreateBucketIfNotExists([]byte("types")) b, err = tx.CreateBucketIfNotExists([]byte("types"))
if err != nil { if err != nil {
return err return err
} }
for i := 0; i < MessageTypeMax; i++ { for i := 0; i < MessageTypeMax; i++ {
bval = b.Get([]byte{byte(i)}) stats.ByType[i] = MakeUint(b.Get([]byte{byte(i)}), "types", strconv.Itoa(i))
stats.ByType[i] = MakeUint(bval, "types", strconv.Itoa(i))
} }
return nil return nil
@ -171,8 +134,7 @@ func loadStats() {
func updateDate() { func updateDate() {
err := db.Update(func(tx *bolt.Tx) error { err := db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte("date")) b := tx.Bucket([]byte("date"))
todayKey := stats.TodayDate.Format("2006-1-2") err := b.Put([]byte(stats.TodayDate.Format("2006-1-2")), PutUint(stats.Today))
err := b.Put([]byte(todayKey), PutUint(stats.Today))
if err != nil { if err != nil {
return err return err
} }
@ -195,7 +157,6 @@ func updateStats(message tg.APIMessage) {
// //
// DB Update flags // DB Update flags
updatemean := false
updatetype := 0 updatetype := 0
updatereplies := false updatereplies := false
updateforward := false updateforward := false
@ -227,25 +188,6 @@ func updateStats(message tg.APIMessage) {
// Text message // Text message
if message.Text != nil { if message.Text != nil {
stats.ByType[MessageTypeText]++ stats.ByType[MessageTypeText]++
// Update total and individual average
msglen := uint64(len(*(message.Text)))
if stats.TotalTxtCount > 0 {
stats.TotalAvgLength = updateMean(stats.TotalAvgLength, stats.TotalTxtCount, msglen)
stats.TotalTxtCount++
} else {
stats.TotalAvgLength = msglen
stats.TotalTxtCount = 1
}
val, exists = stats.ByUserAvgCount[username]
if exists {
stats.ByUserAvgLen[username] = updateMean(stats.ByUserAvgLen[username], val, msglen)
stats.ByUserAvgCount[username]++
} else {
stats.ByUserAvgLen[username] = msglen
stats.ByUserAvgCount[username] = 1
}
updatemean = true
updatetype = MessageTypeText updatetype = MessageTypeText
} }
// Audio message // Audio message
@ -312,17 +254,6 @@ func updateStats(message tg.APIMessage) {
return err return err
} }
if updatemean {
err = b.Put([]byte("avg"), PutUint(stats.TotalAvgLength))
if err != nil {
return err
}
err = b.Put([]byte("avgcount"), PutUint(stats.TotalTxtCount))
if err != nil {
return err
}
}
if updatereplies { if updatereplies {
err = b.Put([]byte("replies"), PutUint(stats.Replies)) err = b.Put([]byte("replies"), PutUint(stats.Replies))
if err != nil { if err != nil {
@ -363,19 +294,6 @@ func updateStats(message tg.APIMessage) {
return err return err
} }
if updatemean {
b = tx.Bucket([]byte("users-avg"))
err = b.Put([]byte(username), PutUint(stats.ByUserAvgLen[username]))
if err != nil {
return err
}
b = tx.Bucket([]byte("users-avgcount"))
err = b.Put([]byte(username), PutUint(stats.ByUserAvgCount[username]))
if err != nil {
return err
}
}
// Update type counter // Update type counter
b = tx.Bucket([]byte("types")) b = tx.Bucket([]byte("types"))
err = b.Put([]byte{byte(updatetype)}, PutUint(stats.ByType[updatetype])) err = b.Put([]byte{byte(updatetype)}, PutUint(stats.ByType[updatetype]))