chore(exporter): rework exporter to fix wrong histogram usage and cache metric data
All checks were successful
ci/woodpecker/push/lint Pipeline was successful
ci/woodpecker/push/test Pipeline was successful
ci/woodpecker/push/build Pipeline was successful
ci/woodpecker/push/deploy Pipeline was successful

This commit is contained in:
Tom Neuber 2025-01-10 14:33:44 +01:00
parent 4d2a7acebc
commit 8b7f45563a
Signed by: tom
GPG key ID: F17EFE4272D89FF6
6 changed files with 180 additions and 312 deletions

View file

@ -17,6 +17,7 @@ type AppSettings struct {
ReadHeaderTimeout time.Duration ReadHeaderTimeout time.Duration
EnableExporter bool EnableExporter bool
ExporterAddress string ExporterAddress string
ExporterInterval time.Duration
} }
//nolint:lll // ignore line length //nolint:lll // ignore line length
@ -28,6 +29,7 @@ type CLI struct {
ReadHeaderTimeout string `name:"read-header-timeout" env:"GEOIP_READ_HEADER_TIMEOUT" help:"timeout for reading http header" default:"${default_read_header_timeout}"` ReadHeaderTimeout string `name:"read-header-timeout" env:"GEOIP_READ_HEADER_TIMEOUT" help:"timeout for reading http header" default:"${default_read_header_timeout}"`
EnableExporter bool `name:"enable-exporter" env:"GEOIP_ENABLE_EXPORTER" help:"enable prometheus exporter" default:"${default_enable_exporter}"` EnableExporter bool `name:"enable-exporter" env:"GEOIP_ENABLE_EXPORTER" help:"enable prometheus exporter" default:"${default_enable_exporter}"`
ExporterAddress string `name:"exporter-address" env:"GEOIP_EXPORTER_ADDRESS" help:"Address to use for the prometheus metrics server" default:"${default_exporter_address}"` ExporterAddress string `name:"exporter-address" env:"GEOIP_EXPORTER_ADDRESS" help:"Address to use for the prometheus metrics server" default:"${default_exporter_address}"`
ExporterInterval string `name:"exporter-interval" env:"GEOIP_EXPORTER_INTERVAL" help:"Interval to scrape the new metrics data" default:"${default_exporter_interval}"`
} }
func (c *CLI) Parse() (*AppSettings, error) { func (c *CLI) Parse() (*AppSettings, error) {
@ -40,6 +42,7 @@ func (c *CLI) Parse() (*AppSettings, error) {
"default_read_header_timeout": "3s", "default_read_header_timeout": "3s",
"default_enable_exporter": "false", "default_enable_exporter": "false",
"default_exporter_address": ":9191", "default_exporter_address": ":9191",
"default_exporter_interval": "5s",
}, },
kong.Name("country-geo-locations"), kong.Name("country-geo-locations"),
kong.Description("🚀 Start a simple web server for GeoIP data"), kong.Description("🚀 Start a simple web server for GeoIP data"),
@ -56,6 +59,11 @@ func (c *CLI) Parse() (*AppSettings, error) {
return nil, err return nil, err
} }
exporterInterval, err := time.ParseDuration(c.ExporterInterval)
if err != nil {
return nil, err
}
if c.DataURL == "" { if c.DataURL == "" {
return nil, ErrInvalidDataURL return nil, ErrInvalidDataURL
} }
@ -68,5 +76,6 @@ func (c *CLI) Parse() (*AppSettings, error) {
ReadHeaderTimeout: readHeaderTimeout, ReadHeaderTimeout: readHeaderTimeout,
EnableExporter: c.EnableExporter, EnableExporter: c.EnableExporter,
ExporterAddress: c.ExporterAddress, ExporterAddress: c.ExporterAddress,
ExporterInterval: exporterInterval,
}, nil }, nil
} }

View file

