From 47b96cd598ef55f14d988b2b2461bdf27d93e174 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ask=20Bj=C3=B8rn=20Hansen?= Date: Sat, 23 Dec 2023 01:32:52 -0800 Subject: [PATCH] zones: per zone server counts API migrated --- ntpdb/models.go | 18 ++++++ ntpdb/otel.go | 46 ++++++++++++++ ntpdb/querier.go | 2 + ntpdb/query.sql.go | 56 +++++++++++++++++ query.sql | 10 +++ server/server.go | 4 +- server/zones.go | 148 +++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 282 insertions(+), 2 deletions(-) create mode 100644 server/zones.go diff --git a/ntpdb/models.go b/ntpdb/models.go index cfe3bfa..74fd73f 100644 --- a/ntpdb/models.go +++ b/ntpdb/models.go @@ -317,3 +317,21 @@ type Server struct { ScoreRaw float64 `db:"score_raw" json:"score_raw"` DeletionOn sql.NullTime `db:"deletion_on" json:"deletion_on"` } + +type Zone struct { + ID uint32 `db:"id" json:"id"` + Name string `db:"name" json:"name"` + Description sql.NullString `db:"description" json:"description"` + ParentID sql.NullInt32 `db:"parent_id" json:"parent_id"` + Dns bool `db:"dns" json:"dns"` +} + +type ZoneServerCount struct { + ID uint32 `db:"id" json:"id"` + ZoneID uint32 `db:"zone_id" json:"zone_id"` + IpVersion ZoneServerCountsIpVersion `db:"ip_version" json:"ip_version"` + Date time.Time `db:"date" json:"date"` + CountActive uint32 `db:"count_active" json:"count_active"` + CountRegistered uint32 `db:"count_registered" json:"count_registered"` + NetspeedActive uint32 `db:"netspeed_active" json:"netspeed_active"` +} diff --git a/ntpdb/otel.go b/ntpdb/otel.go index b19f5eb..70cbea2 100644 --- a/ntpdb/otel.go +++ b/ntpdb/otel.go @@ -263,6 +263,52 @@ func (_d QuerierTxWithTracing) GetServerScores(ctx context.Context, arg GetServe return _d.QuerierTx.GetServerScores(ctx, arg) } +// GetZoneByName implements QuerierTx +func (_d QuerierTxWithTracing) GetZoneByName(ctx context.Context, name string) (z1 Zone, err error) { + ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetZoneByName") + defer func() { + if _d._spanDecorator != nil { + _d._spanDecorator(_span, map[string]interface{}{ + "ctx": ctx, + "name": name}, map[string]interface{}{ + "z1": z1, + "err": err}) + } else if err != nil { + _span.RecordError(err) + _span.SetAttributes( + attribute.String("event", "error"), + attribute.String("message", err.Error()), + ) + } + + _span.End() + }() + return _d.QuerierTx.GetZoneByName(ctx, name) +} + +// GetZoneCounts implements QuerierTx +func (_d QuerierTxWithTracing) GetZoneCounts(ctx context.Context, zoneID uint32) (za1 []ZoneServerCount, err error) { + ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetZoneCounts") + defer func() { + if _d._spanDecorator != nil { + _d._spanDecorator(_span, map[string]interface{}{ + "ctx": ctx, + "zoneID": zoneID}, map[string]interface{}{ + "za1": za1, + "err": err}) + } else if err != nil { + _span.RecordError(err) + _span.SetAttributes( + attribute.String("event", "error"), + attribute.String("message", err.Error()), + ) + } + + _span.End() + }() + return _d.QuerierTx.GetZoneCounts(ctx, zoneID) +} + // GetZoneStatsData implements QuerierTx func (_d QuerierTxWithTracing) GetZoneStatsData(ctx context.Context) (ga1 []GetZoneStatsDataRow, err error) { ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetZoneStatsData") diff --git a/ntpdb/querier.go b/ntpdb/querier.go index eef4c94..85e7f29 100644 --- a/ntpdb/querier.go +++ b/ntpdb/querier.go @@ -18,6 +18,8 @@ type Querier interface { GetServerLogScoresByMonitorID(ctx context.Context, arg GetServerLogScoresByMonitorIDParams) ([]LogScore, error) GetServerNetspeed(ctx context.Context, ip string) (uint32, error) GetServerScores(ctx context.Context, arg GetServerScoresParams) ([]GetServerScoresRow, error) + GetZoneByName(ctx context.Context, name string) (Zone, error) + GetZoneCounts(ctx context.Context, zoneID uint32) ([]ZoneServerCount, error) GetZoneStatsData(ctx context.Context) ([]GetZoneStatsDataRow, error) GetZoneStatsV2(ctx context.Context, ip string) ([]GetZoneStatsV2Row, error) } diff --git a/ntpdb/query.sql.go b/ntpdb/query.sql.go index 4e39afd..b2f8228 100644 --- a/ntpdb/query.sql.go +++ b/ntpdb/query.sql.go @@ -333,6 +333,62 @@ func (q *Queries) GetServerScores(ctx context.Context, arg GetServerScoresParams return items, nil } +const getZoneByName = `-- name: GetZoneByName :one +select id, name, description, parent_id, dns from zones +where + name = ? +` + +func (q *Queries) GetZoneByName(ctx context.Context, name string) (Zone, error) { + row := q.db.QueryRowContext(ctx, getZoneByName, name) + var i Zone + err := row.Scan( + &i.ID, + &i.Name, + &i.Description, + &i.ParentID, + &i.Dns, + ) + return i, err +} + +const getZoneCounts = `-- name: GetZoneCounts :many +select id, zone_id, ip_version, date, count_active, count_registered, netspeed_active from zone_server_counts + where zone_id = ? + order by date +` + +func (q *Queries) GetZoneCounts(ctx context.Context, zoneID uint32) ([]ZoneServerCount, error) { + rows, err := q.db.QueryContext(ctx, getZoneCounts, zoneID) + if err != nil { + return nil, err + } + defer rows.Close() + var items []ZoneServerCount + for rows.Next() { + var i ZoneServerCount + if err := rows.Scan( + &i.ID, + &i.ZoneID, + &i.IpVersion, + &i.Date, + &i.CountActive, + &i.CountRegistered, + &i.NetspeedActive, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const getZoneStatsData = `-- name: GetZoneStatsData :many SELECT zc.date, z.name, zc.ip_version, count_active, count_registered, netspeed_active FROM zone_server_counts zc USE INDEX (date_idx) diff --git a/query.sql b/query.sql index d5405c1..92d4a81 100644 --- a/query.sql +++ b/query.sql @@ -83,3 +83,13 @@ where monitor_id = ? order by ts desc limit ?; + +-- name: GetZoneByName :one +select * from zones +where + name = sqlc.arg(name); + +-- name: GetZoneCounts :many +select * from zone_server_counts + where zone_id = ? + order by date; diff --git a/server/server.go b/server/server.go index 6a41ab2..e203327 100644 --- a/server/server.go +++ b/server/server.go @@ -195,7 +195,7 @@ func (srv *Server) Run() error { if len(ntpconf.WebHostname()) > 0 { e.POST("/api/server/scores/:server/:mode", func(c echo.Context) error { - // POST requests used to work + // POST requests used to work, so make them not error out mode := c.Param("mode") server := c.Param("server") query := c.Request().URL.Query() @@ -210,7 +210,7 @@ func (srv *Server) Run() error { } e.GET("/graph/:server/:type", srv.graphImage) - // e.GET("/api/server/scores/:server/:type", srv.logScores) + e.GET("/api/zone/counts/:zone_name", srv.zoneCounts) g.Go(func() error { return e.Start(":8030") diff --git a/server/zones.go b/server/zones.go new file mode 100644 index 0000000..bcce3e8 --- /dev/null +++ b/server/zones.go @@ -0,0 +1,148 @@ +package server + +import ( + "database/sql" + "errors" + "net/http" + "strconv" + "time" + + "github.com/labstack/echo/v4" + "go.ntppool.org/common/logger" + "go.ntppool.org/common/tracing" + "go.ntppool.org/data-api/ntpdb" +) + +func (srv *Server) zoneCounts(c echo.Context) error { + log := logger.Setup() + ctx, span := tracing.Tracer().Start(c.Request().Context(), "zoneCounts") + defer span.End() + + // just cache for a short time by default + c.Response().Header().Set("Cache-Control", "public,max-age=240") + c.Response().Header().Set("Access-Control-Allow-Origin", "*") + c.Response().Header().Del("Vary") + + q := ntpdb.NewWrappedQuerier(ntpdb.New(srv.db)) + + zone, err := q.GetZoneByName(ctx, c.Param("zone_name")) + if err != nil || zone.ID == 0 { + if errors.Is(err, sql.ErrNoRows) { + return c.String(http.StatusNotFound, "Not found") + } + log.ErrorContext(ctx, "could not query for zone", "err", err) + span.RecordError(err) + return echo.NewHTTPError(http.StatusInternalServerError, "internal error") + } + + counts, err := q.GetZoneCounts(ctx, zone.ID) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + log.ErrorContext(ctx, "get counts", "err", err) + span.RecordError(err) + return c.String(http.StatusInternalServerError, "internal error") + } + } + + type historyEntry struct { + D string `json:"d"` // date + Ts int `json:"ts"` // epoch timestamp + Rc int `json:"rc"` // count registered + Ac int `json:"ac"` // count active + W int `json:"w"` // netspeed active + Iv string `json:"iv"` // ip version + } + + rv := struct { + History []historyEntry `json:"history"` + }{} + + skipCount := 0.0 + limit := 0 + + if limitParam := c.QueryParam("limit"); len(limitParam) > 0 { + if limitInt, err := strconv.Atoi(limitParam); err == nil && limitInt > 0 { + limit = limitInt + } + } + + var mostRecentDate int64 = -1 + if limit > 0 { + count := 0 + dates := map[int64]bool{} + for _, c := range counts { + ep := c.Date.Unix() + if _, ok := dates[ep]; !ok { + count++ + dates[ep] = true + mostRecentDate = ep + } + } + if limit < count { + if limit > 1 { + skipCount = float64(count) / float64(limit-1) + } else { + // skip everything and use the special logic that we always include the most recent date + skipCount = float64(count) + 1 + + } + } + + log.DebugContext(ctx, "mod", "count", count, "limit", limit, "mod", count%limit, "skipCount", skipCount) + // log.Info("limit plan", "date count", count, "limit", limit, "skipCount", skipCount) + } + + toSkip := 0.0 + if limit == 1 { + toSkip = skipCount // we just want to look for the last entry + } + lastDate := int64(0) + lastSkip := int64(0) + skipThreshold := 0.5 + for _, c := range counts { + cDate := c.Date.Unix() + if (toSkip <= skipThreshold && cDate != lastSkip) || + lastDate == cDate || + mostRecentDate == cDate { + // log.Info("adding date", "date", c.Date.Format(time.DateOnly)) + rv.History = append(rv.History, historyEntry{ + D: c.Date.Format(time.DateOnly), + Ts: int(cDate), + Ac: int(c.CountActive), + Rc: int(c.CountRegistered), + W: int(c.NetspeedActive), + Iv: string(c.IpVersion), + }) + lastDate = cDate + } else { + // log.Info("skipping date", "date", c.Date.Format(time.DateOnly)) + if lastSkip == cDate { + continue + } + toSkip-- + lastSkip = cDate + continue + } + if toSkip <= skipThreshold && skipCount > 0 { + toSkip += skipCount + } + + } + + if limit > 0 { + count := 0 + dates := map[int]bool{} + for _, c := range rv.History { + ep := c.Ts + if _, ok := dates[ep]; !ok { + count++ + dates[ep] = true + } + } + log.DebugContext(ctx, "result counts", "skipCount", skipCount, "limit", limit, "got", count) + } + + c.Response().Header().Set("Cache-Control", "s-maxage=28800, max-age=7200") + return c.JSON(http.StatusOK, rv) + +}