From 7a64bcc64d8d6365e82afe3f47fa75cfb8ad420d Mon Sep 17 00:00:00 2001 From: Nicolas MASSE Date: Tue, 22 Feb 2022 12:20:34 +0100 Subject: [PATCH] implement the replay function --- archive.go | 5 +- cli/cmd/archive.go | 8 +- cli/cmd/list.go | 114 ++++++++++++++++++ cli/cmd/replay.go | 125 ++++++++++++++++++++ cli/cmd/root.go | 25 ++++ mqtt.go | 4 +- replay.go | 279 +++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 553 insertions(+), 7 deletions(-) create mode 100644 cli/cmd/list.go create mode 100644 cli/cmd/replay.go create mode 100644 replay.go diff --git a/archive.go b/archive.go index b422d5e..4e72e81 100644 --- a/archive.go +++ b/archive.go @@ -80,6 +80,9 @@ type Archiver struct { const ( // JSON filename format (one file per day) logfileFormat = "20060102.json" + + // JSON file rotation interval + rotationInterval = 24 * time.Hour ) // StartArchive starts the archiving process. It is not safe to call @@ -93,7 +96,7 @@ func (archiver *Archiver) StartArchive() error { SetMqttLogger(archiver.Logger) archiver.Logger.Println("Connecting to MQTT server...") var err error - archiver.client, err = NewMqttClient(archiver.MqttConfig) + archiver.client, err = NewMqttClient(archiver.MqttConfig, false) if err != nil { return err } diff --git a/cli/cmd/archive.go b/cli/cmd/archive.go index ac8458e..0e724d3 100644 --- a/cli/cmd/archive.go +++ b/cli/cmd/archive.go @@ -37,6 +37,10 @@ var archiveCmd = &cobra.Command{ Short: "Archive MQTT events from the broker to S3", Long: `TODO`, Run: func(cmd *cobra.Command, args []string) { + // Each main feature gets its own default client id to prevent the replay + // feature from colliding with the archive function + viper.SetDefault("mqtt.clientId", "mqtt-archiver-archive") + ok := true if viper.GetString("s3.endpoint") == "" { logger.Println("No S3 endpoint defined in configuration") @@ -104,8 +108,4 @@ var archiveCmd = &cobra.Command{ func init() { rootCmd.AddCommand(archiveCmd) - - // Each main feature gets its own default client id to prevent the replay - // feature from colliding with the archive function - viper.SetDefault("mqtt.clientId", "mqtt-archiver-archive") } diff --git a/cli/cmd/list.go b/cli/cmd/list.go new file mode 100644 index 0000000..b04b72f --- /dev/null +++ b/cli/cmd/list.go @@ -0,0 +1,114 @@ +/* +Copyright © 2022 Nicolas MASSE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ +package cmd + +import ( + "fmt" + "os" + + mqttArchiver "github.com/nmasse-itix/mqtt-archiver" + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +// listCmd represents the list command +var listCmd = &cobra.Command{ + Use: "list", + Short: "List available archives", + Long: `TODO`, + Run: func(cmd *cobra.Command, args []string) { + ok := true + if viper.GetString("s3.endpoint") == "" { + logger.Println("No S3 endpoint defined in configuration") + ok = false + } + if viper.GetString("s3.accessKey") == "" { + logger.Println("No S3 access key defined in configuration") + ok = false + } + if viper.GetString("s3.secretKey") == "" { + logger.Println("No S3 secret key defined in configuration") + ok = false + } + if viper.GetString("s3.bucket") == "" { + logger.Println("No S3 bucket name defined in configuration") + ok = false + } + if from.time.IsZero() { + logger.Println("Please specify the beginning of the replay period") + ok = false + } + if to.time.IsZero() { + to.Set("now") + } + if !ok { + os.Exit(1) + } + + config := mqttArchiver.ReplayerConfig{ + S3Config: mqttArchiver.S3Config{ + Endpoint: viper.GetString("s3.endpoint"), + AccessKey: viper.GetString("s3.accessKey"), + SecretKey: viper.GetString("s3.secretKey"), + UseSSL: viper.GetBool("s3.ssl"), + BucketName: viper.GetString("s3.bucket"), + }, + WorkingDir: viper.GetString("workingDir"), + Logger: logger, + Follow: follow, + From: from.time, + To: to.time, + } + replayer, err := mqttArchiver.NewReplayer(config) + if err != nil { + logger.Fatalln(err) + } + + files := make(chan mqttArchiver.Archive) + errors := make(chan error) + eol := make(chan struct{}) + + go replayer.ListArchives(files, errors, eol) + + for { + select { + case <-eol: + return + case err := <-errors: + logger.Println(err) + case file := <-files: + file.Reader.Close() + fmt.Println(file.FileName) + } + } + + }, +} + +func init() { + listCmd.Flags().BoolVarP(&follow, "follow", "f", false, "list archives as they are produced") + listCmd.Flags().Var(&from, "from", "beginning of list period") + listCmd.Flags().Var(&to, "to", "end of list period") + + rootCmd.AddCommand(listCmd) + +} diff --git a/cli/cmd/replay.go b/cli/cmd/replay.go new file mode 100644 index 0000000..57a9800 --- /dev/null +++ b/cli/cmd/replay.go @@ -0,0 +1,125 @@ +/* +Copyright © 2022 Nicolas MASSE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ +package cmd + +import ( + "os" + "os/signal" + "syscall" + + mqttArchiver "github.com/nmasse-itix/mqtt-archiver" + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +// listCmd represents the list command +var replayCmd = &cobra.Command{ + Use: "replay", + Short: "Replays event from archives to MQTT broker", + Long: `TODO`, + Run: func(cmd *cobra.Command, args []string) { + // Each main feature gets its own default client id to prevent the replay + // feature from colliding with the archive function + viper.SetDefault("mqtt.clientId", "mqtt-archiver-replay") + + ok := true + if viper.GetString("s3.endpoint") == "" { + logger.Println("No S3 endpoint defined in configuration") + ok = false + } + if viper.GetString("s3.accessKey") == "" { + logger.Println("No S3 access key defined in configuration") + ok = false + } + if viper.GetString("s3.secretKey") == "" { + logger.Println("No S3 secret key defined in configuration") + ok = false + } + if viper.GetString("s3.bucket") == "" { + logger.Println("No S3 bucket name defined in configuration") + ok = false + } + if viper.GetString("mqtt.broker") == "" { + logger.Println("No MQTT broker defined in configuration") + ok = false + } + if from.time.IsZero() { + logger.Println("Please specify the beginning of the replay period") + ok = false + } + if to.time.IsZero() { + to.Set("now") + } + if !ok { + os.Exit(1) + } + + config := mqttArchiver.ReplayerConfig{ + S3Config: mqttArchiver.S3Config{ + Endpoint: viper.GetString("s3.endpoint"), + AccessKey: viper.GetString("s3.accessKey"), + SecretKey: viper.GetString("s3.secretKey"), + UseSSL: viper.GetBool("s3.ssl"), + BucketName: viper.GetString("s3.bucket"), + }, + MqttConfig: mqttArchiver.MqttConfig{ + BrokerURL: viper.GetString("mqtt.broker"), + Username: viper.GetString("mqtt.username"), + Password: viper.GetString("mqtt.password"), + ClientID: viper.GetString("mqtt.clientId"), + Timeout: viper.GetDuration("mqtt.timeout"), + GracePeriod: viper.GetDuration("mqtt.gracePeriod"), + }, + WorkingDir: viper.GetString("workingDir"), + Logger: logger, + TopicPrefix: prefix, + From: from.time, + To: to.time, + } + replayer, err := mqttArchiver.NewReplayer(config) + if err != nil { + logger.Fatalln(err) + } + + go func() { + // trap SIGINT and SIGTEM to gracefully stop + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + + // Wait for SIGTERM or SIGINT + sig := <-sigs + logger.Printf("Received signal %s", sig) + replayer.StopReplay() + }() + + logger.Println("Starting the replay process...") + replayer.StartReplay() + }, +} + +func init() { + replayCmd.Flags().Var(&from, "from", "beginning of replay period") + replayCmd.Flags().Var(&to, "to", "end of replay period") + replayCmd.Flags().StringVarP(&prefix, "prefix", "p", "", "prefix MQTT topic with the supplied string") + + rootCmd.AddCommand(replayCmd) +} diff --git a/cli/cmd/root.go b/cli/cmd/root.go index ff926ec..58e1822 100644 --- a/cli/cmd/root.go +++ b/cli/cmd/root.go @@ -31,6 +31,31 @@ import ( "github.com/spf13/viper" ) +type timeArg struct { + time time.Time +} + +func (t *timeArg) String() string { + return t.time.String() +} + +func (t *timeArg) Set(val string) error { + var err error + if val == "now" { + t.time = time.Now().UTC() + } else { + t.time, err = time.ParseInLocation("2006-01-02T15:04:05", val, time.UTC) + } + return err +} + +func (t *timeArg) Type() string { + return "time" +} + +var follow bool = false +var from, to timeArg +var prefix string var cfgFile string var logger *log.Logger diff --git a/mqtt.go b/mqtt.go index fae2cba..5851f18 100644 --- a/mqtt.go +++ b/mqtt.go @@ -56,7 +56,7 @@ func SetMqttLogger(logger *log.Logger) { } // NewMqttClient creates a new MQTT client and connects to the broker -func NewMqttClient(config MqttConfig) (mqtt.Client, error) { +func NewMqttClient(config MqttConfig, clean bool) (mqtt.Client, error) { if config.BrokerURL == "" { return nil, fmt.Errorf("MQTT broker URL is empty") } @@ -67,7 +67,7 @@ func NewMqttClient(config MqttConfig) (mqtt.Client, error) { opts.SetConnectRetry(true) opts.SetConnectRetryInterval(config.Timeout) opts.SetOrderMatters(false) - opts.SetCleanSession(false) + opts.SetCleanSession(clean) opts.SetClientID(config.ClientID) if config.Username != "" && config.Password != "" { opts.SetUsername(config.Username) diff --git a/replay.go b/replay.go new file mode 100644 index 0000000..1d32a67 --- /dev/null +++ b/replay.go @@ -0,0 +1,279 @@ +/* +Copyright © 2022 Nicolas MASSE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ +package lib + +import ( + "bufio" + "compress/gzip" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "os" + "path" + "sync" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" +) + +const ( + // Maximum length of a JSON entry (bytes) + MaxTokenSize int = 1024 * 1024 +) + +// A ReplayerConfig holds the configuration data of a Replayer +type ReplayerConfig struct { + S3Config S3Config // credentials to connect to S3 + MqttConfig MqttConfig // credentials to connect to MQTT + WorkingDir string // location to store JSON files + Follow bool // when listing archives, wait for new archives as they are produced + Logger *log.Logger // a logger + From time.Time // begining of the replay period + To time.Time // end of the replay period + TopicPrefix string // prefix topic with this string +} + +// A Replayer replays events from JSON archives to the MQTT broker +type Replayer struct { + Config ReplayerConfig // the replayer public configuration + wg sync.WaitGroup // a wait group to keep track of each running go routine + s3Client *minio.Client // the s3 client + mqttClient mqtt.Client // the MQTT client + done chan bool // a channel to signal the replayer it must ends gracefully +} + +// An Archive represents an archive that has been opened (file descriptor is open). +// Any method receiving this structure is responsible for calling Reader.Close(). +type Archive struct { + Timestamp time.Time + Reader io.ReadCloser + FileName string +} + +// NewReplayer builds a replayer by its public configuration +func NewReplayer(c ReplayerConfig) (*Replayer, error) { + var replayer Replayer = Replayer{ + Config: c, + } + + var err error + replayer.s3Client, err = minio.New(replayer.Config.S3Config.Endpoint, &minio.Options{ + Creds: credentials.NewStaticV4(replayer.Config.S3Config.AccessKey, replayer.Config.S3Config.SecretKey, ""), + Secure: replayer.Config.S3Config.UseSSL, + }) + if err != nil { + return nil, err + } + + ctx := context.Background() + exists, err := replayer.s3Client.BucketExists(ctx, replayer.Config.S3Config.BucketName) + if err != nil { + return nil, err + } + if !exists { + return nil, fmt.Errorf("s3 bucket does not exist") + } + + // There are two consumers of this channel (ListArchives and StartReplay) + replayer.done = make(chan bool, 2) + + return &replayer, nil +} + +// ListArchives sends a list of available archives (and potential errors) +// through the files (and errors) channels. +// If replayer.Config.Follow is false, the eol channel is used to signal +// the end of the list. +func (replayer *Replayer) ListArchives(files chan Archive, errors chan error, eol chan struct{}) { + replayer.wg.Add(1) + defer replayer.wg.Done() + + i := replayer.Config.From + + var end time.Time = replayer.Config.To + var loose bool = false +main: + for { + var retries = 5 + for i.Before(end) { + fd, err := replayer.OpenArchive(i) + if err != nil { + errors <- err + } else if fd != nil { + var archive Archive = Archive{ + Timestamp: i, + FileName: i.Format(logfileFormat), + Reader: fd, + } + files <- archive + } else if loose && retries > 0 { + // When following archive production, the productor might not + // had the time to rotate its archives. Retry for a few seconds... + retries-- + time.Sleep(time.Second) + continue + } + + i = i.Add(rotationInterval) + } + + if !replayer.Config.Follow { + eol <- struct{}{} // Signal end of list + break + } + + // End or continue after a one second pause + select { + case <-replayer.done: + break main + case <-time.After(time.Second): + + } + + end = time.Now().UTC() + loose = true + } +} + +// StopReplay initiates a graceful stop of the replay process. +func (replayer *Replayer) StopReplay() { + replayer.done <- true + replayer.done <- true + replayer.wg.Wait() +} + +// StartReplay starts the replay process. +func (replayer *Replayer) StartReplay() { + var err error + + replayer.wg.Add(1) + defer replayer.wg.Done() + + // initialize the MQTT library + SetMqttLogger(replayer.Config.Logger) + replayer.mqttClient, err = NewMqttClient(replayer.Config.MqttConfig, true) + if err != nil { + replayer.Config.Logger.Println(err) + return + } + + files := make(chan Archive) + errors := make(chan error) + eol := make(chan struct{}) + go replayer.ListArchives(files, errors, eol) + +main: + for { + var archive Archive + select { + case <-eol: + break main + case err := <-errors: + replayer.Config.Logger.Println(err) + continue + case archive = <-files: + err = replayer.ReplayArchive(archive) + if err != nil { + replayer.Config.Logger.Println(err) + } + case <-replayer.done: + break main + } + + } +} + +// ReplayArchive replays a specific archive. +func (replayer *Replayer) ReplayArchive(archive Archive) error { + defer archive.Reader.Close() + + replayer.Config.Logger.Printf("Replaying archive %s...", archive.FileName) + + var buffer []byte = make([]byte, MaxTokenSize) + scanner := bufio.NewScanner(archive.Reader) + scanner.Buffer(buffer, MaxTokenSize) + for scanner.Scan() { + var evt EventLogEntry + err := json.Unmarshal(scanner.Bytes(), &evt) + if err != nil { + return err + } + + var topic string = evt.Topic + if replayer.Config.TopicPrefix != "" { + topic = replayer.Config.TopicPrefix + topic + } + token := replayer.mqttClient.Publish(topic, MQTT_QOS_2, false, evt.Payload) + if !token.WaitTimeout(replayer.Config.MqttConfig.Timeout) { + return fmt.Errorf("timeout") + } + } + + return nil +} + +// OpenArchive opens the archive containing to a specific point in time. +// It handle the following cases: +// - .json file in the working directory +// - .json.gz file in the working directory +// - .json.gz file in the S3 bucket +func (replayer *Replayer) OpenArchive(moment time.Time) (io.ReadCloser, error) { + fileName := moment.Format(logfileFormat) + logFilePath := path.Join(replayer.Config.WorkingDir, fileName) + fd, err := os.OpenFile(logFilePath, os.O_RDONLY, 0755) + if err == nil { + return fd, nil + } else if !errors.Is(err, os.ErrNotExist) { + return nil, err + } + + fileName += ".gz" + fd, err = os.OpenFile(logFilePath, os.O_RDONLY, 0755) + if err == nil { + return gzip.NewReader(fd) + } else if !errors.Is(err, os.ErrNotExist) { + return nil, err + } + + ctx := context.Background() + s3FileName := path.Join(string(fileName[0:4]), fileName) + obj, err := replayer.s3Client.GetObject(ctx, replayer.Config.S3Config.BucketName, s3FileName, minio.GetObjectOptions{}) + if err != nil { + return nil, err + } + _, err = obj.Stat() + if err == nil { + return gzip.NewReader(obj) + } + + var s3err minio.ErrorResponse + if errors.As(err, &s3err) && s3err.Code == "NoSuchKey" { + return nil, nil + } + + return nil, err +}