@ -1,46 +0,0 @@
package exporter
import (
"git.ar21.de/yolokube/country-geo-locations/internal/cache"
"git.ar21.de/yolokube/country-geo-locations/internal/cmd"
"git.ar21.de/yolokube/country-geo-locations/internal/database"
"github.com/prometheus/client_golang/prometheus"
)
type Collector struct {
config *cmd.AppSettings
cache *cache.Cache
db *database.Database
metrics *Metrics
queue *RequestDataQueue
}
func NewCollector(
config *cmd.AppSettings,
cache *cache.Cache,
db *database.Database,
queue *RequestDataQueue,
) *Collector {
return &Collector{
config: config,
cache: cache,
db: db,
metrics: NewMetrics(),
queue: queue,
}
}
func (c *Collector) Describe(ch chan<- *prometheus.Desc) {
ch <- c.metrics.metricCacheTTL
ch <- c.metrics.metricCurrentlyCached
ch <- c.metrics.metricDatabaseTimestamp
ch <- c.metrics.metricDatabaseReady
}
func (c *Collector) Collect(ch chan<- prometheus.Metric) {
c.metrics.collectCacheTTLMetric(ch, c.config.CacheTTL.Seconds())
c.metrics.collectCurrentlyCachedMetric(ch, float64(c.cache.Count()))
c.metrics.collectDatabaseTimestampMetric(ch, c.db)
c.metrics.collectDatabaseReadyMetric(ch, c.db.IsReady())
c.metrics.collectReqeustDataMetrics(ch, c.queue)
}

View file

