Browse Source

feat: get all endpoint as stream

master
RouxAntoine 6 months ago
parent
commit
bf72b1df6b
Signed by: antoine <antoinroux@hotmail.fr> GPG Key ID: 098FB66FC0475E70
10 changed files with 155 additions and 27 deletions
  1. +2
    -0
      .vscode/launch.json
  2. +40
    -9
      cmd/weather/main.go
  3. +2
    -0
      config.sample.hcl
  4. +9
    -0
      internal/config.go
  5. +35
    -0
      internal/storage/s3.go
  6. +48
    -9
      internal/web/handler.go
  7. +2
    -2
      manifests/common.tf
  8. +4
    -1
      pkg/headers/header.go
  9. +12
    -6
      pkg/logger/logger.go
  10. +1
    -0
      pkg/logger/writer.go

+ 2
- 0
.vscode/launch.json View File

@@ -9,6 +9,7 @@
"type": "go",
"request": "launch",
"mode": "auto",
"buildFlags": "-tags dev",
"program": "${workspaceFolder}/cmd/weather/main.go",
"cwd": "${workspaceFolder}"
},
@@ -17,6 +18,7 @@
"type": "go",
"request": "launch",
"mode": "auto",
"buildFlags": "-tags dev",
"program": "${workspaceFolder}/cmd/poller/main.go",
"cwd": "${workspaceFolder}"
}


+ 40
- 9
cmd/weather/main.go View File

