|
@@ -9,7 +9,6 @@ import (
|
|
|
"fmt"
|
|
|
"log"
|
|
|
"math"
|
|
|
- "os"
|
|
|
"regexp"
|
|
|
"strconv"
|
|
|
"strings"
|
|
@@ -17,24 +16,37 @@ import (
|
|
|
|
|
|
"github.com/ClickHouse/clickhouse-go/v2"
|
|
|
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
|
|
|
+ "github.com/fsnotify/fsnotify"
|
|
|
"github.com/minio/minio-go/v7"
|
|
|
"github.com/minio/minio-go/v7/pkg/credentials"
|
|
|
"github.com/minio/minio-go/v7/pkg/notification"
|
|
|
"gorm.io/driver/mysql"
|
|
|
"gorm.io/gorm"
|
|
|
"gorm.io/gorm/logger"
|
|
|
+ // "github.com/natefinch/lumberjack"
|
|
|
+ // "github.com/sirupsen/logrus"
|
|
|
+ "github.com/spf13/viper"
|
|
|
)
|
|
|
|
|
|
-var MYSQL_USER = "root"
|
|
|
-var MYSQL_PASSWORD = ""
|
|
|
-var MYSQL_HOST = "localhost:3306"
|
|
|
-var MYSQL_DATABASE = "minio-into-stck-db"
|
|
|
-var MINIO_ACCESS_KEY = "25cSXPqzdHrPwJkSIRkM"
|
|
|
-var MINIO_SECRET = "FN3AhQaVo7z1wgvce3IWiI1CI68T02OVeSUKCeRf"
|
|
|
-var MINIO_BUCKET = "bucket"
|
|
|
-var MINIO_HOST = "$(hostname).local:9000"
|
|
|
-var CLICKHOUSE_HOST = "localhost:9000"
|
|
|
-var CLICKHOUSE_TABLE = "tsdb_cpp"
|
|
|
+// AppInitConfig is the initial configuration of the application, loaded from the config file.
|
|
|
+type AppInitConfig struct {
|
|
|
+ Mysql struct {
|
|
|
+ User string `mapstructure:"user"`
|
|
|
+ Password string `mapstructure:"password"`
|
|
|
+ Host string `mapstructure:"host"`
|
|
|
+ Database string `mapstructure:"database"`
|
|
|
+ } `mapstructure:"mysql"`
|
|
|
+ Minio struct {
|
|
|
+ AccessKey string `mapstructure:"accessKey"`
|
|
|
+ Secret string `mapstructure:"secret"`
|
|
|
+ Bucket string `mapstructure:"bucket"`
|
|
|
+ Host string `mapstructure:"host"`
|
|
|
+ } `mapstructure:"minio"`
|
|
|
+ Stck struct {
|
|
|
+ Host string `mapstructure:"host"`
|
|
|
+ Table string `mapstructure:"table"`
|
|
|
+ } `mapstructure:"stck"`
|
|
|
+}
|
|
|
|
|
|
type AppConfig struct {
|
|
|
// List of name regex patterns to exclude from import.
|
|
@@ -68,50 +80,41 @@ type StreamMetadata struct {
|
|
|
TotalPoints int `json:"total_points"`
|
|
|
}
|
|
|
|
|
|
+var appInitCfg *AppInitConfig = &AppInitConfig{}
|
|
|
+
|
|
|
+func initLoadConfig() {
|
|
|
+ viper.SetConfigFile("./config/application.yaml")
|
|
|
+ viper.WatchConfig()
|
|
|
+ viper.OnConfigChange(func(e fsnotify.Event) {
|
|
|
+ log.Println("Config file changed:", e.Name)
|
|
|
+ var newAppInitConfig AppInitConfig
|
|
|
+ err := viper.Unmarshal(&newAppInitConfig)
|
|
|
+ if err != nil {
|
|
|
+ log.Println("Failed to unmarshal config:", err)
|
|
|
+ }
|
|
|
+ appInitCfg = &newAppInitConfig
|
|
|
+ })
|
|
|
+ err := viper.ReadInConfig()
|
|
|
+ if err != nil {
|
|
|
+ log.Fatalf("Failed to read config file: %v", err)
|
|
|
+ }
|
|
|
+ err = viper.Unmarshal(appInitCfg)
|
|
|
+ if err != nil {
|
|
|
+ log.Printf("Failed to unmarshal config: %v\n", err)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func main() {
|
|
|
var err error
|
|
|
|
|
|
log.Println("Starting application...")
|
|
|
|
|
|
- // Load environment variables
|
|
|
- if os.Getenv("MYSQL_USER") != "" {
|
|
|
- MYSQL_USER = os.Getenv("MYSQL_USER")
|
|
|
- }
|
|
|
- if os.Getenv("MYSQL_PASSWORD") != "" {
|
|
|
- MYSQL_PASSWORD = os.Getenv("MYSQL_PASSWORD")
|
|
|
- }
|
|
|
- if os.Getenv("MYSQL_HOST") != "" {
|
|
|
- MYSQL_HOST = os.Getenv("MYSQL_HOST")
|
|
|
- }
|
|
|
- if os.Getenv("MYSQL_DATABASE") != "" {
|
|
|
- MYSQL_DATABASE = os.Getenv("MYSQL_DATABASE")
|
|
|
- }
|
|
|
- if os.Getenv("MINIO_ACCESS_KEY") != "" {
|
|
|
- MINIO_ACCESS_KEY = os.Getenv("MINIO_ACCESS_KEY")
|
|
|
- }
|
|
|
- if os.Getenv("MINIO_SECRET") != "" {
|
|
|
- MINIO_SECRET = os.Getenv("MINIO_SECRET")
|
|
|
- }
|
|
|
- if os.Getenv("MINIO_BUCKET") != "" {
|
|
|
- MINIO_BUCKET = os.Getenv("MINIO_BUCKET")
|
|
|
- }
|
|
|
- if os.Getenv("MINIO_HOST") != "" {
|
|
|
- MINIO_HOST = os.Getenv("MINIO_HOST")
|
|
|
- }
|
|
|
- MINIO_HOST, err = util.ExpandShellString(MINIO_HOST)
|
|
|
- if err != nil {
|
|
|
- log.Fatalf("Failed to expand shell string: %v", err)
|
|
|
- }
|
|
|
- if os.Getenv("CLICKHOUSE_HOST") != "" {
|
|
|
- CLICKHOUSE_HOST = os.Getenv("CLICKHOUSE_HOST")
|
|
|
- }
|
|
|
- if os.Getenv("CLICKHOUSE_TABLE") != "" {
|
|
|
- CLICKHOUSE_TABLE = os.Getenv("CLICKHOUSE_TABLE")
|
|
|
- }
|
|
|
+ // Load configuration from file
|
|
|
+ initLoadConfig()
|
|
|
|
|
|
// Connect to MySQL
|
|
|
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local",
|
|
|
- MYSQL_USER, MYSQL_PASSWORD, MYSQL_HOST, MYSQL_DATABASE)
|
|
|
+ appInitCfg.Mysql.User, appInitCfg.Mysql.Password, appInitCfg.Mysql.Host, appInitCfg.Mysql.Database)
|
|
|
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
|
|
|
if err != nil {
|
|
|
log.Fatalf("Failed to connect to MySQL: %v", err)
|
|
@@ -140,17 +143,17 @@ func main() {
|
|
|
log.Println("Auto migration completed")
|
|
|
|
|
|
// Connect to MinIO
|
|
|
- minioClient, err := minio.New(MINIO_HOST, &minio.Options{
|
|
|
- Creds: credentials.NewStaticV4(MINIO_ACCESS_KEY, MINIO_SECRET, ""),
|
|
|
+ minioClient, err := minio.New(appInitCfg.Minio.Host, &minio.Options{
|
|
|
+ Creds: credentials.NewStaticV4(appInitCfg.Minio.AccessKey, appInitCfg.Minio.Secret, ""),
|
|
|
Secure: false,
|
|
|
})
|
|
|
if err == nil {
|
|
|
- bucketExists, err := minioClient.BucketExists(context.Background(), MINIO_BUCKET)
|
|
|
+ bucketExists, err := minioClient.BucketExists(context.Background(), appInitCfg.Minio.Bucket)
|
|
|
if err != nil {
|
|
|
- log.Fatalf("Failed to check if bucket %s exists: %v", MINIO_BUCKET, err)
|
|
|
+ log.Fatalf("Failed to check if bucket %s exists: %v", appInitCfg.Minio.Bucket, err)
|
|
|
}
|
|
|
if !bucketExists {
|
|
|
- log.Fatalf("Bucket %s does not exist", MINIO_BUCKET)
|
|
|
+ log.Fatalf("Bucket %s does not exist", appInitCfg.Minio.Bucket)
|
|
|
}
|
|
|
}
|
|
|
if err != nil {
|
|
@@ -160,7 +163,7 @@ func main() {
|
|
|
|
|
|
// Connect to ClickHouse
|
|
|
ckConn, err := clickhouse.Open(&clickhouse.Options{
|
|
|
- Addr: []string{CLICKHOUSE_HOST},
|
|
|
+ Addr: []string{appInitCfg.Stck.Host},
|
|
|
})
|
|
|
if err == nil {
|
|
|
err = ckConn.Ping(context.Background())
|
|
@@ -213,7 +216,7 @@ func main_worker(app AppCtx) {
|
|
|
|
|
|
// Register bucket notification
|
|
|
notifys := app.minioClient.ListenBucketNotification(
|
|
|
- ctx, MINIO_BUCKET, "", "", []string{string(notification.ObjectCreatedAll)})
|
|
|
+ ctx, appInitCfg.Minio.Bucket, "", "", []string{string(notification.ObjectCreatedAll)})
|
|
|
|
|
|
// Start the notification listener
|
|
|
go func() {
|
|
@@ -308,7 +311,7 @@ func trigger_full_upload(app AppCtx, objToUploadChan chan<- string) {
|
|
|
options := minio.ListObjectsOptions{
|
|
|
Recursive: false,
|
|
|
}
|
|
|
- objectsCh := app.minioClient.ListObjects(context.Background(), MINIO_BUCKET, options)
|
|
|
+ objectsCh := app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options)
|
|
|
for objInfo := range objectsCh {
|
|
|
if objInfo.Err != nil {
|
|
|
log.Printf("Error listing objects: %v\n", objInfo.Err)
|
|
@@ -344,7 +347,7 @@ func upload_one_stream(app AppCtx, streamName string, partUploadChan chan PartUp
|
|
|
Prefix: streamObjPath,
|
|
|
Recursive: false,
|
|
|
}
|
|
|
- for objInfo := range app.minioClient.ListObjects(context.Background(), MINIO_BUCKET, options) {
|
|
|
+ for objInfo := range app.minioClient.ListObjects(context.Background(), appInitCfg.Minio.Bucket, options) {
|
|
|
if objInfo.Err != nil {
|
|
|
return false, objInfo.Err
|
|
|
}
|
|
@@ -373,7 +376,7 @@ func upload_one_part(app AppCtx, streamInfo *StreamMetadata, streamName string,
|
|
|
dryRun := false
|
|
|
if !dryRun {
|
|
|
// Get the part data from MinIO
|
|
|
- obj, err := app.minioClient.GetObject(context.Background(), MINIO_BUCKET, partName, minio.GetObjectOptions{})
|
|
|
+ obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket, partName, minio.GetObjectOptions{})
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("failed to get part data: %w", err)
|
|
|
}
|
|
@@ -426,7 +429,7 @@ func upload_one_part(app AppCtx, streamInfo *StreamMetadata, streamName string,
|
|
|
partPointsCount := len(partData) / 8
|
|
|
|
|
|
// Insert the part data into ClickHouse
|
|
|
- batch, err := app.ckConn.PrepareBatch(context.Background(), "INSERT INTO "+CLICKHOUSE_TABLE)
|
|
|
+ batch, err := app.ckConn.PrepareBatch(context.Background(), "INSERT INTO "+appInitCfg.Stck.Table)
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("failed to insert part data into ClickHouse: %w", err)
|
|
|
}
|
|
@@ -513,7 +516,8 @@ func part_already_uploaded(app AppCtx, streamName string, partName string) bool
|
|
|
func get_stream_metadata(app AppCtx, streamName string) (*StreamMetadata, error) {
|
|
|
// Get the stream metadata from MinIO
|
|
|
metadataObjPath := streamName + "/metadata.json"
|
|
|
- obj, err := app.minioClient.GetObject(context.Background(), MINIO_BUCKET, metadataObjPath, minio.GetObjectOptions{})
|
|
|
+ obj, err := app.minioClient.GetObject(context.Background(), appInitCfg.Minio.Bucket,
|
|
|
+ metadataObjPath, minio.GetObjectOptions{})
|
|
|
if err != nil {
|
|
|
return nil, fmt.Errorf("failed to get stream metadata: %w", err)
|
|
|
}
|