Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 675e993353 | |||
| e1398e7472 | |||
| b786ed6986 | |||
| 2f2a407409 |
@@ -11,25 +11,27 @@ import (
|
|||||||
"go.ntppool.org/data-api/ntpdb"
|
"go.ntppool.org/data-api/ntpdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (d *ClickHouse) Logscores(ctx context.Context, serverID, monitorID int, since time.Time, limit int) ([]ntpdb.LogScore, error) {
|
func (d *ClickHouse) Logscores(ctx context.Context, serverID, monitorID int, since time.Time, limit int, fullHistory bool) ([]ntpdb.LogScore, error) {
|
||||||
log := logger.Setup()
|
log := logger.Setup()
|
||||||
ctx, span := tracing.Tracer().Start(ctx, "CH Logscores")
|
ctx, span := tracing.Tracer().Start(ctx, "CH Logscores")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
if since.IsZero() {
|
recentFirst := true
|
||||||
|
|
||||||
|
if since.IsZero() && !fullHistory {
|
||||||
|
log.WarnContext(ctx, "resetting since to 4 days ago")
|
||||||
since = time.Now().Add(4 * -24 * time.Hour)
|
since = time.Now().Add(4 * -24 * time.Hour)
|
||||||
|
} else {
|
||||||
|
recentFirst = false
|
||||||
}
|
}
|
||||||
|
|
||||||
args := []interface{}{serverID, since, limit}
|
args := []interface{}{serverID}
|
||||||
query := `select id,monitor_id,server_id,ts,
|
query := `select id,monitor_id,server_id,ts,
|
||||||
toFloat64(score),toFloat64(step),offset,
|
toFloat64(score),toFloat64(step),offset,
|
||||||
rtt,leap,warning,error
|
rtt,leap,warning,error
|
||||||
from log_scores
|
from log_scores
|
||||||
where
|
where
|
||||||
server_id = ?
|
server_id = ?`
|
||||||
and ts > ?
|
|
||||||
order by ts desc
|
|
||||||
limit ?;`
|
|
||||||
|
|
||||||
if monitorID > 0 {
|
if monitorID > 0 {
|
||||||
query = `select id,monitor_id,server_id,ts,
|
query = `select id,monitor_id,server_id,ts,
|
||||||
@@ -38,14 +40,30 @@ func (d *ClickHouse) Logscores(ctx context.Context, serverID, monitorID int, sin
|
|||||||
from log_scores
|
from log_scores
|
||||||
where
|
where
|
||||||
server_id = ?
|
server_id = ?
|
||||||
and monitor_id = ?
|
and monitor_id = ?`
|
||||||
and ts > ?
|
args = []interface{}{serverID, monitorID}
|
||||||
order by ts desc
|
|
||||||
limit ?;`
|
|
||||||
args = []interface{}{serverID, monitorID, since, limit}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
rows, err := d.Scores.Query(clickhouse.Context(ctx, clickhouse.WithSpan(span.SpanContext())),
|
if fullHistory {
|
||||||
|
query += " order by ts"
|
||||||
|
if recentFirst {
|
||||||
|
query += " desc"
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
query += " and ts > ? order by ts "
|
||||||
|
if recentFirst {
|
||||||
|
query += "desc "
|
||||||
|
}
|
||||||
|
query += "limit ?"
|
||||||
|
args = append(args, since, limit)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.DebugContext(ctx, "clickhouse query", "query", query, "args", args)
|
||||||
|
|
||||||
|
rows, err := d.Scores.Query(
|
||||||
|
clickhouse.Context(
|
||||||
|
ctx, clickhouse.WithSpan(span.SpanContext()),
|
||||||
|
),
|
||||||
query, args...,
|
query, args...,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -59,6 +77,8 @@ func (d *ClickHouse) Logscores(ctx context.Context, serverID, monitorID int, sin
|
|||||||
|
|
||||||
row := ntpdb.LogScore{}
|
row := ntpdb.LogScore{}
|
||||||
|
|
||||||
|
var leap uint8
|
||||||
|
|
||||||
if err := rows.Scan(
|
if err := rows.Scan(
|
||||||
&row.ID,
|
&row.ID,
|
||||||
&row.MonitorID,
|
&row.MonitorID,
|
||||||
@@ -68,7 +88,7 @@ func (d *ClickHouse) Logscores(ctx context.Context, serverID, monitorID int, sin
|
|||||||
&row.Step,
|
&row.Step,
|
||||||
&row.Offset,
|
&row.Offset,
|
||||||
&row.Rtt,
|
&row.Rtt,
|
||||||
&row.Attributes.Leap,
|
&leap,
|
||||||
&row.Attributes.Warning,
|
&row.Attributes.Warning,
|
||||||
&row.Attributes.Error,
|
&row.Attributes.Error,
|
||||||
); err != nil {
|
); err != nil {
|
||||||
@@ -76,6 +96,8 @@ func (d *ClickHouse) Logscores(ctx context.Context, serverID, monitorID int, sin
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
row.Attributes.Leap = int8(leap)
|
||||||
|
|
||||||
rv = append(rv, row)
|
rv = append(rv, row)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
6
go.mod
6
go.mod
@@ -8,6 +8,7 @@ require (
|
|||||||
github.com/ClickHouse/clickhouse-go/v2 v2.17.1
|
github.com/ClickHouse/clickhouse-go/v2 v2.17.1
|
||||||
github.com/go-sql-driver/mysql v1.7.1
|
github.com/go-sql-driver/mysql v1.7.1
|
||||||
github.com/hashicorp/go-retryablehttp v0.7.5
|
github.com/hashicorp/go-retryablehttp v0.7.5
|
||||||
|
github.com/labstack/echo-contrib v0.15.0
|
||||||
github.com/labstack/echo/v4 v4.11.4
|
github.com/labstack/echo/v4 v4.11.4
|
||||||
github.com/samber/slog-echo v1.12.1
|
github.com/samber/slog-echo v1.12.1
|
||||||
github.com/spf13/cobra v1.8.0
|
github.com/spf13/cobra v1.8.0
|
||||||
@@ -40,12 +41,11 @@ require (
|
|||||||
github.com/go-logr/stdr v1.2.2 // indirect
|
github.com/go-logr/stdr v1.2.2 // indirect
|
||||||
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
|
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
|
||||||
github.com/golang/protobuf v1.5.3 // indirect
|
github.com/golang/protobuf v1.5.3 // indirect
|
||||||
github.com/google/uuid v1.5.0 // indirect
|
github.com/google/uuid v1.6.0 // indirect
|
||||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
|
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
|
||||||
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
|
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
|
||||||
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
||||||
github.com/klauspost/compress v1.17.4 // indirect
|
github.com/klauspost/compress v1.17.4 // indirect
|
||||||
github.com/labstack/echo-contrib v0.15.0 // indirect
|
|
||||||
github.com/labstack/gommon v0.4.2 // indirect
|
github.com/labstack/gommon v0.4.2 // indirect
|
||||||
github.com/mattn/go-colorable v0.1.13 // indirect
|
github.com/mattn/go-colorable v0.1.13 // indirect
|
||||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||||
@@ -80,6 +80,6 @@ require (
|
|||||||
golang.org/x/time v0.5.0 // indirect
|
golang.org/x/time v0.5.0 // indirect
|
||||||
google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 // indirect
|
google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 // indirect
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
|
||||||
google.golang.org/grpc v1.60.1 // indirect
|
google.golang.org/grpc v1.61.0 // indirect
|
||||||
google.golang.org/protobuf v1.32.0 // indirect
|
google.golang.org/protobuf v1.32.0 // indirect
|
||||||
)
|
)
|
||||||
|
|||||||
4
go.sum
4
go.sum
@@ -63,6 +63,8 @@ github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
|
|||||||
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
|
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
|
||||||
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 h1:6UKoz5ujsI55KNpsJH3UwCq3T8kKbZwNZBNPuTTje8U=
|
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 h1:6UKoz5ujsI55KNpsJH3UwCq3T8kKbZwNZBNPuTTje8U=
|
||||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1/go.mod h1:YvJ2f6MplWDhfxiUC3KpyTy76kYUZA4W3pTv/wdKQ9Y=
|
github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1/go.mod h1:YvJ2f6MplWDhfxiUC3KpyTy76kYUZA4W3pTv/wdKQ9Y=
|
||||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 h1:Wqo399gCIufwto+VfwCSvsnfGpF/w5E9CNxSwbpD6No=
|
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 h1:Wqo399gCIufwto+VfwCSvsnfGpF/w5E9CNxSwbpD6No=
|
||||||
@@ -338,6 +340,8 @@ google.golang.org/grpc v1.60.0 h1:6FQAR0kM31P6MRdeluor2w2gPaS4SVNrD/DNTxrQ15k=
|
|||||||
google.golang.org/grpc v1.60.0/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM=
|
google.golang.org/grpc v1.60.0/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM=
|
||||||
google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU=
|
google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU=
|
||||||
google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM=
|
google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM=
|
||||||
|
google.golang.org/grpc v1.61.0 h1:TOvOcuXn30kRao+gfcvsebNEa5iZIiLkisYEkf7R7o0=
|
||||||
|
google.golang.org/grpc v1.61.0/go.mod h1:VUbo7IFqmF1QtCAstipjG0GIoq49KvMe9+h1jFLBNJs=
|
||||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||||
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||||
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import (
|
|||||||
"go.ntppool.org/data-api/chdb"
|
"go.ntppool.org/data-api/chdb"
|
||||||
"go.ntppool.org/data-api/ntpdb"
|
"go.ntppool.org/data-api/ntpdb"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
type LogScoreHistory struct {
|
type LogScoreHistory struct {
|
||||||
@@ -18,19 +19,20 @@ type LogScoreHistory struct {
|
|||||||
// MonitorIDs []uint32
|
// MonitorIDs []uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetHistoryClickHouse(ctx context.Context, ch *chdb.ClickHouse, db *sql.DB, serverID, monitorID uint32, since time.Time, count int) (*LogScoreHistory, error) {
|
func GetHistoryClickHouse(ctx context.Context, ch *chdb.ClickHouse, db *sql.DB, serverID, monitorID uint32, since time.Time, count int, fullHistory bool) (*LogScoreHistory, error) {
|
||||||
log := logger.FromContext(ctx)
|
log := logger.FromContext(ctx)
|
||||||
ctx, span := tracing.Tracer().Start(ctx, "logscores.GetHistoryClickHouse")
|
ctx, span := tracing.Tracer().Start(ctx, "logscores.GetHistoryClickHouse",
|
||||||
defer span.End()
|
trace.WithAttributes(
|
||||||
|
|
||||||
span.SetAttributes(
|
|
||||||
attribute.Int("server", int(serverID)),
|
attribute.Int("server", int(serverID)),
|
||||||
attribute.Int("monitor", int(monitorID)),
|
attribute.Int("monitor", int(monitorID)),
|
||||||
|
attribute.Bool("full_history", fullHistory),
|
||||||
|
),
|
||||||
)
|
)
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
log.Debug("GetHistoryCH", "server", serverID, "monitor", monitorID, "since", since, "count", count)
|
log.DebugContext(ctx, "GetHistoryCH", "server", serverID, "monitor", monitorID, "since", since, "count", count, "full_history", fullHistory)
|
||||||
|
|
||||||
ls, err := ch.Logscores(ctx, int(serverID), int(monitorID), since, count)
|
ls, err := ch.Logscores(ctx, int(serverID), int(monitorID), since, count, fullHistory)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ErrorContext(ctx, "clickhouse logscores", "err", err)
|
log.ErrorContext(ctx, "clickhouse logscores", "err", err)
|
||||||
@@ -47,7 +49,6 @@ func GetHistoryClickHouse(ctx context.Context, ch *chdb.ClickHouse, db *sql.DB,
|
|||||||
return &LogScoreHistory{
|
return &LogScoreHistory{
|
||||||
LogScores: ls,
|
LogScores: ls,
|
||||||
Monitors: monitors,
|
Monitors: monitors,
|
||||||
// MonitorIDs: monitorIDs,
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -9,6 +9,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/netip"
|
||||||
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@@ -47,6 +49,7 @@ type historyParameters struct {
|
|||||||
monitorID int
|
monitorID int
|
||||||
server ntpdb.Server
|
server ntpdb.Server
|
||||||
since time.Time
|
since time.Time
|
||||||
|
fullHistory bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (srv *Server) getHistoryParameters(ctx context.Context, c echo.Context) (historyParameters, error) {
|
func (srv *Server) getHistoryParameters(ctx context.Context, c echo.Context) (historyParameters, error) {
|
||||||
@@ -64,7 +67,6 @@ func (srv *Server) getHistoryParameters(ctx context.Context, c echo.Context) (hi
|
|||||||
if limit > 10000 {
|
if limit > 10000 {
|
||||||
limit = 10000
|
limit = 10000
|
||||||
}
|
}
|
||||||
|
|
||||||
p.limit = limit
|
p.limit = limit
|
||||||
|
|
||||||
q := ntpdb.NewWrappedQuerier(ntpdb.New(srv.db))
|
q := ntpdb.NewWrappedQuerier(ntpdb.New(srv.db))
|
||||||
@@ -112,17 +114,25 @@ func (srv *Server) getHistoryParameters(ctx context.Context, c echo.Context) (hi
|
|||||||
if since > 0 {
|
if since > 0 {
|
||||||
p.since = time.Unix(since, 0)
|
p.since = time.Unix(since, 0)
|
||||||
}
|
}
|
||||||
if !p.since.IsZero() {
|
|
||||||
log.Warn("monitor data requested with since parameter", "since", p.since)
|
clientIP, err := netip.ParseAddr(c.RealIP())
|
||||||
|
if err != nil {
|
||||||
|
return p, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// log.DebugContext(ctx, "client ip", "client_ip", clientIP.String())
|
||||||
|
|
||||||
|
if clientIP.IsPrivate() || clientIP.IsLoopback() { // don't allow this through the ingress or CDN
|
||||||
|
if fullParam := c.QueryParam("full_history"); len(fullParam) > 0 {
|
||||||
|
if t, _ := strconv.ParseBool(fullParam); t {
|
||||||
|
p.fullHistory = true
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return p, nil
|
return p, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (srv *Server) getHistoryCH(ctx context.Context, c echo.Context, p historyParameters) (*logscores.LogScoreHistory, error) {
|
|
||||||
return logscores.GetHistoryClickHouse(ctx, srv.ch, srv.db, p.server.ID, uint32(p.monitorID), p.since, p.limit)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (srv *Server) getHistoryMySQL(ctx context.Context, c echo.Context, p historyParameters) (*logscores.LogScoreHistory, error) {
|
func (srv *Server) getHistoryMySQL(ctx context.Context, c echo.Context, p historyParameters) (*logscores.LogScoreHistory, error) {
|
||||||
ls, err := logscores.GetHistoryMySQL(ctx, srv.db, p.server.ID, uint32(p.monitorID), p.since, p.limit)
|
ls, err := logscores.GetHistoryMySQL(ctx, srv.db, p.server.ID, uint32(p.monitorID), p.since, p.limit)
|
||||||
return ls, err
|
return ls, err
|
||||||
@@ -167,10 +177,18 @@ func (srv *Server) history(c echo.Context) error {
|
|||||||
|
|
||||||
var history *logscores.LogScoreHistory
|
var history *logscores.LogScoreHistory
|
||||||
|
|
||||||
if c.QueryParam("source") == "c" {
|
sourceParam := c.QueryParam("source")
|
||||||
history, err = srv.getHistoryCH(ctx, c, p)
|
switch sourceParam {
|
||||||
} else {
|
case "m":
|
||||||
|
case "c":
|
||||||
|
default:
|
||||||
|
sourceParam = os.Getenv("default_source")
|
||||||
|
}
|
||||||
|
|
||||||
|
if sourceParam == "m" {
|
||||||
history, err = srv.getHistoryMySQL(ctx, c, p)
|
history, err = srv.getHistoryMySQL(ctx, c, p)
|
||||||
|
} else {
|
||||||
|
history, err = logscores.GetHistoryClickHouse(ctx, srv.ch, srv.db, p.server.ID, uint32(p.monitorID), p.since, p.limit, p.fullHistory)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var httpError *echo.HTTPError
|
var httpError *echo.HTTPError
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import (
|
|||||||
|
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
|
"github.com/labstack/echo-contrib/echoprometheus"
|
||||||
"github.com/labstack/echo/v4"
|
"github.com/labstack/echo/v4"
|
||||||
"github.com/labstack/echo/v4/middleware"
|
"github.com/labstack/echo/v4/middleware"
|
||||||
slogecho "github.com/samber/slog-echo"
|
slogecho "github.com/samber/slog-echo"
|
||||||
@@ -123,6 +124,9 @@ func (srv *Server) Run() error {
|
|||||||
|
|
||||||
e.IPExtractor = echo.ExtractIPFromXFFHeader(trustOptions...)
|
e.IPExtractor = echo.ExtractIPFromXFFHeader(trustOptions...)
|
||||||
|
|
||||||
|
e.Use(echoprometheus.NewMiddlewareWithConfig(echoprometheus.MiddlewareConfig{
|
||||||
|
Registerer: srv.metrics.Registry(),
|
||||||
|
}))
|
||||||
e.Use(otelecho.Middleware("data-api"))
|
e.Use(otelecho.Middleware("data-api"))
|
||||||
e.Use(slogecho.NewWithConfig(log,
|
e.Use(slogecho.NewWithConfig(log,
|
||||||
slogecho.Config{
|
slogecho.Config{
|
||||||
|
|||||||
Reference in New Issue
Block a user