From 61245cc77c726f3a6a98b926823b4dc592ccce4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ask=20Bj=C3=B8rn=20Hansen?= Date: Sun, 10 Dec 2023 21:02:04 -0800 Subject: [PATCH] scores: csv handler --- go.mod | 2 +- go.sum | 2 + logscores/history.go | 77 ++++++++++++++++++ ntpdb/models.go | 162 ++++++++++++++++++++++++++++++++++++++ ntpdb/monitor.go | 23 ++++++ ntpdb/otel.go | 93 ++++++++++++++++++++++ ntpdb/querier.go | 5 ++ ntpdb/query.sql.go | 179 ++++++++++++++++++++++++++++++++++++++++++ query.sql | 23 +++++- server/history.go | 183 +++++++++++++++++++++++++++++++++++++++++++ server/server.go | 1 + sqlc.yaml | 2 + 12 files changed, 750 insertions(+), 2 deletions(-) create mode 100644 logscores/history.go create mode 100644 ntpdb/monitor.go create mode 100644 server/history.go diff --git a/go.mod b/go.mod index 0c4bbbc..c4e1ac6 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/spf13/cobra v1.8.0 github.com/stretchr/testify v1.8.4 go.ntppool.org/api v0.1.8-0.20231210025001-f2c143296511 - go.ntppool.org/common v0.2.5 + go.ntppool.org/common v0.2.6-0.20231211031613-608f05d39551 go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho v0.46.1 go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.46.1 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 diff --git a/go.sum b/go.sum index 909f2a0..8cbcc93 100644 --- a/go.sum +++ b/go.sum @@ -142,6 +142,8 @@ go.ntppool.org/api v0.1.8-0.20231210025001-f2c143296511 h1:ilTOX5NQcdDiNohhDexUe go.ntppool.org/api v0.1.8-0.20231210025001-f2c143296511/go.mod h1:9FKbwWfF7eRU7GZVEI3wUYv71ZKt16cLCGfxLayzS2Q= go.ntppool.org/common v0.2.5 h1:fvuFrCCbmaRzZOSwv71+yhqVLOTDB/fD7YlscdGa6qs= go.ntppool.org/common v0.2.5/go.mod h1:Cw8mq8jd2sLCxbTNzYXKXn3qKo2ZLERZ6V/eLcSgDHw= +go.ntppool.org/common v0.2.6-0.20231211031613-608f05d39551 h1:kJdF3U4KBuJJtbF04d5OA/QttxDY/fOgHe1oUDEgLGA= +go.ntppool.org/common v0.2.6-0.20231211031613-608f05d39551/go.mod h1:Cw8mq8jd2sLCxbTNzYXKXn3qKo2ZLERZ6V/eLcSgDHw= go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho v0.46.1 h1:yJWyqeE+8jdOJpt+ZFn7sX05EJAK/9C4jjNZyb61xZg= go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho v0.46.1/go.mod h1:tlgpIvi6LCv4QIZQyBc8Gkr6HDxbJLTh9eQPNZAaljE= go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.46.1 h1:gbhw/u49SS3gkPWiYweQNJGm/uJN5GkI/FrosxSHT7A= diff --git a/logscores/history.go b/logscores/history.go new file mode 100644 index 0000000..39a7c7c --- /dev/null +++ b/logscores/history.go @@ -0,0 +1,77 @@ +package logscores + +import ( + "context" + "database/sql" + "time" + + "go.ntppool.org/common/logger" + "go.ntppool.org/data-api/ntpdb" +) + +type LogScoreHistory struct { + LogScores []ntpdb.LogScore + Monitors map[int]string +} + +func GetHistory(ctx context.Context, db *sql.DB, serverID, monitorID uint32, since time.Time, count int) (*LogScoreHistory, error) { + log := logger.Setup() + + if count == 0 { + count = 200 + } + + log.Debug("GetHistory", "server", serverID, "monitor", monitorID, "since", since, "count", count) + + q := ntpdb.NewWrappedQuerier(ntpdb.New(db)) + + var ls []ntpdb.LogScore + var err error + if monitorID > 0 { + ls, err = q.GetServerLogScoresByMonitorID(ctx, ntpdb.GetServerLogScoresByMonitorIDParams{ + ServerID: serverID, + MonitorID: sql.NullInt32{Int32: int32(monitorID), Valid: true}, + Limit: int32(count), + }) + } else { + ls, err = q.GetServerLogScores(ctx, ntpdb.GetServerLogScoresParams{ + ServerID: serverID, + Limit: int32(count), + }) + } + if err != nil { + return nil, err + } + + monitors := map[int]string{} + monitorIDs := []uint32{} + for _, l := range ls { + if !l.MonitorID.Valid { + continue + } + mID := uint32(l.MonitorID.Int32) + if _, ok := monitors[int(mID)]; !ok { + monitors[int(mID)] = "" + monitorIDs = append(monitorIDs, mID) + } + } + + dbmons, err := q.GetMonitorsByID(ctx, monitorIDs) + if err != nil { + return nil, err + } + for _, m := range dbmons { + monitors[int(m.ID)] = m.DisplayName() + } + + return &LogScoreHistory{ + LogScores: ls, + Monitors: monitors, + }, nil +} + +/* + + + + */ diff --git a/ntpdb/models.go b/ntpdb/models.go index 7bf3e26..d292248 100644 --- a/ntpdb/models.go +++ b/ntpdb/models.go @@ -9,8 +9,139 @@ import ( "database/sql/driver" "fmt" "time" + + "go.ntppool.org/common/types" ) +type MonitorsIpVersion string + +const ( + MonitorsIpVersionV4 MonitorsIpVersion = "v4" + MonitorsIpVersionV6 MonitorsIpVersion = "v6" +) + +func (e *MonitorsIpVersion) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = MonitorsIpVersion(s) + case string: + *e = MonitorsIpVersion(s) + default: + return fmt.Errorf("unsupported scan type for MonitorsIpVersion: %T", src) + } + return nil +} + +type NullMonitorsIpVersion struct { + MonitorsIpVersion MonitorsIpVersion `json:"monitors_ip_version"` + Valid bool `json:"valid"` // Valid is true if MonitorsIpVersion is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullMonitorsIpVersion) Scan(value interface{}) error { + if value == nil { + ns.MonitorsIpVersion, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.MonitorsIpVersion.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullMonitorsIpVersion) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.MonitorsIpVersion), nil +} + +type MonitorsStatus string + +const ( + MonitorsStatusPending MonitorsStatus = "pending" + MonitorsStatusTesting MonitorsStatus = "testing" + MonitorsStatusActive MonitorsStatus = "active" + MonitorsStatusPaused MonitorsStatus = "paused" + MonitorsStatusDeleted MonitorsStatus = "deleted" +) + +func (e *MonitorsStatus) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = MonitorsStatus(s) + case string: + *e = MonitorsStatus(s) + default: + return fmt.Errorf("unsupported scan type for MonitorsStatus: %T", src) + } + return nil +} + +type NullMonitorsStatus struct { + MonitorsStatus MonitorsStatus `json:"monitors_status"` + Valid bool `json:"valid"` // Valid is true if MonitorsStatus is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullMonitorsStatus) Scan(value interface{}) error { + if value == nil { + ns.MonitorsStatus, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.MonitorsStatus.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullMonitorsStatus) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.MonitorsStatus), nil +} + +type MonitorsType string + +const ( + MonitorsTypeMonitor MonitorsType = "monitor" + MonitorsTypeScore MonitorsType = "score" +) + +func (e *MonitorsType) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = MonitorsType(s) + case string: + *e = MonitorsType(s) + default: + return fmt.Errorf("unsupported scan type for MonitorsType: %T", src) + } + return nil +} + +type NullMonitorsType struct { + MonitorsType MonitorsType `json:"monitors_type"` + Valid bool `json:"valid"` // Valid is true if MonitorsType is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullMonitorsType) Scan(value interface{}) error { + if value == nil { + ns.MonitorsType, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.MonitorsType.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullMonitorsType) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.MonitorsType), nil +} + type ServersIpVersion string const ( @@ -95,6 +226,37 @@ func (ns NullZoneServerCountsIpVersion) Value() (driver.Value, error) { return string(ns.ZoneServerCountsIpVersion), nil } +type LogScore struct { + ID uint64 `db:"id" json:"id"` + MonitorID sql.NullInt32 `db:"monitor_id" json:"monitor_id"` + ServerID uint32 `db:"server_id" json:"server_id"` + Ts time.Time `db:"ts" json:"ts"` + Score float64 `db:"score" json:"score"` + Step float64 `db:"step" json:"step"` + Offset sql.NullFloat64 `db:"offset" json:"offset"` + Rtt sql.NullInt32 `db:"rtt" json:"rtt"` + Attributes types.LogScoreAttributes `db:"attributes" json:"attributes"` +} + +type Monitor struct { + ID uint32 `db:"id" json:"id"` + Type MonitorsType `db:"type" json:"type"` + UserID sql.NullInt32 `db:"user_id" json:"user_id"` + AccountID sql.NullInt32 `db:"account_id" json:"account_id"` + Name string `db:"name" json:"name"` + Location string `db:"location" json:"location"` + Ip sql.NullString `db:"ip" json:"ip"` + IpVersion NullMonitorsIpVersion `db:"ip_version" json:"ip_version"` + TlsName sql.NullString `db:"tls_name" json:"tls_name"` + ApiKey sql.NullString `db:"api_key" json:"api_key"` + Status MonitorsStatus `db:"status" json:"status"` + Config string `db:"config" json:"config"` + ClientVersion string `db:"client_version" json:"client_version"` + LastSeen sql.NullTime `db:"last_seen" json:"last_seen"` + LastSubmit sql.NullTime `db:"last_submit" json:"last_submit"` + CreatedOn time.Time `db:"created_on" json:"created_on"` +} + type Server struct { ID uint32 `db:"id" json:"id"` Ip string `db:"ip" json:"ip"` diff --git a/ntpdb/monitor.go b/ntpdb/monitor.go new file mode 100644 index 0000000..bbeec7e --- /dev/null +++ b/ntpdb/monitor.go @@ -0,0 +1,23 @@ +package ntpdb + +import ( + "strconv" + "strings" +) + +func (m *Monitor) DisplayName() string { + switch { + case len(m.Name) > 0: + return m.Name + case m.TlsName.Valid && len(m.TlsName.String) > 0: + name := m.TlsName.String + if idx := strings.Index(name, "."); idx > 0 { + name = name[0:idx] + } + return name + case len(m.Location) > 0: + return m.Location + " (" + strconv.Itoa(int(m.ID)) + ")" // todo: IDToken instead of ID + default: + return strconv.Itoa(int(m.ID)) // todo: IDToken + } +} diff --git a/ntpdb/otel.go b/ntpdb/otel.go index ff80576..dda5c2e 100644 --- a/ntpdb/otel.go +++ b/ntpdb/otel.go @@ -8,6 +8,7 @@ package ntpdb import ( "context" + "database/sql" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -78,6 +79,52 @@ func (_d QuerierTxWithTracing) Commit(ctx context.Context) (err error) { return _d.QuerierTx.Commit(ctx) } +// GetMonitorByName implements QuerierTx +func (_d QuerierTxWithTracing) GetMonitorByName(ctx context.Context, tlsName sql.NullString) (m1 Monitor, err error) { + ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetMonitorByName") + defer func() { + if _d._spanDecorator != nil { + _d._spanDecorator(_span, map[string]interface{}{ + "ctx": ctx, + "tlsName": tlsName}, map[string]interface{}{ + "m1": m1, + "err": err}) + } else if err != nil { + _span.RecordError(err) + _span.SetAttributes( + attribute.String("event", "error"), + attribute.String("message", err.Error()), + ) + } + + _span.End() + }() + return _d.QuerierTx.GetMonitorByName(ctx, tlsName) +} + +// GetMonitorsByID implements QuerierTx +func (_d QuerierTxWithTracing) GetMonitorsByID(ctx context.Context, ids []uint32) (ma1 []Monitor, err error) { + ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetMonitorsByID") + defer func() { + if _d._spanDecorator != nil { + _d._spanDecorator(_span, map[string]interface{}{ + "ctx": ctx, + "ids": ids}, map[string]interface{}{ + "ma1": ma1, + "err": err}) + } else if err != nil { + _span.RecordError(err) + _span.SetAttributes( + attribute.String("event", "error"), + attribute.String("message", err.Error()), + ) + } + + _span.End() + }() + return _d.QuerierTx.GetMonitorsByID(ctx, ids) +} + // GetServerByID implements QuerierTx func (_d QuerierTxWithTracing) GetServerByID(ctx context.Context, id uint32) (s1 Server, err error) { ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetServerByID") @@ -124,6 +171,52 @@ func (_d QuerierTxWithTracing) GetServerByIP(ctx context.Context, ip string) (s1 return _d.QuerierTx.GetServerByIP(ctx, ip) } +// GetServerLogScores implements QuerierTx +func (_d QuerierTxWithTracing) GetServerLogScores(ctx context.Context, arg GetServerLogScoresParams) (la1 []LogScore, err error) { + ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetServerLogScores") + defer func() { + if _d._spanDecorator != nil { + _d._spanDecorator(_span, map[string]interface{}{ + "ctx": ctx, + "arg": arg}, map[string]interface{}{ + "la1": la1, + "err": err}) + } else if err != nil { + _span.RecordError(err) + _span.SetAttributes( + attribute.String("event", "error"), + attribute.String("message", err.Error()), + ) + } + + _span.End() + }() + return _d.QuerierTx.GetServerLogScores(ctx, arg) +} + +// GetServerLogScoresByMonitorID implements QuerierTx +func (_d QuerierTxWithTracing) GetServerLogScoresByMonitorID(ctx context.Context, arg GetServerLogScoresByMonitorIDParams) (la1 []LogScore, err error) { + ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetServerLogScoresByMonitorID") + defer func() { + if _d._spanDecorator != nil { + _d._spanDecorator(_span, map[string]interface{}{ + "ctx": ctx, + "arg": arg}, map[string]interface{}{ + "la1": la1, + "err": err}) + } else if err != nil { + _span.RecordError(err) + _span.SetAttributes( + attribute.String("event", "error"), + attribute.String("message", err.Error()), + ) + } + + _span.End() + }() + return _d.QuerierTx.GetServerLogScoresByMonitorID(ctx, arg) +} + // GetServerNetspeed implements QuerierTx func (_d QuerierTxWithTracing) GetServerNetspeed(ctx context.Context, ip string) (u1 uint32, err error) { ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetServerNetspeed") diff --git a/ntpdb/querier.go b/ntpdb/querier.go index d679684..2e14f77 100644 --- a/ntpdb/querier.go +++ b/ntpdb/querier.go @@ -6,11 +6,16 @@ package ntpdb import ( "context" + "database/sql" ) type Querier interface { + GetMonitorByName(ctx context.Context, tlsName sql.NullString) (Monitor, error) + GetMonitorsByID(ctx context.Context, ids []uint32) ([]Monitor, error) GetServerByID(ctx context.Context, id uint32) (Server, error) GetServerByIP(ctx context.Context, ip string) (Server, error) + GetServerLogScores(ctx context.Context, arg GetServerLogScoresParams) ([]LogScore, error) + GetServerLogScoresByMonitorID(ctx context.Context, arg GetServerLogScoresByMonitorIDParams) ([]LogScore, error) GetServerNetspeed(ctx context.Context, ip string) (uint32, error) GetZoneStatsData(ctx context.Context) ([]GetZoneStatsDataRow, error) GetZoneStatsV2(ctx context.Context, ip string) ([]GetZoneStatsV2Row, error) diff --git a/ntpdb/query.sql.go b/ntpdb/query.sql.go index b1bb7b0..da85835 100644 --- a/ntpdb/query.sql.go +++ b/ntpdb/query.sql.go @@ -7,9 +7,94 @@ package ntpdb import ( "context" + "database/sql" + "strings" "time" ) +const getMonitorByName = `-- name: GetMonitorByName :one +select id, type, user_id, account_id, name, location, ip, ip_version, tls_name, api_key, status, config, client_version, last_seen, last_submit, created_on from monitors where tls_name = ? +` + +func (q *Queries) GetMonitorByName(ctx context.Context, tlsName sql.NullString) (Monitor, error) { + row := q.db.QueryRowContext(ctx, getMonitorByName, tlsName) + var i Monitor + err := row.Scan( + &i.ID, + &i.Type, + &i.UserID, + &i.AccountID, + &i.Name, + &i.Location, + &i.Ip, + &i.IpVersion, + &i.TlsName, + &i.ApiKey, + &i.Status, + &i.Config, + &i.ClientVersion, + &i.LastSeen, + &i.LastSubmit, + &i.CreatedOn, + ) + return i, err +} + +const getMonitorsByID = `-- name: GetMonitorsByID :many +select id, type, user_id, account_id, name, location, ip, ip_version, tls_name, api_key, status, config, client_version, last_seen, last_submit, created_on from monitors +where id in (/*SLICE:ids*/?) +` + +func (q *Queries) GetMonitorsByID(ctx context.Context, ids []uint32) ([]Monitor, error) { + query := getMonitorsByID + var queryParams []interface{} + if len(ids) > 0 { + for _, v := range ids { + queryParams = append(queryParams, v) + } + query = strings.Replace(query, "/*SLICE:ids*/?", strings.Repeat(",?", len(ids))[1:], 1) + } else { + query = strings.Replace(query, "/*SLICE:ids*/?", "NULL", 1) + } + rows, err := q.db.QueryContext(ctx, query, queryParams...) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Monitor + for rows.Next() { + var i Monitor + if err := rows.Scan( + &i.ID, + &i.Type, + &i.UserID, + &i.AccountID, + &i.Name, + &i.Location, + &i.Ip, + &i.IpVersion, + &i.TlsName, + &i.ApiKey, + &i.Status, + &i.Config, + &i.ClientVersion, + &i.LastSeen, + &i.LastSubmit, + &i.CreatedOn, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const getServerByID = `-- name: GetServerByID :one select id, ip, ip_version, user_id, account_id, hostname, stratum, in_pool, in_server_list, netspeed, created_on, updated_on, score_ts, score_raw, deletion_on from servers where @@ -68,6 +153,100 @@ func (q *Queries) GetServerByIP(ctx context.Context, ip string) (Server, error) return i, err } +const getServerLogScores = `-- name: GetServerLogScores :many +select id, monitor_id, server_id, ts, score, step, offset, rtt, attributes from log_scores +where + server_id = ? + order by ts desc + limit ? +` + +type GetServerLogScoresParams struct { + ServerID uint32 `db:"server_id" json:"server_id"` + Limit int32 `db:"limit" json:"limit"` +} + +func (q *Queries) GetServerLogScores(ctx context.Context, arg GetServerLogScoresParams) ([]LogScore, error) { + rows, err := q.db.QueryContext(ctx, getServerLogScores, arg.ServerID, arg.Limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []LogScore + for rows.Next() { + var i LogScore + if err := rows.Scan( + &i.ID, + &i.MonitorID, + &i.ServerID, + &i.Ts, + &i.Score, + &i.Step, + &i.Offset, + &i.Rtt, + &i.Attributes, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const getServerLogScoresByMonitorID = `-- name: GetServerLogScoresByMonitorID :many +select id, monitor_id, server_id, ts, score, step, offset, rtt, attributes from log_scores +where + server_id = ? AND + monitor_id = ? + order by ts desc + limit ? +` + +type GetServerLogScoresByMonitorIDParams struct { + ServerID uint32 `db:"server_id" json:"server_id"` + MonitorID sql.NullInt32 `db:"monitor_id" json:"monitor_id"` + Limit int32 `db:"limit" json:"limit"` +} + +func (q *Queries) GetServerLogScoresByMonitorID(ctx context.Context, arg GetServerLogScoresByMonitorIDParams) ([]LogScore, error) { + rows, err := q.db.QueryContext(ctx, getServerLogScoresByMonitorID, arg.ServerID, arg.MonitorID, arg.Limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []LogScore + for rows.Next() { + var i LogScore + if err := rows.Scan( + &i.ID, + &i.MonitorID, + &i.ServerID, + &i.Ts, + &i.Score, + &i.Step, + &i.Offset, + &i.Rtt, + &i.Attributes, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const getServerNetspeed = `-- name: GetServerNetspeed :one select netspeed from servers where ip = ? ` diff --git a/query.sql b/query.sql index e264513..2b9add3 100644 --- a/query.sql +++ b/query.sql @@ -42,8 +42,29 @@ select * from servers where id = ?; - -- name: GetServerByIP :one select * from servers where ip = sqlc.arg(ip); + +-- name: GetMonitorByName :one +select * from monitors where tls_name = ?; + +-- name: GetMonitorsByID :many +select * from monitors +where id in (sqlc.slice('ids')); + +-- name: GetServerLogScores :many +select * from log_scores +where + server_id = ? + order by ts desc + limit ?; + +-- name: GetServerLogScoresByMonitorID :many +select * from log_scores +where + server_id = ? AND + monitor_id = ? + order by ts desc + limit ?; diff --git a/server/history.go b/server/history.go new file mode 100644 index 0000000..8fabeea --- /dev/null +++ b/server/history.go @@ -0,0 +1,183 @@ +package server + +import ( + "bytes" + "context" + "database/sql" + "encoding/csv" + "fmt" + "net/http" + "strconv" + "strings" + "time" + + "github.com/labstack/echo/v4" + "go.ntppool.org/common/logger" + "go.ntppool.org/common/tracing" + "go.ntppool.org/data-api/logscores" + "go.ntppool.org/data-api/ntpdb" +) + +type historyMode uint8 + +const ( + historyModeUnknown historyMode = iota + historyModeLog + historyModeJSON + historyModeMonitor +) + +func paramHistoryMode(s string) historyMode { + switch s { + case "log": + return historyModeLog + case "json": + return historyModeJSON + case "monitor": + return historyModeMonitor + default: + return historyModeUnknown + } +} + +func (srv *Server) getHistory(ctx context.Context, c echo.Context, server ntpdb.Server) (*logscores.LogScoreHistory, error) { + log := logger.Setup() + + limit := 0 + if limitParam, err := strconv.Atoi(c.QueryParam("limit")); err == nil { + limit = limitParam + } else { + limit = 50 + } + if limit > 4000 { + limit = 4000 + } + + since, _ := strconv.ParseInt(c.QueryParam("since"), 10, 64) // defaults to 0 so don't care if it parses + + monitorParam := c.QueryParam("monitor") + + if since > 0 { + c.Request().Header.Set("Cache-Control", "s-maxage=300") + } + + q := ntpdb.NewWrappedQuerier(ntpdb.New(srv.db)) + + var monitorID uint32 = 0 + switch monitorParam { + case "": + name := "recentmedian.scores.ntp.dev" + monitor, err := q.GetMonitorByName(ctx, sql.NullString{Valid: true, String: name}) + if err != nil { + log.Warn("could not find monitor", "name", name, "err", err) + } + monitorID = monitor.ID + case "*": + monitorID = 0 // don't filter on monitor ID + } + + log.Info("monitor param", "monitor", monitorID) + + sinceTime := time.Unix(since, 0) + if since > 0 { + log.Warn("monitor data requested with since parameter, not supported", "since", sinceTime) + } + + ls, err := logscores.GetHistory(ctx, srv.db, server.ID, monitorID, sinceTime, limit) + + return ls, err +} + +func (srv *Server) history(c echo.Context) error { + log := logger.Setup() + ctx, span := tracing.Tracer().Start(c.Request().Context(), "history") + defer span.End() + + // for errors and 404s, a shorter cache time + c.Response().Header().Set("Cache-Control", "public,max-age=300") + + mode := paramHistoryMode(c.Param("mode")) + if mode == historyModeUnknown { + return c.String(http.StatusNotFound, "invalid mode") + } + + server, err := srv.FindServer(ctx, c.Param("server")) + if err != nil { + log.Error("find server", "err", err) + return c.String(http.StatusInternalServerError, "internal error") + } + if server.ID == 0 { + return c.String(http.StatusNotFound, "server not found") + } + + history, err := srv.getHistory(ctx, c, server) + if err != nil { + log.Error("get history", "err", err) + return c.String(http.StatusInternalServerError, "internal error") + } + + if mode == historyModeLog { + + ctx, span := tracing.Tracer().Start(ctx, "history.csv") + b := bytes.NewBuffer([]byte{}) + w := csv.NewWriter(b) + + ff := func(f float64) string { + s := fmt.Sprintf("%.9f", f) + s = strings.TrimRight(s, "0") + s = strings.TrimRight(s, ".") + return s + } + + w.Write([]string{"ts_epoch", "ts", "offset", "step", "score", "monitor_id", "monitor_name", "leap", "error"}) + for _, l := range history.LogScores { + // log.Debug("csv line", "id", l.ID, "n", i) + + var offset string + if l.Offset.Valid { + offset = ff(l.Offset.Float64) + } + + step := ff(l.Step) + score := ff(l.Score) + var monName string + if l.MonitorID.Valid { + monName = history.Monitors[int(l.MonitorID.Int32)] + } + var leap string + if l.Attributes.Leap != 0 { + leap = fmt.Sprintf("%d", l.Attributes.Leap) + } + + err := w.Write([]string{ + strconv.Itoa(int(l.Ts.Unix())), + // l.Ts.Format(time.RFC3339), + l.Ts.Format("2006-01-02 15:04:05"), + offset, + step, + score, + fmt.Sprintf("%d", l.MonitorID.Int32), + monName, + leap, + l.Attributes.Error, + }) + if err != nil { + log.Warn("csv encoding error", "ls_id", l.ID, "err", err) + } + } + w.Flush() + if err := w.Error(); err != nil { + log.ErrorContext(ctx, "could not flush csv", "err", err) + span.End() + return c.String(http.StatusInternalServerError, "csv error") + } + + log.Info("entries", "count", len(history.LogScores), "out_bytes", b.Len()) + + span.End() + return c.Blob(http.StatusOK, "text/csv", b.Bytes()) + + } + + return c.JSON(http.StatusOK, history) +} diff --git a/server/server.go b/server/server.go index d84a684..d5f86e0 100644 --- a/server/server.go +++ b/server/server.go @@ -188,6 +188,7 @@ func (srv *Server) Run() error { e.GET("/api/usercc", srv.userCountryData) e.GET("/api/server/dns/answers/:server", srv.dnsAnswers) + e.GET("/api/server/scores/:server/:mode", srv.history) e.GET("/graph/:server/:type", srv.graphImage) // e.GET("/api/server/scores/:server/:type", srv.logScores) diff --git a/sqlc.yaml b/sqlc.yaml index 7765887..4994cbf 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -15,5 +15,7 @@ sql: rename: servers.Ip: IP overrides: + - column: log_scores.attributes + go_type: go.ntppool.org/common/types.LogScoreAttributes - column: "server_netspeed.netspeed_active" go_type: "uint64"