Private
Public Access
1
0

scores: clickhouse support
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2024-01-20 19:41:02 -07:00
parent 5682c86837
commit 6df51fc19f
9 changed files with 309 additions and 58 deletions

View File

@@ -36,7 +36,7 @@ func (d *ClickHouse) ServerAnswerCounts(ctx context.Context, serverIP string, da
ctx, span := tracing.Tracer().Start(ctx, "ServerAnswerCounts")
defer span.End()
conn := d.conn
conn := d.Logs
log := logger.Setup().With("server", serverIP)
@@ -100,7 +100,7 @@ func (d *ClickHouse) AnswerTotals(ctx context.Context, qtype string, days int) (
defer span.End()
// queries by UserCC / Qtype for the ServerIP
rows, err := d.conn.Query(clickhouse.Context(ctx,
rows, err := d.Logs.Query(clickhouse.Context(ctx,
clickhouse.WithSpan(span.SpanContext()),
), `
select UserCC,Qtype,sum(queries) as queries

View File

@@ -2,35 +2,82 @@ package chdb
import (
"context"
"os"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"gopkg.in/yaml.v3"
"go.ntppool.org/common/logger"
"go.ntppool.org/common/version"
)
type Config struct {
ClickHouse struct {
Scores DBConfig `yaml:"scores"`
Logs DBConfig `yaml:"logs"`
} `yaml:"clickhouse"`
}
type DBConfig struct {
Host string
Database string
}
type ClickHouse struct {
conn clickhouse.Conn
Logs clickhouse.Conn
Scores clickhouse.Conn
}
func New(ctx context.Context, dbConfigPath string) (*ClickHouse, error) {
conn, err := setupClickhouse(ctx)
ch, err := setupClickhouse(ctx, dbConfigPath)
if err != nil {
return nil, err
}
return &ClickHouse{conn: conn}, nil
return ch, nil
}
func setupClickhouse(ctx context.Context) (driver.Conn, error) {
func setupClickhouse(ctx context.Context, configFile string) (*ClickHouse, error) {
log := logger.FromContext(ctx)
log.InfoContext(ctx, "opening config", "file", configFile)
dbFile, err := os.Open(configFile)
if err != nil {
return nil, err
}
dec := yaml.NewDecoder(dbFile)
cfg := Config{}
err = dec.Decode(&cfg)
if err != nil {
return nil, err
}
ch := &ClickHouse{}
ch.Logs, err = open(ctx, cfg.ClickHouse.Logs)
if err != nil {
return nil, err
}
ch.Scores, err = open(ctx, cfg.ClickHouse.Scores)
if err != nil {
return nil, err
}
return ch, nil
}
func open(ctx context.Context, cfg DBConfig) (clickhouse.Conn, error) {
log := logger.Setup()
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{"10.43.207.123:9000"},
Addr: []string{cfg.Host + ":9000"},
Auth: clickhouse.Auth{
Database: "geodns3",
Database: cfg.Database,
Username: "default",
Password: "",
},

View File

@@ -36,7 +36,7 @@ func (d *ClickHouse) UserCountryData(ctx context.Context) (*UserCountry, error)
ctx, span := tracing.Tracer().Start(ctx, "UserCountryData")
defer span.End()
rows, err := d.conn.Query(clickhouse.Context(ctx, clickhouse.WithSpan(span.SpanContext())),
rows, err := d.Logs.Query(clickhouse.Context(ctx, clickhouse.WithSpan(span.SpanContext())),
"select max(dt) as d,UserCC,Qtype,sum(queries) as queries from by_usercc_1d where dt > now() - INTERVAL 4 DAY group by rollup(Qtype,UserCC) order by UserCC,Qtype;")
if err != nil {
log.ErrorContext(ctx, "query error", "err", err)

86
chdb/logscores.go Normal file
View File

@@ -0,0 +1,86 @@
package chdb
import (
"context"
"fmt"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"go.ntppool.org/common/logger"
"go.ntppool.org/common/tracing"
"go.ntppool.org/data-api/ntpdb"
)
func (d *ClickHouse) Logscores(ctx context.Context, serverID, monitorID int, since time.Time, limit int) ([]ntpdb.LogScore, error) {
log := logger.Setup()
ctx, span := tracing.Tracer().Start(ctx, "CH Logscores")
defer span.End()
if since.IsZero() {
since = time.Now().Add(4 * -24 * time.Hour)
}
args := []interface{}{serverID, since, limit}
query := `select id,monitor_id,server_id,ts,
toFloat64(score),toFloat64(step),offset,
rtt,leap,warning,error
from log_scores
where
server_id = ?
and ts > ?
order by ts desc
limit ?;`
if monitorID > 0 {
query = `select id,monitor_id,server_id,ts,
toFloat64(score),toFloat64(step),offset,
rtt,leap,warning,error
from log_scores
where
server_id = ?
and monitor_id = ?
and ts > ?
order by ts desc
limit ?;`
args = []interface{}{serverID, monitorID, since, limit}
}
rows, err := d.Scores.Query(clickhouse.Context(ctx, clickhouse.WithSpan(span.SpanContext())),
query, args...,
)
if err != nil {
log.ErrorContext(ctx, "query error", "err", err)
return nil, fmt.Errorf("database error")
}
rv := []ntpdb.LogScore{}
for rows.Next() {
row := ntpdb.LogScore{}
if err := rows.Scan(
&row.ID,
&row.MonitorID,
&row.ServerID,
&row.Ts,
&row.Score,
&row.Step,
&row.Offset,
&row.Rtt,
&row.Attributes.Leap,
&row.Attributes.Warning,
&row.Attributes.Error,
); err != nil {
log.Error("could not parse row", "err", err)
continue
}
rv = append(rv, row)
}
// log.InfoContext(ctx, "returning data", "rv", rv)
return rv, nil
}