Private
Public Access
1
0
Files
data-api/chdb/logscores.go
Ask Bjørn Hansen 2dfc355f7c
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/tag Build is passing
style: format Go code with gofumpt
Apply consistent formatting to Go source files using gofumpt
as required by pre-commit guidelines.
2025-08-03 16:06:59 -07:00

235 lines
5.4 KiB
Go

package chdb
import (
"context"
"fmt"
"strings"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"go.ntppool.org/common/logger"
"go.ntppool.org/common/tracing"
"go.ntppool.org/data-api/ntpdb"
)
func (d *ClickHouse) Logscores(ctx context.Context, serverID, monitorID int, since time.Time, limit int, fullHistory bool) ([]ntpdb.LogScore, error) {
log := logger.Setup()
ctx, span := tracing.Tracer().Start(ctx, "CH Logscores")
defer span.End()
recentFirst := true
if since.IsZero() && !fullHistory {
since = time.Now().Add(4 * -24 * time.Hour)
} else {
recentFirst = false
}
args := []interface{}{serverID}
query := `select id,monitor_id,server_id,ts,
toFloat64(score),toFloat64(step),offset,
rtt,leap,warning,error
from log_scores
where
server_id = ?`
if monitorID > 0 {
query = `select id,monitor_id,server_id,ts,
toFloat64(score),toFloat64(step),offset,
rtt,leap,warning,error
from log_scores
where
server_id = ?
and monitor_id = ?`
args = []interface{}{serverID, monitorID}
}
if fullHistory {
query += " order by ts"
if recentFirst {
query += " desc"
}
} else {
query += " and ts > ? order by ts "
if recentFirst {
query += "desc "
}
query += "limit ?"
args = append(args, since, limit)
}
log.DebugContext(ctx, "clickhouse query", "query", query, "args", args)
rows, err := d.Scores.Query(
clickhouse.Context(
ctx, clickhouse.WithSpan(span.SpanContext()),
),
query, args...,
)
if err != nil {
log.ErrorContext(ctx, "query error", "err", err)
return nil, fmt.Errorf("database error")
}
rv := []ntpdb.LogScore{}
for rows.Next() {
row := ntpdb.LogScore{}
var leap uint8
if err := rows.Scan(
&row.ID,
&row.MonitorID,
&row.ServerID,
&row.Ts,
&row.Score,
&row.Step,
&row.Offset,
&row.Rtt,
&leap,
&row.Attributes.Warning,
&row.Attributes.Error,
); err != nil {
log.Error("could not parse row", "err", err)
continue
}
row.Attributes.Leap = int8(leap)
rv = append(rv, row)
}
// log.InfoContext(ctx, "returning data", "rv", rv)
return rv, nil
}
// LogscoresTimeRange queries log scores within a specific time range for Grafana integration
func (d *ClickHouse) LogscoresTimeRange(ctx context.Context, serverID, monitorID int, from, to time.Time, limit int) ([]ntpdb.LogScore, error) {
log := logger.Setup()
ctx, span := tracing.Tracer().Start(ctx, "CH LogscoresTimeRange")
defer span.End()
args := []interface{}{serverID, from, to}
query := `select id,monitor_id,server_id,ts,
toFloat64(score),toFloat64(step),offset,
rtt,leap,warning,error
from log_scores
where
server_id = ?
and ts >= ?
and ts <= ?`
if monitorID > 0 {
query += " and monitor_id = ?"
args = append(args, monitorID)
}
// Always order by timestamp ASC for Grafana convention
query += " order by ts ASC"
// Apply limit to prevent memory issues
if limit > 0 {
query += " limit ?"
args = append(args, limit)
}
log.DebugContext(ctx, "clickhouse time range query",
"query", query,
"args", args,
"server_id", serverID,
"monitor_id", monitorID,
"from", from.Format(time.RFC3339),
"to", to.Format(time.RFC3339),
"limit", limit,
"full_sql_with_params", func() string {
// Build a readable SQL query with parameters substituted for debugging
sqlDebug := query
paramIndex := 0
for strings.Contains(sqlDebug, "?") && paramIndex < len(args) {
var replacement string
switch v := args[paramIndex].(type) {
case int:
replacement = fmt.Sprintf("%d", v)
case time.Time:
replacement = fmt.Sprintf("'%s'", v.Format("2006-01-02 15:04:05"))
default:
replacement = fmt.Sprintf("'%v'", v)
}
sqlDebug = strings.Replace(sqlDebug, "?", replacement, 1)
paramIndex++
}
return sqlDebug
}(),
)
rows, err := d.Scores.Query(
clickhouse.Context(
ctx, clickhouse.WithSpan(span.SpanContext()),
),
query, args...,
)
if err != nil {
log.ErrorContext(ctx, "time range query error", "err", err)
return nil, fmt.Errorf("database error")
}
rv := []ntpdb.LogScore{}
for rows.Next() {
row := ntpdb.LogScore{}
var leap uint8
if err := rows.Scan(
&row.ID,
&row.MonitorID,
&row.ServerID,
&row.Ts,
&row.Score,
&row.Step,
&row.Offset,
&row.Rtt,
&leap,
&row.Attributes.Warning,
&row.Attributes.Error,
); err != nil {
log.Error("could not parse row", "err", err)
continue
}
row.Attributes.Leap = int8(leap)
rv = append(rv, row)
}
log.InfoContext(ctx, "time range query results",
"rows_returned", len(rv),
"server_id", serverID,
"monitor_id", monitorID,
"time_range", fmt.Sprintf("%s to %s", from.Format(time.RFC3339), to.Format(time.RFC3339)),
"limit", limit,
"sample_rows", func() []map[string]interface{} {
samples := make([]map[string]interface{}, 0, 3)
for i, row := range rv {
if i >= 3 {
break
}
samples = append(samples, map[string]interface{}{
"id": row.ID,
"monitor_id": row.MonitorID,
"ts": row.Ts.Format(time.RFC3339),
"score": row.Score,
"rtt_valid": row.Rtt.Valid,
"offset_valid": row.Offset.Valid,
})
}
return samples
}(),
)
return rv, nil
}