@@ -10,28 +10,40 @@ import (
"log"
"os"
"os/signal"
"strings"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

type flagParameter struct {
configFile string
logLevel zapcore.Level
logOutput string
wait time.Duration
}

func main() {
var wait time.Duration
var configFile string
flag.DurationVar(&wait, "graceful-timeout", time.Second*15, "the duration for which the server gracefully wait for existing connections to finish - e.g. 15s or 1m")
flag.StringVar(&configFile, "filename", "config.hcl", "configuration filename")
f := flagParameter{}
var logLevel string
flag.DurationVar(&f.wait, "graceful-timeout", time.Second*15, "the duration for which the server gracefully wait for existing connections to finish - e.g. 15s or 1m")
flag.StringVar(&f.configFile, "filename", "config.hcl", "configuration filename")
flag.StringVar(&logLevel, "logLevel", "info", "Log level")
flag.StringVar(&f.logOutput, "logOutput", "logs/weather.log", "Output log path")
flag.Parse()
f.logLevel = parseLogLevel(logLevel)

//logger
loggerLevel := zap.NewAtomicLevel()
defaultLogger := logger.NewLogger("weather", "weather.log", loggerLevel)
loggerLevel := zap.NewAtomicLevelAt(f.logLevel)
defaultLogger := logger.NewLogger("weather", f.logOutput, loggerLevel)
defer defaultLogger.Sync()

//configuration parsing
config := internal.ParseConfiguration(defaultLogger.Sugar(), configFile)
config := internal.ParseConfiguration(defaultLogger.Sugar(), f.configFile)

//http
addr := web.NewListenAddr("0.0.0.0", 8080)
addr := web.NewListenAddr(config.Listen, config.Port)

defaultLogger.Sugar().Infof("Weather server is listening on %s", addr)
server := web.New(defaultLogger, addr, version.String()).
@@ -50,7 +62,7 @@ func main() {
signal.Notify(c, os.Interrupt)

<-c
ctx, cancel := context.WithTimeout(context.Background(), wait)
ctx, cancel := context.WithTimeout(context.Background(), f.wait)
defer cancel()

server.Shutdown(ctx)
@@ -58,3 +70,22 @@ func main() {
log.Println("shutting down")
os.Exit(0)
}

func parseLogLevel(logLevel string) zapcore.Level {
zapLevel := zap.InfoLevel
switch strings.ToUpper(logLevel) {
case "DEBUG":
zapLevel = zapcore.DebugLevel
case "INFO":
zapLevel = zapcore.InfoLevel
case "WARN":
zapLevel = zapcore.WarnLevel
case "ERROR":
zapLevel = zapcore.ErrorLevel
case "PANIC":
zapLevel = zapcore.PanicLevel
case "FATAL":
zapLevel = zapcore.FatalLevel
}
return zapLevel
}

+ 2
- 0
config.sample.hcl View File

@@ -1,4 +1,6 @@
openweather_secret = ""
# port = 443
# listen_addr = "127.0.0.1"

s3 {
endpoint_url = ""


+ 9
- 0
internal/config.go View File

@@ -18,6 +18,8 @@ import (
type WeatherConfig struct {
OpenweatherSecret string `hcl:"openweather_secret"`
S3Storage storage.WeatherS3StorageConfig `hcl:"s3,block"`
Port int `hcl:"port,optional"`
Listen string `hcl:"listen_addr,optional"`
}

//ParseConfiguration parse configuration from filename path
@@ -56,5 +58,12 @@ func ParseConfiguration(sLogger *zap.SugaredLogger, filename string) *WeatherCon
if config.OpenweatherSecret == "" {
sLogger.Fatal("Missing required parameter : openweather-secret")
}
if config.Listen == "" {
config.Listen = "0.0.0.0"
}
if config.Port == 0 {
config.Port = 8080
}

return config
}

+ 35
- 0
internal/storage/s3.go View File

@@ -73,3 +73,38 @@ func (ss *S3Storage) Store(ctx context.Context, content io.Reader) {
ss.logger.Debug("Storage success", apmzap.TraceContext(ctx)...)
}
}

//GetAtDate retrieve one data from bucket
func (ss *S3Storage) GetAtDate(ctx context.Context, atDate time.Time) *minio.Object {
filename := fmt.Sprintf("%s.json", atDate.Format(time.RFC3339))
reader, err := ss.Session.GetObject(ctx, ss.S3Config.BucketName, filename, minio.GetObjectOptions{})
if err != nil {
ss.logger.Error("Storage get s3Object failed", zap.Error(err))
return nil
}
return reader
}

type Streamable interface{}

type WeatherFile struct {
Streamable `json:"-"`
Name string `json:"name"`
}

//GetAll list all date available
func (ss *S3Storage) GetAll(ctx context.Context) <-chan Streamable {
// []string
objectStatCh := make(chan Streamable, 1)

go func() {
for object := range ss.Session.ListObjects(ctx, ss.S3Config.BucketName, minio.ListObjectsOptions{}) {
if object.Err != nil {
fmt.Println(object.Err)
continue
}
objectStatCh <- WeatherFile{Name: object.Key}
}
}()
return objectStatCh
}

+ 48
- 9
internal/web/handler.go View File

@@ -11,7 +11,6 @@ import (
"time"

"github.com/gorilla/mux"
"github.com/minio/minio-go/v7"
"go.elastic.co/apm"
"go.elastic.co/apm/module/apmzap"
"go.uber.org/zap"
@@ -70,13 +69,24 @@ func (wh *WeatherHandler) RegisterApi(version string) {
span, ctx := apm.StartSpan(r.Context(), "s3GetAllWeather", "custom")
defer span.End()

// application/stream+json
// application/x-ndjson
// text/event-stream
rw.Header().Set(headers.ContentType, "text/plain")
rw.Header().Set(headers.CacheControl, "no-cache")
rw.Header().Set(headers.Connection, "keep-alive")

// List all objects from a bucket-name with a matching prefix.
for object := range wh.storage.Session.ListObjects(ctx, wh.storage.S3Config.BucketName, minio.ListObjectsOptions{}) {
if object.Err != nil {
fmt.Println(object.Err)
wh.ResponseStream(rw, r, wh.storage.GetAll(ctx), func(enc *json.Encoder, obj storage.Streamable) {
if o, ok := obj.(storage.WeatherFile); ok {
fmt.Printf("name : %s\n", o.Name)
}
fmt.Println(object)
}

err := enc.Encode(obj)
if err != nil {
wh.wlogger.Sugar().Errorf("All encoding error %v\n", obj)
}
})
})

api.HandleFunc("/at/{atDate}", func(rw http.ResponseWriter, r *http.Request) {
@@ -93,9 +103,9 @@ func (wh *WeatherHandler) RegisterApi(version string) {
apmzap.TraceContext(ctx),
zap.String("name", fmt.Sprintf("%s.json", atDate.Format(time.RFC3339))),
)...)
reader, err := wh.storage.Session.GetObject(ctx, wh.storage.S3Config.BucketName, fmt.Sprintf("%s.json", atDate.Format(time.RFC3339)), minio.GetObjectOptions{})
if err != nil {
wh.wlogger.Error("AtDate get s3Object failed", zap.Error(err))
reader := wh.storage.GetAtDate(ctx, atDate)
if reader == nil {
rw.WriteHeader(http.StatusInternalServerError)
}
defer reader.Close()
@@ -111,3 +121,32 @@ func (wh *WeatherHandler) RegisterApi(version string) {
}
})
}

func (wh *WeatherHandler) ResponseStream(
w http.ResponseWriter, r *http.Request, stream <-chan storage.Streamable, f func(enc *json.Encoder, obj storage.Streamable),
) {
flusher, ok := w.(http.Flusher)
if !ok {
wh.wlogger.Debug("Flusher cast error")
http.NotFound(w, r)
return
}

w.Header().Set(headers.TransferEncoding, "chunked")
w.WriteHeader(http.StatusOK)
flusher.Flush()

enc := json.NewEncoder(w)

cn := r.Context()
for object := range stream {
select {
case <-cn.Done():
wh.wlogger.Debug("Client stopped listening", zap.Error(cn.Err()))
return
default:
f(enc, object)
flusher.Flush()
}
}
}

+ 2
- 2
manifests/common.tf View File

@@ -31,7 +31,7 @@ module "poller_application" {
kubernetes_namespace = kubernetes_namespace.application_namespace
application_image = format("docker.registry/weather/poller:%s", var.poller_version)
kubernetes_config_map = kubernetes_config_map.weather_config.metadata.0
application_args = ["-filename", "/conf/config.hcl", "-logLevel", "info", "-logOutput", "/logs/weather.log", "-check-interval", "1h"]
application_args = ["-filename", "/conf/config.hcl", "-logLevel", "info", "-logOutput", "/logs/poller.log", "-check-interval", "1h"]
}
// deploy weather server application
module "weather_server_application" {
@@ -43,5 +43,5 @@ module "weather_server_application" {
kubernetes_config_map = kubernetes_config_map.weather_config.metadata.0
expose_application = true
application_dns = "weather.localdomain"
application_args = ["-filename", "/conf/config.hcl"]
application_args = ["-filename", "/conf/config.hcl", "-logOutput", "/logs/weather.log"]
}

+ 4
- 1
pkg/headers/header.go View File

@@ -2,5 +2,8 @@ package headers

const (
//ContentType ...
ContentType = "Content-Type"
ContentType = "Content-Type"
TransferEncoding = "Transfer-Encoding"
Connection = "Connection"
CacheControl = "Cache-Control"
)

+ 12
- 6
pkg/logger/logger.go View File

@@ -93,12 +93,18 @@ func (wl *WeatherLogger) HTTPLogHandler(next http.Handler) http.Handler {
ctxtt := apm.ContextWithTransaction(r.Context(), tx)
r = r.WithContext(ctxtt)

ww := ResponseWriter{
ResponseWriter: w,
if flusher, ok := w.(http.Flusher); ok {
ww := ResponseWriter{
ResponseWriter: w,
Flusher: flusher,
}
wl.LogHTTPRequest(r)
next.ServeHTTP(&ww, r)
wl.LogHTTPResponse(ww)
} else {
wl.Error("Request don't support flusher and stream")
w.WriteHeader(http.StatusInternalServerError)
}
wl.LogHTTPRequest(r)
next.ServeHTTP(&ww, r)
wl.LogHTTPResponse(ww)
})
}

@@ -118,7 +124,7 @@ func (wl *WeatherLogger) LogHTTPRequest(r *http.Request) {
zap.String("http.request.method", r.Method),
// zap.Int64("http.request.bytes", r.Header ContentLength), // total body+header len
zap.String("http.request.mime_type", r.Header.Get(headers.ContentType)),
// zap.String(""),
zap.String("url.path", r.RequestURI),
// zap.String(""),
}...)



+ 1
- 0
pkg/logger/writer.go View File

@@ -5,6 +5,7 @@ import "net/http"
//ResponseWriter ResponseWriter
type ResponseWriter struct {
http.ResponseWriter
http.Flusher
Status int
Length int
}


Loading…
Cancel
Save