Apply consistent formatting to Go source files using gofumpt as required by pre-commit guidelines.
235 lines
5.4 KiB
Go
235 lines
5.4 KiB
Go
package chdb
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/ClickHouse/clickhouse-go/v2"
|
|
"go.ntppool.org/common/logger"
|
|
"go.ntppool.org/common/tracing"
|
|
"go.ntppool.org/data-api/ntpdb"
|
|
)
|
|
|
|
func (d *ClickHouse) Logscores(ctx context.Context, serverID, monitorID int, since time.Time, limit int, fullHistory bool) ([]ntpdb.LogScore, error) {
|
|
log := logger.Setup()
|
|
ctx, span := tracing.Tracer().Start(ctx, "CH Logscores")
|
|
defer span.End()
|
|
|
|
recentFirst := true
|
|
|
|
if since.IsZero() && !fullHistory {
|
|
since = time.Now().Add(4 * -24 * time.Hour)
|
|
} else {
|
|
recentFirst = false
|
|
}
|
|
|
|
args := []interface{}{serverID}
|
|
query := `select id,monitor_id,server_id,ts,
|
|
toFloat64(score),toFloat64(step),offset,
|
|
rtt,leap,warning,error
|
|
from log_scores
|
|
where
|
|
server_id = ?`
|
|
|
|
if monitorID > 0 {
|
|
query = `select id,monitor_id,server_id,ts,
|
|
toFloat64(score),toFloat64(step),offset,
|
|
rtt,leap,warning,error
|
|
from log_scores
|
|
where
|
|
server_id = ?
|
|
and monitor_id = ?`
|
|
args = []interface{}{serverID, monitorID}
|
|
}
|
|
|
|
if fullHistory {
|
|
query += " order by ts"
|
|
if recentFirst {
|
|
query += " desc"
|
|
}
|
|
} else {
|
|
query += " and ts > ? order by ts "
|
|
if recentFirst {
|
|
query += "desc "
|
|
}
|
|
query += "limit ?"
|
|
args = append(args, since, limit)
|
|
}
|
|
|
|
log.DebugContext(ctx, "clickhouse query", "query", query, "args", args)
|
|
|
|
rows, err := d.Scores.Query(
|
|
clickhouse.Context(
|
|
ctx, clickhouse.WithSpan(span.SpanContext()),
|
|
),
|
|
query, args...,
|
|
)
|
|
if err != nil {
|
|
log.ErrorContext(ctx, "query error", "err", err)
|
|
return nil, fmt.Errorf("database error")
|
|
}
|
|
|
|
rv := []ntpdb.LogScore{}
|
|
|
|
for rows.Next() {
|
|
|
|
row := ntpdb.LogScore{}
|
|
|
|
var leap uint8
|
|
|
|
if err := rows.Scan(
|
|
&row.ID,
|
|
&row.MonitorID,
|
|
&row.ServerID,
|
|
&row.Ts,
|
|
&row.Score,
|
|
&row.Step,
|
|
&row.Offset,
|
|
&row.Rtt,
|
|
&leap,
|
|
&row.Attributes.Warning,
|
|
&row.Attributes.Error,
|
|
); err != nil {
|
|
log.Error("could not parse row", "err", err)
|
|
continue
|
|
}
|
|
|
|
row.Attributes.Leap = int8(leap)
|
|
|
|
rv = append(rv, row)
|
|
|
|
}
|
|
|
|
// log.InfoContext(ctx, "returning data", "rv", rv)
|
|
|
|
return rv, nil
|
|
}
|
|
|
|
// LogscoresTimeRange queries log scores within a specific time range for Grafana integration
|
|
func (d *ClickHouse) LogscoresTimeRange(ctx context.Context, serverID, monitorID int, from, to time.Time, limit int) ([]ntpdb.LogScore, error) {
|
|
log := logger.Setup()
|
|
ctx, span := tracing.Tracer().Start(ctx, "CH LogscoresTimeRange")
|
|
defer span.End()
|
|
|
|
args := []interface{}{serverID, from, to}
|
|
|
|
query := `select id,monitor_id,server_id,ts,
|
|
toFloat64(score),toFloat64(step),offset,
|
|
rtt,leap,warning,error
|
|
from log_scores
|
|
where
|
|
server_id = ?
|
|
and ts >= ?
|
|
and ts <= ?`
|
|
|
|
if monitorID > 0 {
|
|
query += " and monitor_id = ?"
|
|
args = append(args, monitorID)
|
|
}
|
|
|
|
// Always order by timestamp ASC for Grafana convention
|
|
query += " order by ts ASC"
|
|
|
|
// Apply limit to prevent memory issues
|
|
if limit > 0 {
|
|
query += " limit ?"
|
|
args = append(args, limit)
|
|
}
|
|
|
|
log.DebugContext(ctx, "clickhouse time range query",
|
|
"query", query,
|
|
"args", args,
|
|
"server_id", serverID,
|
|
"monitor_id", monitorID,
|
|
"from", from.Format(time.RFC3339),
|
|
"to", to.Format(time.RFC3339),
|
|
"limit", limit,
|
|
"full_sql_with_params", func() string {
|
|
// Build a readable SQL query with parameters substituted for debugging
|
|
sqlDebug := query
|
|
paramIndex := 0
|
|
for strings.Contains(sqlDebug, "?") && paramIndex < len(args) {
|
|
var replacement string
|
|
switch v := args[paramIndex].(type) {
|
|
case int:
|
|
replacement = fmt.Sprintf("%d", v)
|
|
case time.Time:
|
|
replacement = fmt.Sprintf("'%s'", v.Format("2006-01-02 15:04:05"))
|
|
default:
|
|
replacement = fmt.Sprintf("'%v'", v)
|
|
}
|
|
sqlDebug = strings.Replace(sqlDebug, "?", replacement, 1)
|
|
paramIndex++
|
|
}
|
|
return sqlDebug
|
|
}(),
|
|
)
|
|
|
|
rows, err := d.Scores.Query(
|
|
clickhouse.Context(
|
|
ctx, clickhouse.WithSpan(span.SpanContext()),
|
|
),
|
|
query, args...,
|
|
)
|
|
if err != nil {
|
|
log.ErrorContext(ctx, "time range query error", "err", err)
|
|
return nil, fmt.Errorf("database error")
|
|
}
|
|
|
|
rv := []ntpdb.LogScore{}
|
|
|
|
for rows.Next() {
|
|
row := ntpdb.LogScore{}
|
|
var leap uint8
|
|
|
|
if err := rows.Scan(
|
|
&row.ID,
|
|
&row.MonitorID,
|
|
&row.ServerID,
|
|
&row.Ts,
|
|
&row.Score,
|
|
&row.Step,
|
|
&row.Offset,
|
|
&row.Rtt,
|
|
&leap,
|
|
&row.Attributes.Warning,
|
|
&row.Attributes.Error,
|
|
); err != nil {
|
|
log.Error("could not parse row", "err", err)
|
|
continue
|
|
}
|
|
|
|
row.Attributes.Leap = int8(leap)
|
|
rv = append(rv, row)
|
|
}
|
|
|
|
log.InfoContext(ctx, "time range query results",
|
|
"rows_returned", len(rv),
|
|
"server_id", serverID,
|
|
"monitor_id", monitorID,
|
|
"time_range", fmt.Sprintf("%s to %s", from.Format(time.RFC3339), to.Format(time.RFC3339)),
|
|
"limit", limit,
|
|
"sample_rows", func() []map[string]interface{} {
|
|
samples := make([]map[string]interface{}, 0, 3)
|
|
for i, row := range rv {
|
|
if i >= 3 {
|
|
break
|
|
}
|
|
samples = append(samples, map[string]interface{}{
|
|
"id": row.ID,
|
|
"monitor_id": row.MonitorID,
|
|
"ts": row.Ts.Format(time.RFC3339),
|
|
"score": row.Score,
|
|
"rtt_valid": row.Rtt.Valid,
|
|
"offset_valid": row.Offset.Valid,
|
|
})
|
|
}
|
|
return samples
|
|
}(),
|
|
)
|
|
|
|
return rv, nil
|
|
}
|