diff --git a/chdb/chdnsanswers.go b/chdb/chdnsanswers.go index 030b5a9..c5572b7 100644 --- a/chdb/chdnsanswers.go +++ b/chdb/chdnsanswers.go @@ -5,14 +5,14 @@ import ( "fmt" "sort" - "github.com/ClickHouse/clickhouse-go/v2" "go.ntppool.org/common/logger" ) type ccCount struct { - CC string - Count uint64 - Points float64 + CC string + Count uint64 + Points float64 + Netspeed float64 } type ServerQueries []*ccCount @@ -29,7 +29,9 @@ func (s ServerQueries) Less(i, j int) bool { return s[i].Count > s[j].Count } -func (d *ClickHouse) ServerAnswerCounts(ctx context.Context, conn clickhouse.Conn, serverIP string, days int) (ServerQueries, error) { +func (d *ClickHouse) ServerAnswerCounts(ctx context.Context, serverIP string, days int) (ServerQueries, error) { + + conn := d.conn log := logger.Setup().With("server", serverIP) @@ -85,12 +87,12 @@ func (d *ClickHouse) ServerAnswerCounts(ctx context.Context, conn clickhouse.Con return rv, nil } -func (d *ClickHouse) AnswerTotals(ctx context.Context, conn clickhouse.Conn, qtype string, days int) (ServerTotals, error) { +func (d *ClickHouse) AnswerTotals(ctx context.Context, qtype string, days int) (ServerTotals, error) { log := logger.Setup() // queries by UserCC / Qtype for the ServerIP - rows, err := conn.Query(ctx, ` + rows, err := d.conn.Query(ctx, ` select UserCC,Qtype,sum(queries) as queries from by_server_ip_1d where diff --git a/chdb/db.go b/chdb/db.go index 2c82e8a..52a6e9e 100644 --- a/chdb/db.go +++ b/chdb/db.go @@ -1,8 +1,78 @@ package chdb +import ( + "context" + "time" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "go.ntppool.org/common/version" + "golang.org/x/exp/slog" +) + type ClickHouse struct { + conn clickhouse.Conn } -func New(dbConfigPath string) (*ClickHouse, error) { - return &ClickHouse{}, nil +func New(ctx context.Context, dbConfigPath string) (*ClickHouse, error) { + conn, err := setupClickhouse(ctx) + if err != nil { + return nil, err + } + return &ClickHouse{conn: conn}, nil +} + +func setupClickhouse(ctx context.Context) (driver.Conn, error) { + + conn, err := clickhouse.Open(&clickhouse.Options{ + Addr: []string{"10.43.207.123:9000"}, + Auth: clickhouse.Auth{ + Database: "geodns3", + Username: "default", + Password: "", + }, + // Debug: true, + // Debugf: func(format string, v ...interface{}) { + // slog.Info("debug format", "format", format) + // fmt.Printf(format+"\n", v) + // }, + Settings: clickhouse.Settings{ + "max_execution_time": 60, + }, + Compression: &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }, + DialTimeout: time.Second * 5, + MaxOpenConns: 5, + MaxIdleConns: 5, + ConnMaxLifetime: time.Duration(10) * time.Minute, + ConnOpenStrategy: clickhouse.ConnOpenInOrder, + BlockBufferSize: 10, + MaxCompressionBuffer: 10240, + ClientInfo: clickhouse.ClientInfo{ + Products: []struct { + Name string + Version string + }{ + {Name: "data-api", Version: version.Version()}, + }, + }, + }) + if err != nil { + return nil, err + } + + v, err := conn.ServerVersion() + if err != nil { + return nil, err + } + slog.Info("clickhouse connection", "version", v) + + err = conn.Ping(ctx) + if err != nil { + slog.Error("clickhouse ping", "err", err) + return nil, err + } + + return conn, nil } diff --git a/chdb/geoqueries.go b/chdb/geoqueries.go index 1eeed9f..1365870 100644 --- a/chdb/geoqueries.go +++ b/chdb/geoqueries.go @@ -9,7 +9,6 @@ import ( "sort" "time" - "github.com/ClickHouse/clickhouse-go/v2" "golang.org/x/exp/slog" ) @@ -31,10 +30,10 @@ func (s UserCountry) Less(i, j int) bool { return s[i].IPv4 > s[j].IPv4 } -func (d *ClickHouse) UserCountryData(ctx context.Context, conn clickhouse.Conn) (*UserCountry, error) { +func (d *ClickHouse) UserCountryData(ctx context.Context) (*UserCountry, error) { // rows, err := conn.Query(ctx, "select dt,UserCC,Qtype,sum(queries) as queries from by_usercc_1d group by rollup(dt,Qtype,UserCC) order by dt,UserCC,Qtype;") - rows, err := conn.Query(ctx, "select max(dt) as d,UserCC,Qtype,sum(queries) as queries from by_usercc_1d where dt > now() - INTERVAL 4 DAY group by rollup(Qtype,UserCC) order by UserCC,Qtype;") + rows, err := d.conn.Query(ctx, "select max(dt) as d,UserCC,Qtype,sum(queries) as queries from by_usercc_1d where dt > now() - INTERVAL 4 DAY group by rollup(Qtype,UserCC) order by UserCC,Qtype;") if err != nil { slog.Error("query error", "err", err) return nil, fmt.Errorf("database error") diff --git a/ntpdb/db.go b/ntpdb/db.go index 4f2f177..c616b55 100644 --- a/ntpdb/db.go +++ b/ntpdb/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.19.0 +// sqlc v1.19.1 package ntpdb diff --git a/ntpdb/models.go b/ntpdb/models.go index 8cd22cf..31b6ab1 100644 --- a/ntpdb/models.go +++ b/ntpdb/models.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.19.0 +// sqlc v1.19.1 package ntpdb diff --git a/ntpdb/query.sql.go b/ntpdb/query.sql.go index 4f1d0a0..03233ce 100644 --- a/ntpdb/query.sql.go +++ b/ntpdb/query.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.19.0 +// sqlc v1.19.1 // source: query.sql package ntpdb @@ -10,6 +10,17 @@ import ( "time" ) +const getServerNetspeed = `-- name: GetServerNetspeed :one +select netspeed from servers where ip = ? +` + +func (q *Queries) GetServerNetspeed(ctx context.Context, ip string) (uint32, error) { + row := q.db.QueryRowContext(ctx, getServerNetspeed, ip) + var netspeed uint32 + err := row.Scan(&netspeed) + return netspeed, err +} + const getZoneStatsData = `-- name: GetZoneStatsData :many SELECT zc.date, z.name, zc.ip_version, count_active, count_registered, netspeed_active FROM zone_server_counts zc USE INDEX (date_idx) @@ -20,12 +31,12 @@ ORDER BY name ` type GetZoneStatsDataRow struct { - Date time.Time `json:"date"` - Name string `json:"name"` - IpVersion ZoneServerCountsIpVersion `json:"ip_version"` - CountActive uint32 `json:"count_active"` - CountRegistered uint32 `json:"count_registered"` - NetspeedActive uint32 `json:"netspeed_active"` + Date time.Time `db:"date" json:"date"` + Name string `db:"name" json:"name"` + IpVersion ZoneServerCountsIpVersion `db:"ip_version" json:"ip_version"` + CountActive uint32 `db:"count_active" json:"count_active"` + CountRegistered uint32 `db:"count_registered" json:"count_registered"` + NetspeedActive uint32 `db:"netspeed_active" json:"netspeed_active"` } func (q *Queries) GetZoneStatsData(ctx context.Context) ([]GetZoneStatsDataRow, error) { @@ -57,3 +68,59 @@ func (q *Queries) GetZoneStatsData(ctx context.Context) ([]GetZoneStatsDataRow, } return items, nil } + +const getZoneStatsV2 = `-- name: GetZoneStatsV2 :many +select zone_name, netspeed_active+0 as netspeed_active FROM ( +SELECT + z.name as zone_name, + SUM( + IF (deletion_on IS NULL AND score_raw > 10, + netspeed, + 0 + ) + ) AS netspeed_active +FROM + servers s + INNER JOIN server_zones sz ON (sz.server_id = s.id) + INNER JOIN zones z ON (z.id = sz.zone_id) + INNER JOIN ( + select zone_id, s.ip_version + from server_zones sz + inner join servers s on (s.id=sz.server_id) + where s.ip=? + ) as srvz on (srvz.zone_id=z.id AND srvz.ip_version=s.ip_version) +WHERE + (deletion_on IS NULL OR deletion_on > NOW()) + AND in_pool = 1 + AND netspeed > 0 +GROUP BY z.name) +AS server_netspeed +` + +type GetZoneStatsV2Row struct { + ZoneName string `db:"zone_name" json:"zone_name"` + NetspeedActive int32 `db:"netspeed_active" json:"netspeed_active"` +} + +func (q *Queries) GetZoneStatsV2(ctx context.Context, ip string) ([]GetZoneStatsV2Row, error) { + rows, err := q.db.QueryContext(ctx, getZoneStatsV2, ip) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetZoneStatsV2Row + for rows.Next() { + var i GetZoneStatsV2Row + if err := rows.Scan(&i.ZoneName, &i.NetspeedActive); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} diff --git a/query.sql b/query.sql index 8409e80..9fdb641 100644 --- a/query.sql +++ b/query.sql @@ -5,3 +5,34 @@ FROM zone_server_counts zc USE INDEX (date_idx) ON(zc.zone_id=z.id) WHERE date IN (SELECT max(date) from zone_server_counts) ORDER BY name; + + +-- name: GetServerNetspeed :one +select netspeed from servers where ip = ?; + +-- name: GetZoneStatsV2 :many +select zone_name, netspeed_active+0 as netspeed_active FROM ( +SELECT + z.name as zone_name, + SUM( + IF (deletion_on IS NULL AND score_raw > 10, + netspeed, + 0 + ) + ) AS netspeed_active +FROM + servers s + INNER JOIN server_zones sz ON (sz.server_id = s.id) + INNER JOIN zones z ON (z.id = sz.zone_id) + INNER JOIN ( + select zone_id, s.ip_version + from server_zones sz + inner join servers s on (s.id=sz.server_id) + where s.ip=? + ) as srvz on (srvz.zone_id=z.id AND srvz.ip_version=s.ip_version) +WHERE + (deletion_on IS NULL OR deletion_on > NOW()) + AND in_pool = 1 + AND netspeed > 0 +GROUP BY z.name) +AS server_netspeed \ No newline at end of file diff --git a/server/clickhouse.go b/server/clickhouse.go deleted file mode 100644 index 2343a3a..0000000 --- a/server/clickhouse.go +++ /dev/null @@ -1,66 +0,0 @@ -package server - -import ( - "context" - "time" - - "github.com/ClickHouse/clickhouse-go/v2" - "github.com/ClickHouse/clickhouse-go/v2/lib/driver" - "go.ntppool.org/common/version" - "golang.org/x/exp/slog" -) - -func (srv *Server) chConn(ctx context.Context) (driver.Conn, error) { - - conn, err := clickhouse.Open(&clickhouse.Options{ - Addr: []string{"10.43.207.123:9000"}, - Auth: clickhouse.Auth{ - Database: "geodns3", - Username: "default", - Password: "", - }, - // Debug: true, - // Debugf: func(format string, v ...interface{}) { - // slog.Info("debug format", "format", format) - // fmt.Printf(format+"\n", v) - // }, - Settings: clickhouse.Settings{ - "max_execution_time": 60, - }, - Compression: &clickhouse.Compression{ - Method: clickhouse.CompressionLZ4, - }, - DialTimeout: time.Second * 5, - MaxOpenConns: 5, - MaxIdleConns: 5, - ConnMaxLifetime: time.Duration(10) * time.Minute, - ConnOpenStrategy: clickhouse.ConnOpenInOrder, - BlockBufferSize: 10, - MaxCompressionBuffer: 10240, - ClientInfo: clickhouse.ClientInfo{ - Products: []struct { - Name string - Version string - }{ - {Name: "data-api", Version: version.Version()}, - }, - }, - }) - if err != nil { - return nil, err - } - - v, err := conn.ServerVersion() - if err != nil { - return nil, err - } - slog.Info("clickhouse connection", "version", v) - - err = conn.Ping(ctx) - if err != nil { - slog.Error("clickhouse ping", "err", err) - return nil, err - } - - return conn, nil -} diff --git a/server/dnsanswers.go b/server/dnsanswers.go index 28254c5..d36db44 100644 --- a/server/dnsanswers.go +++ b/server/dnsanswers.go @@ -6,7 +6,10 @@ import ( "github.com/labstack/echo/v4" "go.ntppool.org/common/logger" + chdb "go.ntppool.org/data-api/chdb" + "go.ntppool.org/data-api/ntpdb" "golang.org/x/exp/slog" + "golang.org/x/sync/errgroup" ) const pointBasis float64 = 10000 @@ -23,11 +26,11 @@ func (srv *Server) dnsAnswers(c echo.Context) error { c.Response().Header().Set("Cache-Control", "max-age=20") - conn, err := srv.chConn(ctx) - if err != nil { - slog.Error("could not connect to clickhouse", "err", err) - return c.String(http.StatusInternalServerError, "clickhouse error") - } + // conn, err := srv.chConn(ctx) + // if err != nil { + // slog.Error("could not connect to clickhouse", "err", err) + // return c.String(http.StatusInternalServerError, "clickhouse error") + // } ip, err := netip.ParseAddr(c.Param("server")) if err != nil { @@ -35,37 +38,93 @@ func (srv *Server) dnsAnswers(c echo.Context) error { return c.NoContent(http.StatusNotFound) } - // q := ntpdb.New(srv.db) - // zoneStats, err := q.GetZoneStats(ctx) - // if err != nil { - // slog.Error("GetZoneStats", "err", err) - // return c.String(http.StatusInternalServerError, err.Error()) - // } - // if zoneStats == nil { - // slog.Info("didn't get zoneStats") - // } + if ip.String() != c.Param("server") || len(c.QueryString()) > 0 { + return c.Redirect(http.StatusPermanentRedirect, "https://www.ntppool.org/api/data/server/dns/answers/"+ip.String()) + } - days := 4 + queryGroup, ctx := errgroup.WithContext(ctx) - serverData, err := srv.ch.ServerAnswerCounts(c.Request().Context(), conn, ip.String(), days) + var zoneStats []ntpdb.GetZoneStatsV2Row + var serverNetspeed uint32 + + queryGroup.Go(func() error { + var err error + q := ntpdb.New(srv.db) + zoneStats, err = q.GetZoneStatsV2(ctx, ip.String()) + if err != nil { + slog.Error("GetZoneStatsV2", "err", err) + return err + } + if zoneStats == nil { + slog.Info("didn't get zoneStats") + } + + serverNetspeed, err = q.GetServerNetspeed(ctx, ip.String()) + if err != nil { + slog.Error("GetServerNetspeed", "err", err) + return err + } + + return nil + }) + + days := 3 + + var serverData chdb.ServerQueries + + queryGroup.Go(func() error { + var err error + serverData, err = srv.ch.ServerAnswerCounts(c.Request().Context(), ip.String(), days) + if err != nil { + slog.Error("ServerUserCCData", "err", err) + return err + } + return nil + }) + + var totalData chdb.ServerTotals + + queryGroup.Go(func() error { + var err error + + qtype := "A" + if ip.Is6() { + qtype = "AAAA" + } + + totalData, err = srv.ch.AnswerTotals(c.Request().Context(), qtype, days) + if err != nil { + slog.Error("AnswerTotals", "err", err) + } + return err + }) + + err = queryGroup.Wait() if err != nil { - slog.Error("ServerUserCCData", "err", err) + slog.Error("query error", "err", err) return c.String(http.StatusInternalServerError, err.Error()) } - qtype := "A" - if ip.Is6() { - qtype = "AAAA" - } + zoneTotals := map[string]int32{} - totalData, err := srv.ch.AnswerTotals(c.Request().Context(), conn, qtype, days) - if err != nil { - slog.Error("AnswerTotals", "err", err) - return c.String(http.StatusInternalServerError, err.Error()) + for _, z := range zoneStats { + zn := z.ZoneName + if zn == "@" { + zn = "" + } + zoneTotals[zn] = z.NetspeedActive // binary.BigEndian.Uint64(...) + // slog.Info("zone netspeed", "cc", z.ZoneName, "speed", z.NetspeedActive) } for _, cc := range serverData { cc.Points = (pointBasis / float64(totalData[cc.CC])) * float64(cc.Count) + totalName := cc.CC + if totalName == "gb" { + totalName = "uk" + } + if zt, ok := zoneTotals[totalName]; ok { + cc.Netspeed = (pointBasis / float64(zt)) * float64(serverNetspeed) + } // log.Info("points", "cc", cc.CC, "points", cc.Points) } diff --git a/server/server.go b/server/server.go index 3bc4be3..0205079 100644 --- a/server/server.go +++ b/server/server.go @@ -32,7 +32,7 @@ type Server struct { } func NewServer(ctx context.Context, configFile string) (*Server, error) { - ch, err := chdb.New(configFile) + ch, err := chdb.New(ctx, configFile) if err != nil { return nil, fmt.Errorf("clickhouse open: %w", err) } @@ -107,12 +107,6 @@ func (srv *Server) userCountryData(c echo.Context) error { ctx := c.Request().Context() - conn, err := srv.chConn(ctx) - if err != nil { - slog.Error("could not connect to clickhouse", "err", err) - return c.String(http.StatusInternalServerError, "clickhouse error") - } - q := ntpdb.New(srv.db) zoneStats, err := q.GetZoneStats(ctx) if err != nil { @@ -123,7 +117,7 @@ func (srv *Server) userCountryData(c echo.Context) error { slog.Info("didn't get zoneStats") } - data, err := srv.ch.UserCountryData(c.Request().Context(), conn) + data, err := srv.ch.UserCountryData(c.Request().Context()) if err != nil { slog.Error("UserCountryData", "err", err) return c.String(http.StatusInternalServerError, err.Error()) diff --git a/sqlc.yaml b/sqlc.yaml index b2f73c4..03c806e 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -8,8 +8,10 @@ sql: package: "ntpdb" out: "ntpdb" emit_json_tags: true + emit_db_tags: true omit_unused_structs: true + emit_interface: true # emit_all_enum_values: true - # overrides: - # - column: "x.avg_rtt" - # go_type: "database/sql.NullFloat64" + overrides: + - column: "server_netspeed.netspeed_active" + go_type: "uint64"