weather/internal/storage/s3.go

111 lines
2.9 KiB
Go
Raw Normal View History

package storage
import (
"context"
"fmt"
"go/weather/pkg/logger"
"io"
"time"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"go.elastic.co/apm"
"go.elastic.co/apm/module/apmzap"
"go.uber.org/zap"
)
//AwsDefaultSection toml default section
type AwsDefaultSection struct {
// attribute should be public ! for go-toml
EndpointURL string `hcl:"endpoint_url"`
BucketName string `hcl:"bucket_name"`
}
//WeatherS3StorageConfig interface abstracting storageConfig
type WeatherS3StorageConfig struct {
AwsDefaultSection `hcl:",nested"`
Region string `hcl:"region"`
AwsAccessKeyID string `hcl:"aws_access_key_id"`
AwsSecretAccessKey string `hcl:"aws_secret_access_key"`
}
//s3Storage classe used to store oi.Reader to s3
type S3Storage struct {
logger *logger.WeatherLogger
Session *minio.Client
S3Config *WeatherS3StorageConfig
}
//NewS3Storage instanciate storage object
func NewS3Storage(log *logger.WeatherLogger, config *WeatherS3StorageConfig) *S3Storage {
s3 := S3Storage{
logger: log,
S3Config: config,
}
var err error
// Initialize minio client object.
s3.Session, err = minio.New(config.EndpointURL, &minio.Options{
Region: config.Region,
Creds: credentials.NewStaticV4(config.AwsAccessKeyID, config.AwsSecretAccessKey, ""),
Secure: false,
})
2021-03-12 23:57:41 +00:00
if err != nil {
s3.logger.Fatal("Storage error", zap.Error(err))
}
return &s3
}
//Store send data to s3 bucket
func (ss *S3Storage) Store(ctx context.Context, content io.Reader) {
span, ctx := apm.StartSpan(ctx, "s3StoreWeatherInfo", "custom")
defer span.End()
name := fmt.Sprintf("%s.json", time.Now().UTC().Format(time.RFC3339))
_, err := ss.Session.PutObject(ctx, ss.S3Config.BucketName, name, content, -1, minio.PutObjectOptions{
ContentType: "application/json",
})
2021-03-12 23:57:41 +00:00
if err != nil {
ss.logger.Error("Storage error", zap.Error(err))
} else {
ss.logger.Debug("Storage success", apmzap.TraceContext(ctx)...)
2021-03-12 23:57:41 +00:00
}
}
2021-04-07 21:02:36 +00:00
//GetAtDate retrieve one data from bucket
func (ss *S3Storage) GetAtDate(ctx context.Context, atDate time.Time) *minio.Object {
filename := fmt.Sprintf("%s.json", atDate.Format(time.RFC3339))
reader, err := ss.Session.GetObject(ctx, ss.S3Config.BucketName, filename, minio.GetObjectOptions{})
if err != nil {
ss.logger.Error("Storage get s3Object failed", zap.Error(err))
return nil
}
return reader
}
type Streamable interface{}
type WeatherFile struct {
Streamable `json:"-"`
Name string `json:"name"`
}
//GetAll list all date available
func (ss *S3Storage) GetAll(ctx context.Context) <-chan Streamable {
// []string
objectStatCh := make(chan Streamable, 1)
go func() {
for object := range ss.Session.ListObjects(ctx, ss.S3Config.BucketName, minio.ListObjectsOptions{}) {
if object.Err != nil {
fmt.Println(object.Err)
continue
}
objectStatCh <- WeatherFile{Name: object.Key}
}
}()
return objectStatCh
}