Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0dfa41da8e | |||
| e4f6d8cafb | |||
| 1b1413a632 |
88
.drone.yml
88
.drone.yml
@@ -1,88 +0,0 @@
|
|||||||
---
|
|
||||||
kind: pipeline
|
|
||||||
type: kubernetes
|
|
||||||
name: default
|
|
||||||
|
|
||||||
environment:
|
|
||||||
GOCACHE: /cache/pkg/cache
|
|
||||||
GOMODCACHE: /cache/pkg/mod
|
|
||||||
|
|
||||||
steps:
|
|
||||||
- name: fetch-tags
|
|
||||||
image: alpine/git
|
|
||||||
commands:
|
|
||||||
- git fetch --tags
|
|
||||||
resources:
|
|
||||||
requests:
|
|
||||||
cpu: 250
|
|
||||||
memory: 50MiB
|
|
||||||
limits:
|
|
||||||
cpu: 250
|
|
||||||
memory: 100MiB
|
|
||||||
|
|
||||||
- name: test
|
|
||||||
image: golang:1.25
|
|
||||||
pull: always
|
|
||||||
volumes:
|
|
||||||
- name: go
|
|
||||||
path: /go
|
|
||||||
- name: gopkg
|
|
||||||
path: /cache
|
|
||||||
commands:
|
|
||||||
- go test -v ./...
|
|
||||||
- go build ./...
|
|
||||||
|
|
||||||
- name: goreleaser
|
|
||||||
image: golang:1.25
|
|
||||||
pull: always
|
|
||||||
resources:
|
|
||||||
requests:
|
|
||||||
cpu: 6000
|
|
||||||
memory: 1024MiB
|
|
||||||
limits:
|
|
||||||
cpu: 10000
|
|
||||||
memory: 4096MiB
|
|
||||||
volumes:
|
|
||||||
- name: go
|
|
||||||
path: /go
|
|
||||||
- name: gopkg
|
|
||||||
path: /cache
|
|
||||||
environment:
|
|
||||||
# GITHUB_TOKEN:
|
|
||||||
# from_secret: GITHUB_TOKEN
|
|
||||||
commands:
|
|
||||||
- ./scripts/run-goreleaser
|
|
||||||
depends_on: [test]
|
|
||||||
|
|
||||||
- name: docker
|
|
||||||
image: harbor.ntppool.org/ntppool/drone-kaniko:main
|
|
||||||
pull: always
|
|
||||||
volumes:
|
|
||||||
- name: go
|
|
||||||
path: /go
|
|
||||||
- name: gopkg
|
|
||||||
path: /cache
|
|
||||||
settings:
|
|
||||||
repo: ntppool/data-api
|
|
||||||
registry: harbor.ntppool.org
|
|
||||||
auto_tag: true
|
|
||||||
tags: SHA7,${DRONE_SOURCE_BRANCH}
|
|
||||||
cache: true
|
|
||||||
username:
|
|
||||||
from_secret: harbor_username
|
|
||||||
password:
|
|
||||||
from_secret: harbor_password
|
|
||||||
depends_on: [goreleaser]
|
|
||||||
|
|
||||||
volumes:
|
|
||||||
- name: go
|
|
||||||
temp: {}
|
|
||||||
- name: gopkg
|
|
||||||
claim:
|
|
||||||
name: go-pkg
|
|
||||||
|
|
||||||
---
|
|
||||||
kind: signature
|
|
||||||
hmac: 7f4f57140394a1c3a34e4d23188edda3cd95359dacf6d0abfa45bda3afff692f
|
|
||||||
|
|
||||||
...
|
|
||||||
69
.woodpecker.yaml
Normal file
69
.woodpecker.yaml
Normal file
@@ -0,0 +1,69 @@
|
|||||||
|
when:
|
||||||
|
- event: [push, pull_request, tag, manual]
|
||||||
|
|
||||||
|
clone:
|
||||||
|
git:
|
||||||
|
image: woodpeckerci/plugin-git
|
||||||
|
settings:
|
||||||
|
tags: true
|
||||||
|
|
||||||
|
variables:
|
||||||
|
- &go_env
|
||||||
|
GOMODCACHE: /go/pkg/mod
|
||||||
|
GOCACHE: /go/pkg/cache
|
||||||
|
- &go_volumes
|
||||||
|
- go-pkg:/go/pkg
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: test
|
||||||
|
image: golang:1.26
|
||||||
|
pull: true
|
||||||
|
environment: *go_env
|
||||||
|
volumes: *go_volumes
|
||||||
|
commands:
|
||||||
|
- go test -v ./...
|
||||||
|
- go build ./...
|
||||||
|
|
||||||
|
- name: goreleaser
|
||||||
|
image: golang:1.26
|
||||||
|
pull: true
|
||||||
|
environment: *go_env
|
||||||
|
volumes: *go_volumes
|
||||||
|
commands:
|
||||||
|
- ./scripts/run-goreleaser
|
||||||
|
backend_options:
|
||||||
|
kubernetes:
|
||||||
|
resources:
|
||||||
|
requests:
|
||||||
|
cpu: 6000
|
||||||
|
memory: 1024Mi
|
||||||
|
limits:
|
||||||
|
cpu: 10000
|
||||||
|
memory: 4096Mi
|
||||||
|
depends_on: [test]
|
||||||
|
|
||||||
|
- name: generate-tags
|
||||||
|
image: ghcr.io/abh/woodpecker-docker-tags-plugin:sha-8a3bd7c
|
||||||
|
settings:
|
||||||
|
tags: |
|
||||||
|
branch
|
||||||
|
sha
|
||||||
|
semver --auto
|
||||||
|
edge -v latest
|
||||||
|
when:
|
||||||
|
- event: [push, tag, manual]
|
||||||
|
depends_on: [goreleaser]
|
||||||
|
|
||||||
|
- name: docker
|
||||||
|
image: woodpeckerci/plugin-kaniko
|
||||||
|
settings:
|
||||||
|
registry: harbor.ntppool.org
|
||||||
|
repo: ntppool/data-api
|
||||||
|
cache: true
|
||||||
|
username:
|
||||||
|
from_secret: harbor_username
|
||||||
|
password:
|
||||||
|
from_secret: harbor_password
|
||||||
|
when:
|
||||||
|
- event: [push, tag, manual]
|
||||||
|
depends_on: [goreleaser, generate-tags]
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
package chdb
|
package chdb
|
||||||
|
|
||||||
// queries to the GeoDNS database
|
// queries to the DNS database
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@@ -208,7 +208,7 @@ func (d *ClickHouse) DNSQueries(ctx context.Context) ([]DNSQueryCounts, error) {
|
|||||||
sum(q)/300 as avg, max(q) as max
|
sum(q)/300 as avg, max(q) as max
|
||||||
from (
|
from (
|
||||||
select window as t, sumSimpleState(queries) as q
|
select window as t, sumSimpleState(queries) as q
|
||||||
from geodns.by_origin_1s
|
from dns.by_origin_1s
|
||||||
where
|
where
|
||||||
window > FROM_UNIXTIME(?)
|
window > FROM_UNIXTIME(?)
|
||||||
and Origin IN ('pool.ntp.org', 'g.ntpns.org')
|
and Origin IN ('pool.ntp.org', 'g.ntpns.org')
|
||||||
|
|||||||
@@ -220,7 +220,7 @@ func (d *ClickHouse) LogscoresTimeRange(ctx context.Context, serverID, monitorID
|
|||||||
samples = append(samples, map[string]interface{}{
|
samples = append(samples, map[string]interface{}{
|
||||||
"id": row.ID,
|
"id": row.ID,
|
||||||
"monitor_id": row.MonitorID,
|
"monitor_id": row.MonitorID,
|
||||||
"ts": row.Ts.Time.Format(time.RFC3339),
|
"ts": row.Ts.Format(time.RFC3339),
|
||||||
"score": row.Score,
|
"score": row.Score,
|
||||||
"rtt_valid": row.Rtt.Valid,
|
"rtt_valid": row.Rtt.Valid,
|
||||||
"offset_valid": row.Offset.Valid,
|
"offset_valid": row.Offset.Valid,
|
||||||
|
|||||||
8
go.mod
8
go.mod
@@ -15,14 +15,14 @@ tool (
|
|||||||
require (
|
require (
|
||||||
dario.cat/mergo v1.0.2
|
dario.cat/mergo v1.0.2
|
||||||
github.com/ClickHouse/clickhouse-go/v2 v2.40.3
|
github.com/ClickHouse/clickhouse-go/v2 v2.40.3
|
||||||
|
github.com/go-sql-driver/mysql v1.9.3
|
||||||
github.com/hashicorp/go-retryablehttp v0.7.8
|
github.com/hashicorp/go-retryablehttp v0.7.8
|
||||||
github.com/jackc/pgx/v5 v5.7.6
|
|
||||||
github.com/labstack/echo-contrib v0.17.4
|
github.com/labstack/echo-contrib v0.17.4
|
||||||
github.com/labstack/echo/v4 v4.13.4
|
github.com/labstack/echo/v4 v4.13.4
|
||||||
github.com/samber/slog-echo v1.17.2
|
github.com/samber/slog-echo v1.17.2
|
||||||
github.com/spf13/cobra v1.10.1
|
github.com/spf13/cobra v1.10.1
|
||||||
go.ntppool.org/api v0.3.4
|
go.ntppool.org/api v0.3.4
|
||||||
go.ntppool.org/common v0.6.3-0.20251129195245-283d3936f6d0
|
go.ntppool.org/common v0.5.2
|
||||||
go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho v0.63.0
|
go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho v0.63.0
|
||||||
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.63.0
|
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.63.0
|
||||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0
|
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0
|
||||||
@@ -53,7 +53,6 @@ require (
|
|||||||
github.com/go-faster/errors v0.7.1 // indirect
|
github.com/go-faster/errors v0.7.1 // indirect
|
||||||
github.com/go-logr/logr v1.4.3 // indirect
|
github.com/go-logr/logr v1.4.3 // indirect
|
||||||
github.com/go-logr/stdr v1.2.2 // indirect
|
github.com/go-logr/stdr v1.2.2 // indirect
|
||||||
github.com/go-sql-driver/mysql v1.9.3 // indirect
|
|
||||||
github.com/google/cel-go v0.24.1 // indirect
|
github.com/google/cel-go v0.24.1 // indirect
|
||||||
github.com/google/uuid v1.6.0 // indirect
|
github.com/google/uuid v1.6.0 // indirect
|
||||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect
|
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect
|
||||||
@@ -64,6 +63,7 @@ require (
|
|||||||
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
||||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||||
|
github.com/jackc/pgx/v5 v5.7.4 // indirect
|
||||||
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||||
github.com/jinzhu/inflection v1.0.0 // indirect
|
github.com/jinzhu/inflection v1.0.0 // indirect
|
||||||
github.com/klauspost/compress v1.18.0 // indirect
|
github.com/klauspost/compress v1.18.0 // indirect
|
||||||
@@ -138,7 +138,7 @@ require (
|
|||||||
golang.org/x/sys v0.36.0 // indirect
|
golang.org/x/sys v0.36.0 // indirect
|
||||||
golang.org/x/text v0.29.0 // indirect
|
golang.org/x/text v0.29.0 // indirect
|
||||||
golang.org/x/time v0.13.0 // indirect
|
golang.org/x/time v0.13.0 // indirect
|
||||||
golang.org/x/tools v0.37.0 // indirect
|
golang.org/x/tools v0.36.0 // indirect
|
||||||
google.golang.org/genproto/googleapis/api v0.0.0-20250922171735-9219d122eba9 // indirect
|
google.golang.org/genproto/googleapis/api v0.0.0-20250922171735-9219d122eba9 // indirect
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250922171735-9219d122eba9 // indirect
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20250922171735-9219d122eba9 // indirect
|
||||||
google.golang.org/grpc v1.75.1 // indirect
|
google.golang.org/grpc v1.75.1 // indirect
|
||||||
|
|||||||
14
go.sum
14
go.sum
@@ -92,8 +92,8 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
|
|||||||
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
||||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
||||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
|
||||||
github.com/jackc/pgx/v5 v5.7.6 h1:rWQc5FwZSPX58r1OQmkuaNicxdmExaEz5A2DO2hUuTk=
|
github.com/jackc/pgx/v5 v5.7.4 h1:9wKznZrhWa2QiHL+NjTSPP6yjl3451BX3imWDnokYlg=
|
||||||
github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M=
|
github.com/jackc/pgx/v5 v5.7.4/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ=
|
||||||
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
|
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
|
||||||
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||||
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
|
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
|
||||||
@@ -230,10 +230,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
|
|||||||
go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g=
|
go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g=
|
||||||
go.ntppool.org/api v0.3.4 h1:KeRyFhIRkjJwZif7hkpqEDEBmukyYGiOi2Fd6j3UzQ0=
|
go.ntppool.org/api v0.3.4 h1:KeRyFhIRkjJwZif7hkpqEDEBmukyYGiOi2Fd6j3UzQ0=
|
||||||
go.ntppool.org/api v0.3.4/go.mod h1:LFLAwnrc/JyjzKnjgf8tCOJhps6oFIjuledS3PCx7xc=
|
go.ntppool.org/api v0.3.4/go.mod h1:LFLAwnrc/JyjzKnjgf8tCOJhps6oFIjuledS3PCx7xc=
|
||||||
go.ntppool.org/common v0.6.2 h1:TvxrpaBQpSYuvuRT24M/I1ZqFjh4woHJTqayCOxe+o8=
|
go.ntppool.org/common v0.5.2 h1:Ijlezhiqqs7TJYZTWwEwultLFxhNaXsh6DkaO53m/F4=
|
||||||
go.ntppool.org/common v0.6.2/go.mod h1:Dkc2P5+aaCseC/cs0uD9elh4yTllqvyeZ1NNT/G/414=
|
go.ntppool.org/common v0.5.2/go.mod h1:e5ohROK9LdZZTI1neNiSlmgmWC23F779qzLvSi4JzyI=
|
||||||
go.ntppool.org/common v0.6.3-0.20251129195245-283d3936f6d0 h1:Vbs/RgrwfdA9ZzGAkhFRaU7ZSEl8D28pk95iYhjzvyA=
|
|
||||||
go.ntppool.org/common v0.6.3-0.20251129195245-283d3936f6d0/go.mod h1:Dkc2P5+aaCseC/cs0uD9elh4yTllqvyeZ1NNT/G/414=
|
|
||||||
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
|
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
|
||||||
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
|
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
|
||||||
go.opentelemetry.io/contrib/bridges/otelslog v0.13.0 h1:bwnLpizECbPr1RrQ27waeY2SPIPeccCx/xLuoYADZ9s=
|
go.opentelemetry.io/contrib/bridges/otelslog v0.13.0 h1:bwnLpizECbPr1RrQ27waeY2SPIPeccCx/xLuoYADZ9s=
|
||||||
@@ -362,8 +360,8 @@ golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtn
|
|||||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||||
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
|
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
|
||||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||||
golang.org/x/tools v0.37.0 h1:DVSRzp7FwePZW356yEAChSdNcQo6Nsp+fex1SUW09lE=
|
golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg=
|
||||||
golang.org/x/tools v0.37.0/go.mod h1:MBN5QPQtLMHVdvsbtarmTNukZDdgwdwlO5qGacAzF0w=
|
golang.org/x/tools v0.36.0/go.mod h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s=
|
||||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
|
|||||||
@@ -2,10 +2,9 @@ package logscores
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"database/sql"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/jackc/pgx/v5/pgtype"
|
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
|
||||||
"go.ntppool.org/common/logger"
|
"go.ntppool.org/common/logger"
|
||||||
"go.ntppool.org/common/tracing"
|
"go.ntppool.org/common/tracing"
|
||||||
"go.ntppool.org/data-api/chdb"
|
"go.ntppool.org/data-api/chdb"
|
||||||
@@ -20,12 +19,12 @@ type LogScoreHistory struct {
|
|||||||
// MonitorIDs []uint32
|
// MonitorIDs []uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetHistoryClickHouse(ctx context.Context, ch *chdb.ClickHouse, db *pgxpool.Pool, serverID, monitorID int64, since time.Time, count int, fullHistory bool) (*LogScoreHistory, error) {
|
func GetHistoryClickHouse(ctx context.Context, ch *chdb.ClickHouse, db *sql.DB, serverID, monitorID uint32, since time.Time, count int, fullHistory bool) (*LogScoreHistory, error) {
|
||||||
log := logger.FromContext(ctx)
|
log := logger.FromContext(ctx)
|
||||||
ctx, span := tracing.Tracer().Start(ctx, "logscores.GetHistoryClickHouse",
|
ctx, span := tracing.Tracer().Start(ctx, "logscores.GetHistoryClickHouse",
|
||||||
trace.WithAttributes(
|
trace.WithAttributes(
|
||||||
attribute.Int64("server", serverID),
|
attribute.Int("server", int(serverID)),
|
||||||
attribute.Int64("monitor", monitorID),
|
attribute.Int("monitor", int(monitorID)),
|
||||||
attribute.Bool("full_history", fullHistory),
|
attribute.Bool("full_history", fullHistory),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
@@ -52,17 +51,17 @@ func GetHistoryClickHouse(ctx context.Context, ch *chdb.ClickHouse, db *pgxpool.
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetHistoryPostgres(ctx context.Context, db *pgxpool.Pool, serverID, monitorID int64, since time.Time, count int) (*LogScoreHistory, error) {
|
func GetHistoryMySQL(ctx context.Context, db *sql.DB, serverID, monitorID uint32, since time.Time, count int) (*LogScoreHistory, error) {
|
||||||
log := logger.FromContext(ctx)
|
log := logger.FromContext(ctx)
|
||||||
ctx, span := tracing.Tracer().Start(ctx, "logscores.GetHistoryPostgres")
|
ctx, span := tracing.Tracer().Start(ctx, "logscores.GetHistoryMySQL")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
span.SetAttributes(
|
span.SetAttributes(
|
||||||
attribute.Int64("server", serverID),
|
attribute.Int("server", int(serverID)),
|
||||||
attribute.Int64("monitor", monitorID),
|
attribute.Int("monitor", int(monitorID)),
|
||||||
)
|
)
|
||||||
|
|
||||||
log.Debug("GetHistoryPostgres", "server", serverID, "monitor", monitorID, "since", since, "count", count)
|
log.Debug("GetHistoryMySQL", "server", serverID, "monitor", monitorID, "since", since, "count", count)
|
||||||
|
|
||||||
q := ntpdb.NewWrappedQuerier(ntpdb.New(db))
|
q := ntpdb.NewWrappedQuerier(ntpdb.New(db))
|
||||||
|
|
||||||
@@ -70,13 +69,13 @@ func GetHistoryPostgres(ctx context.Context, db *pgxpool.Pool, serverID, monitor
|
|||||||
var err error
|
var err error
|
||||||
if monitorID > 0 {
|
if monitorID > 0 {
|
||||||
ls, err = q.GetServerLogScoresByMonitorID(ctx, ntpdb.GetServerLogScoresByMonitorIDParams{
|
ls, err = q.GetServerLogScoresByMonitorID(ctx, ntpdb.GetServerLogScoresByMonitorIDParams{
|
||||||
ServerID: int64(serverID),
|
ServerID: serverID,
|
||||||
MonitorID: pgtype.Int8{Int64: int64(monitorID), Valid: true},
|
MonitorID: sql.NullInt32{Int32: int32(monitorID), Valid: true},
|
||||||
Limit: int32(count),
|
Limit: int32(count),
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
ls, err = q.GetServerLogScores(ctx, ntpdb.GetServerLogScoresParams{
|
ls, err = q.GetServerLogScores(ctx, ntpdb.GetServerLogScoresParams{
|
||||||
ServerID: int64(serverID),
|
ServerID: serverID,
|
||||||
Limit: int32(count),
|
Limit: int32(count),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -98,12 +97,12 @@ func GetHistoryPostgres(ctx context.Context, db *pgxpool.Pool, serverID, monitor
|
|||||||
|
|
||||||
func getMonitorNames(ctx context.Context, ls []ntpdb.LogScore, q ntpdb.QuerierTx) (map[int]string, error) {
|
func getMonitorNames(ctx context.Context, ls []ntpdb.LogScore, q ntpdb.QuerierTx) (map[int]string, error) {
|
||||||
monitors := map[int]string{}
|
monitors := map[int]string{}
|
||||||
monitorIDs := []int64{}
|
monitorIDs := []uint32{}
|
||||||
for _, l := range ls {
|
for _, l := range ls {
|
||||||
if !l.MonitorID.Valid {
|
if !l.MonitorID.Valid {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
mID := l.MonitorID.Int64
|
mID := uint32(l.MonitorID.Int32)
|
||||||
if _, ok := monitors[int(mID)]; !ok {
|
if _, ok := monitors[int(mID)]; !ok {
|
||||||
monitors[int(mID)] = ""
|
monitors[int(mID)] = ""
|
||||||
monitorIDs = append(monitorIDs, mID)
|
monitorIDs = append(monitorIDs, mID)
|
||||||
|
|||||||
13
ntpdb/db.go
13
ntpdb/db.go
@@ -6,15 +6,14 @@ package ntpdb
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"database/sql"
|
||||||
"github.com/jackc/pgx/v5"
|
|
||||||
"github.com/jackc/pgx/v5/pgconn"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type DBTX interface {
|
type DBTX interface {
|
||||||
Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
|
ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
|
||||||
Query(context.Context, string, ...interface{}) (pgx.Rows, error)
|
PrepareContext(context.Context, string) (*sql.Stmt, error)
|
||||||
QueryRow(context.Context, string, ...interface{}) pgx.Row
|
QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
|
||||||
|
QueryRowContext(context.Context, string, ...interface{}) *sql.Row
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(db DBTX) *Queries {
|
func New(db DBTX) *Queries {
|
||||||
@@ -25,7 +24,7 @@ type Queries struct {
|
|||||||
db DBTX
|
db DBTX
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *Queries) WithTx(tx pgx.Tx) *Queries {
|
func (q *Queries) WithTx(tx *sql.Tx) *Queries {
|
||||||
return &Queries{
|
return &Queries{
|
||||||
db: tx,
|
db: tx,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,15 +1,85 @@
|
|||||||
package ntpdb
|
package ntpdb
|
||||||
|
|
||||||
//go:generate go tool github.com/hexdigest/gowrap/cmd/gowrap gen -t ./opentelemetry.gowrap -g -i QuerierTx -p . -o otel.go
|
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"database/sql/driver"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
"github.com/go-sql-driver/mysql"
|
||||||
"go.ntppool.org/common/database/pgdb"
|
"go.ntppool.org/common/logger"
|
||||||
|
"gopkg.in/yaml.v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
// OpenDB opens a PostgreSQL connection pool using the specified config file
|
type Config struct {
|
||||||
func OpenDB(ctx context.Context, configFile string) (*pgxpool.Pool, error) {
|
MySQL DBConfig `yaml:"mysql"`
|
||||||
return pgdb.OpenPoolWithConfigFile(ctx, configFile)
|
}
|
||||||
|
|
||||||
|
type DBConfig struct {
|
||||||
|
DSN string `default:"" flag:"dsn" usage:"Database DSN"`
|
||||||
|
User string `default:"" flag:"user"`
|
||||||
|
Pass string `default:"" flag:"pass"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func OpenDB(ctx context.Context, configFile string) (*sql.DB, error) {
|
||||||
|
log := logger.FromContext(ctx)
|
||||||
|
|
||||||
|
dbconn := sql.OpenDB(Driver{CreateConnectorFunc: createConnector(ctx, configFile)})
|
||||||
|
|
||||||
|
dbconn.SetConnMaxLifetime(time.Minute * 3)
|
||||||
|
dbconn.SetMaxOpenConns(8)
|
||||||
|
dbconn.SetMaxIdleConns(3)
|
||||||
|
|
||||||
|
err := dbconn.Ping()
|
||||||
|
if err != nil {
|
||||||
|
log.DebugContext(ctx, "could not connect to database: %s", "err", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return dbconn, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func createConnector(ctx context.Context, configFile string) CreateConnectorFunc {
|
||||||
|
log := logger.FromContext(ctx)
|
||||||
|
return func() (driver.Connector, error) {
|
||||||
|
log.DebugContext(ctx, "opening db config file", "filename", configFile)
|
||||||
|
|
||||||
|
dbFile, err := os.Open(configFile)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
dec := yaml.NewDecoder(dbFile)
|
||||||
|
|
||||||
|
cfg := Config{}
|
||||||
|
|
||||||
|
err = dec.Decode(&cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// log.Printf("db cfg: %+v", cfg)
|
||||||
|
|
||||||
|
dsn := cfg.MySQL.DSN
|
||||||
|
if len(dsn) == 0 {
|
||||||
|
return nil, fmt.Errorf("--database.dsn flag or DATABASE_DSN environment variable required")
|
||||||
|
}
|
||||||
|
|
||||||
|
dbcfg, err := mysql.ParseDSN(dsn)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if user := cfg.MySQL.User; len(user) > 0 {
|
||||||
|
dbcfg.User = user
|
||||||
|
}
|
||||||
|
|
||||||
|
if pass := cfg.MySQL.Pass; len(pass) > 0 {
|
||||||
|
dbcfg.Passwd = pass
|
||||||
|
}
|
||||||
|
|
||||||
|
return mysql.NewConnector(dbcfg)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
33
ntpdb/dynamic_connect.go
Normal file
33
ntpdb/dynamic_connect.go
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
package ntpdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql/driver"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
// from https://github.com/Boostport/dynamic-database-config
|
||||||
|
|
||||||
|
type CreateConnectorFunc func() (driver.Connector, error)
|
||||||
|
|
||||||
|
type Driver struct {
|
||||||
|
CreateConnectorFunc CreateConnectorFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d Driver) Driver() driver.Driver {
|
||||||
|
return d
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d Driver) Connect(ctx context.Context) (driver.Conn, error) {
|
||||||
|
connector, err := d.CreateConnectorFunc()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error creating connector from function: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return connector.Connect(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d Driver) Open(name string) (driver.Conn, error) {
|
||||||
|
return nil, errors.New("open is not supported")
|
||||||
|
}
|
||||||
@@ -5,10 +5,11 @@
|
|||||||
package ntpdb
|
package ntpdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"database/sql"
|
||||||
"database/sql/driver"
|
"database/sql/driver"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/jackc/pgx/v5/pgtype"
|
|
||||||
"go.ntppool.org/common/types"
|
"go.ntppool.org/common/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -270,73 +271,73 @@ func (ns NullZoneServerCountsIpVersion) Value() (driver.Value, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type LogScore struct {
|
type LogScore struct {
|
||||||
ID int64 `db:"id" json:"id"`
|
ID uint64 `db:"id" json:"id"`
|
||||||
MonitorID pgtype.Int8 `db:"monitor_id" json:"monitor_id"`
|
MonitorID sql.NullInt32 `db:"monitor_id" json:"monitor_id"`
|
||||||
ServerID int64 `db:"server_id" json:"server_id"`
|
ServerID uint32 `db:"server_id" json:"server_id"`
|
||||||
Ts pgtype.Timestamptz `db:"ts" json:"ts"`
|
Ts time.Time `db:"ts" json:"ts"`
|
||||||
Score float64 `db:"score" json:"score"`
|
Score float64 `db:"score" json:"score"`
|
||||||
Step float64 `db:"step" json:"step"`
|
Step float64 `db:"step" json:"step"`
|
||||||
Offset pgtype.Float8 `db:"offset" json:"offset"`
|
Offset sql.NullFloat64 `db:"offset" json:"offset"`
|
||||||
Rtt pgtype.Int4 `db:"rtt" json:"rtt"`
|
Rtt sql.NullInt32 `db:"rtt" json:"rtt"`
|
||||||
Attributes types.LogScoreAttributes `db:"attributes" json:"attributes"`
|
Attributes types.LogScoreAttributes `db:"attributes" json:"attributes"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Monitor struct {
|
type Monitor struct {
|
||||||
ID int64 `db:"id" json:"id"`
|
ID uint32 `db:"id" json:"id"`
|
||||||
IDToken pgtype.Text `db:"id_token" json:"id_token"`
|
IDToken sql.NullString `db:"id_token" json:"id_token"`
|
||||||
Type MonitorsType `db:"type" json:"type"`
|
Type MonitorsType `db:"type" json:"type"`
|
||||||
UserID pgtype.Int8 `db:"user_id" json:"user_id"`
|
UserID sql.NullInt32 `db:"user_id" json:"user_id"`
|
||||||
AccountID pgtype.Int8 `db:"account_id" json:"account_id"`
|
AccountID sql.NullInt32 `db:"account_id" json:"account_id"`
|
||||||
Hostname string `db:"hostname" json:"hostname"`
|
Hostname string `db:"hostname" json:"hostname"`
|
||||||
Location string `db:"location" json:"location"`
|
Location string `db:"location" json:"location"`
|
||||||
Ip pgtype.Text `db:"ip" json:"ip"`
|
Ip sql.NullString `db:"ip" json:"ip"`
|
||||||
IpVersion NullMonitorsIpVersion `db:"ip_version" json:"ip_version"`
|
IpVersion NullMonitorsIpVersion `db:"ip_version" json:"ip_version"`
|
||||||
TlsName pgtype.Text `db:"tls_name" json:"tls_name"`
|
TlsName sql.NullString `db:"tls_name" json:"tls_name"`
|
||||||
ApiKey pgtype.Text `db:"api_key" json:"api_key"`
|
ApiKey sql.NullString `db:"api_key" json:"api_key"`
|
||||||
Status MonitorsStatus `db:"status" json:"status"`
|
Status MonitorsStatus `db:"status" json:"status"`
|
||||||
Config string `db:"config" json:"config"`
|
Config string `db:"config" json:"config"`
|
||||||
ClientVersion string `db:"client_version" json:"client_version"`
|
ClientVersion string `db:"client_version" json:"client_version"`
|
||||||
LastSeen pgtype.Timestamptz `db:"last_seen" json:"last_seen"`
|
LastSeen sql.NullTime `db:"last_seen" json:"last_seen"`
|
||||||
LastSubmit pgtype.Timestamptz `db:"last_submit" json:"last_submit"`
|
LastSubmit sql.NullTime `db:"last_submit" json:"last_submit"`
|
||||||
CreatedOn pgtype.Timestamptz `db:"created_on" json:"created_on"`
|
CreatedOn time.Time `db:"created_on" json:"created_on"`
|
||||||
DeletedOn pgtype.Timestamptz `db:"deleted_on" json:"deleted_on"`
|
DeletedOn sql.NullTime `db:"deleted_on" json:"deleted_on"`
|
||||||
IsCurrent pgtype.Bool `db:"is_current" json:"is_current"`
|
IsCurrent sql.NullBool `db:"is_current" json:"is_current"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Server struct {
|
type Server struct {
|
||||||
ID int64 `db:"id" json:"id"`
|
ID uint32 `db:"id" json:"id"`
|
||||||
Ip string `db:"ip" json:"ip"`
|
Ip string `db:"ip" json:"ip"`
|
||||||
IpVersion ServersIpVersion `db:"ip_version" json:"ip_version"`
|
IpVersion ServersIpVersion `db:"ip_version" json:"ip_version"`
|
||||||
UserID pgtype.Int8 `db:"user_id" json:"user_id"`
|
UserID sql.NullInt32 `db:"user_id" json:"user_id"`
|
||||||
AccountID pgtype.Int8 `db:"account_id" json:"account_id"`
|
AccountID sql.NullInt32 `db:"account_id" json:"account_id"`
|
||||||
Hostname pgtype.Text `db:"hostname" json:"hostname"`
|
Hostname sql.NullString `db:"hostname" json:"hostname"`
|
||||||
Stratum pgtype.Int2 `db:"stratum" json:"stratum"`
|
Stratum sql.NullInt16 `db:"stratum" json:"stratum"`
|
||||||
InPool int16 `db:"in_pool" json:"in_pool"`
|
InPool uint8 `db:"in_pool" json:"in_pool"`
|
||||||
InServerList int16 `db:"in_server_list" json:"in_server_list"`
|
InServerList uint8 `db:"in_server_list" json:"in_server_list"`
|
||||||
Netspeed int64 `db:"netspeed" json:"netspeed"`
|
Netspeed uint32 `db:"netspeed" json:"netspeed"`
|
||||||
NetspeedTarget int64 `db:"netspeed_target" json:"netspeed_target"`
|
NetspeedTarget uint32 `db:"netspeed_target" json:"netspeed_target"`
|
||||||
CreatedOn pgtype.Timestamptz `db:"created_on" json:"created_on"`
|
CreatedOn time.Time `db:"created_on" json:"created_on"`
|
||||||
UpdatedOn pgtype.Timestamptz `db:"updated_on" json:"updated_on"`
|
UpdatedOn time.Time `db:"updated_on" json:"updated_on"`
|
||||||
ScoreTs pgtype.Timestamptz `db:"score_ts" json:"score_ts"`
|
ScoreTs sql.NullTime `db:"score_ts" json:"score_ts"`
|
||||||
ScoreRaw float64 `db:"score_raw" json:"score_raw"`
|
ScoreRaw float64 `db:"score_raw" json:"score_raw"`
|
||||||
DeletionOn pgtype.Date `db:"deletion_on" json:"deletion_on"`
|
DeletionOn sql.NullTime `db:"deletion_on" json:"deletion_on"`
|
||||||
Flags string `db:"flags" json:"flags"`
|
Flags string `db:"flags" json:"flags"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Zone struct {
|
type Zone struct {
|
||||||
ID int64 `db:"id" json:"id"`
|
ID uint32 `db:"id" json:"id"`
|
||||||
Name string `db:"name" json:"name"`
|
Name string `db:"name" json:"name"`
|
||||||
Description pgtype.Text `db:"description" json:"description"`
|
Description sql.NullString `db:"description" json:"description"`
|
||||||
ParentID pgtype.Int8 `db:"parent_id" json:"parent_id"`
|
ParentID sql.NullInt32 `db:"parent_id" json:"parent_id"`
|
||||||
Dns bool `db:"dns" json:"dns"`
|
Dns bool `db:"dns" json:"dns"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ZoneServerCount struct {
|
type ZoneServerCount struct {
|
||||||
ID int64 `db:"id" json:"id"`
|
ID uint32 `db:"id" json:"id"`
|
||||||
ZoneID int64 `db:"zone_id" json:"zone_id"`
|
ZoneID uint32 `db:"zone_id" json:"zone_id"`
|
||||||
IpVersion ZoneServerCountsIpVersion `db:"ip_version" json:"ip_version"`
|
IpVersion ZoneServerCountsIpVersion `db:"ip_version" json:"ip_version"`
|
||||||
Date pgtype.Date `db:"date" json:"date"`
|
Date time.Time `db:"date" json:"date"`
|
||||||
CountActive int32 `db:"count_active" json:"count_active"`
|
CountActive uint32 `db:"count_active" json:"count_active"`
|
||||||
CountRegistered int32 `db:"count_registered" json:"count_registered"`
|
CountRegistered uint32 `db:"count_registered" json:"count_registered"`
|
||||||
NetspeedActive int `db:"netspeed_active" json:"netspeed_active"`
|
NetspeedActive int `db:"netspeed_active" json:"netspeed_active"`
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,55 +0,0 @@
|
|||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
_codes "go.opentelemetry.io/otel/codes"
|
|
||||||
"go.opentelemetry.io/otel"
|
|
||||||
"go.opentelemetry.io/otel/attribute"
|
|
||||||
)
|
|
||||||
|
|
||||||
{{ $decorator := (or .Vars.DecoratorName (printf "%sWithTracing" .Interface.Name)) }}
|
|
||||||
{{ $spanNameType := (or .Vars.SpanNamePrefix .Interface.Name) }}
|
|
||||||
|
|
||||||
// {{$decorator}} implements {{.Interface.Name}} interface instrumented with open telemetry spans
|
|
||||||
type {{$decorator}} struct {
|
|
||||||
{{.Interface.Type}}
|
|
||||||
_instance string
|
|
||||||
_spanDecorator func(span trace.Span, params, results map[string]interface{})
|
|
||||||
}
|
|
||||||
|
|
||||||
// New{{$decorator}} returns {{$decorator}}
|
|
||||||
func New{{$decorator}} (base {{.Interface.Type}}, instance string, spanDecorator ...func(span trace.Span, params, results map[string]interface{})) {{$decorator}} {
|
|
||||||
d := {{$decorator}} {
|
|
||||||
{{.Interface.Name}}: base,
|
|
||||||
_instance: instance,
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(spanDecorator) > 0 && spanDecorator[0] != nil {
|
|
||||||
d._spanDecorator = spanDecorator[0]
|
|
||||||
}
|
|
||||||
|
|
||||||
return d
|
|
||||||
}
|
|
||||||
|
|
||||||
{{range $method := .Interface.Methods}}
|
|
||||||
{{if $method.AcceptsContext}}
|
|
||||||
// {{$method.Name}} implements {{$.Interface.Name}}
|
|
||||||
func (_d {{$decorator}}) {{$method.Declaration}} {
|
|
||||||
ctx, _span := otel.Tracer(_d._instance).Start(ctx, "{{$spanNameType}}.{{$method.Name}}")
|
|
||||||
defer func() {
|
|
||||||
if _d._spanDecorator != nil {
|
|
||||||
_d._spanDecorator(_span, {{$method.ParamsMap}}, {{$method.ResultsMap}})
|
|
||||||
}{{- if $method.ReturnsError}} else if err != nil {
|
|
||||||
_span.RecordError(err)
|
|
||||||
_span.SetStatus(_codes.Error, err.Error())
|
|
||||||
_span.SetAttributes(
|
|
||||||
attribute.String("event", "error"),
|
|
||||||
attribute.String("message", err.Error()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
{{end}}
|
|
||||||
_span.End()
|
|
||||||
}()
|
|
||||||
{{$method.Pass (printf "_d.%s." $.Interface.Name) }}
|
|
||||||
}
|
|
||||||
{{end}}
|
|
||||||
{{end}}
|
|
||||||
@@ -1,5 +1,5 @@
|
|||||||
// Code generated by gowrap. DO NOT EDIT.
|
// Code generated by gowrap. DO NOT EDIT.
|
||||||
// template: opentelemetry.gowrap
|
// template: https://raw.githubusercontent.com/hexdigest/gowrap/6bd1bc023b4d2a619f30020924f258b8ff665a7a/templates/opentelemetry
|
||||||
// gowrap: http://github.com/hexdigest/gowrap
|
// gowrap: http://github.com/hexdigest/gowrap
|
||||||
|
|
||||||
package ntpdb
|
package ntpdb
|
||||||
@@ -7,11 +7,10 @@ package ntpdb
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"go.opentelemetry.io/otel/trace"
|
|
||||||
|
|
||||||
"go.opentelemetry.io/otel"
|
"go.opentelemetry.io/otel"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
_codes "go.opentelemetry.io/otel/codes"
|
_codes "go.opentelemetry.io/otel/codes"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
// QuerierTxWithTracing implements QuerierTx interface instrumented with open telemetry spans
|
// QuerierTxWithTracing implements QuerierTx interface instrumented with open telemetry spans
|
||||||
@@ -105,7 +104,7 @@ func (_d QuerierTxWithTracing) GetMonitorByNameAndIPVersion(ctx context.Context,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetMonitorsByID implements QuerierTx
|
// GetMonitorsByID implements QuerierTx
|
||||||
func (_d QuerierTxWithTracing) GetMonitorsByID(ctx context.Context, monitorids []int64) (ma1 []Monitor, err error) {
|
func (_d QuerierTxWithTracing) GetMonitorsByID(ctx context.Context, monitorids []uint32) (ma1 []Monitor, err error) {
|
||||||
ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetMonitorsByID")
|
ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetMonitorsByID")
|
||||||
defer func() {
|
defer func() {
|
||||||
if _d._spanDecorator != nil {
|
if _d._spanDecorator != nil {
|
||||||
@@ -129,7 +128,7 @@ func (_d QuerierTxWithTracing) GetMonitorsByID(ctx context.Context, monitorids [
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetServerByID implements QuerierTx
|
// GetServerByID implements QuerierTx
|
||||||
func (_d QuerierTxWithTracing) GetServerByID(ctx context.Context, id int64) (s1 Server, err error) {
|
func (_d QuerierTxWithTracing) GetServerByID(ctx context.Context, id uint32) (s1 Server, err error) {
|
||||||
ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetServerByID")
|
ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetServerByID")
|
||||||
defer func() {
|
defer func() {
|
||||||
if _d._spanDecorator != nil {
|
if _d._spanDecorator != nil {
|
||||||
@@ -225,14 +224,14 @@ func (_d QuerierTxWithTracing) GetServerLogScoresByMonitorID(ctx context.Context
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetServerNetspeed implements QuerierTx
|
// GetServerNetspeed implements QuerierTx
|
||||||
func (_d QuerierTxWithTracing) GetServerNetspeed(ctx context.Context, ip string) (i1 int64, err error) {
|
func (_d QuerierTxWithTracing) GetServerNetspeed(ctx context.Context, ip string) (u1 uint32, err error) {
|
||||||
ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetServerNetspeed")
|
ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetServerNetspeed")
|
||||||
defer func() {
|
defer func() {
|
||||||
if _d._spanDecorator != nil {
|
if _d._spanDecorator != nil {
|
||||||
_d._spanDecorator(_span, map[string]interface{}{
|
_d._spanDecorator(_span, map[string]interface{}{
|
||||||
"ctx": ctx,
|
"ctx": ctx,
|
||||||
"ip": ip}, map[string]interface{}{
|
"ip": ip}, map[string]interface{}{
|
||||||
"i1": i1,
|
"u1": u1,
|
||||||
"err": err})
|
"err": err})
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
_span.RecordError(err)
|
_span.RecordError(err)
|
||||||
@@ -297,7 +296,7 @@ func (_d QuerierTxWithTracing) GetZoneByName(ctx context.Context, name string) (
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetZoneCounts implements QuerierTx
|
// GetZoneCounts implements QuerierTx
|
||||||
func (_d QuerierTxWithTracing) GetZoneCounts(ctx context.Context, zoneID int64) (za1 []ZoneServerCount, err error) {
|
func (_d QuerierTxWithTracing) GetZoneCounts(ctx context.Context, zoneID uint32) (za1 []ZoneServerCount, err error) {
|
||||||
ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetZoneCounts")
|
ctx, _span := otel.Tracer(_d._instance).Start(ctx, "QuerierTx.GetZoneCounts")
|
||||||
defer func() {
|
defer func() {
|
||||||
if _d._spanDecorator != nil {
|
if _d._spanDecorator != nil {
|
||||||
|
|||||||
@@ -10,15 +10,15 @@ import (
|
|||||||
|
|
||||||
type Querier interface {
|
type Querier interface {
|
||||||
GetMonitorByNameAndIPVersion(ctx context.Context, arg GetMonitorByNameAndIPVersionParams) (Monitor, error)
|
GetMonitorByNameAndIPVersion(ctx context.Context, arg GetMonitorByNameAndIPVersionParams) (Monitor, error)
|
||||||
GetMonitorsByID(ctx context.Context, monitorids []int64) ([]Monitor, error)
|
GetMonitorsByID(ctx context.Context, monitorids []uint32) ([]Monitor, error)
|
||||||
GetServerByID(ctx context.Context, id int64) (Server, error)
|
GetServerByID(ctx context.Context, id uint32) (Server, error)
|
||||||
GetServerByIP(ctx context.Context, ip string) (Server, error)
|
GetServerByIP(ctx context.Context, ip string) (Server, error)
|
||||||
GetServerLogScores(ctx context.Context, arg GetServerLogScoresParams) ([]LogScore, error)
|
GetServerLogScores(ctx context.Context, arg GetServerLogScoresParams) ([]LogScore, error)
|
||||||
GetServerLogScoresByMonitorID(ctx context.Context, arg GetServerLogScoresByMonitorIDParams) ([]LogScore, error)
|
GetServerLogScoresByMonitorID(ctx context.Context, arg GetServerLogScoresByMonitorIDParams) ([]LogScore, error)
|
||||||
GetServerNetspeed(ctx context.Context, ip string) (int64, error)
|
GetServerNetspeed(ctx context.Context, ip string) (uint32, error)
|
||||||
GetServerScores(ctx context.Context, arg GetServerScoresParams) ([]GetServerScoresRow, error)
|
GetServerScores(ctx context.Context, arg GetServerScoresParams) ([]GetServerScoresRow, error)
|
||||||
GetZoneByName(ctx context.Context, name string) (Zone, error)
|
GetZoneByName(ctx context.Context, name string) (Zone, error)
|
||||||
GetZoneCounts(ctx context.Context, zoneID int64) ([]ZoneServerCount, error)
|
GetZoneCounts(ctx context.Context, zoneID uint32) ([]ZoneServerCount, error)
|
||||||
GetZoneStatsData(ctx context.Context) ([]GetZoneStatsDataRow, error)
|
GetZoneStatsData(ctx context.Context) ([]GetZoneStatsDataRow, error)
|
||||||
GetZoneStatsV2(ctx context.Context, ip string) ([]GetZoneStatsV2Row, error)
|
GetZoneStatsV2(ctx context.Context, ip string) ([]GetZoneStatsV2Row, error)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,27 +7,28 @@ package ntpdb
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"database/sql"
|
||||||
"github.com/jackc/pgx/v5/pgtype"
|
"strings"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const getMonitorByNameAndIPVersion = `-- name: GetMonitorByNameAndIPVersion :one
|
const getMonitorByNameAndIPVersion = `-- name: GetMonitorByNameAndIPVersion :one
|
||||||
select id, id_token, type, user_id, account_id, hostname, location, ip, ip_version, tls_name, api_key, status, config, client_version, last_seen, last_submit, created_on, deleted_on, is_current from monitors
|
select id, id_token, type, user_id, account_id, hostname, location, ip, ip_version, tls_name, api_key, status, config, client_version, last_seen, last_submit, created_on, deleted_on, is_current from monitors
|
||||||
where
|
where
|
||||||
tls_name like $1 AND
|
tls_name like ? AND
|
||||||
(ip_version = $2 OR (type = 'score' AND ip_version IS NULL)) AND
|
(ip_version = ? OR (type = 'score' AND ip_version IS NULL)) AND
|
||||||
is_current = true
|
is_current = 1
|
||||||
order by id
|
order by id
|
||||||
limit 1
|
limit 1
|
||||||
`
|
`
|
||||||
|
|
||||||
type GetMonitorByNameAndIPVersionParams struct {
|
type GetMonitorByNameAndIPVersionParams struct {
|
||||||
TlsName pgtype.Text `db:"tls_name" json:"tls_name"`
|
TlsName sql.NullString `db:"tls_name" json:"tls_name"`
|
||||||
IpVersion NullMonitorsIpVersion `db:"ip_version" json:"ip_version"`
|
IpVersion NullMonitorsIpVersion `db:"ip_version" json:"ip_version"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *Queries) GetMonitorByNameAndIPVersion(ctx context.Context, arg GetMonitorByNameAndIPVersionParams) (Monitor, error) {
|
func (q *Queries) GetMonitorByNameAndIPVersion(ctx context.Context, arg GetMonitorByNameAndIPVersionParams) (Monitor, error) {
|
||||||
row := q.db.QueryRow(ctx, getMonitorByNameAndIPVersion, arg.TlsName, arg.IpVersion)
|
row := q.db.QueryRowContext(ctx, getMonitorByNameAndIPVersion, arg.TlsName, arg.IpVersion)
|
||||||
var i Monitor
|
var i Monitor
|
||||||
err := row.Scan(
|
err := row.Scan(
|
||||||
&i.ID,
|
&i.ID,
|
||||||
@@ -55,11 +56,21 @@ func (q *Queries) GetMonitorByNameAndIPVersion(ctx context.Context, arg GetMonit
|
|||||||
|
|
||||||
const getMonitorsByID = `-- name: GetMonitorsByID :many
|
const getMonitorsByID = `-- name: GetMonitorsByID :many
|
||||||
select id, id_token, type, user_id, account_id, hostname, location, ip, ip_version, tls_name, api_key, status, config, client_version, last_seen, last_submit, created_on, deleted_on, is_current from monitors
|
select id, id_token, type, user_id, account_id, hostname, location, ip, ip_version, tls_name, api_key, status, config, client_version, last_seen, last_submit, created_on, deleted_on, is_current from monitors
|
||||||
where id = ANY($1::bigint[])
|
where id in (/*SLICE:MonitorIDs*/?)
|
||||||
`
|
`
|
||||||
|
|
||||||
func (q *Queries) GetMonitorsByID(ctx context.Context, monitorids []int64) ([]Monitor, error) {
|
func (q *Queries) GetMonitorsByID(ctx context.Context, monitorids []uint32) ([]Monitor, error) {
|
||||||
rows, err := q.db.Query(ctx, getMonitorsByID, monitorids)
|
query := getMonitorsByID
|
||||||
|
var queryParams []interface{}
|
||||||
|
if len(monitorids) > 0 {
|
||||||
|
for _, v := range monitorids {
|
||||||
|
queryParams = append(queryParams, v)
|
||||||
|
}
|
||||||
|
query = strings.Replace(query, "/*SLICE:MonitorIDs*/?", strings.Repeat(",?", len(monitorids))[1:], 1)
|
||||||
|
} else {
|
||||||
|
query = strings.Replace(query, "/*SLICE:MonitorIDs*/?", "NULL", 1)
|
||||||
|
}
|
||||||
|
rows, err := q.db.QueryContext(ctx, query, queryParams...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -92,6 +103,9 @@ func (q *Queries) GetMonitorsByID(ctx context.Context, monitorids []int64) ([]Mo
|
|||||||
}
|
}
|
||||||
items = append(items, i)
|
items = append(items, i)
|
||||||
}
|
}
|
||||||
|
if err := rows.Close(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
if err := rows.Err(); err != nil {
|
if err := rows.Err(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -101,11 +115,11 @@ func (q *Queries) GetMonitorsByID(ctx context.Context, monitorids []int64) ([]Mo
|
|||||||
const getServerByID = `-- name: GetServerByID :one
|
const getServerByID = `-- name: GetServerByID :one
|
||||||
select id, ip, ip_version, user_id, account_id, hostname, stratum, in_pool, in_server_list, netspeed, netspeed_target, created_on, updated_on, score_ts, score_raw, deletion_on, flags from servers
|
select id, ip, ip_version, user_id, account_id, hostname, stratum, in_pool, in_server_list, netspeed, netspeed_target, created_on, updated_on, score_ts, score_raw, deletion_on, flags from servers
|
||||||
where
|
where
|
||||||
id = $1
|
id = ?
|
||||||
`
|
`
|
||||||
|
|
||||||
func (q *Queries) GetServerByID(ctx context.Context, id int64) (Server, error) {
|
func (q *Queries) GetServerByID(ctx context.Context, id uint32) (Server, error) {
|
||||||
row := q.db.QueryRow(ctx, getServerByID, id)
|
row := q.db.QueryRowContext(ctx, getServerByID, id)
|
||||||
var i Server
|
var i Server
|
||||||
err := row.Scan(
|
err := row.Scan(
|
||||||
&i.ID,
|
&i.ID,
|
||||||
@@ -132,11 +146,11 @@ func (q *Queries) GetServerByID(ctx context.Context, id int64) (Server, error) {
|
|||||||
const getServerByIP = `-- name: GetServerByIP :one
|
const getServerByIP = `-- name: GetServerByIP :one
|
||||||
select id, ip, ip_version, user_id, account_id, hostname, stratum, in_pool, in_server_list, netspeed, netspeed_target, created_on, updated_on, score_ts, score_raw, deletion_on, flags from servers
|
select id, ip, ip_version, user_id, account_id, hostname, stratum, in_pool, in_server_list, netspeed, netspeed_target, created_on, updated_on, score_ts, score_raw, deletion_on, flags from servers
|
||||||
where
|
where
|
||||||
ip = $1
|
ip = ?
|
||||||
`
|
`
|
||||||
|
|
||||||
func (q *Queries) GetServerByIP(ctx context.Context, ip string) (Server, error) {
|
func (q *Queries) GetServerByIP(ctx context.Context, ip string) (Server, error) {
|
||||||
row := q.db.QueryRow(ctx, getServerByIP, ip)
|
row := q.db.QueryRowContext(ctx, getServerByIP, ip)
|
||||||
var i Server
|
var i Server
|
||||||
err := row.Scan(
|
err := row.Scan(
|
||||||
&i.ID,
|
&i.ID,
|
||||||
@@ -161,20 +175,20 @@ func (q *Queries) GetServerByIP(ctx context.Context, ip string) (Server, error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
const getServerLogScores = `-- name: GetServerLogScores :many
|
const getServerLogScores = `-- name: GetServerLogScores :many
|
||||||
select id, monitor_id, server_id, ts, score, step, "offset", rtt, attributes from log_scores
|
select id, monitor_id, server_id, ts, score, step, offset, rtt, attributes from log_scores
|
||||||
where
|
where
|
||||||
server_id = $1
|
server_id = ?
|
||||||
order by ts desc
|
order by ts desc
|
||||||
limit $2
|
limit ?
|
||||||
`
|
`
|
||||||
|
|
||||||
type GetServerLogScoresParams struct {
|
type GetServerLogScoresParams struct {
|
||||||
ServerID int64 `db:"server_id" json:"server_id"`
|
ServerID uint32 `db:"server_id" json:"server_id"`
|
||||||
Limit int32 `db:"limit" json:"limit"`
|
Limit int32 `db:"limit" json:"limit"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *Queries) GetServerLogScores(ctx context.Context, arg GetServerLogScoresParams) ([]LogScore, error) {
|
func (q *Queries) GetServerLogScores(ctx context.Context, arg GetServerLogScoresParams) ([]LogScore, error) {
|
||||||
rows, err := q.db.Query(ctx, getServerLogScores, arg.ServerID, arg.Limit)
|
rows, err := q.db.QueryContext(ctx, getServerLogScores, arg.ServerID, arg.Limit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -197,6 +211,9 @@ func (q *Queries) GetServerLogScores(ctx context.Context, arg GetServerLogScores
|
|||||||
}
|
}
|
||||||
items = append(items, i)
|
items = append(items, i)
|
||||||
}
|
}
|
||||||
|
if err := rows.Close(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
if err := rows.Err(); err != nil {
|
if err := rows.Err(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -204,22 +221,22 @@ func (q *Queries) GetServerLogScores(ctx context.Context, arg GetServerLogScores
|
|||||||
}
|
}
|
||||||
|
|
||||||
const getServerLogScoresByMonitorID = `-- name: GetServerLogScoresByMonitorID :many
|
const getServerLogScoresByMonitorID = `-- name: GetServerLogScoresByMonitorID :many
|
||||||
select id, monitor_id, server_id, ts, score, step, "offset", rtt, attributes from log_scores
|
select id, monitor_id, server_id, ts, score, step, offset, rtt, attributes from log_scores
|
||||||
where
|
where
|
||||||
server_id = $1 AND
|
server_id = ? AND
|
||||||
monitor_id = $2
|
monitor_id = ?
|
||||||
order by ts desc
|
order by ts desc
|
||||||
limit $3
|
limit ?
|
||||||
`
|
`
|
||||||
|
|
||||||
type GetServerLogScoresByMonitorIDParams struct {
|
type GetServerLogScoresByMonitorIDParams struct {
|
||||||
ServerID int64 `db:"server_id" json:"server_id"`
|
ServerID uint32 `db:"server_id" json:"server_id"`
|
||||||
MonitorID pgtype.Int8 `db:"monitor_id" json:"monitor_id"`
|
MonitorID sql.NullInt32 `db:"monitor_id" json:"monitor_id"`
|
||||||
Limit int32 `db:"limit" json:"limit"`
|
Limit int32 `db:"limit" json:"limit"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *Queries) GetServerLogScoresByMonitorID(ctx context.Context, arg GetServerLogScoresByMonitorIDParams) ([]LogScore, error) {
|
func (q *Queries) GetServerLogScoresByMonitorID(ctx context.Context, arg GetServerLogScoresByMonitorIDParams) ([]LogScore, error) {
|
||||||
rows, err := q.db.Query(ctx, getServerLogScoresByMonitorID, arg.ServerID, arg.MonitorID, arg.Limit)
|
rows, err := q.db.QueryContext(ctx, getServerLogScoresByMonitorID, arg.ServerID, arg.MonitorID, arg.Limit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -242,6 +259,9 @@ func (q *Queries) GetServerLogScoresByMonitorID(ctx context.Context, arg GetServ
|
|||||||
}
|
}
|
||||||
items = append(items, i)
|
items = append(items, i)
|
||||||
}
|
}
|
||||||
|
if err := rows.Close(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
if err := rows.Err(); err != nil {
|
if err := rows.Err(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -249,12 +269,12 @@ func (q *Queries) GetServerLogScoresByMonitorID(ctx context.Context, arg GetServ
|
|||||||
}
|
}
|
||||||
|
|
||||||
const getServerNetspeed = `-- name: GetServerNetspeed :one
|
const getServerNetspeed = `-- name: GetServerNetspeed :one
|
||||||
select netspeed from servers where ip = $1
|
select netspeed from servers where ip = ?
|
||||||
`
|
`
|
||||||
|
|
||||||
func (q *Queries) GetServerNetspeed(ctx context.Context, ip string) (int64, error) {
|
func (q *Queries) GetServerNetspeed(ctx context.Context, ip string) (uint32, error) {
|
||||||
row := q.db.QueryRow(ctx, getServerNetspeed, ip)
|
row := q.db.QueryRowContext(ctx, getServerNetspeed, ip)
|
||||||
var netspeed int64
|
var netspeed uint32
|
||||||
err := row.Scan(&netspeed)
|
err := row.Scan(&netspeed)
|
||||||
return netspeed, err
|
return netspeed, err
|
||||||
}
|
}
|
||||||
@@ -267,28 +287,39 @@ select
|
|||||||
inner join monitors m
|
inner join monitors m
|
||||||
on (m.id=ss.monitor_id)
|
on (m.id=ss.monitor_id)
|
||||||
where
|
where
|
||||||
server_id = $1 AND
|
server_id = ? AND
|
||||||
monitor_id = ANY($2::bigint[])
|
monitor_id in (/*SLICE:MonitorIDs*/?)
|
||||||
`
|
`
|
||||||
|
|
||||||
type GetServerScoresParams struct {
|
type GetServerScoresParams struct {
|
||||||
ServerID int64 `db:"server_id" json:"server_id"`
|
ServerID uint32 `db:"server_id" json:"server_id"`
|
||||||
MonitorIDs []int64 `db:"MonitorIDs" json:"MonitorIDs"`
|
MonitorIDs []uint32 `db:"MonitorIDs" json:"MonitorIDs"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type GetServerScoresRow struct {
|
type GetServerScoresRow struct {
|
||||||
ID int64 `db:"id" json:"id"`
|
ID uint32 `db:"id" json:"id"`
|
||||||
Hostname string `db:"hostname" json:"hostname"`
|
Hostname string `db:"hostname" json:"hostname"`
|
||||||
TlsName pgtype.Text `db:"tls_name" json:"tls_name"`
|
TlsName sql.NullString `db:"tls_name" json:"tls_name"`
|
||||||
Location string `db:"location" json:"location"`
|
Location string `db:"location" json:"location"`
|
||||||
Type MonitorsType `db:"type" json:"type"`
|
Type MonitorsType `db:"type" json:"type"`
|
||||||
ScoreRaw float64 `db:"score_raw" json:"score_raw"`
|
ScoreRaw float64 `db:"score_raw" json:"score_raw"`
|
||||||
ScoreTs pgtype.Timestamptz `db:"score_ts" json:"score_ts"`
|
ScoreTs sql.NullTime `db:"score_ts" json:"score_ts"`
|
||||||
Status ServerScoresStatus `db:"status" json:"status"`
|
Status ServerScoresStatus `db:"status" json:"status"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *Queries) GetServerScores(ctx context.Context, arg GetServerScoresParams) ([]GetServerScoresRow, error) {
|
func (q *Queries) GetServerScores(ctx context.Context, arg GetServerScoresParams) ([]GetServerScoresRow, error) {
|
||||||
rows, err := q.db.Query(ctx, getServerScores, arg.ServerID, arg.MonitorIDs)
|
query := getServerScores
|
||||||
|
var queryParams []interface{}
|
||||||
|
queryParams = append(queryParams, arg.ServerID)
|
||||||
|
if len(arg.MonitorIDs) > 0 {
|
||||||
|
for _, v := range arg.MonitorIDs {
|
||||||
|
queryParams = append(queryParams, v)
|
||||||
|
}
|
||||||
|
query = strings.Replace(query, "/*SLICE:MonitorIDs*/?", strings.Repeat(",?", len(arg.MonitorIDs))[1:], 1)
|
||||||
|
} else {
|
||||||
|
query = strings.Replace(query, "/*SLICE:MonitorIDs*/?", "NULL", 1)
|
||||||
|
}
|
||||||
|
rows, err := q.db.QueryContext(ctx, query, queryParams...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -310,6 +341,9 @@ func (q *Queries) GetServerScores(ctx context.Context, arg GetServerScoresParams
|
|||||||
}
|
}
|
||||||
items = append(items, i)
|
items = append(items, i)
|
||||||
}
|
}
|
||||||
|
if err := rows.Close(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
if err := rows.Err(); err != nil {
|
if err := rows.Err(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -319,11 +353,11 @@ func (q *Queries) GetServerScores(ctx context.Context, arg GetServerScoresParams
|
|||||||
const getZoneByName = `-- name: GetZoneByName :one
|
const getZoneByName = `-- name: GetZoneByName :one
|
||||||
select id, name, description, parent_id, dns from zones
|
select id, name, description, parent_id, dns from zones
|
||||||
where
|
where
|
||||||
name = $1
|
name = ?
|
||||||
`
|
`
|
||||||
|
|
||||||
func (q *Queries) GetZoneByName(ctx context.Context, name string) (Zone, error) {
|
func (q *Queries) GetZoneByName(ctx context.Context, name string) (Zone, error) {
|
||||||
row := q.db.QueryRow(ctx, getZoneByName, name)
|
row := q.db.QueryRowContext(ctx, getZoneByName, name)
|
||||||
var i Zone
|
var i Zone
|
||||||
err := row.Scan(
|
err := row.Scan(
|
||||||
&i.ID,
|
&i.ID,
|
||||||
@@ -337,12 +371,12 @@ func (q *Queries) GetZoneByName(ctx context.Context, name string) (Zone, error)
|
|||||||
|
|
||||||
const getZoneCounts = `-- name: GetZoneCounts :many
|
const getZoneCounts = `-- name: GetZoneCounts :many
|
||||||
select id, zone_id, ip_version, date, count_active, count_registered, netspeed_active from zone_server_counts
|
select id, zone_id, ip_version, date, count_active, count_registered, netspeed_active from zone_server_counts
|
||||||
where zone_id = $1
|
where zone_id = ?
|
||||||
order by date
|
order by date
|
||||||
`
|
`
|
||||||
|
|
||||||
func (q *Queries) GetZoneCounts(ctx context.Context, zoneID int64) ([]ZoneServerCount, error) {
|
func (q *Queries) GetZoneCounts(ctx context.Context, zoneID uint32) ([]ZoneServerCount, error) {
|
||||||
rows, err := q.db.Query(ctx, getZoneCounts, zoneID)
|
rows, err := q.db.QueryContext(ctx, getZoneCounts, zoneID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -363,6 +397,9 @@ func (q *Queries) GetZoneCounts(ctx context.Context, zoneID int64) ([]ZoneServer
|
|||||||
}
|
}
|
||||||
items = append(items, i)
|
items = append(items, i)
|
||||||
}
|
}
|
||||||
|
if err := rows.Close(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
if err := rows.Err(); err != nil {
|
if err := rows.Err(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -371,7 +408,7 @@ func (q *Queries) GetZoneCounts(ctx context.Context, zoneID int64) ([]ZoneServer
|
|||||||
|
|
||||||
const getZoneStatsData = `-- name: GetZoneStatsData :many
|
const getZoneStatsData = `-- name: GetZoneStatsData :many
|
||||||
SELECT zc.date, z.name, zc.ip_version, count_active, count_registered, netspeed_active
|
SELECT zc.date, z.name, zc.ip_version, count_active, count_registered, netspeed_active
|
||||||
FROM zone_server_counts zc
|
FROM zone_server_counts zc USE INDEX (date_idx)
|
||||||
INNER JOIN zones z
|
INNER JOIN zones z
|
||||||
ON(zc.zone_id=z.id)
|
ON(zc.zone_id=z.id)
|
||||||
WHERE date IN (SELECT max(date) from zone_server_counts)
|
WHERE date IN (SELECT max(date) from zone_server_counts)
|
||||||
@@ -379,16 +416,16 @@ ORDER BY name
|
|||||||
`
|
`
|
||||||
|
|
||||||
type GetZoneStatsDataRow struct {
|
type GetZoneStatsDataRow struct {
|
||||||
Date pgtype.Date `db:"date" json:"date"`
|
Date time.Time `db:"date" json:"date"`
|
||||||
Name string `db:"name" json:"name"`
|
Name string `db:"name" json:"name"`
|
||||||
IpVersion ZoneServerCountsIpVersion `db:"ip_version" json:"ip_version"`
|
IpVersion ZoneServerCountsIpVersion `db:"ip_version" json:"ip_version"`
|
||||||
CountActive int32 `db:"count_active" json:"count_active"`
|
CountActive uint32 `db:"count_active" json:"count_active"`
|
||||||
CountRegistered int32 `db:"count_registered" json:"count_registered"`
|
CountRegistered uint32 `db:"count_registered" json:"count_registered"`
|
||||||
NetspeedActive int `db:"netspeed_active" json:"netspeed_active"`
|
NetspeedActive int `db:"netspeed_active" json:"netspeed_active"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *Queries) GetZoneStatsData(ctx context.Context) ([]GetZoneStatsDataRow, error) {
|
func (q *Queries) GetZoneStatsData(ctx context.Context) ([]GetZoneStatsDataRow, error) {
|
||||||
rows, err := q.db.Query(ctx, getZoneStatsData)
|
rows, err := q.db.QueryContext(ctx, getZoneStatsData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -408,6 +445,9 @@ func (q *Queries) GetZoneStatsData(ctx context.Context) ([]GetZoneStatsDataRow,
|
|||||||
}
|
}
|
||||||
items = append(items, i)
|
items = append(items, i)
|
||||||
}
|
}
|
||||||
|
if err := rows.Close(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
if err := rows.Err(); err != nil {
|
if err := rows.Err(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -415,14 +455,15 @@ func (q *Queries) GetZoneStatsData(ctx context.Context) ([]GetZoneStatsDataRow,
|
|||||||
}
|
}
|
||||||
|
|
||||||
const getZoneStatsV2 = `-- name: GetZoneStatsV2 :many
|
const getZoneStatsV2 = `-- name: GetZoneStatsV2 :many
|
||||||
|
select zone_name, netspeed_active+0 as netspeed_active FROM (
|
||||||
SELECT
|
SELECT
|
||||||
z.name as zone_name,
|
z.name as zone_name,
|
||||||
CAST(SUM(
|
SUM(
|
||||||
CASE WHEN deletion_on IS NULL AND score_raw > 10
|
IF (deletion_on IS NULL AND score_raw > 10,
|
||||||
THEN netspeed
|
netspeed,
|
||||||
ELSE 0
|
0
|
||||||
END
|
)
|
||||||
) AS int) AS netspeed_active
|
) AS netspeed_active
|
||||||
FROM
|
FROM
|
||||||
servers s
|
servers s
|
||||||
INNER JOIN server_zones sz ON (sz.server_id = s.id)
|
INNER JOIN server_zones sz ON (sz.server_id = s.id)
|
||||||
@@ -431,22 +472,23 @@ FROM
|
|||||||
select zone_id, s.ip_version
|
select zone_id, s.ip_version
|
||||||
from server_zones sz
|
from server_zones sz
|
||||||
inner join servers s on (s.id=sz.server_id)
|
inner join servers s on (s.id=sz.server_id)
|
||||||
where s.ip=$1
|
where s.ip=?
|
||||||
) as srvz on (srvz.zone_id=z.id AND srvz.ip_version=s.ip_version)
|
) as srvz on (srvz.zone_id=z.id AND srvz.ip_version=s.ip_version)
|
||||||
WHERE
|
WHERE
|
||||||
(deletion_on IS NULL OR deletion_on > NOW())
|
(deletion_on IS NULL OR deletion_on > NOW())
|
||||||
AND in_pool = 1
|
AND in_pool = 1
|
||||||
AND netspeed > 0
|
AND netspeed > 0
|
||||||
GROUP BY z.name
|
GROUP BY z.name)
|
||||||
|
AS server_netspeed
|
||||||
`
|
`
|
||||||
|
|
||||||
type GetZoneStatsV2Row struct {
|
type GetZoneStatsV2Row struct {
|
||||||
ZoneName string `db:"zone_name" json:"zone_name"`
|
ZoneName string `db:"zone_name" json:"zone_name"`
|
||||||
NetspeedActive int32 `db:"netspeed_active" json:"netspeed_active"`
|
NetspeedActive int `db:"netspeed_active" json:"netspeed_active"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *Queries) GetZoneStatsV2(ctx context.Context, ip string) ([]GetZoneStatsV2Row, error) {
|
func (q *Queries) GetZoneStatsV2(ctx context.Context, ip string) ([]GetZoneStatsV2Row, error) {
|
||||||
rows, err := q.db.Query(ctx, getZoneStatsV2, ip)
|
rows, err := q.db.QueryContext(ctx, getZoneStatsV2, ip)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -459,6 +501,9 @@ func (q *Queries) GetZoneStatsV2(ctx context.Context, ip string) ([]GetZoneStats
|
|||||||
}
|
}
|
||||||
items = append(items, i)
|
items = append(items, i)
|
||||||
}
|
}
|
||||||
|
if err := rows.Close(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
if err := rows.Err(); err != nil {
|
if err := rows.Err(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
69
ntpdb/tx.go
69
ntpdb/tx.go
@@ -2,11 +2,7 @@ package ntpdb
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"database/sql"
|
||||||
|
|
||||||
"github.com/jackc/pgx/v5"
|
|
||||||
"go.ntppool.org/common/logger"
|
|
||||||
"go.opentelemetry.io/otel/trace"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type QuerierTx interface {
|
type QuerierTx interface {
|
||||||
@@ -15,17 +11,14 @@ type QuerierTx interface {
|
|||||||
Begin(ctx context.Context) (QuerierTx, error)
|
Begin(ctx context.Context) (QuerierTx, error)
|
||||||
Commit(ctx context.Context) error
|
Commit(ctx context.Context) error
|
||||||
Rollback(ctx context.Context) error
|
Rollback(ctx context.Context) error
|
||||||
|
|
||||||
// Conn returns the connection used by this transaction
|
|
||||||
Conn() *pgx.Conn
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Beginner interface {
|
type Beginner interface {
|
||||||
Begin(context.Context) (pgx.Tx, error)
|
Begin(context.Context) (sql.Tx, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type Tx interface {
|
type Tx interface {
|
||||||
Begin(context.Context) (pgx.Tx, error)
|
Begin(context.Context) (sql.Tx, error)
|
||||||
Commit(ctx context.Context) error
|
Commit(ctx context.Context) error
|
||||||
Rollback(ctx context.Context) error
|
Rollback(ctx context.Context) error
|
||||||
}
|
}
|
||||||
@@ -35,33 +28,21 @@ func (q *Queries) Begin(ctx context.Context) (QuerierTx, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &Queries{db: tx}, nil
|
return &Queries{db: &tx}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *Queries) Commit(ctx context.Context) error {
|
func (q *Queries) Commit(ctx context.Context) error {
|
||||||
tx, ok := q.db.(Tx)
|
tx, ok := q.db.(Tx)
|
||||||
if !ok {
|
if !ok {
|
||||||
// Commit called on Queries with dbpool, so treat as transaction already committed
|
return sql.ErrTxDone
|
||||||
return pgx.ErrTxClosed
|
|
||||||
}
|
}
|
||||||
return tx.Commit(ctx)
|
return tx.Commit(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *Queries) Conn() *pgx.Conn {
|
|
||||||
// pgx.Tx is an interface that has Conn() method
|
|
||||||
tx, ok := q.db.(pgx.Tx)
|
|
||||||
if !ok {
|
|
||||||
logger.Setup().Error("could not get connection from QuerierTx")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return tx.Conn()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *Queries) Rollback(ctx context.Context) error {
|
func (q *Queries) Rollback(ctx context.Context) error {
|
||||||
tx, ok := q.db.(Tx)
|
tx, ok := q.db.(Tx)
|
||||||
if !ok {
|
if !ok {
|
||||||
// Rollback called on Queries with dbpool, so treat as transaction already committed
|
return sql.ErrTxDone
|
||||||
return pgx.ErrTxClosed
|
|
||||||
}
|
}
|
||||||
return tx.Rollback(ctx)
|
return tx.Rollback(ctx)
|
||||||
}
|
}
|
||||||
@@ -81,41 +62,3 @@ func (wq *WrappedQuerier) Begin(ctx context.Context) (QuerierTx, error) {
|
|||||||
}
|
}
|
||||||
return NewWrappedQuerier(q), nil
|
return NewWrappedQuerier(q), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wq *WrappedQuerier) Conn() *pgx.Conn {
|
|
||||||
return wq.QuerierTxWithTracing.Conn()
|
|
||||||
}
|
|
||||||
|
|
||||||
// LogRollback logs and performs a rollback if the transaction is still active
|
|
||||||
func LogRollback(ctx context.Context, tx QuerierTx) {
|
|
||||||
if !isInTransaction(tx) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
log := logger.FromContext(ctx)
|
|
||||||
log.WarnContext(ctx, "transaction rollback called on an active transaction")
|
|
||||||
|
|
||||||
// if caller ctx is done we still need rollback to happen
|
|
||||||
// so Rollback gets a fresh context with span copied over
|
|
||||||
rbCtx := context.Background()
|
|
||||||
if span := trace.SpanFromContext(ctx); span != nil {
|
|
||||||
rbCtx = trace.ContextWithSpan(rbCtx, span)
|
|
||||||
}
|
|
||||||
if err := tx.Rollback(rbCtx); err != nil && !errors.Is(err, pgx.ErrTxClosed) {
|
|
||||||
log.ErrorContext(ctx, "rollback failed", "err", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func isInTransaction(tx QuerierTx) bool {
|
|
||||||
if tx == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
conn := tx.Conn()
|
|
||||||
if conn == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// 'I' means idle, so if it's not idle, we're in a transaction
|
|
||||||
return conn.PgConn().TxStatus() != 'I'
|
|
||||||
}
|
|
||||||
|
|||||||
44
query.sql
44
query.sql
@@ -1,6 +1,6 @@
|
|||||||
-- name: GetZoneStatsData :many
|
-- name: GetZoneStatsData :many
|
||||||
SELECT zc.date, z.name, zc.ip_version, count_active, count_registered, netspeed_active
|
SELECT zc.date, z.name, zc.ip_version, count_active, count_registered, netspeed_active
|
||||||
FROM zone_server_counts zc
|
FROM zone_server_counts zc USE INDEX (date_idx)
|
||||||
INNER JOIN zones z
|
INNER JOIN zones z
|
||||||
ON(zc.zone_id=z.id)
|
ON(zc.zone_id=z.id)
|
||||||
WHERE date IN (SELECT max(date) from zone_server_counts)
|
WHERE date IN (SELECT max(date) from zone_server_counts)
|
||||||
@@ -8,17 +8,18 @@ ORDER BY name;
|
|||||||
|
|
||||||
|
|
||||||
-- name: GetServerNetspeed :one
|
-- name: GetServerNetspeed :one
|
||||||
select netspeed from servers where ip = $1;
|
select netspeed from servers where ip = ?;
|
||||||
|
|
||||||
-- name: GetZoneStatsV2 :many
|
-- name: GetZoneStatsV2 :many
|
||||||
|
select zone_name, netspeed_active+0 as netspeed_active FROM (
|
||||||
SELECT
|
SELECT
|
||||||
z.name as zone_name,
|
z.name as zone_name,
|
||||||
CAST(SUM(
|
SUM(
|
||||||
CASE WHEN deletion_on IS NULL AND score_raw > 10
|
IF (deletion_on IS NULL AND score_raw > 10,
|
||||||
THEN netspeed
|
netspeed,
|
||||||
ELSE 0
|
0
|
||||||
END
|
)
|
||||||
) AS int) AS netspeed_active
|
) AS netspeed_active
|
||||||
FROM
|
FROM
|
||||||
servers s
|
servers s
|
||||||
INNER JOIN server_zones sz ON (sz.server_id = s.id)
|
INNER JOIN server_zones sz ON (sz.server_id = s.id)
|
||||||
@@ -27,18 +28,19 @@ FROM
|
|||||||
select zone_id, s.ip_version
|
select zone_id, s.ip_version
|
||||||
from server_zones sz
|
from server_zones sz
|
||||||
inner join servers s on (s.id=sz.server_id)
|
inner join servers s on (s.id=sz.server_id)
|
||||||
where s.ip=$1
|
where s.ip=?
|
||||||
) as srvz on (srvz.zone_id=z.id AND srvz.ip_version=s.ip_version)
|
) as srvz on (srvz.zone_id=z.id AND srvz.ip_version=s.ip_version)
|
||||||
WHERE
|
WHERE
|
||||||
(deletion_on IS NULL OR deletion_on > NOW())
|
(deletion_on IS NULL OR deletion_on > NOW())
|
||||||
AND in_pool = 1
|
AND in_pool = 1
|
||||||
AND netspeed > 0
|
AND netspeed > 0
|
||||||
GROUP BY z.name;
|
GROUP BY z.name)
|
||||||
|
AS server_netspeed;
|
||||||
|
|
||||||
-- name: GetServerByID :one
|
-- name: GetServerByID :one
|
||||||
select * from servers
|
select * from servers
|
||||||
where
|
where
|
||||||
id = $1;
|
id = ?;
|
||||||
|
|
||||||
-- name: GetServerByIP :one
|
-- name: GetServerByIP :one
|
||||||
select * from servers
|
select * from servers
|
||||||
@@ -50,13 +52,13 @@ select * from monitors
|
|||||||
where
|
where
|
||||||
tls_name like sqlc.arg('tls_name') AND
|
tls_name like sqlc.arg('tls_name') AND
|
||||||
(ip_version = sqlc.arg('ip_version') OR (type = 'score' AND ip_version IS NULL)) AND
|
(ip_version = sqlc.arg('ip_version') OR (type = 'score' AND ip_version IS NULL)) AND
|
||||||
is_current = true
|
is_current = 1
|
||||||
order by id
|
order by id
|
||||||
limit 1;
|
limit 1;
|
||||||
|
|
||||||
-- name: GetMonitorsByID :many
|
-- name: GetMonitorsByID :many
|
||||||
select * from monitors
|
select * from monitors
|
||||||
where id = ANY(sqlc.arg('MonitorIDs')::bigint[]);
|
where id in (sqlc.slice('MonitorIDs'));
|
||||||
|
|
||||||
-- name: GetServerScores :many
|
-- name: GetServerScores :many
|
||||||
select
|
select
|
||||||
@@ -66,23 +68,23 @@ select
|
|||||||
inner join monitors m
|
inner join monitors m
|
||||||
on (m.id=ss.monitor_id)
|
on (m.id=ss.monitor_id)
|
||||||
where
|
where
|
||||||
server_id = $1 AND
|
server_id = ? AND
|
||||||
monitor_id = ANY(sqlc.arg('MonitorIDs')::bigint[]);
|
monitor_id in (sqlc.slice('MonitorIDs'));
|
||||||
|
|
||||||
-- name: GetServerLogScores :many
|
-- name: GetServerLogScores :many
|
||||||
select * from log_scores
|
select * from log_scores
|
||||||
where
|
where
|
||||||
server_id = $1
|
server_id = ?
|
||||||
order by ts desc
|
order by ts desc
|
||||||
limit $2;
|
limit ?;
|
||||||
|
|
||||||
-- name: GetServerLogScoresByMonitorID :many
|
-- name: GetServerLogScoresByMonitorID :many
|
||||||
select * from log_scores
|
select * from log_scores
|
||||||
where
|
where
|
||||||
server_id = $1 AND
|
server_id = ? AND
|
||||||
monitor_id = $2
|
monitor_id = ?
|
||||||
order by ts desc
|
order by ts desc
|
||||||
limit $3;
|
limit ?;
|
||||||
|
|
||||||
-- name: GetZoneByName :one
|
-- name: GetZoneByName :one
|
||||||
select * from zones
|
select * from zones
|
||||||
@@ -91,5 +93,5 @@ where
|
|||||||
|
|
||||||
-- name: GetZoneCounts :many
|
-- name: GetZoneCounts :many
|
||||||
select * from zone_server_counts
|
select * from zone_server_counts
|
||||||
where zone_id = $1
|
where zone_id = ?
|
||||||
order by date;
|
order by date;
|
||||||
|
|||||||
3838
schema.sql
3838
schema.sql
File diff suppressed because it is too large
Load Diff
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
set -euo pipefail
|
set -euo pipefail
|
||||||
|
|
||||||
go install github.com/goreleaser/goreleaser/v2@v2.12.3
|
go install github.com/goreleaser/goreleaser/v2@v2.13.3
|
||||||
|
|
||||||
if [ ! -z "${harbor_username:-}" ]; then
|
if [ ! -z "${harbor_username:-}" ]; then
|
||||||
DOCKER_FILE=~/.docker/config.json
|
DOCKER_FILE=~/.docker/config.json
|
||||||
@@ -13,11 +13,11 @@ if [ ! -z "${harbor_username:-}" ]; then
|
|||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
|
|
||||||
DRONE_TAG=${DRONE_TAG-""}
|
CI_TAG=${CI_COMMIT_TAG:-${DRONE_TAG:-""}}
|
||||||
|
|
||||||
is_snapshot=""
|
is_snapshot=""
|
||||||
|
|
||||||
if [ -z "$DRONE_TAG" ]; then
|
if [ -z "$CI_TAG" ]; then
|
||||||
is_snapshot="--snapshot"
|
is_snapshot="--snapshot"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
|||||||
@@ -1,11 +1,11 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
|
|
||||||
"github.com/jackc/pgx/v5"
|
|
||||||
"github.com/labstack/echo/v4"
|
"github.com/labstack/echo/v4"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
@@ -56,7 +56,7 @@ func (srv *Server) dnsAnswers(c echo.Context) error {
|
|||||||
queryGroup, ctx := errgroup.WithContext(ctx)
|
queryGroup, ctx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
var zoneStats []ntpdb.GetZoneStatsV2Row
|
var zoneStats []ntpdb.GetZoneStatsV2Row
|
||||||
var serverNetspeed int64
|
var serverNetspeed uint32
|
||||||
|
|
||||||
queryGroup.Go(func() error {
|
queryGroup.Go(func() error {
|
||||||
var err error
|
var err error
|
||||||
@@ -64,7 +64,7 @@ func (srv *Server) dnsAnswers(c echo.Context) error {
|
|||||||
|
|
||||||
serverNetspeed, err = q.GetServerNetspeed(ctx, ip.String())
|
serverNetspeed, err = q.GetServerNetspeed(ctx, ip.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, pgx.ErrNoRows) {
|
if !errors.Is(err, sql.ErrNoRows) {
|
||||||
log.Error("GetServerNetspeed", "err", err)
|
log.Error("GetServerNetspeed", "err", err)
|
||||||
}
|
}
|
||||||
return err // this will return if the server doesn't exist
|
return err // this will return if the server doesn't exist
|
||||||
@@ -116,7 +116,7 @@ func (srv *Server) dnsAnswers(c echo.Context) error {
|
|||||||
|
|
||||||
err = queryGroup.Wait()
|
err = queryGroup.Wait()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, pgx.ErrNoRows) {
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
return c.String(http.StatusNotFound, "Not found")
|
return c.String(http.StatusNotFound, "Not found")
|
||||||
}
|
}
|
||||||
log.Error("query error", "err", err)
|
log.Error("query error", "err", err)
|
||||||
@@ -130,7 +130,7 @@ func (srv *Server) dnsAnswers(c echo.Context) error {
|
|||||||
if zn == "@" {
|
if zn == "@" {
|
||||||
zn = ""
|
zn = ""
|
||||||
}
|
}
|
||||||
zoneTotals[zn] = int(z.NetspeedActive) // binary.BigEndian.Uint64(...)
|
zoneTotals[zn] = z.NetspeedActive // binary.BigEndian.Uint64(...)
|
||||||
// log.Info("zone netspeed", "cc", z.ZoneName, "speed", z.NetspeedActive)
|
// log.Info("zone netspeed", "cc", z.ZoneName, "speed", z.NetspeedActive)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,12 +2,12 @@ package server
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/jackc/pgx/v5"
|
|
||||||
"go.ntppool.org/common/logger"
|
"go.ntppool.org/common/logger"
|
||||||
"go.ntppool.org/common/tracing"
|
"go.ntppool.org/common/tracing"
|
||||||
"go.ntppool.org/data-api/ntpdb"
|
"go.ntppool.org/data-api/ntpdb"
|
||||||
@@ -22,7 +22,7 @@ func (srv *Server) FindServer(ctx context.Context, serverID string) (ntpdb.Serve
|
|||||||
var serverData ntpdb.Server
|
var serverData ntpdb.Server
|
||||||
var dberr error
|
var dberr error
|
||||||
if id, err := strconv.Atoi(serverID); id > 0 && err == nil {
|
if id, err := strconv.Atoi(serverID); id > 0 && err == nil {
|
||||||
serverData, dberr = q.GetServerByID(ctx, int64(id))
|
serverData, dberr = q.GetServerByID(ctx, uint32(id))
|
||||||
} else {
|
} else {
|
||||||
ip, err := netip.ParseAddr(serverID)
|
ip, err := netip.ParseAddr(serverID)
|
||||||
if err != nil || !ip.IsValid() {
|
if err != nil || !ip.IsValid() {
|
||||||
@@ -31,7 +31,7 @@ func (srv *Server) FindServer(ctx context.Context, serverID string) (ntpdb.Serve
|
|||||||
serverData, dberr = q.GetServerByIP(ctx, ip.String())
|
serverData, dberr = q.GetServerByIP(ctx, ip.String())
|
||||||
}
|
}
|
||||||
if dberr != nil {
|
if dberr != nil {
|
||||||
if !errors.Is(dberr, pgx.ErrNoRows) {
|
if !errors.Is(dberr, sql.ErrNoRows) {
|
||||||
log.Error("could not query server id", "err", dberr)
|
log.Error("could not query server id", "err", dberr)
|
||||||
return serverData, dberr
|
return serverData, dberr
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -195,7 +195,7 @@ func transformToGrafanaTableFormat(history *logscores.LogScoreHistory, monitors
|
|||||||
skippedInvalidMonitors++
|
skippedInvalidMonitors++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
monitorID := int(ls.MonitorID.Int64)
|
monitorID := int(ls.MonitorID.Int32)
|
||||||
monitorData[monitorID] = append(monitorData[monitorID], ls)
|
monitorData[monitorID] = append(monitorData[monitorID], ls)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -275,7 +275,7 @@ func transformToGrafanaTableFormat(history *logscores.LogScoreHistory, monitors
|
|||||||
var values [][]interface{}
|
var values [][]interface{}
|
||||||
for _, ls := range logScores {
|
for _, ls := range logScores {
|
||||||
// Convert timestamp to milliseconds
|
// Convert timestamp to milliseconds
|
||||||
timestampMs := ls.Ts.Time.Unix() * 1000
|
timestampMs := ls.Ts.Unix() * 1000
|
||||||
|
|
||||||
// Create row: [time, score, rtt, offset]
|
// Create row: [time, score, rtt, offset]
|
||||||
row := []interface{}{
|
row := []interface{}{
|
||||||
@@ -382,7 +382,7 @@ func (srv *Server) scoresTimeRange(c echo.Context) error {
|
|||||||
"time_range_duration", params.to.Sub(params.from).String(),
|
"time_range_duration", params.to.Sub(params.from).String(),
|
||||||
)
|
)
|
||||||
|
|
||||||
logScores, err := srv.ch.LogscoresTimeRange(ctx, int(server.ID), int(params.monitorID), params.from, params.to, params.maxDataPoints)
|
logScores, err := srv.ch.LogscoresTimeRange(ctx, int(server.ID), params.monitorID, params.from, params.to, params.maxDataPoints)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.ErrorContext(ctx, "clickhouse time range query", "err", err,
|
log.ErrorContext(ctx, "clickhouse time range query", "err", err,
|
||||||
"server_id", server.ID,
|
"server_id", server.ID,
|
||||||
@@ -397,8 +397,8 @@ func (srv *Server) scoresTimeRange(c echo.Context) error {
|
|||||||
log.InfoContext(ctx, "clickhouse query results",
|
log.InfoContext(ctx, "clickhouse query results",
|
||||||
"server_id", server.ID,
|
"server_id", server.ID,
|
||||||
"rows_returned", len(logScores),
|
"rows_returned", len(logScores),
|
||||||
"first_few_ids", func() []int64 {
|
"first_few_ids", func() []uint64 {
|
||||||
ids := make([]int64, 0, 3)
|
ids := make([]uint64, 0, 3)
|
||||||
for i, ls := range logScores {
|
for i, ls := range logScores {
|
||||||
if i >= 3 {
|
if i >= 3 {
|
||||||
break
|
break
|
||||||
@@ -416,10 +416,10 @@ func (srv *Server) scoresTimeRange(c echo.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get monitor names for the returned data
|
// Get monitor names for the returned data
|
||||||
monitorIDs := []int64{}
|
monitorIDs := []uint32{}
|
||||||
for _, ls := range logScores {
|
for _, ls := range logScores {
|
||||||
if ls.MonitorID.Valid {
|
if ls.MonitorID.Valid {
|
||||||
monitorID := ls.MonitorID.Int64
|
monitorID := uint32(ls.MonitorID.Int32)
|
||||||
if _, exists := history.Monitors[int(monitorID)]; !exists {
|
if _, exists := history.Monitors[int(monitorID)]; !exists {
|
||||||
history.Monitors[int(monitorID)] = ""
|
history.Monitors[int(monitorID)] = ""
|
||||||
monitorIDs = append(monitorIDs, monitorID)
|
monitorIDs = append(monitorIDs, monitorID)
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package server
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"database/sql"
|
||||||
"encoding/csv"
|
"encoding/csv"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
@@ -14,8 +15,6 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/jackc/pgx/v5"
|
|
||||||
"github.com/jackc/pgx/v5/pgtype"
|
|
||||||
"github.com/labstack/echo/v4"
|
"github.com/labstack/echo/v4"
|
||||||
"go.ntppool.org/common/logger"
|
"go.ntppool.org/common/logger"
|
||||||
"go.ntppool.org/common/tracing"
|
"go.ntppool.org/common/tracing"
|
||||||
@@ -64,7 +63,7 @@ func paramHistoryMode(s string) historyMode {
|
|||||||
|
|
||||||
type historyParameters struct {
|
type historyParameters struct {
|
||||||
limit int
|
limit int
|
||||||
monitorID int64
|
monitorID int
|
||||||
server ntpdb.Server
|
server ntpdb.Server
|
||||||
since time.Time
|
since time.Time
|
||||||
fullHistory bool
|
fullHistory bool
|
||||||
@@ -91,7 +90,7 @@ func (srv *Server) getHistoryParameters(ctx context.Context, c echo.Context, ser
|
|||||||
|
|
||||||
monitorParam := c.QueryParam("monitor")
|
monitorParam := c.QueryParam("monitor")
|
||||||
|
|
||||||
var monitorID int64
|
var monitorID uint32
|
||||||
switch monitorParam {
|
switch monitorParam {
|
||||||
case "":
|
case "":
|
||||||
name := "recentmedian.scores.ntp.dev"
|
name := "recentmedian.scores.ntp.dev"
|
||||||
@@ -102,7 +101,7 @@ func (srv *Server) getHistoryParameters(ctx context.Context, c echo.Context, ser
|
|||||||
ipVersion = ntpdb.NullMonitorsIpVersion{MonitorsIpVersion: ntpdb.MonitorsIpVersionV6, Valid: true}
|
ipVersion = ntpdb.NullMonitorsIpVersion{MonitorsIpVersion: ntpdb.MonitorsIpVersionV6, Valid: true}
|
||||||
}
|
}
|
||||||
monitor, err := q.GetMonitorByNameAndIPVersion(ctx, ntpdb.GetMonitorByNameAndIPVersionParams{
|
monitor, err := q.GetMonitorByNameAndIPVersion(ctx, ntpdb.GetMonitorByNameAndIPVersionParams{
|
||||||
TlsName: pgtype.Text{Valid: true, String: name},
|
TlsName: sql.NullString{Valid: true, String: name},
|
||||||
IpVersion: ipVersion,
|
IpVersion: ipVersion,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -112,9 +111,9 @@ func (srv *Server) getHistoryParameters(ctx context.Context, c echo.Context, ser
|
|||||||
case "*":
|
case "*":
|
||||||
monitorID = 0 // don't filter on monitor ID
|
monitorID = 0 // don't filter on monitor ID
|
||||||
default:
|
default:
|
||||||
mID, err := strconv.ParseInt(monitorParam, 10, 64)
|
mID, err := strconv.ParseUint(monitorParam, 10, 32)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
monitorID = mID
|
monitorID = uint32(mID)
|
||||||
} else {
|
} else {
|
||||||
// only accept the name prefix; no wildcards; trust the database
|
// only accept the name prefix; no wildcards; trust the database
|
||||||
// to filter out any other crazy
|
// to filter out any other crazy
|
||||||
@@ -130,11 +129,11 @@ func (srv *Server) getHistoryParameters(ctx context.Context, c echo.Context, ser
|
|||||||
ipVersion = ntpdb.NullMonitorsIpVersion{MonitorsIpVersion: ntpdb.MonitorsIpVersionV6, Valid: true}
|
ipVersion = ntpdb.NullMonitorsIpVersion{MonitorsIpVersion: ntpdb.MonitorsIpVersionV6, Valid: true}
|
||||||
}
|
}
|
||||||
monitor, err := q.GetMonitorByNameAndIPVersion(ctx, ntpdb.GetMonitorByNameAndIPVersionParams{
|
monitor, err := q.GetMonitorByNameAndIPVersion(ctx, ntpdb.GetMonitorByNameAndIPVersionParams{
|
||||||
TlsName: pgtype.Text{Valid: true, String: monitorParam},
|
TlsName: sql.NullString{Valid: true, String: monitorParam},
|
||||||
IpVersion: ipVersion,
|
IpVersion: ipVersion,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, pgx.ErrNoRows) {
|
if err == sql.ErrNoRows {
|
||||||
return p, echo.NewHTTPError(http.StatusNotFound, "monitor not found").WithInternal(err)
|
return p, echo.NewHTTPError(http.StatusNotFound, "monitor not found").WithInternal(err)
|
||||||
}
|
}
|
||||||
log.WarnContext(ctx, "could not find monitor", "name", monitorParam, "ip_version", server.IpVersion, "err", err)
|
log.WarnContext(ctx, "could not find monitor", "name", monitorParam, "ip_version", server.IpVersion, "err", err)
|
||||||
@@ -145,7 +144,7 @@ func (srv *Server) getHistoryParameters(ctx context.Context, c echo.Context, ser
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
p.monitorID = monitorID
|
p.monitorID = int(monitorID)
|
||||||
log.DebugContext(ctx, "monitor param", "monitor", monitorID, "ip_version", server.IpVersion)
|
log.DebugContext(ctx, "monitor param", "monitor", monitorID, "ip_version", server.IpVersion)
|
||||||
|
|
||||||
since, _ := strconv.ParseInt(c.QueryParam("since"), 10, 64) // defaults to 0 so don't care if it parses
|
since, _ := strconv.ParseInt(c.QueryParam("since"), 10, 64) // defaults to 0 so don't care if it parses
|
||||||
@@ -171,8 +170,8 @@ func (srv *Server) getHistoryParameters(ctx context.Context, c echo.Context, ser
|
|||||||
return p, nil
|
return p, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (srv *Server) getHistoryPostgres(ctx context.Context, _ echo.Context, p historyParameters) (*logscores.LogScoreHistory, error) {
|
func (srv *Server) getHistoryMySQL(ctx context.Context, _ echo.Context, p historyParameters) (*logscores.LogScoreHistory, error) {
|
||||||
ls, err := logscores.GetHistoryPostgres(ctx, srv.db, p.server.ID, p.monitorID, p.since, p.limit)
|
ls, err := logscores.GetHistoryMySQL(ctx, srv.db, p.server.ID, uint32(p.monitorID), p.since, p.limit)
|
||||||
return ls, err
|
return ls, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -231,9 +230,9 @@ func (srv *Server) history(c echo.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if sourceParam == "m" {
|
if sourceParam == "m" {
|
||||||
history, err = srv.getHistoryPostgres(ctx, c, p)
|
history, err = srv.getHistoryMySQL(ctx, c, p)
|
||||||
} else {
|
} else {
|
||||||
history, err = logscores.GetHistoryClickHouse(ctx, srv.ch, srv.db, p.server.ID, p.monitorID, p.since, p.limit, p.fullHistory)
|
history, err = logscores.GetHistoryClickHouse(ctx, srv.ch, srv.db, p.server.ID, uint32(p.monitorID), p.since, p.limit, p.fullHistory)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var httpError *echo.HTTPError
|
var httpError *echo.HTTPError
|
||||||
@@ -277,7 +276,7 @@ func (srv *Server) historyJSON(ctx context.Context, c echo.Context, server ntpdb
|
|||||||
}
|
}
|
||||||
|
|
||||||
type MonitorEntry struct {
|
type MonitorEntry struct {
|
||||||
ID int64 `json:"id"`
|
ID uint32 `json:"id"`
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
Ts string `json:"ts"`
|
Ts string `json:"ts"`
|
||||||
@@ -298,9 +297,9 @@ func (srv *Server) historyJSON(ctx context.Context, c echo.Context, server ntpdb
|
|||||||
|
|
||||||
// log.InfoContext(ctx, "monitor id list", "ids", history.MonitorIDs)
|
// log.InfoContext(ctx, "monitor id list", "ids", history.MonitorIDs)
|
||||||
|
|
||||||
monitorIDs := []int64{}
|
monitorIDs := []uint32{}
|
||||||
for k := range history.Monitors {
|
for k := range history.Monitors {
|
||||||
monitorIDs = append(monitorIDs, int64(k))
|
monitorIDs = append(monitorIDs, uint32(k))
|
||||||
}
|
}
|
||||||
|
|
||||||
q := ntpdb.NewWrappedQuerier(ntpdb.New(srv.db))
|
q := ntpdb.NewWrappedQuerier(ntpdb.New(srv.db))
|
||||||
@@ -319,12 +318,12 @@ func (srv *Server) historyJSON(ctx context.Context, c echo.Context, server ntpdb
|
|||||||
// log.InfoContext(ctx, "got logScoreMonitors", "count", len(logScoreMonitors))
|
// log.InfoContext(ctx, "got logScoreMonitors", "count", len(logScoreMonitors))
|
||||||
|
|
||||||
// Calculate average RTT per monitor
|
// Calculate average RTT per monitor
|
||||||
monitorRttSums := make(map[int64]float64)
|
monitorRttSums := make(map[uint32]float64)
|
||||||
monitorRttCounts := make(map[int64]int)
|
monitorRttCounts := make(map[uint32]int)
|
||||||
|
|
||||||
for _, ls := range history.LogScores {
|
for _, ls := range history.LogScores {
|
||||||
if ls.MonitorID.Valid && ls.Rtt.Valid {
|
if ls.MonitorID.Valid && ls.Rtt.Valid {
|
||||||
monitorID := ls.MonitorID.Int64
|
monitorID := uint32(ls.MonitorID.Int32)
|
||||||
monitorRttSums[monitorID] += float64(ls.Rtt.Int32) / 1000.0
|
monitorRttSums[monitorID] += float64(ls.Rtt.Int32) / 1000.0
|
||||||
monitorRttCounts[monitorID]++
|
monitorRttCounts[monitorID]++
|
||||||
}
|
}
|
||||||
@@ -363,8 +362,8 @@ func (srv *Server) historyJSON(ctx context.Context, c echo.Context, server ntpdb
|
|||||||
x := float64(1000000000000)
|
x := float64(1000000000000)
|
||||||
score := math.Round(ls.Score*x) / x
|
score := math.Round(ls.Score*x) / x
|
||||||
res.History[i] = ScoresEntry{
|
res.History[i] = ScoresEntry{
|
||||||
TS: ls.Ts.Time.Unix(),
|
TS: ls.Ts.Unix(),
|
||||||
MonitorID: int(ls.MonitorID.Int64),
|
MonitorID: int(ls.MonitorID.Int32),
|
||||||
Step: ls.Step,
|
Step: ls.Step,
|
||||||
Score: score,
|
Score: score,
|
||||||
}
|
}
|
||||||
@@ -415,7 +414,7 @@ func (srv *Server) historyCSV(ctx context.Context, c echo.Context, history *logs
|
|||||||
score := ff(l.Score)
|
score := ff(l.Score)
|
||||||
var monName string
|
var monName string
|
||||||
if l.MonitorID.Valid {
|
if l.MonitorID.Valid {
|
||||||
monName = history.Monitors[int(l.MonitorID.Int64)]
|
monName = history.Monitors[int(l.MonitorID.Int32)]
|
||||||
}
|
}
|
||||||
var leap string
|
var leap string
|
||||||
if l.Attributes.Leap != 0 {
|
if l.Attributes.Leap != 0 {
|
||||||
@@ -428,13 +427,13 @@ func (srv *Server) historyCSV(ctx context.Context, c echo.Context, history *logs
|
|||||||
}
|
}
|
||||||
|
|
||||||
err := w.Write([]string{
|
err := w.Write([]string{
|
||||||
strconv.Itoa(int(l.Ts.Time.Unix())),
|
strconv.Itoa(int(l.Ts.Unix())),
|
||||||
// l.Ts.Format(time.RFC3339),
|
// l.Ts.Format(time.RFC3339),
|
||||||
l.Ts.Time.Format("2006-01-02 15:04:05"),
|
l.Ts.Format("2006-01-02 15:04:05"),
|
||||||
offset,
|
offset,
|
||||||
step,
|
step,
|
||||||
score,
|
score,
|
||||||
fmt.Sprintf("%d", l.MonitorID.Int64),
|
fmt.Sprintf("%d", l.MonitorID.Int32),
|
||||||
monName,
|
monName,
|
||||||
rtt,
|
rtt,
|
||||||
leap,
|
leap,
|
||||||
@@ -465,7 +464,7 @@ func setHistoryCacheControl(c echo.Context, history *logscores.LogScoreHistory)
|
|||||||
if len(history.LogScores) == 0 ||
|
if len(history.LogScores) == 0 ||
|
||||||
// cache for longer if data hasn't updated for a while; or we didn't
|
// cache for longer if data hasn't updated for a while; or we didn't
|
||||||
// find any.
|
// find any.
|
||||||
(time.Now().Add(-8 * time.Hour).After(history.LogScores[len(history.LogScores)-1].Ts.Time)) {
|
(time.Now().Add(-8 * time.Hour).After(history.LogScores[len(history.LogScores)-1].Ts)) {
|
||||||
hdr.Set("Cache-Control", "s-maxage=260,max-age=360")
|
hdr.Set("Cache-Control", "s-maxage=260,max-age=360")
|
||||||
} else {
|
} else {
|
||||||
if len(history.LogScores) == 1 {
|
if len(history.LogScores) == 1 {
|
||||||
|
|||||||
@@ -2,16 +2,17 @@ package server
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
|
||||||
"github.com/labstack/echo-contrib/echoprometheus"
|
"github.com/labstack/echo-contrib/echoprometheus"
|
||||||
"github.com/labstack/echo/v4"
|
"github.com/labstack/echo/v4"
|
||||||
"github.com/labstack/echo/v4/middleware"
|
"github.com/labstack/echo/v4/middleware"
|
||||||
@@ -35,7 +36,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Server struct {
|
type Server struct {
|
||||||
db *pgxpool.Pool
|
db *sql.DB
|
||||||
ch *chdb.ClickHouse
|
ch *chdb.ClickHouse
|
||||||
config *config.Config
|
config *config.Config
|
||||||
|
|
||||||
@@ -54,7 +55,7 @@ func NewServer(ctx context.Context, configFile string) (*Server, error) {
|
|||||||
}
|
}
|
||||||
db, err := ntpdb.OpenDB(ctx, configFile)
|
db, err := ntpdb.OpenDB(ctx, configFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("postgres open: %w", err)
|
return nil, fmt.Errorf("mysql open: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
conf := config.New()
|
conf := config.New()
|
||||||
@@ -302,9 +303,22 @@ func healthHandler(srv *Server, log *slog.Logger) func(w http.ResponseWriter, re
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
g, ctx := errgroup.WithContext(ctx)
|
g, ctx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
stats := srv.db.Stat()
|
stats := srv.db.Stats()
|
||||||
if stats.TotalConns() > 3 {
|
if stats.OpenConnections > 3 {
|
||||||
log.InfoContext(ctx, "health requests", "url", req.URL.String(), "total_conns", stats.TotalConns())
|
log.InfoContext(ctx, "health requests", "url", req.URL.String(), "stats", stats)
|
||||||
|
}
|
||||||
|
|
||||||
|
if resetParam := req.URL.Query().Get("reset"); resetParam != "" {
|
||||||
|
reset, err := strconv.ParseBool(resetParam)
|
||||||
|
log.InfoContext(ctx, "db reset request", "err", err, "reset", reset)
|
||||||
|
|
||||||
|
if err == nil && reset {
|
||||||
|
// this feature was to debug some specific problem
|
||||||
|
log.InfoContext(ctx, "setting idle db conns to zero")
|
||||||
|
srv.db.SetConnMaxLifetime(30 * time.Second)
|
||||||
|
srv.db.SetMaxIdleConns(0)
|
||||||
|
srv.db.SetMaxIdleConns(4)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
@@ -326,7 +340,7 @@ func healthHandler(srv *Server, log *slog.Logger) func(w http.ResponseWriter, re
|
|||||||
})
|
})
|
||||||
|
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
err := srv.db.Ping(ctx)
|
err := srv.db.PingContext(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WarnContext(ctx, "db ping", "err", err)
|
log.WarnContext(ctx, "db ping", "err", err)
|
||||||
return err
|
return err
|
||||||
|
|||||||
@@ -1,12 +1,12 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/jackc/pgx/v5"
|
|
||||||
"github.com/labstack/echo/v4"
|
"github.com/labstack/echo/v4"
|
||||||
"go.ntppool.org/common/logger"
|
"go.ntppool.org/common/logger"
|
||||||
"go.ntppool.org/common/tracing"
|
"go.ntppool.org/common/tracing"
|
||||||
@@ -27,7 +27,7 @@ func (srv *Server) zoneCounts(c echo.Context) error {
|
|||||||
|
|
||||||
zone, err := q.GetZoneByName(ctx, c.Param("zone_name"))
|
zone, err := q.GetZoneByName(ctx, c.Param("zone_name"))
|
||||||
if err != nil || zone.ID == 0 {
|
if err != nil || zone.ID == 0 {
|
||||||
if errors.Is(err, pgx.ErrNoRows) {
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
return c.String(http.StatusNotFound, "Not found")
|
return c.String(http.StatusNotFound, "Not found")
|
||||||
}
|
}
|
||||||
log.ErrorContext(ctx, "could not query for zone", "err", err)
|
log.ErrorContext(ctx, "could not query for zone", "err", err)
|
||||||
@@ -37,7 +37,7 @@ func (srv *Server) zoneCounts(c echo.Context) error {
|
|||||||
|
|
||||||
counts, err := q.GetZoneCounts(ctx, zone.ID)
|
counts, err := q.GetZoneCounts(ctx, zone.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, pgx.ErrNoRows) {
|
if !errors.Is(err, sql.ErrNoRows) {
|
||||||
log.ErrorContext(ctx, "get counts", "err", err)
|
log.ErrorContext(ctx, "get counts", "err", err)
|
||||||
span.RecordError(err)
|
span.RecordError(err)
|
||||||
return c.String(http.StatusInternalServerError, "internal error")
|
return c.String(http.StatusInternalServerError, "internal error")
|
||||||
@@ -71,7 +71,7 @@ func (srv *Server) zoneCounts(c echo.Context) error {
|
|||||||
count := 0
|
count := 0
|
||||||
dates := map[int64]bool{}
|
dates := map[int64]bool{}
|
||||||
for _, c := range counts {
|
for _, c := range counts {
|
||||||
ep := c.Date.Time.Unix()
|
ep := c.Date.Unix()
|
||||||
if _, ok := dates[ep]; !ok {
|
if _, ok := dates[ep]; !ok {
|
||||||
count++
|
count++
|
||||||
dates[ep] = true
|
dates[ep] = true
|
||||||
@@ -99,13 +99,13 @@ func (srv *Server) zoneCounts(c echo.Context) error {
|
|||||||
lastSkip := int64(0)
|
lastSkip := int64(0)
|
||||||
skipThreshold := 0.5
|
skipThreshold := 0.5
|
||||||
for _, c := range counts {
|
for _, c := range counts {
|
||||||
cDate := c.Date.Time.Unix()
|
cDate := c.Date.Unix()
|
||||||
if (toSkip <= skipThreshold && cDate != lastSkip) ||
|
if (toSkip <= skipThreshold && cDate != lastSkip) ||
|
||||||
lastDate == cDate ||
|
lastDate == cDate ||
|
||||||
mostRecentDate == cDate {
|
mostRecentDate == cDate {
|
||||||
// log.Info("adding date", "date", c.Date.Time.Format(time.DateOnly))
|
// log.Info("adding date", "date", c.Date.Format(time.DateOnly))
|
||||||
rv.History = append(rv.History, historyEntry{
|
rv.History = append(rv.History, historyEntry{
|
||||||
D: c.Date.Time.Format(time.DateOnly),
|
D: c.Date.Format(time.DateOnly),
|
||||||
Ts: int(cDate),
|
Ts: int(cDate),
|
||||||
Ac: int(c.CountActive),
|
Ac: int(c.CountActive),
|
||||||
Rc: int(c.CountRegistered),
|
Rc: int(c.CountRegistered),
|
||||||
|
|||||||
@@ -2,17 +2,16 @@ version: "2"
|
|||||||
sql:
|
sql:
|
||||||
- schema: "schema.sql"
|
- schema: "schema.sql"
|
||||||
queries: "query.sql"
|
queries: "query.sql"
|
||||||
engine: "postgresql"
|
engine: "mysql"
|
||||||
strict_order_by: false
|
|
||||||
gen:
|
gen:
|
||||||
go:
|
go:
|
||||||
package: "ntpdb"
|
package: "ntpdb"
|
||||||
out: "ntpdb"
|
out: "ntpdb"
|
||||||
sql_package: "pgx/v5"
|
|
||||||
emit_json_tags: true
|
emit_json_tags: true
|
||||||
emit_db_tags: true
|
emit_db_tags: true
|
||||||
omit_unused_structs: true
|
omit_unused_structs: true
|
||||||
emit_interface: true
|
emit_interface: true
|
||||||
|
# emit_all_enum_values: true
|
||||||
rename:
|
rename:
|
||||||
servers.Ip: IP
|
servers.Ip: IP
|
||||||
overrides:
|
overrides:
|
||||||
|
|||||||
Reference in New Issue
Block a user