308 lines
7.6 KiB
Go
308 lines
7.6 KiB
Go
package server
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"database/sql"
|
|
"encoding/csv"
|
|
"fmt"
|
|
"math"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/labstack/echo/v4"
|
|
"go.ntppool.org/common/logger"
|
|
"go.ntppool.org/common/tracing"
|
|
"go.ntppool.org/data-api/logscores"
|
|
"go.ntppool.org/data-api/ntpdb"
|
|
)
|
|
|
|
type historyMode uint8
|
|
|
|
const (
|
|
historyModeUnknown historyMode = iota
|
|
historyModeLog
|
|
historyModeJSON
|
|
historyModeMonitor
|
|
)
|
|
|
|
func paramHistoryMode(s string) historyMode {
|
|
switch s {
|
|
case "log":
|
|
return historyModeLog
|
|
case "json":
|
|
return historyModeJSON
|
|
case "monitor":
|
|
return historyModeMonitor
|
|
default:
|
|
return historyModeUnknown
|
|
}
|
|
}
|
|
|
|
func (srv *Server) getHistory(ctx context.Context, c echo.Context, server ntpdb.Server) (*logscores.LogScoreHistory, error) {
|
|
log := logger.Setup()
|
|
|
|
limit := 0
|
|
if limitParam, err := strconv.Atoi(c.QueryParam("limit")); err == nil {
|
|
limit = limitParam
|
|
} else {
|
|
limit = 50
|
|
}
|
|
if limit > 4000 {
|
|
limit = 4000
|
|
}
|
|
|
|
since, _ := strconv.ParseInt(c.QueryParam("since"), 10, 64) // defaults to 0 so don't care if it parses
|
|
|
|
monitorParam := c.QueryParam("monitor")
|
|
|
|
if since > 0 {
|
|
c.Request().Header.Set("Cache-Control", "s-maxage=300")
|
|
}
|
|
|
|
q := ntpdb.NewWrappedQuerier(ntpdb.New(srv.db))
|
|
|
|
var monitorID uint32 = 0
|
|
switch monitorParam {
|
|
case "":
|
|
name := "recentmedian.scores.ntp.dev"
|
|
monitor, err := q.GetMonitorByName(ctx, sql.NullString{Valid: true, String: name})
|
|
if err != nil {
|
|
log.Warn("could not find monitor", "name", name, "err", err)
|
|
}
|
|
monitorID = monitor.ID
|
|
case "*":
|
|
monitorID = 0 // don't filter on monitor ID
|
|
default:
|
|
mID, err := strconv.ParseUint(monitorParam, 10, 32)
|
|
if err != nil {
|
|
log.InfoContext(ctx, "invalid monitor parameter", "monitor", monitorParam)
|
|
return nil, nil
|
|
}
|
|
monitorID = uint32(mID)
|
|
}
|
|
|
|
log.Info("monitor param", "monitor", monitorID)
|
|
|
|
sinceTime := time.Unix(since, 0)
|
|
if since > 0 {
|
|
log.Warn("monitor data requested with since parameter, not supported", "since", sinceTime)
|
|
}
|
|
|
|
ls, err := logscores.GetHistory(ctx, srv.db, server.ID, monitorID, sinceTime, limit)
|
|
|
|
return ls, err
|
|
}
|
|
|
|
func (srv *Server) history(c echo.Context) error {
|
|
log := logger.Setup()
|
|
ctx, span := tracing.Tracer().Start(c.Request().Context(), "history")
|
|
defer span.End()
|
|
|
|
// just cache for a short time by default
|
|
c.Response().Header().Set("Cache-Control", "public,max-age=240")
|
|
|
|
mode := paramHistoryMode(c.Param("mode"))
|
|
if mode == historyModeUnknown {
|
|
return c.String(http.StatusNotFound, "invalid mode")
|
|
}
|
|
|
|
server, err := srv.FindServer(ctx, c.Param("server"))
|
|
if err != nil {
|
|
log.Error("find server", "err", err)
|
|
span.RecordError(err)
|
|
return c.String(http.StatusInternalServerError, "internal error")
|
|
}
|
|
if server.DeletionAge(30 * 24 * time.Hour) {
|
|
span.AddEvent("server deleted")
|
|
return c.String(http.StatusNotFound, "server not found")
|
|
}
|
|
if server.ID == 0 {
|
|
span.AddEvent("server not found")
|
|
return c.String(http.StatusNotFound, "server not found")
|
|
}
|
|
|
|
history, err := srv.getHistory(ctx, c, server)
|
|
if err != nil {
|
|
log.Error("get history", "err", err)
|
|
span.RecordError(err)
|
|
return c.String(http.StatusInternalServerError, "internal error")
|
|
}
|
|
|
|
c.Response().Header().Set("Access-Control-Allow-Origin", "*")
|
|
|
|
switch mode {
|
|
case historyModeLog:
|
|
return srv.historyCSV(ctx, c, history)
|
|
case historyModeJSON:
|
|
return srv.historyJSON(ctx, c, server, history)
|
|
default:
|
|
return c.String(http.StatusNotFound, "not implemented")
|
|
}
|
|
|
|
}
|
|
|
|
func (srv *Server) historyJSON(ctx context.Context, c echo.Context, server ntpdb.Server, history *logscores.LogScoreHistory) error {
|
|
log := logger.Setup()
|
|
ctx, span := tracing.Tracer().Start(ctx, "history.json")
|
|
defer span.End()
|
|
|
|
type ScoresEntry struct {
|
|
TS int64 `json:"ts"`
|
|
Offset *float64 `json:"offset,omitempty"`
|
|
Step float64 `json:"step"`
|
|
Score float64 `json:"score"`
|
|
MonitorID int `json:"monitor_id"`
|
|
}
|
|
|
|
type MonitorEntry struct {
|
|
ID uint32 `json:"id"`
|
|
Name string `json:"name"`
|
|
Type string `json:"type"`
|
|
Ts string `json:"ts"`
|
|
Score float64 `json:"score"`
|
|
Status string `json:"status"`
|
|
}
|
|
res := struct {
|
|
History []ScoresEntry `json:"history"`
|
|
Monitors []MonitorEntry `json:"monitors"`
|
|
Server struct {
|
|
IP string `json:"ip"`
|
|
} `json:"server"`
|
|
}{
|
|
History: make([]ScoresEntry, len(history.LogScores)),
|
|
}
|
|
res.Server.IP = server.Ip
|
|
|
|
// log.InfoContext(ctx, "monitor id list", "ids", history.MonitorIDs)
|
|
|
|
q := ntpdb.NewWrappedQuerier(ntpdb.New(srv.db))
|
|
logScoreMonitors, err := q.GetServerScores(ctx,
|
|
ntpdb.GetServerScoresParams{
|
|
MonitorIDs: history.MonitorIDs,
|
|
ServerID: server.ID,
|
|
},
|
|
)
|
|
if err != nil {
|
|
span.RecordError(err)
|
|
log.ErrorContext(ctx, "GetServerScores", "err", err)
|
|
return c.String(http.StatusInternalServerError, "err")
|
|
}
|
|
|
|
// log.InfoContext(ctx, "got logScoreMonitors", "count", len(logScoreMonitors))
|
|
|
|
for _, lsm := range logScoreMonitors {
|
|
score := math.Round(lsm.ScoreRaw*10) / 10 // round to one decimal
|
|
|
|
tempMon := ntpdb.Monitor{
|
|
Name: lsm.Name,
|
|
TlsName: lsm.TlsName,
|
|
Location: lsm.Location,
|
|
ID: lsm.ID,
|
|
}
|
|
name := tempMon.DisplayName()
|
|
|
|
me := MonitorEntry{
|
|
ID: lsm.ID,
|
|
Name: name,
|
|
Type: string(lsm.Type),
|
|
Ts: lsm.ScoreTs.Time.Format(time.RFC3339),
|
|
Score: score,
|
|
Status: string(lsm.Status),
|
|
}
|
|
res.Monitors = append(res.Monitors, me)
|
|
}
|
|
|
|
for i, ls := range history.LogScores {
|
|
x := float64(1000000000000)
|
|
score := math.Round(ls.Score*x) / x
|
|
res.History[i] = ScoresEntry{
|
|
TS: ls.Ts.Unix(),
|
|
MonitorID: int(ls.MonitorID.Int32),
|
|
Step: ls.Step,
|
|
Score: score,
|
|
}
|
|
if ls.Offset.Valid {
|
|
offset := ls.Offset.Float64
|
|
res.History[i].Offset = &offset
|
|
}
|
|
}
|
|
|
|
if len(history.LogScores) == 0 ||
|
|
history.LogScores[len(history.LogScores)-1].Ts.After(time.Now().Add(-8*time.Hour)) {
|
|
// cache for longer if data hasn't updated for a while
|
|
c.Request().Header.Set("Cache-Control", "s-maxage=3600,max-age=1800")
|
|
} else {
|
|
c.Request().Header.Set("Cache-Control", "s-maxage=300,max-age=240")
|
|
}
|
|
|
|
return c.JSON(http.StatusOK, res)
|
|
|
|
}
|
|
|
|
func (srv *Server) historyCSV(ctx context.Context, c echo.Context, history *logscores.LogScoreHistory) error {
|
|
log := logger.Setup()
|
|
ctx, span := tracing.Tracer().Start(ctx, "history.csv")
|
|
b := bytes.NewBuffer([]byte{})
|
|
w := csv.NewWriter(b)
|
|
|
|
ff := func(f float64) string {
|
|
s := fmt.Sprintf("%.9f", f)
|
|
s = strings.TrimRight(s, "0")
|
|
s = strings.TrimRight(s, ".")
|
|
return s
|
|
}
|
|
|
|
w.Write([]string{"ts_epoch", "ts", "offset", "step", "score", "monitor_id", "monitor_name", "leap", "error"})
|
|
for _, l := range history.LogScores {
|
|
// log.Debug("csv line", "id", l.ID, "n", i)
|
|
|
|
var offset string
|
|
if l.Offset.Valid {
|
|
offset = ff(l.Offset.Float64)
|
|
}
|
|
|
|
step := ff(l.Step)
|
|
score := ff(l.Score)
|
|
var monName string
|
|
if l.MonitorID.Valid {
|
|
monName = history.Monitors[int(l.MonitorID.Int32)]
|
|
}
|
|
var leap string
|
|
if l.Attributes.Leap != 0 {
|
|
leap = fmt.Sprintf("%d", l.Attributes.Leap)
|
|
}
|
|
|
|
err := w.Write([]string{
|
|
strconv.Itoa(int(l.Ts.Unix())),
|
|
// l.Ts.Format(time.RFC3339),
|
|
l.Ts.Format("2006-01-02 15:04:05"),
|
|
offset,
|
|
step,
|
|
score,
|
|
fmt.Sprintf("%d", l.MonitorID.Int32),
|
|
monName,
|
|
leap,
|
|
l.Attributes.Error,
|
|
})
|
|
if err != nil {
|
|
log.Warn("csv encoding error", "ls_id", l.ID, "err", err)
|
|
}
|
|
}
|
|
w.Flush()
|
|
if err := w.Error(); err != nil {
|
|
log.ErrorContext(ctx, "could not flush csv", "err", err)
|
|
span.End()
|
|
return c.String(http.StatusInternalServerError, "csv error")
|
|
}
|
|
|
|
// log.Info("entries", "count", len(history.LogScores), "out_bytes", b.Len())
|
|
|
|
c.Request().Header.Set("Cache-Control", "s-maxage=120,max-age=120")
|
|
|
|
return c.Blob(http.StatusOK, "text/csv", b.Bytes())
|
|
|
|
}
|