zones: per zone server counts API migrated
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
@@ -317,3 +317,21 @@ type Server struct {
|
||||
ScoreRaw float64 `db:"score_raw" json:"score_raw"`
|
||||
DeletionOn sql.NullTime `db:"deletion_on" json:"deletion_on"`
|
||||
}
|
||||
|
||||
type Zone struct {
|
||||
ID uint32 `db:"id" json:"id"`
|
||||
Name string `db:"name" json:"name"`
|
||||
Description sql.NullString `db:"description" json:"description"`
|
||||
ParentID sql.NullInt32 `db:"parent_id" json:"parent_id"`
|
||||
Dns bool `db:"dns" json:"dns"`
|
||||
}
|
||||
|
||||
type ZoneServerCount struct {
|
||||
ID uint32 `db:"id" json:"id"`
|
||||
ZoneID uint32 `db:"zone_id" json:"zone_id"`
|
||||
IpVersion ZoneServerCountsIpVersion `db:"ip_version" json:"ip_version"`
|
||||
Date time.Time `db:"date" json:"date"`
|
||||
CountActive uint32 `db:"count_active" json:"count_active"`
|
||||
CountRegistered uint32 `db:"count_registered" json:"count_registered"`
|
||||
NetspeedActive uint32 `db:"netspeed_active" json:"netspeed_active"`
|
||||
}
|
||||
|
||||
@@ -263,6 +263,52 @@ func (_d QuerierTxWithTracing) GetServerScores(ctx context.Context, arg GetServe
|
||||
return _d.QuerierTx.GetServerScores(ctx, arg)
|
||||
}
|
||||
|
||||
// GetZoneByName implements QuerierTx
|
||||
func (_d QuerierTxWithTracing) GetZoneByName(ctx context.Context, name string) (z1 Zone, err error) {
|
||||
ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetZoneByName")
|
||||
defer func() {
|
||||
if _d._spanDecorator != nil {
|
||||
_d._spanDecorator(_span, map[string]interface{}{
|
||||
"ctx": ctx,
|
||||
"name": name}, map[string]interface{}{
|
||||
"z1": z1,
|
||||
"err": err})
|
||||
} else if err != nil {
|
||||
_span.RecordError(err)
|
||||
_span.SetAttributes(
|
||||
attribute.String("event", "error"),
|
||||
attribute.String("message", err.Error()),
|
||||
)
|
||||
}
|
||||
|
||||
_span.End()
|
||||
}()
|
||||
return _d.QuerierTx.GetZoneByName(ctx, name)
|
||||
}
|
||||
|
||||
// GetZoneCounts implements QuerierTx
|
||||
func (_d QuerierTxWithTracing) GetZoneCounts(ctx context.Context, zoneID uint32) (za1 []ZoneServerCount, err error) {
|
||||
ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetZoneCounts")
|
||||
defer func() {
|
||||
if _d._spanDecorator != nil {
|
||||
_d._spanDecorator(_span, map[string]interface{}{
|
||||
"ctx": ctx,
|
||||
"zoneID": zoneID}, map[string]interface{}{
|
||||
"za1": za1,
|
||||
"err": err})
|
||||
} else if err != nil {
|
||||
_span.RecordError(err)
|
||||
_span.SetAttributes(
|
||||
attribute.String("event", "error"),
|
||||
attribute.String("message", err.Error()),
|
||||
)
|
||||
}
|
||||
|
||||
_span.End()
|
||||
}()
|
||||
return _d.QuerierTx.GetZoneCounts(ctx, zoneID)
|
||||
}
|
||||
|
||||
// GetZoneStatsData implements QuerierTx
|
||||
func (_d QuerierTxWithTracing) GetZoneStatsData(ctx context.Context) (ga1 []GetZoneStatsDataRow, err error) {
|
||||
ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetZoneStatsData")
|
||||
|
||||
@@ -18,6 +18,8 @@ type Querier interface {
|
||||
GetServerLogScoresByMonitorID(ctx context.Context, arg GetServerLogScoresByMonitorIDParams) ([]LogScore, error)
|
||||
GetServerNetspeed(ctx context.Context, ip string) (uint32, error)
|
||||
GetServerScores(ctx context.Context, arg GetServerScoresParams) ([]GetServerScoresRow, error)
|
||||
GetZoneByName(ctx context.Context, name string) (Zone, error)
|
||||
GetZoneCounts(ctx context.Context, zoneID uint32) ([]ZoneServerCount, error)
|
||||
GetZoneStatsData(ctx context.Context) ([]GetZoneStatsDataRow, error)
|
||||
GetZoneStatsV2(ctx context.Context, ip string) ([]GetZoneStatsV2Row, error)
|
||||
}
|
||||
|
||||
@@ -333,6 +333,62 @@ func (q *Queries) GetServerScores(ctx context.Context, arg GetServerScoresParams
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getZoneByName = `-- name: GetZoneByName :one
|
||||
select id, name, description, parent_id, dns from zones
|
||||
where
|
||||
name = ?
|
||||
`
|
||||
|
||||
func (q *Queries) GetZoneByName(ctx context.Context, name string) (Zone, error) {
|
||||
row := q.db.QueryRowContext(ctx, getZoneByName, name)
|
||||
var i Zone
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.Name,
|
||||
&i.Description,
|
||||
&i.ParentID,
|
||||
&i.Dns,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const getZoneCounts = `-- name: GetZoneCounts :many
|
||||
select id, zone_id, ip_version, date, count_active, count_registered, netspeed_active from zone_server_counts
|
||||
where zone_id = ?
|
||||
order by date
|
||||
`
|
||||
|
||||
func (q *Queries) GetZoneCounts(ctx context.Context, zoneID uint32) ([]ZoneServerCount, error) {
|
||||
rows, err := q.db.QueryContext(ctx, getZoneCounts, zoneID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []ZoneServerCount
|
||||
for rows.Next() {
|
||||
var i ZoneServerCount
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.ZoneID,
|
||||
&i.IpVersion,
|
||||
&i.Date,
|
||||
&i.CountActive,
|
||||
&i.CountRegistered,
|
||||
&i.NetspeedActive,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getZoneStatsData = `-- name: GetZoneStatsData :many
|
||||
SELECT zc.date, z.name, zc.ip_version, count_active, count_registered, netspeed_active
|
||||
FROM zone_server_counts zc USE INDEX (date_idx)
|
||||
|
||||
10
query.sql
10
query.sql
@@ -83,3 +83,13 @@ where
|
||||
monitor_id = ?
|
||||
order by ts desc
|
||||
limit ?;
|
||||
|
||||
-- name: GetZoneByName :one
|
||||
select * from zones
|
||||
where
|
||||
name = sqlc.arg(name);
|
||||
|
||||
-- name: GetZoneCounts :many
|
||||
select * from zone_server_counts
|
||||
where zone_id = ?
|
||||
order by date;
|
||||
|
||||
@@ -195,7 +195,7 @@ func (srv *Server) Run() error {
|
||||
|
||||
if len(ntpconf.WebHostname()) > 0 {
|
||||
e.POST("/api/server/scores/:server/:mode", func(c echo.Context) error {
|
||||
// POST requests used to work
|
||||
// POST requests used to work, so make them not error out
|
||||
mode := c.Param("mode")
|
||||
server := c.Param("server")
|
||||
query := c.Request().URL.Query()
|
||||
@@ -210,7 +210,7 @@ func (srv *Server) Run() error {
|
||||
}
|
||||
e.GET("/graph/:server/:type", srv.graphImage)
|
||||
|
||||
// e.GET("/api/server/scores/:server/:type", srv.logScores)
|
||||
e.GET("/api/zone/counts/:zone_name", srv.zoneCounts)
|
||||
|
||||
g.Go(func() error {
|
||||
return e.Start(":8030")
|
||||
|
||||
148
server/zones.go
Normal file
148
server/zones.go
Normal file
@@ -0,0 +1,148 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/labstack/echo/v4"
|
||||
"go.ntppool.org/common/logger"
|
||||
"go.ntppool.org/common/tracing"
|
||||
"go.ntppool.org/data-api/ntpdb"
|
||||
)
|
||||
|
||||
func (srv *Server) zoneCounts(c echo.Context) error {
|
||||
log := logger.Setup()
|
||||
ctx, span := tracing.Tracer().Start(c.Request().Context(), "zoneCounts")
|
||||
defer span.End()
|
||||
|
||||
// just cache for a short time by default
|
||||
c.Response().Header().Set("Cache-Control", "public,max-age=240")
|
||||
c.Response().Header().Set("Access-Control-Allow-Origin", "*")
|
||||
c.Response().Header().Del("Vary")
|
||||
|
||||
q := ntpdb.NewWrappedQuerier(ntpdb.New(srv.db))
|
||||
|
||||
zone, err := q.GetZoneByName(ctx, c.Param("zone_name"))
|
||||
if err != nil || zone.ID == 0 {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return c.String(http.StatusNotFound, "Not found")
|
||||
}
|
||||
log.ErrorContext(ctx, "could not query for zone", "err", err)
|
||||
span.RecordError(err)
|
||||
return echo.NewHTTPError(http.StatusInternalServerError, "internal error")
|
||||
}
|
||||
|
||||
counts, err := q.GetZoneCounts(ctx, zone.ID)
|
||||
if err != nil {
|
||||
if !errors.Is(err, sql.ErrNoRows) {
|
||||
log.ErrorContext(ctx, "get counts", "err", err)
|
||||
span.RecordError(err)
|
||||
return c.String(http.StatusInternalServerError, "internal error")
|
||||
}
|
||||
}
|
||||
|
||||
type historyEntry struct {
|
||||
D string `json:"d"` // date
|
||||
Ts int `json:"ts"` // epoch timestamp
|
||||
Rc int `json:"rc"` // count registered
|
||||
Ac int `json:"ac"` // count active
|
||||
W int `json:"w"` // netspeed active
|
||||
Iv string `json:"iv"` // ip version
|
||||
}
|
||||
|
||||
rv := struct {
|
||||
History []historyEntry `json:"history"`
|
||||
}{}
|
||||
|
||||
skipCount := 0.0
|
||||
limit := 0
|
||||
|
||||
if limitParam := c.QueryParam("limit"); len(limitParam) > 0 {
|
||||
if limitInt, err := strconv.Atoi(limitParam); err == nil && limitInt > 0 {
|
||||
limit = limitInt
|
||||
}
|
||||
}
|
||||
|
||||
var mostRecentDate int64 = -1
|
||||
if limit > 0 {
|
||||
count := 0
|
||||
dates := map[int64]bool{}
|
||||
for _, c := range counts {
|
||||
ep := c.Date.Unix()
|
||||
if _, ok := dates[ep]; !ok {
|
||||
count++
|
||||
dates[ep] = true
|
||||
mostRecentDate = ep
|
||||
}
|
||||
}
|
||||
if limit < count {
|
||||
if limit > 1 {
|
||||
skipCount = float64(count) / float64(limit-1)
|
||||
} else {
|
||||
// skip everything and use the special logic that we always include the most recent date
|
||||
skipCount = float64(count) + 1
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
log.DebugContext(ctx, "mod", "count", count, "limit", limit, "mod", count%limit, "skipCount", skipCount)
|
||||
// log.Info("limit plan", "date count", count, "limit", limit, "skipCount", skipCount)
|
||||
}
|
||||
|
||||
toSkip := 0.0
|
||||
if limit == 1 {
|
||||
toSkip = skipCount // we just want to look for the last entry
|
||||
}
|
||||
lastDate := int64(0)
|
||||
lastSkip := int64(0)
|
||||
skipThreshold := 0.5
|
||||
for _, c := range counts {
|
||||
cDate := c.Date.Unix()
|
||||
if (toSkip <= skipThreshold && cDate != lastSkip) ||
|
||||
lastDate == cDate ||
|
||||
mostRecentDate == cDate {
|
||||
// log.Info("adding date", "date", c.Date.Format(time.DateOnly))
|
||||
rv.History = append(rv.History, historyEntry{
|
||||
D: c.Date.Format(time.DateOnly),
|
||||
Ts: int(cDate),
|
||||
Ac: int(c.CountActive),
|
||||
Rc: int(c.CountRegistered),
|
||||
W: int(c.NetspeedActive),
|
||||
Iv: string(c.IpVersion),
|
||||
})
|
||||
lastDate = cDate
|
||||
} else {
|
||||
// log.Info("skipping date", "date", c.Date.Format(time.DateOnly))
|
||||
if lastSkip == cDate {
|
||||
continue
|
||||
}
|
||||
toSkip--
|
||||
lastSkip = cDate
|
||||
continue
|
||||
}
|
||||
if toSkip <= skipThreshold && skipCount > 0 {
|
||||
toSkip += skipCount
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if limit > 0 {
|
||||
count := 0
|
||||
dates := map[int]bool{}
|
||||
for _, c := range rv.History {
|
||||
ep := c.Ts
|
||||
if _, ok := dates[ep]; !ok {
|
||||
count++
|
||||
dates[ep] = true
|
||||
}
|
||||
}
|
||||
log.DebugContext(ctx, "result counts", "skipCount", skipCount, "limit", limit, "got", count)
|
||||
}
|
||||
|
||||
c.Response().Header().Set("Cache-Control", "s-maxage=28800, max-age=7200")
|
||||
return c.JSON(http.StatusOK, rv)
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user