- Add parseRelativeTime function supporting "-3d", "-2h", "-30m" format - Update parseTimeRangeParams to handle Unix timestamps and relative times - Add unit tests with comprehensive coverage for all time formats - Document v2 API in API.md with examples and migration guide Enables intuitive time queries like from=-3d&to=-1h instead of Unix timestamps, improving developer experience for the enhanced v2 endpoint that supports 50k records vs legacy 10k limit.
590 lines
17 KiB
Go
590 lines
17 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"regexp"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/labstack/echo/v4"
|
|
"go.ntppool.org/common/logger"
|
|
"go.ntppool.org/common/tracing"
|
|
"go.ntppool.org/data-api/logscores"
|
|
"go.ntppool.org/data-api/ntpdb"
|
|
)
|
|
|
|
// ColumnDef represents a Grafana table column definition
|
|
type ColumnDef struct {
|
|
Text string `json:"text"`
|
|
Type string `json:"type"`
|
|
Unit string `json:"unit,omitempty"`
|
|
}
|
|
|
|
// GrafanaTableSeries represents a single table series in Grafana format
|
|
type GrafanaTableSeries struct {
|
|
Target string `json:"target"`
|
|
Tags map[string]string `json:"tags"`
|
|
Columns []ColumnDef `json:"columns"`
|
|
Values [][]interface{} `json:"values"`
|
|
}
|
|
|
|
// GrafanaTimeSeriesResponse represents the complete Grafana table response
|
|
type GrafanaTimeSeriesResponse []GrafanaTableSeries
|
|
|
|
// timeRangeParams extends historyParameters with time range support
|
|
type timeRangeParams struct {
|
|
historyParameters // embed existing struct
|
|
from time.Time
|
|
to time.Time
|
|
maxDataPoints int
|
|
interval string // for future downsampling
|
|
}
|
|
|
|
// parseTimeRangeParams parses and validates time range parameters
|
|
// parseRelativeTime parses relative time expressions like "-3d", "-2h", "-30m"
|
|
// Returns the absolute time relative to the provided base time (usually time.Now())
|
|
func parseRelativeTime(relativeTimeStr string, baseTime time.Time) (time.Time, error) {
|
|
if relativeTimeStr == "" {
|
|
return time.Time{}, fmt.Errorf("empty time string")
|
|
}
|
|
|
|
// Check if it's a regular Unix timestamp first
|
|
if unixTime, err := strconv.ParseInt(relativeTimeStr, 10, 64); err == nil {
|
|
return time.Unix(unixTime, 0), nil
|
|
}
|
|
|
|
// Parse relative time format like "-3d", "-2h", "-30m", "-5s"
|
|
re := regexp.MustCompile(`^(-?)(\d+)([dhms])$`)
|
|
matches := re.FindStringSubmatch(relativeTimeStr)
|
|
if len(matches) != 4 {
|
|
return time.Time{}, fmt.Errorf("invalid time format, expected Unix timestamp or relative format like '-3d', '-2h', '-30m', '-5s'")
|
|
}
|
|
|
|
sign := matches[1]
|
|
valueStr := matches[2]
|
|
unit := matches[3]
|
|
|
|
value, err := strconv.Atoi(valueStr)
|
|
if err != nil {
|
|
return time.Time{}, fmt.Errorf("invalid numeric value: %s", valueStr)
|
|
}
|
|
|
|
var duration time.Duration
|
|
switch unit {
|
|
case "s":
|
|
duration = time.Duration(value) * time.Second
|
|
case "m":
|
|
duration = time.Duration(value) * time.Minute
|
|
case "h":
|
|
duration = time.Duration(value) * time.Hour
|
|
case "d":
|
|
duration = time.Duration(value) * 24 * time.Hour
|
|
default:
|
|
return time.Time{}, fmt.Errorf("invalid time unit: %s", unit)
|
|
}
|
|
|
|
// Apply sign (negative means go back in time)
|
|
if sign == "-" {
|
|
return baseTime.Add(-duration), nil
|
|
}
|
|
return baseTime.Add(duration), nil
|
|
}
|
|
|
|
func (srv *Server) parseTimeRangeParams(ctx context.Context, c echo.Context, server ntpdb.Server) (timeRangeParams, error) {
|
|
log := logger.FromContext(ctx)
|
|
|
|
// Start with existing parameter parsing logic
|
|
baseParams, err := srv.getHistoryParameters(ctx, c, server)
|
|
if err != nil {
|
|
return timeRangeParams{}, err
|
|
}
|
|
|
|
trParams := timeRangeParams{
|
|
historyParameters: baseParams,
|
|
maxDataPoints: 50000, // default
|
|
}
|
|
|
|
// Parse from timestamp (required) - supports Unix timestamps and relative time like "-3d"
|
|
fromParam := c.QueryParam("from")
|
|
if fromParam == "" {
|
|
return timeRangeParams{}, echo.NewHTTPError(http.StatusBadRequest, "from parameter is required")
|
|
}
|
|
|
|
now := time.Now()
|
|
trParams.from, err = parseRelativeTime(fromParam, now)
|
|
if err != nil {
|
|
return timeRangeParams{}, echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("invalid from parameter: %v", err))
|
|
}
|
|
|
|
// Parse to timestamp (required) - supports Unix timestamps and relative time like "-1d"
|
|
toParam := c.QueryParam("to")
|
|
if toParam == "" {
|
|
return timeRangeParams{}, echo.NewHTTPError(http.StatusBadRequest, "to parameter is required")
|
|
}
|
|
|
|
trParams.to, err = parseRelativeTime(toParam, now)
|
|
if err != nil {
|
|
return timeRangeParams{}, echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("invalid to parameter: %v", err))
|
|
}
|
|
|
|
// Validate time range
|
|
if trParams.from.Equal(trParams.to) || trParams.from.After(trParams.to) {
|
|
return timeRangeParams{}, echo.NewHTTPError(http.StatusBadRequest, "from must be before to")
|
|
}
|
|
|
|
// Check minimum range (1 second)
|
|
if trParams.to.Sub(trParams.from) < time.Second {
|
|
return timeRangeParams{}, echo.NewHTTPError(http.StatusBadRequest, "time range must be at least 1 second")
|
|
}
|
|
|
|
// Check maximum range (90 days)
|
|
if trParams.to.Sub(trParams.from) > 90*24*time.Hour {
|
|
return timeRangeParams{}, echo.NewHTTPError(http.StatusBadRequest, "time range cannot exceed 90 days")
|
|
}
|
|
|
|
// Parse maxDataPoints (optional)
|
|
if maxDataPointsParam := c.QueryParam("maxDataPoints"); maxDataPointsParam != "" {
|
|
maxDP, err := strconv.Atoi(maxDataPointsParam)
|
|
if err != nil {
|
|
return timeRangeParams{}, echo.NewHTTPError(http.StatusBadRequest, "invalid maxDataPoints format")
|
|
}
|
|
if maxDP > 50000 {
|
|
return timeRangeParams{}, echo.NewHTTPError(http.StatusBadRequest, "maxDataPoints cannot exceed 50000")
|
|
}
|
|
if maxDP > 0 {
|
|
trParams.maxDataPoints = maxDP
|
|
}
|
|
}
|
|
|
|
// Parse interval (optional, for future downsampling)
|
|
trParams.interval = c.QueryParam("interval")
|
|
|
|
log.DebugContext(ctx, "parsed time range params",
|
|
"from", trParams.from,
|
|
"to", trParams.to,
|
|
"maxDataPoints", trParams.maxDataPoints,
|
|
"interval", trParams.interval,
|
|
"monitor", trParams.monitorID,
|
|
)
|
|
|
|
return trParams, nil
|
|
}
|
|
|
|
// sanitizeMonitorName sanitizes monitor names for Grafana target format
|
|
func sanitizeMonitorName(name string) string {
|
|
// Replace problematic characters for Grafana target format
|
|
result := strings.ReplaceAll(name, " ", "_")
|
|
result = strings.ReplaceAll(result, ".", "-")
|
|
result = strings.ReplaceAll(result, "/", "-")
|
|
return result
|
|
}
|
|
|
|
// transformToGrafanaTableFormat converts LogScoreHistory to Grafana table format
|
|
func transformToGrafanaTableFormat(history *logscores.LogScoreHistory, monitors []ntpdb.Monitor) GrafanaTimeSeriesResponse {
|
|
// Group data by monitor_id (one series per monitor)
|
|
monitorData := make(map[int][]ntpdb.LogScore)
|
|
monitorInfo := make(map[int]ntpdb.Monitor)
|
|
|
|
// Group log scores by monitor ID
|
|
skippedInvalidMonitors := 0
|
|
for _, ls := range history.LogScores {
|
|
if !ls.MonitorID.Valid {
|
|
skippedInvalidMonitors++
|
|
continue
|
|
}
|
|
monitorID := int(ls.MonitorID.Int32)
|
|
monitorData[monitorID] = append(monitorData[monitorID], ls)
|
|
}
|
|
|
|
// Debug logging for transformation
|
|
logger.Setup().Info("transformation grouping debug",
|
|
"total_log_scores", len(history.LogScores),
|
|
"skipped_invalid_monitors", skippedInvalidMonitors,
|
|
"grouped_monitor_ids", func() []int {
|
|
keys := make([]int, 0, len(monitorData))
|
|
for k := range monitorData {
|
|
keys = append(keys, k)
|
|
}
|
|
return keys
|
|
}(),
|
|
"monitor_data_counts", func() map[int]int {
|
|
counts := make(map[int]int)
|
|
for k, v := range monitorData {
|
|
counts[k] = len(v)
|
|
}
|
|
return counts
|
|
}(),
|
|
)
|
|
|
|
// Index monitors by ID for quick lookup
|
|
for _, monitor := range monitors {
|
|
monitorInfo[int(monitor.ID)] = monitor
|
|
}
|
|
|
|
var response GrafanaTimeSeriesResponse
|
|
|
|
// Create one table series per monitor
|
|
logger.Setup().Info("creating grafana series",
|
|
"monitor_data_entries", len(monitorData),
|
|
)
|
|
|
|
for monitorID, logScores := range monitorData {
|
|
if len(logScores) == 0 {
|
|
logger.Setup().Info("skipping monitor with no data", "monitor_id", monitorID)
|
|
continue
|
|
}
|
|
|
|
logger.Setup().Info("processing monitor series",
|
|
"monitor_id", monitorID,
|
|
"log_scores_count", len(logScores),
|
|
)
|
|
|
|
// Get monitor name from history.Monitors map or from monitor info
|
|
monitorName := "unknown"
|
|
if name, exists := history.Monitors[monitorID]; exists && name != "" {
|
|
monitorName = name
|
|
} else if monitor, exists := monitorInfo[monitorID]; exists {
|
|
monitorName = monitor.DisplayName()
|
|
}
|
|
|
|
// Build target name and tags
|
|
sanitizedName := sanitizeMonitorName(monitorName)
|
|
target := "monitor{name=" + sanitizedName + "}"
|
|
|
|
tags := map[string]string{
|
|
"monitor_id": strconv.Itoa(monitorID),
|
|
"monitor_name": monitorName,
|
|
"type": "monitor",
|
|
}
|
|
|
|
// Add status (we'll use active as default since we have data for this monitor)
|
|
tags["status"] = "active"
|
|
|
|
// Define table columns
|
|
columns := []ColumnDef{
|
|
{Text: "time", Type: "time"},
|
|
{Text: "score", Type: "number"},
|
|
{Text: "rtt", Type: "number", Unit: "ms"},
|
|
{Text: "offset", Type: "number", Unit: "s"},
|
|
}
|
|
|
|
// Build values array
|
|
var values [][]interface{}
|
|
for _, ls := range logScores {
|
|
// Convert timestamp to milliseconds
|
|
timestampMs := ls.Ts.Unix() * 1000
|
|
|
|
// Create row: [time, score, rtt, offset]
|
|
row := []interface{}{
|
|
timestampMs,
|
|
ls.Score,
|
|
}
|
|
|
|
// Add RTT (convert from microseconds to milliseconds, handle null)
|
|
if ls.Rtt.Valid {
|
|
rttMs := float64(ls.Rtt.Int32) / 1000.0
|
|
row = append(row, rttMs)
|
|
} else {
|
|
row = append(row, nil)
|
|
}
|
|
|
|
// Add offset (handle null)
|
|
if ls.Offset.Valid {
|
|
row = append(row, ls.Offset.Float64)
|
|
} else {
|
|
row = append(row, nil)
|
|
}
|
|
|
|
values = append(values, row)
|
|
}
|
|
|
|
// Create table series
|
|
series := GrafanaTableSeries{
|
|
Target: target,
|
|
Tags: tags,
|
|
Columns: columns,
|
|
Values: values,
|
|
}
|
|
|
|
response = append(response, series)
|
|
|
|
logger.Setup().Info("created series for monitor",
|
|
"monitor_id", monitorID,
|
|
"target", series.Target,
|
|
"values_count", len(series.Values),
|
|
)
|
|
}
|
|
|
|
logger.Setup().Info("transformation complete",
|
|
"final_response_count", len(response),
|
|
"response_is_nil", response == nil,
|
|
)
|
|
|
|
return response
|
|
}
|
|
|
|
// scoresTimeRange handles Grafana time range requests for NTP server scores
|
|
func (srv *Server) scoresTimeRange(c echo.Context) error {
|
|
log := logger.Setup()
|
|
ctx, span := tracing.Tracer().Start(c.Request().Context(), "scoresTimeRange")
|
|
defer span.End()
|
|
|
|
// Set reasonable default cache time; adjusted later based on data
|
|
c.Response().Header().Set("Cache-Control", "public,max-age=240")
|
|
|
|
// Validate mode parameter
|
|
mode := c.Param("mode")
|
|
if mode != "json" {
|
|
return echo.NewHTTPError(http.StatusNotFound, "invalid mode - only json supported")
|
|
}
|
|
|
|
// Find and validate server first
|
|
server, err := srv.FindServer(ctx, c.Param("server"))
|
|
if err != nil {
|
|
log.ErrorContext(ctx, "find server", "err", err)
|
|
if he, ok := err.(*echo.HTTPError); ok {
|
|
return he
|
|
}
|
|
span.RecordError(err)
|
|
return echo.NewHTTPError(http.StatusInternalServerError, "internal error")
|
|
}
|
|
if server.DeletionAge(30 * 24 * time.Hour) {
|
|
span.AddEvent("server deleted")
|
|
return echo.NewHTTPError(http.StatusNotFound, "server not found")
|
|
}
|
|
if server.ID == 0 {
|
|
span.AddEvent("server not found")
|
|
return echo.NewHTTPError(http.StatusNotFound, "server not found")
|
|
}
|
|
|
|
// Parse and validate time range parameters
|
|
params, err := srv.parseTimeRangeParams(ctx, c, server)
|
|
if err != nil {
|
|
if he, ok := err.(*echo.HTTPError); ok {
|
|
return he
|
|
}
|
|
log.ErrorContext(ctx, "parse time range parameters", "err", err)
|
|
span.RecordError(err)
|
|
return echo.NewHTTPError(http.StatusInternalServerError, "internal error")
|
|
}
|
|
|
|
// Query ClickHouse for time range data
|
|
log.InfoContext(ctx, "executing clickhouse time range query",
|
|
"server_id", server.ID,
|
|
"server_ip", server.Ip,
|
|
"monitor_id", params.monitorID,
|
|
"from", params.from,
|
|
"to", params.to,
|
|
"max_data_points", params.maxDataPoints,
|
|
"time_range_duration", params.to.Sub(params.from).String(),
|
|
)
|
|
|
|
logScores, err := srv.ch.LogscoresTimeRange(ctx, int(server.ID), params.monitorID, params.from, params.to, params.maxDataPoints)
|
|
if err != nil {
|
|
log.ErrorContext(ctx, "clickhouse time range query", "err", err,
|
|
"server_id", server.ID,
|
|
"monitor_id", params.monitorID,
|
|
"from", params.from,
|
|
"to", params.to,
|
|
)
|
|
span.RecordError(err)
|
|
return echo.NewHTTPError(http.StatusInternalServerError, "internal error")
|
|
}
|
|
|
|
log.InfoContext(ctx, "clickhouse query results",
|
|
"server_id", server.ID,
|
|
"rows_returned", len(logScores),
|
|
"first_few_ids", func() []uint64 {
|
|
ids := make([]uint64, 0, 3)
|
|
for i, ls := range logScores {
|
|
if i >= 3 {
|
|
break
|
|
}
|
|
ids = append(ids, ls.ID)
|
|
}
|
|
return ids
|
|
}(),
|
|
)
|
|
|
|
// Build LogScoreHistory structure for compatibility with existing functions
|
|
history := &logscores.LogScoreHistory{
|
|
LogScores: logScores,
|
|
Monitors: make(map[int]string),
|
|
}
|
|
|
|
// Get monitor names for the returned data
|
|
monitorIDs := []uint32{}
|
|
for _, ls := range logScores {
|
|
if ls.MonitorID.Valid {
|
|
monitorID := uint32(ls.MonitorID.Int32)
|
|
if _, exists := history.Monitors[int(monitorID)]; !exists {
|
|
history.Monitors[int(monitorID)] = ""
|
|
monitorIDs = append(monitorIDs, monitorID)
|
|
}
|
|
}
|
|
}
|
|
|
|
log.InfoContext(ctx, "monitor processing",
|
|
"unique_monitor_ids", monitorIDs,
|
|
"monitor_count", len(monitorIDs),
|
|
)
|
|
|
|
// Get monitor details from database for status and display names
|
|
var monitors []ntpdb.Monitor
|
|
if len(monitorIDs) > 0 {
|
|
q := ntpdb.NewWrappedQuerier(ntpdb.New(srv.db))
|
|
logScoreMonitors, err := q.GetServerScores(ctx, ntpdb.GetServerScoresParams{
|
|
MonitorIDs: monitorIDs,
|
|
ServerID: server.ID,
|
|
})
|
|
if err != nil {
|
|
log.ErrorContext(ctx, "get monitor details", "err", err)
|
|
// Don't fail the request, just use basic info
|
|
} else {
|
|
for _, lsm := range logScoreMonitors {
|
|
// Create monitor entry for transformation (we mainly need the display name)
|
|
tempMon := ntpdb.Monitor{
|
|
TlsName: lsm.TlsName,
|
|
Location: lsm.Location,
|
|
ID: lsm.ID,
|
|
}
|
|
monitors = append(monitors, tempMon)
|
|
|
|
// Update monitor name in history
|
|
history.Monitors[int(lsm.ID)] = tempMon.DisplayName()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Transform to Grafana table format
|
|
log.InfoContext(ctx, "starting grafana transformation",
|
|
"log_scores_count", len(logScores),
|
|
"monitors_count", len(monitors),
|
|
"history_monitors", history.Monitors,
|
|
)
|
|
|
|
grafanaResponse := transformToGrafanaTableFormat(history, monitors)
|
|
|
|
log.InfoContext(ctx, "grafana transformation complete",
|
|
"response_series_count", len(grafanaResponse),
|
|
"response_preview", func() interface{} {
|
|
if len(grafanaResponse) == 0 {
|
|
return "empty_response"
|
|
}
|
|
first := grafanaResponse[0]
|
|
return map[string]interface{}{
|
|
"target": first.Target,
|
|
"tags": first.Tags,
|
|
"columns_count": len(first.Columns),
|
|
"values_count": len(first.Values),
|
|
"first_few_values": func() [][]interface{} {
|
|
if len(first.Values) == 0 {
|
|
return [][]interface{}{}
|
|
}
|
|
count := 2
|
|
if len(first.Values) < count {
|
|
count = len(first.Values)
|
|
}
|
|
return first.Values[:count]
|
|
}(),
|
|
}
|
|
}(),
|
|
)
|
|
|
|
// Set cache control headers based on data characteristics
|
|
setHistoryCacheControl(c, history)
|
|
|
|
// Set CORS headers
|
|
c.Response().Header().Set("Access-Control-Allow-Origin", "*")
|
|
c.Response().Header().Set("Content-Type", "application/json")
|
|
|
|
log.InfoContext(ctx, "time range response final",
|
|
"server_id", server.ID,
|
|
"server_ip", server.Ip,
|
|
"monitor_id", params.monitorID,
|
|
"time_range", params.to.Sub(params.from).String(),
|
|
"raw_data_points", len(logScores),
|
|
"grafana_series_count", len(grafanaResponse),
|
|
"max_data_points", params.maxDataPoints,
|
|
"response_is_null", grafanaResponse == nil,
|
|
"response_is_empty", len(grafanaResponse) == 0,
|
|
)
|
|
|
|
return c.JSON(http.StatusOK, grafanaResponse)
|
|
}
|
|
|
|
// testGrafanaTable returns sample data in Grafana table format for validation
|
|
func (srv *Server) testGrafanaTable(c echo.Context) error {
|
|
log := logger.Setup()
|
|
ctx, span := tracing.Tracer().Start(c.Request().Context(), "testGrafanaTable")
|
|
defer span.End()
|
|
|
|
log.InfoContext(ctx, "serving test Grafana table format",
|
|
"remote_ip", c.RealIP(),
|
|
"user_agent", c.Request().UserAgent(),
|
|
)
|
|
|
|
// Generate sample data with realistic NTP Pool values
|
|
now := time.Now()
|
|
sampleData := GrafanaTimeSeriesResponse{
|
|
{
|
|
Target: "monitor{name=zakim1-yfhw4a}",
|
|
Tags: map[string]string{
|
|
"monitor_id": "126",
|
|
"monitor_name": "zakim1-yfhw4a",
|
|
"type": "monitor",
|
|
"status": "active",
|
|
},
|
|
Columns: []ColumnDef{
|
|
{Text: "time", Type: "time"},
|
|
{Text: "score", Type: "number"},
|
|
{Text: "rtt", Type: "number", Unit: "ms"},
|
|
{Text: "offset", Type: "number", Unit: "s"},
|
|
},
|
|
Values: [][]interface{}{
|
|
{now.Add(-10*time.Minute).Unix() * 1000, 20.0, 18.865, -0.000267},
|
|
{now.Add(-20*time.Minute).Unix() * 1000, 20.0, 18.96, -0.000390},
|
|
{now.Add(-30*time.Minute).Unix() * 1000, 20.0, 18.073, -0.000768},
|
|
{now.Add(-40*time.Minute).Unix() * 1000, 20.0, 18.209, nil}, // null offset example
|
|
},
|
|
},
|
|
{
|
|
Target: "monitor{name=nj2-mon01}",
|
|
Tags: map[string]string{
|
|
"monitor_id": "84",
|
|
"monitor_name": "nj2-mon01",
|
|
"type": "monitor",
|
|
"status": "active",
|
|
},
|
|
Columns: []ColumnDef{
|
|
{Text: "time", Type: "time"},
|
|
{Text: "score", Type: "number"},
|
|
{Text: "rtt", Type: "number", Unit: "ms"},
|
|
{Text: "offset", Type: "number", Unit: "s"},
|
|
},
|
|
Values: [][]interface{}{
|
|
{now.Add(-10*time.Minute).Unix() * 1000, 19.5, 22.145, 0.000123},
|
|
{now.Add(-20*time.Minute).Unix() * 1000, 19.8, 21.892, 0.000089},
|
|
{now.Add(-30*time.Minute).Unix() * 1000, 20.0, 22.034, 0.000156},
|
|
},
|
|
},
|
|
}
|
|
|
|
// Add CORS header for browser testing
|
|
c.Response().Header().Set("Access-Control-Allow-Origin", "*")
|
|
c.Response().Header().Set("Content-Type", "application/json")
|
|
|
|
// Set cache control similar to other endpoints
|
|
c.Response().Header().Set("Cache-Control", "public,max-age=60")
|
|
|
|
log.InfoContext(ctx, "test Grafana table response sent",
|
|
"series_count", len(sampleData),
|
|
"response_size_approx", "~1KB",
|
|
)
|
|
|
|
return c.JSON(http.StatusOK, sampleData)
|
|
}
|