weather/internal/storage/s3.go

115 lines
2.9 KiB
Go

package storage
import (
"fmt"
"go/weather/pkg/logger"
"io"
"os"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/defaults"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
toml "github.com/pelletier/go-toml"
"go.uber.org/zap"
)
//AwsDefaultSection toml default section
type AwsDefaultSection struct {
// attribute should be public ! for go-toml
EndpointURL string `toml:"endpoint_url" hcl:"endpoint_url"`
}
//CustomAwsConfig custom toml config for aws
type CustomAwsConfig struct {
Default AwsDefaultSection `toml:"default"`
}
//WeatherS3StorageConfig interface abstracting storageConfig
type WeatherS3StorageConfig struct {
AwsDefaultSection `hcl:",nested"`
Region string `hcl:"region"`
AwsAccessKeyID string `hcl:"aws_access_key_id"`
AwsSecretAccessKey string `hcl:"aws_secret_access_key"`
}
//s3Storage classe used to store oi.Reader to s3
type s3Storage struct {
logger *logger.WeatherLogger
session *session.Session
bucket string
}
//NewS3Storage instanciate storage object
func NewS3Storage(log *logger.WeatherLogger, config *WeatherS3StorageConfig, bucket string) Storage {
s3 := s3Storage{
logger: log,
bucket: bucket,
}
var consolidateConfig aws.Config
// when no config file arer specify overload weather configurable parameter with default one
if config == nil {
customConfig := CustomAwsConfig{}
err := decodeFile(defaults.SharedConfigFilename(), &customConfig)
if err != nil {
s3.logger.Fatal("Storage error", zap.Error(err))
}
consolidateConfig = aws.Config{
Endpoint: &customConfig.Default.EndpointURL,
}
} else {
consolidateConfig = aws.Config{
Endpoint: &config.EndpointURL,
Credentials: credentials.NewStaticCredentials(config.AwsAccessKeyID, config.AwsSecretAccessKey, ""),
Region: &config.Region,
}
}
consolidateConfig.MergeIn(&aws.Config{
DisableSSL: aws.Bool(true),
LogLevel: aws.LogLevel(log.GetAwsLevel()),
S3ForcePathStyle: aws.Bool(true),
})
s, err := session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
Config: consolidateConfig,
})
if err != nil {
s3.logger.Fatal("Storage error", zap.Error(err))
}
s3.session = s
return &s3
}
//Store send data to s3 bucket
func (ss *s3Storage) Store(content io.Reader) {
uploader := s3manager.NewUploader(ss.session)
_, err := uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(ss.bucket),
Key: aws.String(fmt.Sprintf("%s.json", time.Now().UTC().Format(time.RFC3339))),
Body: content,
})
if err != nil {
ss.logger.Error("Storage error", zap.Error(err))
}
}
//DecodeFile call toml.Decode with file
func decodeFile(fpath string, v interface{}) error {
bs, err := os.Open(fpath)
if err != nil {
return err
}
d := toml.NewDecoder(bs)
err = d.Decode(v)
return err
}