@ -0,0 +1,61 @@
package exporter
import (
"net/http"
"git.ar21.de/yolokube/country-geo-locations/internal/cache"
"git.ar21.de/yolokube/country-geo-locations/internal/cmd"
"git.ar21.de/yolokube/country-geo-locations/internal/database"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
type Exporter struct {
config *cmd.AppSettings
cache *cache.Cache
database *database.Database
metrics *Metrics
}
func NewExporter(config *cmd.AppSettings, c *cache.Cache, db *database.Database) *Exporter {
exporter := &Exporter{
config: config,
cache: c,
database: db,
}
exporter.metrics = NewMetrics(exporter)
return exporter
}
func (e *Exporter) Collect() {
e.metrics.collectCacheTTLMetric()
e.metrics.collectCurrentlyCachedMetric()
e.metrics.collectDatabaseReadyMetric()
e.metrics.collectDatabaseTimestampMetric()
}
func (e *Exporter) Middleware() func(next http.Handler) http.Handler {
return Middleware{
metrics: e.metrics,
}.handler
}
func (e *Exporter) Start() error {
prometheus.MustRegister(
e.metrics.metricCacheTTL,
e.metrics.metricCurrentlyCached,
e.metrics.metricDatabaseTimestamp,
e.metrics.metricDatabaseReady,
e.metrics.metricRequestsTotal,
e.metrics.metricRequestLatency,
)
server := &http.Server{
Addr: e.config.ExporterAddress,
Handler: promhttp.Handler(),
ReadHeaderTimeout: e.config.ReadHeaderTimeout,
}
return server.ListenAndServe()
}

View file

@ -2,9 +2,7 @@ package exporter
import ( import (
"log" "log"
"time"
"git.ar21.de/yolokube/country-geo-locations/internal/database"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
@ -13,205 +11,102 @@ const (
cacheSubsystem string = "cache" cacheSubsystem string = "cache"
dbSubsystem string = "db" dbSubsystem string = "db"
metricLabelCacheTTL string = "ttl" metricNameCacheTTL string = "ttl"
metricLabelCurrentlyCached string = "currently_cached" metricNameCurrentlyCached string = "currently_cached"
metricLabelDatabaseTimestamp string = "timestamp" metricNameDatabaseTimestamp string = "timestamp"
metricLabelDatabaseReady string = "ready" metricNameDatabaseReady string = "ready"
metricLabelRequestsTotal string = "requests_total" metricNameRequestsTotal string = "requests_total"
metricLabelRequestLatency string = "request_latency" metricNameRequestLatency string = "request_latency"
) )
type Metrics struct { type Metrics struct {
metricCacheTTL *prometheus.Desc exporter *Exporter
metricCurrentlyCached *prometheus.Desc
metricDatabaseTimestamp *prometheus.Desc
metricDatabaseReady *prometheus.Desc
metricRequestsTotal *prometheus.Desc
metricRequestLatency *prometheus.Desc
counter uint metricCacheTTL prometheus.Gauge
metricCurrentlyCached prometheus.Gauge
metricDatabaseTimestamp *prometheus.GaugeVec
metricDatabaseReady prometheus.Gauge
metricRequestsTotal prometheus.Counter
metricRequestLatency prometheus.Histogram
} }
func NewMetrics() *Metrics { func NewMetrics(e *Exporter) *Metrics {
return &Metrics{ return &Metrics{
metricCacheTTL: prometheus.NewDesc( exporter: e,
prometheus.BuildFQName( metricCacheTTL: prometheus.NewGauge(
namespace, prometheus.GaugeOpts{
cacheSubsystem, Namespace: namespace,
metricLabelCacheTTL, Subsystem: cacheSubsystem,
), Name: metricNameCacheTTL,
"Duration for cached requests", Help: "Duration for cached requests",
nil, },
nil,
), ),
metricCurrentlyCached: prometheus.NewDesc( metricCurrentlyCached: prometheus.NewGauge(
prometheus.BuildFQName( prometheus.GaugeOpts{
namespace, Namespace: namespace,
cacheSubsystem, Subsystem: cacheSubsystem,
metricLabelCurrentlyCached, Name: metricNameCurrentlyCached,
), Help: "Number of cached entries",
"Number of cached entries", },
nil,
nil,
), ),
metricDatabaseTimestamp: prometheus.NewDesc( metricDatabaseTimestamp: prometheus.NewGaugeVec(
prometheus.BuildFQName( prometheus.GaugeOpts{
namespace, Namespace: namespace,
dbSubsystem, Subsystem: dbSubsystem,
metricLabelDatabaseTimestamp, Name: metricNameDatabaseTimestamp,
), Help: "Timestamp of the CSV file",
"Timestamp of the CSV file", },
[]string{metricLabelDatabaseTimestamp}, []string{metricNameDatabaseTimestamp},
nil,
), ),
metricDatabaseReady: prometheus.NewDesc( metricDatabaseReady: prometheus.NewGauge(
prometheus.BuildFQName( prometheus.GaugeOpts{
namespace, Namespace: namespace,
dbSubsystem, Subsystem: dbSubsystem,
metricLabelDatabaseReady, Name: metricNameDatabaseReady,
), Help: "Ready status of the database",
"Ready status of the database", },
nil,
nil,
), ),
metricRequestsTotal: prometheus.NewDesc( metricRequestsTotal: prometheus.NewCounter(
prometheus.BuildFQName( prometheus.CounterOpts{
namespace, Namespace: namespace,
"", Name: metricNameRequestsTotal,
metricLabelRequestsTotal, Help: "Counter for total requests",
), },
"Counter for total requests",
nil,
nil,
), ),
metricRequestLatency: prometheus.NewDesc( metricRequestLatency: prometheus.NewHistogram(
prometheus.BuildFQName( prometheus.HistogramOpts{
namespace, Namespace: namespace,
"", Name: metricNameRequestLatency,
metricLabelRequestLatency, Help: "Latency statistics for requests",
), Buckets: prometheus.ExponentialBuckets(10, 1.5, 30),
"Latency statistics for requests", },
nil,
nil,
), ),
} }
} }
func (m *Metrics) collectCacheTTLMetric(ch chan<- prometheus.Metric, ttl float64) { func (m *Metrics) collectCacheTTLMetric() {
ch <- prometheus.MustNewConstMetric( m.metricCacheTTL.Set(m.exporter.config.CacheTTL.Seconds())
m.metricCacheTTL,
prometheus.GaugeValue,
ttl,
)
} }
func (m *Metrics) collectCurrentlyCachedMetric(ch chan<- prometheus.Metric, count float64) { func (m *Metrics) collectCurrentlyCachedMetric() {
ch <- prometheus.MustNewConstMetric( m.metricCurrentlyCached.Set(float64(m.exporter.cache.Count()))
m.metricCurrentlyCached,
prometheus.GaugeValue,
count,
)
} }
func (m *Metrics) collectDatabaseTimestampMetric(ch chan<- prometheus.Metric, db *database.Database) { func (m *Metrics) collectDatabaseTimestampMetric() {
timestamp, err := db.Timestamp() timestamp, err := m.exporter.database.Timestamp()
if err == nil { if err != nil {
ch <- prometheus.MustNewConstMetric( log.Printf("failed to read file timestamp: %v", err)
m.metricDatabaseTimestamp, return
prometheus.GaugeValue,
float64(timestamp.Unix()),
timestamp.String(),
)
} else {
log.Printf("failed to read file timestamp: %v\n", err)
} }
m.metricDatabaseTimestamp.WithLabelValues(timestamp.String()).Set(float64(timestamp.Unix()))
} }
func (m *Metrics) collectDatabaseReadyMetric(ch chan<- prometheus.Metric, ready bool) { func (m *Metrics) collectDatabaseReadyMetric() {
var dbReady uint8 var dbReady float64
if ready { if m.exporter.database.IsReady() {
dbReady = 1 dbReady = 1
} }
ch <- prometheus.MustNewConstMetric( m.metricDatabaseReady.Set(dbReady)
m.metricDatabaseReady,
prometheus.GaugeValue,
float64(dbReady),
)
}
func (m *Metrics) collectReqeustDataMetrics(ch chan<- prometheus.Metric, queue *RequestDataQueue) {
var (
count uint64
sum float64
)
buckets := make(map[float64]uint64)
bucketBounds := []float64{
10,
20,
30,
40,
50,
60,
70,
80,
90,
100,
200,
300,
400,
500,
600,
700,
800,
900,
1000,
1500,
2000,
2500,
3000,
3500,
4000,
4500,
5000,
10000,
20000,
30000,
40000,
50000,
100000,
200000,
300000,
400000,
500000,
1000000,
}
data := queue.ConsumeAll()
for _, r := range data {
latency := float64(r.Latency.Microseconds())
sum += latency
count++
for _, bound := range bucketBounds {
if latency <= bound {
buckets[bound]++
}
}
}
m.counter += uint(len(data))
ch <- prometheus.MustNewConstMetric(
m.metricRequestsTotal,
prometheus.CounterValue,
float64(m.counter),
)
ch <- prometheus.MustNewConstHistogramWithCreatedTimestamp(
m.metricRequestLatency,
count,
sum,
buckets,
time.Now(),
)
} }

View file

@ -2,67 +2,19 @@ package exporter
import ( import (
"net/http" "net/http"
"sync"
"time"
"github.com/go-chi/chi/v5/middleware" "github.com/prometheus/client_golang/prometheus"
) )
type RequestData struct {
Latency time.Duration
Request *http.Request
Start time.Time
}
type RequestDataQueue struct {
mu sync.Mutex
data []RequestData
}
func NewRequestDataQueue() *RequestDataQueue {
return &RequestDataQueue{
data: []RequestData{},
}
}
func (q *RequestDataQueue) Add(data RequestData) {
q.mu.Lock()
defer q.mu.Unlock()
q.data = append(q.data, data)
}
func (q *RequestDataQueue) ConsumeAll() []RequestData {
q.mu.Lock()
defer q.mu.Unlock()
data := q.data
q.data = nil
return data
}
type Middleware struct { type Middleware struct {
queue *RequestDataQueue metrics *Metrics
}
func NewMiddleware(queue *RequestDataQueue) func(next http.Handler) http.Handler {
m := Middleware{
queue: queue,
}
return m.handler
} }
func (m Middleware) handler(next http.Handler) http.Handler { func (m Middleware) handler(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now() timer := prometheus.NewTimer(m.metrics.metricRequestLatency)
ww := middleware.NewWrapResponseWriter(w, r.ProtoMajor) next.ServeHTTP(w, r)
next.ServeHTTP(ww, r) m.metrics.metricRequestsTotal.Inc()
timer.ObserveDuration()
m.queue.Add( })
RequestData{
Latency: time.Since(start),
Request: r,
Start: start,
},
)
}
return http.HandlerFunc(fn)
} }

