From f6b0f96a340fcdceda74f06c3134589edc3fcf75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ask=20Bj=C3=B8rn=20Hansen?= Date: Sun, 10 Dec 2023 21:42:15 -0800 Subject: [PATCH] scores: json handler --- logscores/history.go | 25 +++-- ntpdb/models.go | 43 ++++++++ ntpdb/otel.go | 31 +++++- ntpdb/querier.go | 3 +- ntpdb/query.sql.go | 83 +++++++++++++-- query.sql | 13 ++- server/history.go | 245 ++++++++++++++++++++++++++++++++----------- 7 files changed, 358 insertions(+), 85 deletions(-) diff --git a/logscores/history.go b/logscores/history.go index 39a7c7c..b34662b 100644 --- a/logscores/history.go +++ b/logscores/history.go @@ -6,21 +6,31 @@ import ( "time" "go.ntppool.org/common/logger" + "go.ntppool.org/common/tracing" "go.ntppool.org/data-api/ntpdb" + "go.opentelemetry.io/otel/attribute" ) type LogScoreHistory struct { - LogScores []ntpdb.LogScore - Monitors map[int]string + LogScores []ntpdb.LogScore + Monitors map[int]string + MonitorIDs []uint32 } func GetHistory(ctx context.Context, db *sql.DB, serverID, monitorID uint32, since time.Time, count int) (*LogScoreHistory, error) { log := logger.Setup() + ctx, span := tracing.Tracer().Start(ctx, "logscores.GetHistory") + defer span.End() if count == 0 { count = 200 } + span.SetAttributes( + attribute.Int("server", int(serverID)), + attribute.Int("monitor", int(monitorID)), + ) + log.Debug("GetHistory", "server", serverID, "monitor", monitorID, "since", since, "count", count) q := ntpdb.NewWrappedQuerier(ntpdb.New(db)) @@ -65,13 +75,8 @@ func GetHistory(ctx context.Context, db *sql.DB, serverID, monitorID uint32, sin } return &LogScoreHistory{ - LogScores: ls, - Monitors: monitors, + LogScores: ls, + Monitors: monitors, + MonitorIDs: monitorIDs, }, nil } - -/* - - - - */ diff --git a/ntpdb/models.go b/ntpdb/models.go index d292248..cfe3bfa 100644 --- a/ntpdb/models.go +++ b/ntpdb/models.go @@ -142,6 +142,49 @@ func (ns NullMonitorsType) Value() (driver.Value, error) { return string(ns.MonitorsType), nil } +type ServerScoresStatus string + +const ( + ServerScoresStatusNew ServerScoresStatus = "new" + ServerScoresStatusTesting ServerScoresStatus = "testing" + ServerScoresStatusActive ServerScoresStatus = "active" +) + +func (e *ServerScoresStatus) Scan(src interface{}) error { + switch s := src.(type) { + case []byte: + *e = ServerScoresStatus(s) + case string: + *e = ServerScoresStatus(s) + default: + return fmt.Errorf("unsupported scan type for ServerScoresStatus: %T", src) + } + return nil +} + +type NullServerScoresStatus struct { + ServerScoresStatus ServerScoresStatus `json:"server_scores_status"` + Valid bool `json:"valid"` // Valid is true if ServerScoresStatus is not NULL +} + +// Scan implements the Scanner interface. +func (ns *NullServerScoresStatus) Scan(value interface{}) error { + if value == nil { + ns.ServerScoresStatus, ns.Valid = "", false + return nil + } + ns.Valid = true + return ns.ServerScoresStatus.Scan(value) +} + +// Value implements the driver Valuer interface. +func (ns NullServerScoresStatus) Value() (driver.Value, error) { + if !ns.Valid { + return nil, nil + } + return string(ns.ServerScoresStatus), nil +} + type ServersIpVersion string const ( diff --git a/ntpdb/otel.go b/ntpdb/otel.go index dda5c2e..b19f5eb 100644 --- a/ntpdb/otel.go +++ b/ntpdb/otel.go @@ -103,13 +103,13 @@ func (_d QuerierTxWithTracing) GetMonitorByName(ctx context.Context, tlsName sql } // GetMonitorsByID implements QuerierTx -func (_d QuerierTxWithTracing) GetMonitorsByID(ctx context.Context, ids []uint32) (ma1 []Monitor, err error) { +func (_d QuerierTxWithTracing) GetMonitorsByID(ctx context.Context, monitorids []uint32) (ma1 []Monitor, err error) { ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetMonitorsByID") defer func() { if _d._spanDecorator != nil { _d._spanDecorator(_span, map[string]interface{}{ - "ctx": ctx, - "ids": ids}, map[string]interface{}{ + "ctx": ctx, + "monitorids": monitorids}, map[string]interface{}{ "ma1": ma1, "err": err}) } else if err != nil { @@ -122,7 +122,7 @@ func (_d QuerierTxWithTracing) GetMonitorsByID(ctx context.Context, ids []uint32 _span.End() }() - return _d.QuerierTx.GetMonitorsByID(ctx, ids) + return _d.QuerierTx.GetMonitorsByID(ctx, monitorids) } // GetServerByID implements QuerierTx @@ -240,6 +240,29 @@ func (_d QuerierTxWithTracing) GetServerNetspeed(ctx context.Context, ip string) return _d.QuerierTx.GetServerNetspeed(ctx, ip) } +// GetServerScores implements QuerierTx +func (_d QuerierTxWithTracing) GetServerScores(ctx context.Context, arg GetServerScoresParams) (ga1 []GetServerScoresRow, err error) { + ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetServerScores") + defer func() { + if _d._spanDecorator != nil { + _d._spanDecorator(_span, map[string]interface{}{ + "ctx": ctx, + "arg": arg}, map[string]interface{}{ + "ga1": ga1, + "err": err}) + } else if err != nil { + _span.RecordError(err) + _span.SetAttributes( + attribute.String("event", "error"), + attribute.String("message", err.Error()), + ) + } + + _span.End() + }() + return _d.QuerierTx.GetServerScores(ctx, arg) +} + // GetZoneStatsData implements QuerierTx func (_d QuerierTxWithTracing) GetZoneStatsData(ctx context.Context) (ga1 []GetZoneStatsDataRow, err error) { ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetZoneStatsData") diff --git a/ntpdb/querier.go b/ntpdb/querier.go index 2e14f77..eef4c94 100644 --- a/ntpdb/querier.go +++ b/ntpdb/querier.go @@ -11,12 +11,13 @@ import ( type Querier interface { GetMonitorByName(ctx context.Context, tlsName sql.NullString) (Monitor, error) - GetMonitorsByID(ctx context.Context, ids []uint32) ([]Monitor, error) + GetMonitorsByID(ctx context.Context, monitorids []uint32) ([]Monitor, error) GetServerByID(ctx context.Context, id uint32) (Server, error) GetServerByIP(ctx context.Context, ip string) (Server, error) GetServerLogScores(ctx context.Context, arg GetServerLogScoresParams) ([]LogScore, error) GetServerLogScoresByMonitorID(ctx context.Context, arg GetServerLogScoresByMonitorIDParams) ([]LogScore, error) GetServerNetspeed(ctx context.Context, ip string) (uint32, error) + GetServerScores(ctx context.Context, arg GetServerScoresParams) ([]GetServerScoresRow, error) GetZoneStatsData(ctx context.Context) ([]GetZoneStatsDataRow, error) GetZoneStatsV2(ctx context.Context, ip string) ([]GetZoneStatsV2Row, error) } diff --git a/ntpdb/query.sql.go b/ntpdb/query.sql.go index da85835..f5407eb 100644 --- a/ntpdb/query.sql.go +++ b/ntpdb/query.sql.go @@ -42,19 +42,19 @@ func (q *Queries) GetMonitorByName(ctx context.Context, tlsName sql.NullString) const getMonitorsByID = `-- name: GetMonitorsByID :many select id, type, user_id, account_id, name, location, ip, ip_version, tls_name, api_key, status, config, client_version, last_seen, last_submit, created_on from monitors -where id in (/*SLICE:ids*/?) +where id in (/*SLICE:MonitorIDs*/?) ` -func (q *Queries) GetMonitorsByID(ctx context.Context, ids []uint32) ([]Monitor, error) { +func (q *Queries) GetMonitorsByID(ctx context.Context, monitorids []uint32) ([]Monitor, error) { query := getMonitorsByID var queryParams []interface{} - if len(ids) > 0 { - for _, v := range ids { + if len(monitorids) > 0 { + for _, v := range monitorids { queryParams = append(queryParams, v) } - query = strings.Replace(query, "/*SLICE:ids*/?", strings.Repeat(",?", len(ids))[1:], 1) + query = strings.Replace(query, "/*SLICE:MonitorIDs*/?", strings.Repeat(",?", len(monitorids))[1:], 1) } else { - query = strings.Replace(query, "/*SLICE:ids*/?", "NULL", 1) + query = strings.Replace(query, "/*SLICE:MonitorIDs*/?", "NULL", 1) } rows, err := q.db.QueryContext(ctx, query, queryParams...) if err != nil { @@ -258,6 +258,77 @@ func (q *Queries) GetServerNetspeed(ctx context.Context, ip string) (uint32, err return netspeed, err } +const getServerScores = `-- name: GetServerScores :many +select + m.id, m.name, m.tls_name, m.location, m.type, + ss.score_raw, ss.score_ts, ss.status + from server_scores ss + inner join monitors m + on (m.id=ss.monitor_id) +where + server_id = ? AND + monitor_id in (/*SLICE:MonitorIDs*/?) +` + +type GetServerScoresParams struct { + ServerID uint32 `db:"server_id" json:"server_id"` + MonitorIDs []uint32 `db:"MonitorIDs" json:"MonitorIDs"` +} + +type GetServerScoresRow struct { + ID uint32 `db:"id" json:"id"` + Name string `db:"name" json:"name"` + TlsName sql.NullString `db:"tls_name" json:"tls_name"` + Location string `db:"location" json:"location"` + Type MonitorsType `db:"type" json:"type"` + ScoreRaw float64 `db:"score_raw" json:"score_raw"` + ScoreTs sql.NullTime `db:"score_ts" json:"score_ts"` + Status ServerScoresStatus `db:"status" json:"status"` +} + +func (q *Queries) GetServerScores(ctx context.Context, arg GetServerScoresParams) ([]GetServerScoresRow, error) { + query := getServerScores + var queryParams []interface{} + queryParams = append(queryParams, arg.ServerID) + if len(arg.MonitorIDs) > 0 { + for _, v := range arg.MonitorIDs { + queryParams = append(queryParams, v) + } + query = strings.Replace(query, "/*SLICE:MonitorIDs*/?", strings.Repeat(",?", len(arg.MonitorIDs))[1:], 1) + } else { + query = strings.Replace(query, "/*SLICE:MonitorIDs*/?", "NULL", 1) + } + rows, err := q.db.QueryContext(ctx, query, queryParams...) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetServerScoresRow + for rows.Next() { + var i GetServerScoresRow + if err := rows.Scan( + &i.ID, + &i.Name, + &i.TlsName, + &i.Location, + &i.Type, + &i.ScoreRaw, + &i.ScoreTs, + &i.Status, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const getZoneStatsData = `-- name: GetZoneStatsData :many SELECT zc.date, z.name, zc.ip_version, count_active, count_registered, netspeed_active FROM zone_server_counts zc USE INDEX (date_idx) diff --git a/query.sql b/query.sql index 2b9add3..6da64e5 100644 --- a/query.sql +++ b/query.sql @@ -52,7 +52,18 @@ select * from monitors where tls_name = ?; -- name: GetMonitorsByID :many select * from monitors -where id in (sqlc.slice('ids')); +where id in (sqlc.slice('MonitorIDs')); + +-- name: GetServerScores :many +select + m.id, m.name, m.tls_name, m.location, m.type, + ss.score_raw, ss.score_ts, ss.status + from server_scores ss + inner join monitors m + on (m.id=ss.monitor_id) +where + server_id = ? AND + monitor_id in (sqlc.slice('MonitorIDs')); -- name: GetServerLogScores :many select * from log_scores diff --git a/server/history.go b/server/history.go index 8fabeea..4a28fe9 100644 --- a/server/history.go +++ b/server/history.go @@ -6,6 +6,7 @@ import ( "database/sql" "encoding/csv" "fmt" + "math" "net/http" "strconv" "strings" @@ -74,6 +75,13 @@ func (srv *Server) getHistory(ctx context.Context, c echo.Context, server ntpdb. monitorID = monitor.ID case "*": monitorID = 0 // don't filter on monitor ID + default: + mID, err := strconv.ParseUint(monitorParam, 10, 32) + if err != nil { + log.InfoContext(ctx, "invalid monitor parameter", "monitor", monitorParam) + return nil, nil + } + monitorID = uint32(mID) } log.Info("monitor param", "monitor", monitorID) @@ -93,8 +101,8 @@ func (srv *Server) history(c echo.Context) error { ctx, span := tracing.Tracer().Start(c.Request().Context(), "history") defer span.End() - // for errors and 404s, a shorter cache time - c.Response().Header().Set("Cache-Control", "public,max-age=300") + // just cache for a short time by default + c.Response().Header().Set("Cache-Control", "public,max-age=240") mode := paramHistoryMode(c.Param("mode")) if mode == historyModeUnknown { @@ -104,80 +112,191 @@ func (srv *Server) history(c echo.Context) error { server, err := srv.FindServer(ctx, c.Param("server")) if err != nil { log.Error("find server", "err", err) + span.RecordError(err) return c.String(http.StatusInternalServerError, "internal error") } if server.ID == 0 { + span.AddEvent("server not found") return c.String(http.StatusNotFound, "server not found") } history, err := srv.getHistory(ctx, c, server) if err != nil { log.Error("get history", "err", err) + span.RecordError(err) return c.String(http.StatusInternalServerError, "internal error") } - if mode == historyModeLog { - - ctx, span := tracing.Tracer().Start(ctx, "history.csv") - b := bytes.NewBuffer([]byte{}) - w := csv.NewWriter(b) - - ff := func(f float64) string { - s := fmt.Sprintf("%.9f", f) - s = strings.TrimRight(s, "0") - s = strings.TrimRight(s, ".") - return s - } - - w.Write([]string{"ts_epoch", "ts", "offset", "step", "score", "monitor_id", "monitor_name", "leap", "error"}) - for _, l := range history.LogScores { - // log.Debug("csv line", "id", l.ID, "n", i) - - var offset string - if l.Offset.Valid { - offset = ff(l.Offset.Float64) - } - - step := ff(l.Step) - score := ff(l.Score) - var monName string - if l.MonitorID.Valid { - monName = history.Monitors[int(l.MonitorID.Int32)] - } - var leap string - if l.Attributes.Leap != 0 { - leap = fmt.Sprintf("%d", l.Attributes.Leap) - } - - err := w.Write([]string{ - strconv.Itoa(int(l.Ts.Unix())), - // l.Ts.Format(time.RFC3339), - l.Ts.Format("2006-01-02 15:04:05"), - offset, - step, - score, - fmt.Sprintf("%d", l.MonitorID.Int32), - monName, - leap, - l.Attributes.Error, - }) - if err != nil { - log.Warn("csv encoding error", "ls_id", l.ID, "err", err) - } - } - w.Flush() - if err := w.Error(); err != nil { - log.ErrorContext(ctx, "could not flush csv", "err", err) - span.End() - return c.String(http.StatusInternalServerError, "csv error") - } - - log.Info("entries", "count", len(history.LogScores), "out_bytes", b.Len()) - - span.End() - return c.Blob(http.StatusOK, "text/csv", b.Bytes()) + c.Response().Header().Set("Access-Control-Allow-Origin", "*") + switch mode { + case historyModeLog: + return srv.historyCSV(ctx, c, history) + case historyModeJSON: + return srv.historyJSON(ctx, c, server, history) + default: + return c.String(http.StatusNotFound, "not implemented") } - return c.JSON(http.StatusOK, history) +} + +func (srv *Server) historyJSON(ctx context.Context, c echo.Context, server ntpdb.Server, history *logscores.LogScoreHistory) error { + log := logger.Setup() + ctx, span := tracing.Tracer().Start(ctx, "history.json") + defer span.End() + + type ScoresEntry struct { + TS int64 `json:"ts"` + Offset *float64 `json:"offset,omitempty"` + Step float64 `json:"step"` + Score float64 `json:"score"` + MonitorID int `json:"monitor_id"` + } + + type MonitorEntry struct { + ID uint32 `json:"id"` + Name string `json:"name"` + Type string `json:"type"` + Ts string `json:"ts"` + Score float64 `json:"score"` + Status string `json:"status"` + } + res := struct { + History []ScoresEntry `json:"history"` + Monitors []MonitorEntry `json:"monitors"` + Server struct { + IP string `json:"ip"` + } `json:"server"` + }{ + History: make([]ScoresEntry, len(history.LogScores)), + } + res.Server.IP = server.Ip + + // log.InfoContext(ctx, "monitor id list", "ids", history.MonitorIDs) + + q := ntpdb.NewWrappedQuerier(ntpdb.New(srv.db)) + logScoreMonitors, err := q.GetServerScores(ctx, + ntpdb.GetServerScoresParams{ + MonitorIDs: history.MonitorIDs, + ServerID: server.ID, + }, + ) + if err != nil { + span.RecordError(err) + log.ErrorContext(ctx, "GetServerScores", "err", err) + return c.String(http.StatusInternalServerError, "err") + } + + // log.InfoContext(ctx, "got logScoreMonitors", "count", len(logScoreMonitors)) + + for _, lsm := range logScoreMonitors { + score := math.Round(lsm.ScoreRaw*10) / 10 // round to one decimal + + tempMon := ntpdb.Monitor{ + Name: lsm.Name, + TlsName: lsm.TlsName, + Location: lsm.Location, + ID: lsm.ID, + } + name := tempMon.DisplayName() + + me := MonitorEntry{ + ID: lsm.ID, + Name: name, + Type: string(lsm.Type), + Ts: lsm.ScoreTs.Time.Format(time.RFC3339), + Score: score, + Status: string(lsm.Status), + } + res.Monitors = append(res.Monitors, me) + } + + for i, ls := range history.LogScores { + x := float64(1000000000000) + score := math.Round(ls.Score*x) / x + res.History[i] = ScoresEntry{ + TS: ls.Ts.Unix(), + MonitorID: int(ls.MonitorID.Int32), + Step: ls.Step, + Score: score, + } + if ls.Offset.Valid { + offset := ls.Offset.Float64 + res.History[i].Offset = &offset + } + } + + if history.LogScores[len(history.LogScores)-1].Ts.After(time.Now().Add(-8 * time.Hour)) { + // cache for longer if data hasn't updated for a while + c.Request().Header.Set("Cache-Control", "s-maxage=3600,max-age=1800") + } else { + c.Request().Header.Set("Cache-Control", "s-maxage=300,max-age=240") + } + + return c.JSON(http.StatusOK, res) + +} + +func (srv *Server) historyCSV(ctx context.Context, c echo.Context, history *logscores.LogScoreHistory) error { + log := logger.Setup() + ctx, span := tracing.Tracer().Start(ctx, "history.csv") + b := bytes.NewBuffer([]byte{}) + w := csv.NewWriter(b) + + ff := func(f float64) string { + s := fmt.Sprintf("%.9f", f) + s = strings.TrimRight(s, "0") + s = strings.TrimRight(s, ".") + return s + } + + w.Write([]string{"ts_epoch", "ts", "offset", "step", "score", "monitor_id", "monitor_name", "leap", "error"}) + for _, l := range history.LogScores { + // log.Debug("csv line", "id", l.ID, "n", i) + + var offset string + if l.Offset.Valid { + offset = ff(l.Offset.Float64) + } + + step := ff(l.Step) + score := ff(l.Score) + var monName string + if l.MonitorID.Valid { + monName = history.Monitors[int(l.MonitorID.Int32)] + } + var leap string + if l.Attributes.Leap != 0 { + leap = fmt.Sprintf("%d", l.Attributes.Leap) + } + + err := w.Write([]string{ + strconv.Itoa(int(l.Ts.Unix())), + // l.Ts.Format(time.RFC3339), + l.Ts.Format("2006-01-02 15:04:05"), + offset, + step, + score, + fmt.Sprintf("%d", l.MonitorID.Int32), + monName, + leap, + l.Attributes.Error, + }) + if err != nil { + log.Warn("csv encoding error", "ls_id", l.ID, "err", err) + } + } + w.Flush() + if err := w.Error(); err != nil { + log.ErrorContext(ctx, "could not flush csv", "err", err) + span.End() + return c.String(http.StatusInternalServerError, "csv error") + } + + // log.Info("entries", "count", len(history.LogScores), "out_bytes", b.Len()) + + c.Request().Header.Set("Cache-Control", "s-maxage=120,max-age=120") + + return c.Blob(http.StatusOK, "text/csv", b.Bytes()) + }