Private
Public Access
1
0

scores: full_history option for internal clients
All checks were successful
continuous-integration/drone/push Build is passing

(somewhat inefficient, but for now rarely used ...)
This commit is contained in:
2024-01-20 23:21:21 -07:00
parent b786ed6986
commit e1398e7472
3 changed files with 59 additions and 34 deletions

View File

@@ -11,25 +11,27 @@ import (
"go.ntppool.org/data-api/ntpdb" "go.ntppool.org/data-api/ntpdb"
) )
func (d *ClickHouse) Logscores(ctx context.Context, serverID, monitorID int, since time.Time, limit int) ([]ntpdb.LogScore, error) { func (d *ClickHouse) Logscores(ctx context.Context, serverID, monitorID int, since time.Time, limit int, fullHistory bool) ([]ntpdb.LogScore, error) {
log := logger.Setup() log := logger.Setup()
ctx, span := tracing.Tracer().Start(ctx, "CH Logscores") ctx, span := tracing.Tracer().Start(ctx, "CH Logscores")
defer span.End() defer span.End()
if since.IsZero() { recentFirst := true
if since.IsZero() && !fullHistory {
log.WarnContext(ctx, "resetting since to 4 days ago")
since = time.Now().Add(4 * -24 * time.Hour) since = time.Now().Add(4 * -24 * time.Hour)
} else {
recentFirst = false
} }
args := []interface{}{serverID, since, limit} args := []interface{}{serverID}
query := `select id,monitor_id,server_id,ts, query := `select id,monitor_id,server_id,ts,
toFloat64(score),toFloat64(step),offset, toFloat64(score),toFloat64(step),offset,
rtt,leap,warning,error rtt,leap,warning,error
from log_scores from log_scores
where where
server_id = ? server_id = ?`
and ts > ?
order by ts desc
limit ?;`
if monitorID > 0 { if monitorID > 0 {
query = `select id,monitor_id,server_id,ts, query = `select id,monitor_id,server_id,ts,
@@ -38,13 +40,26 @@ func (d *ClickHouse) Logscores(ctx context.Context, serverID, monitorID int, sin
from log_scores from log_scores
where where
server_id = ? server_id = ?
and monitor_id = ? and monitor_id = ?`
and ts > ? args = []interface{}{serverID, monitorID}
order by ts desc
limit ?;`
args = []interface{}{serverID, monitorID, since, limit}
} }
if fullHistory {
query += " order by ts"
if recentFirst {
query += " desc"
}
} else {
query += " and ts > ? order by ts "
if recentFirst {
query += "desc "
}
query += "limit ?"
args = append(args, since, limit)
}
log.DebugContext(ctx, "clickhouse query", "query", query, "args", args)
rows, err := d.Scores.Query(clickhouse.Context(ctx, clickhouse.WithSpan(span.SpanContext())), rows, err := d.Scores.Query(clickhouse.Context(ctx, clickhouse.WithSpan(span.SpanContext())),
query, args..., query, args...,
) )

View File

@@ -10,6 +10,7 @@ import (
"go.ntppool.org/data-api/chdb" "go.ntppool.org/data-api/chdb"
"go.ntppool.org/data-api/ntpdb" "go.ntppool.org/data-api/ntpdb"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
) )
type LogScoreHistory struct { type LogScoreHistory struct {
@@ -18,19 +19,20 @@ type LogScoreHistory struct {
// MonitorIDs []uint32 // MonitorIDs []uint32
} }
func GetHistoryClickHouse(ctx context.Context, ch *chdb.ClickHouse, db *sql.DB, serverID, monitorID uint32, since time.Time, count int) (*LogScoreHistory, error) { func GetHistoryClickHouse(ctx context.Context, ch *chdb.ClickHouse, db *sql.DB, serverID, monitorID uint32, since time.Time, count int, fullHistory bool) (*LogScoreHistory, error) {
log := logger.FromContext(ctx) log := logger.FromContext(ctx)
ctx, span := tracing.Tracer().Start(ctx, "logscores.GetHistoryClickHouse") ctx, span := tracing.Tracer().Start(ctx, "logscores.GetHistoryClickHouse",
trace.WithAttributes(
attribute.Int("server", int(serverID)),
attribute.Int("monitor", int(monitorID)),
attribute.Bool("full_history", fullHistory),
),
)
defer span.End() defer span.End()
span.SetAttributes( log.DebugContext(ctx, "GetHistoryCH", "server", serverID, "monitor", monitorID, "since", since, "count", count, "full_history", fullHistory)
attribute.Int("server", int(serverID)),
attribute.Int("monitor", int(monitorID)),
)
log.Debug("GetHistoryCH", "server", serverID, "monitor", monitorID, "since", since, "count", count) ls, err := ch.Logscores(ctx, int(serverID), int(monitorID), since, count, fullHistory)
ls, err := ch.Logscores(ctx, int(serverID), int(monitorID), since, count)
if err != nil { if err != nil {
log.ErrorContext(ctx, "clickhouse logscores", "err", err) log.ErrorContext(ctx, "clickhouse logscores", "err", err)
@@ -47,7 +49,6 @@ func GetHistoryClickHouse(ctx context.Context, ch *chdb.ClickHouse, db *sql.DB,
return &LogScoreHistory{ return &LogScoreHistory{
LogScores: ls, LogScores: ls,
Monitors: monitors, Monitors: monitors,
// MonitorIDs: monitorIDs,
}, nil }, nil
} }

View File

@@ -9,6 +9,7 @@ import (
"fmt" "fmt"
"math" "math"
"net/http" "net/http"
"net/netip"
"os" "os"
"strconv" "strconv"
"strings" "strings"
@@ -44,10 +45,11 @@ func paramHistoryMode(s string) historyMode {
} }
type historyParameters struct { type historyParameters struct {
limit int limit int
monitorID int monitorID int
server ntpdb.Server server ntpdb.Server
since time.Time since time.Time
fullHistory bool
} }
func (srv *Server) getHistoryParameters(ctx context.Context, c echo.Context) (historyParameters, error) { func (srv *Server) getHistoryParameters(ctx context.Context, c echo.Context) (historyParameters, error) {
@@ -65,7 +67,6 @@ func (srv *Server) getHistoryParameters(ctx context.Context, c echo.Context) (hi
if limit > 10000 { if limit > 10000 {
limit = 10000 limit = 10000
} }
p.limit = limit p.limit = limit
q := ntpdb.NewWrappedQuerier(ntpdb.New(srv.db)) q := ntpdb.NewWrappedQuerier(ntpdb.New(srv.db))
@@ -113,17 +114,25 @@ func (srv *Server) getHistoryParameters(ctx context.Context, c echo.Context) (hi
if since > 0 { if since > 0 {
p.since = time.Unix(since, 0) p.since = time.Unix(since, 0)
} }
if !p.since.IsZero() {
log.Warn("monitor data requested with since parameter", "since", p.since) clientIP, err := netip.ParseAddr(c.RealIP())
if err != nil {
return p, err
}
// log.DebugContext(ctx, "client ip", "client_ip", clientIP.String())
if clientIP.IsPrivate() || clientIP.IsLoopback() { // don't allow this through the ingress or CDN
if fullParam := c.QueryParam("full_history"); len(fullParam) > 0 {
if t, _ := strconv.ParseBool(fullParam); t {
p.fullHistory = true
}
}
} }
return p, nil return p, nil
} }
func (srv *Server) getHistoryCH(ctx context.Context, c echo.Context, p historyParameters) (*logscores.LogScoreHistory, error) {
return logscores.GetHistoryClickHouse(ctx, srv.ch, srv.db, p.server.ID, uint32(p.monitorID), p.since, p.limit)
}
func (srv *Server) getHistoryMySQL(ctx context.Context, c echo.Context, p historyParameters) (*logscores.LogScoreHistory, error) { func (srv *Server) getHistoryMySQL(ctx context.Context, c echo.Context, p historyParameters) (*logscores.LogScoreHistory, error) {
ls, err := logscores.GetHistoryMySQL(ctx, srv.db, p.server.ID, uint32(p.monitorID), p.since, p.limit) ls, err := logscores.GetHistoryMySQL(ctx, srv.db, p.server.ID, uint32(p.monitorID), p.since, p.limit)
return ls, err return ls, err
@@ -179,7 +188,7 @@ func (srv *Server) history(c echo.Context) error {
if sourceParam == "m" { if sourceParam == "m" {
history, err = srv.getHistoryMySQL(ctx, c, p) history, err = srv.getHistoryMySQL(ctx, c, p)
} else { } else {
history, err = srv.getHistoryCH(ctx, c, p) history, err = logscores.GetHistoryClickHouse(ctx, srv.ch, srv.db, p.server.ID, uint32(p.monitorID), p.since, p.limit, p.fullHistory)
} }
if err != nil { if err != nil {
var httpError *echo.HTTPError var httpError *echo.HTTPError