63
main.go
View file

@ -6,6 +6,7 @@ import (
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
"time"
apiv1 "git.ar21.de/yolokube/country-geo-locations/api/v1" apiv1 "git.ar21.de/yolokube/country-geo-locations/api/v1"
"git.ar21.de/yolokube/country-geo-locations/internal/cache" "git.ar21.de/yolokube/country-geo-locations/internal/cache"
@ -17,29 +18,24 @@ import (
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware" "github.com/go-chi/chi/v5/middleware"
"github.com/go-chi/render" "github.com/go-chi/render"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
) )
func main() { func main() {
cli := cmd.CLI{} cli := cmd.CLI{}
queue := exporter.NewRequestDataQueue() config, err := cli.Parse()
appSettings, err := cli.Parse()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
handleGracefulShutdown() handleGracefulShutdown()
exporterMiddleware := exporter.NewMiddleware(queue)
r := chi.NewRouter() r := chi.NewRouter()
r.Use(middleware.RequestID) r.Use(middleware.RequestID)
r.Use(middleware.Logger) r.Use(middleware.Logger)
r.Use(middleware.Recoverer) r.Use(middleware.Recoverer)
r.Use(exporterMiddleware)
r.Use(render.SetContentType(render.ContentTypeJSON)) r.Use(render.SetContentType(render.ContentTypeJSON))
ctx := downloader.NewContext(appSettings.DataFile, appSettings.DataURL) ctx := downloader.NewContext(config.DataFile, config.DataURL)
if !ctx.FileExists() { if !ctx.FileExists() {
if downloadErr := ctx.Download(); downloadErr != nil { if downloadErr := ctx.Download(); downloadErr != nil {
log.Fatal(downloadErr) log.Fatal(downloadErr)
@ -47,24 +43,42 @@ func main() {
log.Printf("saved file to %s\n", ctx.Filename) log.Printf("saved file to %s\n", ctx.Filename)
} }
cache := cache.NewCache(appSettings.CacheTTL) cache := cache.NewCache(config.CacheTTL)
db, err := database.NewDatabase(appSettings) db, err := database.NewDatabase(config)
if err != nil { if err != nil {
log.Fatal("database creation failed", err) log.Fatal("database creation failed", err)
} }
if appSettings.EnableExporter { if config.EnableExporter {
exporter := exporter.NewExporter(config, cache, db)
r.Use(exporter.Middleware())
ticker := time.NewTicker(config.ExporterInterval)
exit := make(chan struct{})
go func() { go func() {
err = enableExporter(appSettings, cache, db, queue) for {
select {
case <-ticker.C:
exporter.Collect()
case <-exit:
ticker.Stop()
return
}
}
}()
log.Println("prometheus exporter refreshes metric data every", config.ExporterInterval)
go func() {
err = exporter.Start()
if err != nil { if err != nil {
log.Panic(err) log.Panic(err)
} }
}() }()
log.Println("prometheus exporter started at", appSettings.ExporterAddress) log.Println("prometheus exporter started at", config.ExporterAddress)
} }
log.Println("importing data from file", appSettings.DataFile) log.Println("importing data from file", config.DataFile)
err = csvimporter.ImportCSV(appSettings.DataFile, db) err = csvimporter.ImportCSV(config.DataFile, db)
if err != nil { if err != nil {
log.Fatal("data Import from file failed", err) log.Fatal("data Import from file failed", err)
} }
@ -74,9 +88,9 @@ func main() {
r.Mount("/api/v1", apiv1.NewRouter(lh)) r.Mount("/api/v1", apiv1.NewRouter(lh))
server := &http.Server{ server := &http.Server{
Addr: appSettings.ServerAddress, Addr: config.ServerAddress,
Handler: r, Handler: r,
ReadHeaderTimeout: appSettings.ReadHeaderTimeout, ReadHeaderTimeout: config.ReadHeaderTimeout,
} }
log.Println("starting server at", server.Addr) log.Println("starting server at", server.Addr)
@ -85,23 +99,6 @@ func main() {
} }
} }
func enableExporter(
settings *cmd.AppSettings,
cache *cache.Cache,
db *database.Database,
queue *exporter.RequestDataQueue,
) error {
prometheus.MustRegister(exporter.NewCollector(settings, cache, db, queue))
metricsServer := &http.Server{
Addr: settings.ExporterAddress,
Handler: promhttp.Handler(),
ReadHeaderTimeout: settings.ReadHeaderTimeout,
}
return metricsServer.ListenAndServe()
}
func handleGracefulShutdown() { func handleGracefulShutdown() {
var signals = make(chan os.Signal, 1) var signals = make(chan os.Signal, 1)