From e22207c8e4dd692e83eb832a849a8fe02677a04c Mon Sep 17 00:00:00 2001 From: Nicolas MASSE Date: Wed, 16 Feb 2022 15:19:24 +0100 Subject: [PATCH] add doc --- archive.go | 153 ++++++++++++++++++++++++++++++++++----------- cli/cmd/archive.go | 38 ++++++++--- cli/cmd/root.go | 3 +- mqtt.go | 51 +++++++++++---- 4 files changed, 188 insertions(+), 57 deletions(-) diff --git a/archive.go b/archive.go index 809e9a8..473a5fa 100644 --- a/archive.go +++ b/archive.go @@ -1,3 +1,24 @@ +/* +Copyright © 2022 Nicolas MASSE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ package lib import ( @@ -18,38 +39,48 @@ import ( "github.com/minio/minio-go/v7/pkg/credentials" ) +// An S3Config represents the mandatory parameters to connect to an S3 service type S3Config struct { - Endpoint string - AccessKey string - SecretKey string - UseSSL bool - BucketName string + Endpoint string // S3 endpoint. Format is "hostname:port" + AccessKey string // S3 access key (or username) + SecretKey string // S3 access key (or token) + UseSSL bool // Enable/disable the use of TLS to connect to S3 + BucketName string // The name of the bucket where to store archives } +// An EventLogEntry represents a single event received from the MQTT broker type EventLogEntry struct { - Version int - Timestamp time.Time - Topic string - Payload []byte + Version int `json:"v"` // Version number (in case the format changes in the future) + Timestamp time.Time `json:"ts,omitempty"` // An optional timestamp + Topic string `json:"topic"` // The topic this event comes from + Payload []byte `json:"data"` // The actual data (is base64 encoded in the JSON structure) } +// An Archiver represents the process of archiving MQTT events to S3. +// The Archiver will subscribe to topics designated by the SubscribePattern, +// save events to a JSON file in the WorkingDir. +// It will rotate the JSON file every day, compress it and send it to S3. +// The files are stored in S3 in a folder per year. type Archiver struct { - S3Config S3Config - MqttConfig MqttConfig - WorkingDir string - Logger log.Logger - SubscribePattern string - currentFilename string - wg sync.WaitGroup - client mqtt.Client - done chan bool - eventLog chan EventLogEntry + S3Config S3Config // credentials to connect to S3 + MqttConfig MqttConfig // credentials to connect to MQTT + WorkingDir string // location to store JSON files + Logger log.Logger // a logger + SubscribePattern string // the pattern (ie. "#") to subscribe to + currentFilename string // the current JSON filename + wg sync.WaitGroup // a wait group to keep track of each running go routine + client mqtt.Client // the MQTT client + done chan bool // channel to trigger the end of the archiving process + eventLog chan EventLogEntry // channel to send events from the MQTT go routines to the go routine that write the JSON file } const ( + // JSON filename format (one file per day) logfileFormat = "20060102.json" ) +// StartArchive starts the archiving process. It is not safe to call +// this method multiple times on the same object. func (archiver *Archiver) StartArchive() error { archiver.done = make(chan bool) archiver.eventLog = make(chan EventLogEntry, 10) @@ -70,25 +101,36 @@ func (archiver *Archiver) StartArchive() error { return fmt.Errorf("mqtt: timeout waiting for subscribe") } + // The events are written to the JSON files by a dedicated go routine + // to avoid synchronization issues or race conditions. go archiver.eventLogWriter() return nil } +// StopArchive stops the archiving process. It is not safe to call +// this method multiple times on the same object. func (archiver *Archiver) StopArchive() { + // First, disconnect from the MQTT broker and leave a grace time period + // for the MQTT library to process the last inflight messages. archiver.Logger.Println("Closing connection to the broker...") archiver.client.Disconnect(uint(archiver.MqttConfig.GracePeriod.Milliseconds())) - archiver.Logger.Println("Waiting for all goroutines to complete...") + + // Signal the eventLogWriter method that it needs to stop archiver.done <- true + + // Then, wait for all go routines to complete. + archiver.Logger.Println("Waiting for all goroutines to complete...") archiver.wg.Wait() } +// eventLogWriter receives events from the eventLog channel and stores them +// to a JSON file. func (archiver *Archiver) eventLogWriter() { archiver.wg.Add(1) defer archiver.wg.Done() - tick := time.NewTicker(time.Second) - defer tick.Stop() + // Open the JSON file var fd *os.File var err error if archiver.currentFilename == "" { @@ -101,14 +143,26 @@ func (archiver *Archiver) eventLogWriter() { defer fd.Close() } + // Before actually writing the JSON file, catch up with any file that might + // have been left by a previous run. + // The catchUp method will discover files that need to be compressed and + // sent to S3, carefully excluding the current file (archiver.currentFilename) + // Once the file list has been built, a go routine will take care of the + // actual operations asynchronously. + // It is important to build the file list after having computed the currentFilename + // but before writing events since we do not want to have a race condition + // between the catchUp job and the log rotation... archiver.catchUp() + tick := time.NewTicker(time.Second) + defer tick.Stop() for { select { - case <-archiver.done: + case <-archiver.done: // We need to stop return case entry := <-archiver.eventLog: if fd == nil { + // It can happen if for instance the log rotation fails archiver.Logger.Println("A message has been lost because the file descriptor is nil") continue } @@ -156,36 +210,51 @@ func (archiver *Archiver) eventLogWriter() { } } +// catchUp discovers files that need to be compressed and sent to S3, +// carefully excluding the current file (archiver.currentFilename). +// Once the file list has been built, a go routine takes care of the +// actual operations asynchronously. func (archiver *Archiver) catchUp() { var toCompress []string var toSend []string - matches, err := filepath.Glob(path.Join(archiver.WorkingDir, "*.json.gz")) + var dedup map[string]bool = make(map[string]bool) + + matches, err := filepath.Glob(path.Join(archiver.WorkingDir, "*.json")) if err != nil { archiver.Logger.Println(err) return } for _, match := range matches { _, file := filepath.Split(match) - toSend = append(toSend, file) + if file == archiver.currentFilename { + continue + } + dedup[file] = true + toCompress = append(toCompress, file) } - matches, err = filepath.Glob(path.Join(archiver.WorkingDir, "*.json")) + matches, err = filepath.Glob(path.Join(archiver.WorkingDir, "*.json.gz")) if err != nil { archiver.Logger.Println(err) return } for _, match := range matches { _, file := filepath.Split(match) - if file == archiver.currentFilename { - continue + _, seen := dedup[file+".gz"] + + // Never trust a .gz file if there is also the corresponding .json file + // the compression process might have failed and left a corrupted .gz file + if !seen { + toSend = append(toSend, file) } - toCompress = append(toCompress, file) } go archiver.doCatchUp(toCompress, toSend) } +// doCatchUp sends already compressed files to S3 and compress + send the +// remaining files func (archiver *Archiver) doCatchUp(toCompress, toSend []string) { if len(toCompress)+len(toSend) > 0 { archiver.Logger.Printf("Catching up with %d files...", len(toCompress)+len(toSend)) @@ -199,6 +268,7 @@ func (archiver *Archiver) doCatchUp(toCompress, toSend []string) { } } +// compressFile compresses a file using gzip func (archiver *Archiver) compressFile(source, target string) error { tfd, err := os.OpenFile(target, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0755) if err != nil { @@ -228,21 +298,31 @@ func (archiver *Archiver) compressFile(source, target string) error { return nil } -func (archiver *Archiver) compressAndSendToS3(fileName string) { +// compressAndSendToS3 compresses a JSON file and sends it to S3 +func (archiver *Archiver) compressAndSendToS3(fileName string) error { logFilePath := path.Join(archiver.WorkingDir, fileName) err := archiver.compressFile(logFilePath, logFilePath+".gz") if err != nil { archiver.Logger.Println(err) - return + return err } - archiver.sendToS3(fileName + ".gz") + + // the sendToS3 method can take very long time to terminate (days) + // because of the embedded backoff mechanism + go archiver.sendToS3(fileName + ".gz") + + return nil } +// sendToS3 takes a compressed JSON file and sends it to S3. An exponential +// backoff retry mechanism takes care of possible errors during the process. +// Note: in case of errors, the sendToS3 method can take very long time to +// terminate (days). func (archiver *Archiver) sendToS3(fileName string) { logFilePath := path.Join(archiver.WorkingDir, fileName) var delay time.Duration - for i := 0; i < 8; i++ { + for i := 0; i < 9; i++ { ctx := context.Background() if delay != 0 { @@ -252,6 +332,7 @@ func (archiver *Archiver) sendToS3(fileName string) { time.Sleep(delay) // Exponential backoff + // 5s, 30s, 3m, 18m, ~2h, 10h, ~3d, ~2w if delay == 0 { delay = 5 * time.Second } else { @@ -297,12 +378,15 @@ func (archiver *Archiver) sendToS3(fileName string) { } } +// openLogFile opens the JSON file for writing. func (archiver *Archiver) openLogFile(fileName string) (*os.File, error) { - archiver.Logger.Printf("Opening log file %s...", archiver.currentFilename) + archiver.Logger.Printf("Opening log file %s...", fileName) logFilePath := path.Join(archiver.WorkingDir, fileName) return os.OpenFile(logFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0755) } +// processMessage is the callback routine called by the MQTT library to process +// events. func (archiver *Archiver) processMessage(c mqtt.Client, m mqtt.Message) { if m.Retained() { return @@ -310,7 +394,6 @@ func (archiver *Archiver) processMessage(c mqtt.Client, m mqtt.Message) { archiver.wg.Add(1) defer archiver.wg.Done() - //fmt.Printf("%d: %s: %s\n", m.MessageID(), m.Topic(), (string)(m.Payload())) entry := EventLogEntry{ Version: 1, Timestamp: time.Now().UTC(), diff --git a/cli/cmd/archive.go b/cli/cmd/archive.go index 2352179..2859009 100644 --- a/cli/cmd/archive.go +++ b/cli/cmd/archive.go @@ -37,7 +37,30 @@ var archiveCmd = &cobra.Command{ Short: "Archive MQTT events from the broker to S3", Long: `TODO`, Run: func(cmd *cobra.Command, args []string) { - logger.Println("Archiving...") + ok := true + if viper.GetString("s3.endpoint") == "" { + logger.Println("No S3 endpoint defined in configuration") + ok = false + } + if viper.GetString("s3.accessKey") == "" { + logger.Println("No S3 access key defined in configuration") + ok = false + } + if viper.GetString("s3.secretKey") == "" { + logger.Println("No S3 secret key defined in configuration") + ok = false + } + if viper.GetString("s3.bucket") == "" { + logger.Println("No S3 bucket name defined in configuration") + ok = false + } + if viper.GetString("mqtt.broker") == "" { + logger.Println("No MQTT broker defined in configuration") + ok = false + } + if !ok { + os.Exit(1) + } archiver := mqttArchiver.Archiver{ S3Config: mqttArchiver.S3Config{ @@ -64,6 +87,7 @@ var archiveCmd = &cobra.Command{ sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + logger.Println("Starting the archiving process...") err := archiver.StartArchive() if err != nil { logger.Fatalln(err) @@ -80,13 +104,7 @@ var archiveCmd = &cobra.Command{ func init() { rootCmd.AddCommand(archiveCmd) - // Here you will define your flags and configuration settings. - - // Cobra supports Persistent Flags which will work for this command - // and all subcommands, e.g.: - // archiveCmd.PersistentFlags().String("foo", "", "A help for foo") - - // Cobra supports local flags which will only run when this command - // is called directly, e.g.: - // archiveCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") + // Each main feature gets its own default client id to prevent the replay + // feature from colliding with the archive function + viper.SetDefault("mqtt.clientId", "mqtt-archiver-archive") } diff --git a/cli/cmd/root.go b/cli/cmd/root.go index 7a637b8..ff926ec 100644 --- a/cli/cmd/root.go +++ b/cli/cmd/root.go @@ -55,10 +55,11 @@ func init() { logger = log.New(os.Stderr, "", 0) // Set default configuration - viper.SetDefault("mqtt.clientId", "mqtt-archiver") viper.SetDefault("mqtt.timeout", 30*time.Second) viper.SetDefault("mqtt.gracePeriod", 5*time.Second) viper.SetDefault("subscribePattern", "#") + viper.SetDefault("s3.ssl", true) + viper.SetDefault("workingDir", ".") cobra.OnInitialize(initConfig) rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $PWD/mqtt-archiver.yaml)") diff --git a/mqtt.go b/mqtt.go index 6294fd7..fae2cba 100644 --- a/mqtt.go +++ b/mqtt.go @@ -1,3 +1,25 @@ +/* +Copyright © 2022 Nicolas MASSE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + package lib import ( @@ -8,27 +30,32 @@ import ( mqtt "github.com/eclipse/paho.mqtt.golang" ) +// Those flags define the MQTT Quality of Service (QoS) levels const ( - MQTT_QOS_0 = 0 - MQTT_QOS_1 = 1 - MQTT_QOS_2 = 2 + MQTT_QOS_0 = 0 // QoS 1 + MQTT_QOS_1 = 1 // QoS 2 + MQTT_QOS_2 = 2 // QoS 3 ) +// An MqttConfig represents the required information to connect to an MQTT +// broker. type MqttConfig struct { - BrokerURL string - Username string - Password string - ClientID string - Timeout time.Duration - GracePeriod time.Duration + BrokerURL string // broker url (tcp://hostname:port or ssl://hostname:port) + Username string // username (optional) + Password string // password (optional) + ClientID string // MQTT ClientID + Timeout time.Duration // how much time to wait for connect and subscribe operations to complete + GracePeriod time.Duration // how much time to wait for the disconnect operation to complete } +// SetMqttLogger sets the logger to be used by the underlying MQTT library func SetMqttLogger(logger *log.Logger) { mqtt.CRITICAL = logger mqtt.ERROR = logger mqtt.WARN = logger } +// NewMqttClient creates a new MQTT client and connects to the broker func NewMqttClient(config MqttConfig) (mqtt.Client, error) { if config.BrokerURL == "" { return nil, fmt.Errorf("MQTT broker URL is empty") @@ -42,8 +69,10 @@ func NewMqttClient(config MqttConfig) (mqtt.Client, error) { opts.SetOrderMatters(false) opts.SetCleanSession(false) opts.SetClientID(config.ClientID) - opts.SetUsername(config.Username) - opts.SetPassword(config.Password) + if config.Username != "" && config.Password != "" { + opts.SetUsername(config.Username) + opts.SetPassword(config.Password) + } client := mqtt.NewClient(opts) ct := client.Connect()