From eb5459abf3ff22ec10df53c1108957d85283fa95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ask=20Bj=C3=B8rn=20Hansen?= Date: Sun, 27 Jul 2025 00:37:49 -0700 Subject: [PATCH] fix(api): protocol-aware monitor filtering for multi-protocol monitors Servers with monitor filtering returned incorrect results when monitors have same names but different protocols (v4/v6). Monitor lookup now considers both name and IP version to match the correct protocol. - Add GetMonitorByNameAndIPVersion SQL query with protocol matching - Update history parameter parsing to use server IP version context - Fix both /scores/{ip}/log and Grafana endpoints - Remove unused GetMonitorByName query Fixes abh/ntppool#264 Reported-by: Anssi Johansson --- ntpdb/otel.go | 13 ++-- ntpdb/querier.go | 3 +- ntpdb/query.sql.go | 16 +++-- query.sql | 7 ++- server/grafana.go | 148 +++++++++++++++++++++++---------------------- server/history.go | 51 ++++++++++------ 6 files changed, 135 insertions(+), 103 deletions(-) diff --git a/ntpdb/otel.go b/ntpdb/otel.go index dc79bd7..732a1c8 100644 --- a/ntpdb/otel.go +++ b/ntpdb/otel.go @@ -8,7 +8,6 @@ package ntpdb import ( "context" - "database/sql" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -82,14 +81,14 @@ func (_d QuerierTxWithTracing) Commit(ctx context.Context) (err error) { return _d.QuerierTx.Commit(ctx) } -// GetMonitorByName implements QuerierTx -func (_d QuerierTxWithTracing) GetMonitorByName(ctx context.Context, tlsName sql.NullString) (m1 Monitor, err error) { - ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetMonitorByName") +// GetMonitorByNameAndIPVersion implements QuerierTx +func (_d QuerierTxWithTracing) GetMonitorByNameAndIPVersion(ctx context.Context, arg GetMonitorByNameAndIPVersionParams) (m1 Monitor, err error) { + ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetMonitorByNameAndIPVersion") defer func() { if _d._spanDecorator != nil { _d._spanDecorator(_span, map[string]interface{}{ - "ctx": ctx, - "tlsName": tlsName}, map[string]interface{}{ + "ctx": ctx, + "arg": arg}, map[string]interface{}{ "m1": m1, "err": err}) } else if err != nil { @@ -103,7 +102,7 @@ func (_d QuerierTxWithTracing) GetMonitorByName(ctx context.Context, tlsName sql _span.End() }() - return _d.QuerierTx.GetMonitorByName(ctx, tlsName) + return _d.QuerierTx.GetMonitorByNameAndIPVersion(ctx, arg) } // GetMonitorsByID implements QuerierTx diff --git a/ntpdb/querier.go b/ntpdb/querier.go index 7853c4f..b4ea234 100644 --- a/ntpdb/querier.go +++ b/ntpdb/querier.go @@ -6,11 +6,10 @@ package ntpdb import ( "context" - "database/sql" ) type Querier interface { - GetMonitorByName(ctx context.Context, tlsName sql.NullString) (Monitor, error) + GetMonitorByNameAndIPVersion(ctx context.Context, arg GetMonitorByNameAndIPVersionParams) (Monitor, error) GetMonitorsByID(ctx context.Context, monitorids []uint32) ([]Monitor, error) GetServerByID(ctx context.Context, id uint32) (Server, error) GetServerByIP(ctx context.Context, ip string) (Server, error) diff --git a/ntpdb/query.sql.go b/ntpdb/query.sql.go index 573aa3b..fe70f75 100644 --- a/ntpdb/query.sql.go +++ b/ntpdb/query.sql.go @@ -12,16 +12,24 @@ import ( "time" ) -const getMonitorByName = `-- name: GetMonitorByName :one +const getMonitorByNameAndIPVersion = `-- name: GetMonitorByNameAndIPVersion :one select id, id_token, type, user_id, account_id, hostname, location, ip, ip_version, tls_name, api_key, status, config, client_version, last_seen, last_submit, created_on, deleted_on, is_current from monitors where - tls_name like ? + tls_name like ? AND + ip_version = ? AND + is_current = 1 AND + status != 'deleted' order by id limit 1 ` -func (q *Queries) GetMonitorByName(ctx context.Context, tlsName sql.NullString) (Monitor, error) { - row := q.db.QueryRowContext(ctx, getMonitorByName, tlsName) +type GetMonitorByNameAndIPVersionParams struct { + TlsName sql.NullString `db:"tls_name" json:"tls_name"` + IpVersion NullMonitorsIpVersion `db:"ip_version" json:"ip_version"` +} + +func (q *Queries) GetMonitorByNameAndIPVersion(ctx context.Context, arg GetMonitorByNameAndIPVersionParams) (Monitor, error) { + row := q.db.QueryRowContext(ctx, getMonitorByNameAndIPVersion, arg.TlsName, arg.IpVersion) var i Monitor err := row.Scan( &i.ID, diff --git a/query.sql b/query.sql index 7813704..c9e2156 100644 --- a/query.sql +++ b/query.sql @@ -47,10 +47,13 @@ select * from servers where ip = sqlc.arg(ip); --- name: GetMonitorByName :one +-- name: GetMonitorByNameAndIPVersion :one select * from monitors where - tls_name like sqlc.arg('tls_name') + tls_name like sqlc.arg('tls_name') AND + ip_version = sqlc.arg('ip_version') AND + is_current = 1 AND + status != 'deleted' order by id limit 1; diff --git a/server/grafana.go b/server/grafana.go index 00d56ec..0a81d9d 100644 --- a/server/grafana.go +++ b/server/grafana.go @@ -42,59 +42,59 @@ type timeRangeParams struct { } // parseTimeRangeParams parses and validates time range parameters -func (srv *Server) parseTimeRangeParams(ctx context.Context, c echo.Context) (timeRangeParams, error) { +func (srv *Server) parseTimeRangeParams(ctx context.Context, c echo.Context, server ntpdb.Server) (timeRangeParams, error) { log := logger.FromContext(ctx) - + // Start with existing parameter parsing logic - baseParams, err := srv.getHistoryParameters(ctx, c) + baseParams, err := srv.getHistoryParameters(ctx, c, server) if err != nil { return timeRangeParams{}, err } - + trParams := timeRangeParams{ historyParameters: baseParams, maxDataPoints: 50000, // default } - + // Parse from timestamp (required) fromParam := c.QueryParam("from") if fromParam == "" { return timeRangeParams{}, echo.NewHTTPError(http.StatusBadRequest, "from parameter is required") } - + fromSec, err := strconv.ParseInt(fromParam, 10, 64) if err != nil { return timeRangeParams{}, echo.NewHTTPError(http.StatusBadRequest, "invalid from timestamp format") } trParams.from = time.Unix(fromSec, 0) - + // Parse to timestamp (required) toParam := c.QueryParam("to") if toParam == "" { return timeRangeParams{}, echo.NewHTTPError(http.StatusBadRequest, "to parameter is required") } - + toSec, err := strconv.ParseInt(toParam, 10, 64) if err != nil { return timeRangeParams{}, echo.NewHTTPError(http.StatusBadRequest, "invalid to timestamp format") } trParams.to = time.Unix(toSec, 0) - + // Validate time range if trParams.from.Equal(trParams.to) || trParams.from.After(trParams.to) { return timeRangeParams{}, echo.NewHTTPError(http.StatusBadRequest, "from must be before to") } - + // Check minimum range (1 second) if trParams.to.Sub(trParams.from) < time.Second { return timeRangeParams{}, echo.NewHTTPError(http.StatusBadRequest, "time range must be at least 1 second") } - + // Check maximum range (90 days) if trParams.to.Sub(trParams.from) > 90*24*time.Hour { return timeRangeParams{}, echo.NewHTTPError(http.StatusBadRequest, "time range cannot exceed 90 days") } - + // Parse maxDataPoints (optional) if maxDataPointsParam := c.QueryParam("maxDataPoints"); maxDataPointsParam != "" { maxDP, err := strconv.Atoi(maxDataPointsParam) @@ -108,10 +108,10 @@ func (srv *Server) parseTimeRangeParams(ctx context.Context, c echo.Context) (ti trParams.maxDataPoints = maxDP } } - + // Parse interval (optional, for future downsampling) trParams.interval = c.QueryParam("interval") - + log.DebugContext(ctx, "parsed time range params", "from", trParams.from, "to", trParams.to, @@ -119,7 +119,7 @@ func (srv *Server) parseTimeRangeParams(ctx context.Context, c echo.Context) (ti "interval", trParams.interval, "monitor", trParams.monitorID, ) - + return trParams, nil } @@ -137,7 +137,7 @@ func transformToGrafanaTableFormat(history *logscores.LogScoreHistory, monitors // Group data by monitor_id (one series per monitor) monitorData := make(map[int][]ntpdb.LogScore) monitorInfo := make(map[int]ntpdb.Monitor) - + // Group log scores by monitor ID skippedInvalidMonitors := 0 for _, ls := range history.LogScores { @@ -148,7 +148,7 @@ func transformToGrafanaTableFormat(history *logscores.LogScoreHistory, monitors monitorID := int(ls.MonitorID.Int32) monitorData[monitorID] = append(monitorData[monitorID], ls) } - + // Debug logging for transformation logger.Setup().Info("transformation grouping debug", "total_log_scores", len(history.LogScores), @@ -168,30 +168,30 @@ func transformToGrafanaTableFormat(history *logscores.LogScoreHistory, monitors return counts }(), ) - + // Index monitors by ID for quick lookup for _, monitor := range monitors { monitorInfo[int(monitor.ID)] = monitor } - + var response GrafanaTimeSeriesResponse - + // Create one table series per monitor - logger.Setup().Info("creating grafana series", + logger.Setup().Info("creating grafana series", "monitor_data_entries", len(monitorData), ) - + for monitorID, logScores := range monitorData { if len(logScores) == 0 { logger.Setup().Info("skipping monitor with no data", "monitor_id", monitorID) continue } - - logger.Setup().Info("processing monitor series", + + logger.Setup().Info("processing monitor series", "monitor_id", monitorID, "log_scores_count", len(logScores), ) - + // Get monitor name from history.Monitors map or from monitor info monitorName := "unknown" if name, exists := history.Monitors[monitorID]; exists && name != "" { @@ -199,20 +199,20 @@ func transformToGrafanaTableFormat(history *logscores.LogScoreHistory, monitors } else if monitor, exists := monitorInfo[monitorID]; exists { monitorName = monitor.DisplayName() } - + // Build target name and tags sanitizedName := sanitizeMonitorName(monitorName) target := "monitor{name=" + sanitizedName + "}" - + tags := map[string]string{ "monitor_id": strconv.Itoa(monitorID), "monitor_name": monitorName, "type": "monitor", } - + // Add status (we'll use active as default since we have data for this monitor) tags["status"] = "active" - + // Define table columns columns := []ColumnDef{ {Text: "time", Type: "time"}, @@ -220,19 +220,19 @@ func transformToGrafanaTableFormat(history *logscores.LogScoreHistory, monitors {Text: "rtt", Type: "number", Unit: "ms"}, {Text: "offset", Type: "number", Unit: "s"}, } - + // Build values array var values [][]interface{} for _, ls := range logScores { // Convert timestamp to milliseconds timestampMs := ls.Ts.Unix() * 1000 - + // Create row: [time, score, rtt, offset] row := []interface{}{ timestampMs, ls.Score, } - + // Add RTT (convert from microseconds to milliseconds, handle null) if ls.Rtt.Valid { rttMs := float64(ls.Rtt.Int32) / 1000.0 @@ -240,17 +240,17 @@ func transformToGrafanaTableFormat(history *logscores.LogScoreHistory, monitors } else { row = append(row, nil) } - + // Add offset (handle null) if ls.Offset.Valid { row = append(row, ls.Offset.Float64) } else { row = append(row, nil) } - + values = append(values, row) } - + // Create table series series := GrafanaTableSeries{ Target: target, @@ -258,21 +258,21 @@ func transformToGrafanaTableFormat(history *logscores.LogScoreHistory, monitors Columns: columns, Values: values, } - + response = append(response, series) - - logger.Setup().Info("created series for monitor", + + logger.Setup().Info("created series for monitor", "monitor_id", monitorID, "target", series.Target, "values_count", len(series.Values), ) } - - logger.Setup().Info("transformation complete", + + logger.Setup().Info("transformation complete", "final_response_count", len(response), "response_is_nil", response == nil, ) - + return response } @@ -291,18 +291,7 @@ func (srv *Server) scoresTimeRange(c echo.Context) error { return echo.NewHTTPError(http.StatusNotFound, "invalid mode - only json supported") } - // Parse and validate time range parameters - params, err := srv.parseTimeRangeParams(ctx, c) - if err != nil { - if he, ok := err.(*echo.HTTPError); ok { - return he - } - log.ErrorContext(ctx, "parse time range parameters", "err", err) - span.RecordError(err) - return echo.NewHTTPError(http.StatusInternalServerError, "internal error") - } - - // Find and validate server + // Find and validate server first server, err := srv.FindServer(ctx, c.Param("server")) if err != nil { log.ErrorContext(ctx, "find server", "err", err) @@ -321,6 +310,17 @@ func (srv *Server) scoresTimeRange(c echo.Context) error { return echo.NewHTTPError(http.StatusNotFound, "server not found") } + // Parse and validate time range parameters + params, err := srv.parseTimeRangeParams(ctx, c, server) + if err != nil { + if he, ok := err.(*echo.HTTPError); ok { + return he + } + log.ErrorContext(ctx, "parse time range parameters", "err", err) + span.RecordError(err) + return echo.NewHTTPError(http.StatusInternalServerError, "internal error") + } + // Query ClickHouse for time range data log.InfoContext(ctx, "executing clickhouse time range query", "server_id", server.ID, @@ -331,10 +331,10 @@ func (srv *Server) scoresTimeRange(c echo.Context) error { "max_data_points", params.maxDataPoints, "time_range_duration", params.to.Sub(params.from).String(), ) - + logScores, err := srv.ch.LogscoresTimeRange(ctx, int(server.ID), params.monitorID, params.from, params.to, params.maxDataPoints) if err != nil { - log.ErrorContext(ctx, "clickhouse time range query", "err", err, + log.ErrorContext(ctx, "clickhouse time range query", "err", err, "server_id", server.ID, "monitor_id", params.monitorID, "from", params.from, @@ -343,14 +343,16 @@ func (srv *Server) scoresTimeRange(c echo.Context) error { span.RecordError(err) return echo.NewHTTPError(http.StatusInternalServerError, "internal error") } - + log.InfoContext(ctx, "clickhouse query results", "server_id", server.ID, "rows_returned", len(logScores), "first_few_ids", func() []uint64 { ids := make([]uint64, 0, 3) for i, ls := range logScores { - if i >= 3 { break } + if i >= 3 { + break + } ids = append(ids, ls.ID) } return ids @@ -374,7 +376,7 @@ func (srv *Server) scoresTimeRange(c echo.Context) error { } } } - + log.InfoContext(ctx, "monitor processing", "unique_monitor_ids", monitorIDs, "monitor_count", len(monitorIDs), @@ -400,7 +402,7 @@ func (srv *Server) scoresTimeRange(c echo.Context) error { ID: lsm.ID, } monitors = append(monitors, tempMon) - + // Update monitor name in history history.Monitors[int(lsm.ID)] = tempMon.DisplayName() } @@ -413,9 +415,9 @@ func (srv *Server) scoresTimeRange(c echo.Context) error { "monitors_count", len(monitors), "history_monitors", history.Monitors, ) - + grafanaResponse := transformToGrafanaTableFormat(history, monitors) - + log.InfoContext(ctx, "grafana transformation complete", "response_series_count", len(grafanaResponse), "response_preview", func() interface{} { @@ -424,14 +426,18 @@ func (srv *Server) scoresTimeRange(c echo.Context) error { } first := grafanaResponse[0] return map[string]interface{}{ - "target": first.Target, - "tags": first.Tags, + "target": first.Target, + "tags": first.Tags, "columns_count": len(first.Columns), - "values_count": len(first.Values), + "values_count": len(first.Values), "first_few_values": func() [][]interface{} { - if len(first.Values) == 0 { return [][]interface{}{} } + if len(first.Values) == 0 { + return [][]interface{}{} + } count := 2 - if len(first.Values) < count { count = len(first.Values) } + if len(first.Values) < count { + count = len(first.Values) + } return first.Values[:count] }(), } @@ -445,7 +451,7 @@ func (srv *Server) scoresTimeRange(c echo.Context) error { c.Response().Header().Set("Access-Control-Allow-Origin", "*") c.Response().Header().Set("Content-Type", "application/json") - log.InfoContext(ctx, "time range response final", + log.InfoContext(ctx, "time range response final", "server_id", server.ID, "server_ip", server.Ip, "monitor_id", params.monitorID, @@ -466,7 +472,7 @@ func (srv *Server) testGrafanaTable(c echo.Context) error { ctx, span := tracing.Tracer().Start(c.Request().Context(), "testGrafanaTable") defer span.End() - log.InfoContext(ctx, "serving test Grafana table format", + log.InfoContext(ctx, "serving test Grafana table format", "remote_ip", c.RealIP(), "user_agent", c.Request().UserAgent(), ) @@ -520,14 +526,14 @@ func (srv *Server) testGrafanaTable(c echo.Context) error { // Add CORS header for browser testing c.Response().Header().Set("Access-Control-Allow-Origin", "*") c.Response().Header().Set("Content-Type", "application/json") - + // Set cache control similar to other endpoints c.Response().Header().Set("Cache-Control", "public,max-age=60") - log.InfoContext(ctx, "test Grafana table response sent", + log.InfoContext(ctx, "test Grafana table response sent", "series_count", len(sampleData), "response_size_approx", "~1KB", ) return c.JSON(http.StatusOK, sampleData) -} \ No newline at end of file +} diff --git a/server/history.go b/server/history.go index b56958f..baa3c04 100644 --- a/server/history.go +++ b/server/history.go @@ -69,7 +69,7 @@ type historyParameters struct { fullHistory bool } -func (srv *Server) getHistoryParameters(ctx context.Context, c echo.Context) (historyParameters, error) { +func (srv *Server) getHistoryParameters(ctx context.Context, c echo.Context, server ntpdb.Server) (historyParameters, error) { log := logger.FromContext(ctx) p := historyParameters{} @@ -94,9 +94,18 @@ func (srv *Server) getHistoryParameters(ctx context.Context, c echo.Context) (hi switch monitorParam { case "": name := "recentmedian.scores.ntp.dev" - monitor, err := q.GetMonitorByName(ctx, sql.NullString{Valid: true, String: name}) + var ipVersion ntpdb.NullMonitorsIpVersion + if server.IpVersion == ntpdb.ServersIpVersionV4 { + ipVersion = ntpdb.NullMonitorsIpVersion{MonitorsIpVersion: ntpdb.MonitorsIpVersionV4, Valid: true} + } else { + ipVersion = ntpdb.NullMonitorsIpVersion{MonitorsIpVersion: ntpdb.MonitorsIpVersionV6, Valid: true} + } + monitor, err := q.GetMonitorByNameAndIPVersion(ctx, ntpdb.GetMonitorByNameAndIPVersionParams{ + TlsName: sql.NullString{Valid: true, String: name}, + IpVersion: ipVersion, + }) if err != nil { - log.Warn("could not find monitor", "name", name, "err", err) + log.Warn("could not find monitor", "name", name, "ip_version", server.IpVersion, "err", err) } monitorID = monitor.ID case "*": @@ -113,12 +122,21 @@ func (srv *Server) getHistoryParameters(ctx context.Context, c echo.Context) (hi } monitorParam = monitorParam + ".%" - monitor, err := q.GetMonitorByName(ctx, sql.NullString{Valid: true, String: monitorParam}) + var ipVersion ntpdb.NullMonitorsIpVersion + if server.IpVersion == ntpdb.ServersIpVersionV4 { + ipVersion = ntpdb.NullMonitorsIpVersion{MonitorsIpVersion: ntpdb.MonitorsIpVersionV4, Valid: true} + } else { + ipVersion = ntpdb.NullMonitorsIpVersion{MonitorsIpVersion: ntpdb.MonitorsIpVersionV6, Valid: true} + } + monitor, err := q.GetMonitorByNameAndIPVersion(ctx, ntpdb.GetMonitorByNameAndIPVersionParams{ + TlsName: sql.NullString{Valid: true, String: monitorParam}, + IpVersion: ipVersion, + }) if err != nil { if err == sql.ErrNoRows { return p, echo.NewHTTPError(http.StatusNotFound, "monitor not found").WithInternal(err) } - log.WarnContext(ctx, "could not find monitor", "name", monitorParam, "err", err) + log.WarnContext(ctx, "could not find monitor", "name", monitorParam, "ip_version", server.IpVersion, "err", err) return p, echo.NewHTTPError(http.StatusNotFound, "monitor not found (sql)") } monitorID = monitor.ID @@ -127,7 +145,7 @@ func (srv *Server) getHistoryParameters(ctx context.Context, c echo.Context) (hi } p.monitorID = int(monitorID) - log.DebugContext(ctx, "monitor param", "monitor", monitorID) + log.DebugContext(ctx, "monitor param", "monitor", monitorID, "ip_version", server.IpVersion) since, _ := strconv.ParseInt(c.QueryParam("since"), 10, 64) // defaults to 0 so don't care if it parses if since > 0 { @@ -171,16 +189,6 @@ func (srv *Server) history(c echo.Context) error { return echo.NewHTTPError(http.StatusNotFound, "invalid mode") } - p, err := srv.getHistoryParameters(ctx, c) - if err != nil { - if he, ok := err.(*echo.HTTPError); ok { - return he - } - log.ErrorContext(ctx, "get history parameters", "err", err) - span.RecordError(err) - return echo.NewHTTPError(http.StatusInternalServerError, "internal error") - } - server, err := srv.FindServer(ctx, c.Param("server")) if err != nil { log.ErrorContext(ctx, "find server", "err", err) @@ -199,6 +207,16 @@ func (srv *Server) history(c echo.Context) error { return echo.NewHTTPError(http.StatusNotFound, "server not found") } + p, err := srv.getHistoryParameters(ctx, c, server) + if err != nil { + if he, ok := err.(*echo.HTTPError); ok { + return he + } + log.ErrorContext(ctx, "get history parameters", "err", err) + span.RecordError(err) + return echo.NewHTTPError(http.StatusInternalServerError, "internal error") + } + p.server = server var history *logscores.LogScoreHistory @@ -456,4 +474,3 @@ func setHistoryCacheControl(c echo.Context, history *logscores.LogScoreHistory) } } } -