Browse Source

implement the replay function

main
Nicolas Massé 4 years ago
parent
commit
7a64bcc64d
  1. 5
      archive.go
  2. 8
      cli/cmd/archive.go
  3. 114
      cli/cmd/list.go
  4. 125
      cli/cmd/replay.go
  5. 25
      cli/cmd/root.go
  6. 4
      mqtt.go
  7. 279
      replay.go

5
archive.go

@ -80,6 +80,9 @@ type Archiver struct {
const ( const (
// JSON filename format (one file per day) // JSON filename format (one file per day)
logfileFormat = "20060102.json" logfileFormat = "20060102.json"
// JSON file rotation interval
rotationInterval = 24 * time.Hour
) )
// StartArchive starts the archiving process. It is not safe to call // StartArchive starts the archiving process. It is not safe to call
@ -93,7 +96,7 @@ func (archiver *Archiver) StartArchive() error {
SetMqttLogger(archiver.Logger) SetMqttLogger(archiver.Logger)
archiver.Logger.Println("Connecting to MQTT server...") archiver.Logger.Println("Connecting to MQTT server...")
var err error var err error
archiver.client, err = NewMqttClient(archiver.MqttConfig) archiver.client, err = NewMqttClient(archiver.MqttConfig, false)
if err != nil { if err != nil {
return err return err
} }

8
cli/cmd/archive.go

@ -37,6 +37,10 @@ var archiveCmd = &cobra.Command{
Short: "Archive MQTT events from the broker to S3", Short: "Archive MQTT events from the broker to S3",
Long: `TODO`, Long: `TODO`,
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
// Each main feature gets its own default client id to prevent the replay
// feature from colliding with the archive function
viper.SetDefault("mqtt.clientId", "mqtt-archiver-archive")
ok := true ok := true
if viper.GetString("s3.endpoint") == "" { if viper.GetString("s3.endpoint") == "" {
logger.Println("No S3 endpoint defined in configuration") logger.Println("No S3 endpoint defined in configuration")
@ -104,8 +108,4 @@ var archiveCmd = &cobra.Command{
func init() { func init() {
rootCmd.AddCommand(archiveCmd) rootCmd.AddCommand(archiveCmd)
// Each main feature gets its own default client id to prevent the replay
// feature from colliding with the archive function
viper.SetDefault("mqtt.clientId", "mqtt-archiver-archive")
} }

114
cli/cmd/list.go

@ -0,0 +1,114 @@
/*
Copyright © 2022 Nicolas MASSE
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
package cmd
import (
"fmt"
"os"
mqttArchiver "github.com/nmasse-itix/mqtt-archiver"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
// listCmd represents the list command
var listCmd = &cobra.Command{
Use: "list",
Short: "List available archives",
Long: `TODO`,
Run: func(cmd *cobra.Command, args []string) {
ok := true
if viper.GetString("s3.endpoint") == "" {
logger.Println("No S3 endpoint defined in configuration")
ok = false
}
if viper.GetString("s3.accessKey") == "" {
logger.Println("No S3 access key defined in configuration")
ok = false
}
if viper.GetString("s3.secretKey") == "" {
logger.Println("No S3 secret key defined in configuration")
ok = false
}
if viper.GetString("s3.bucket") == "" {
logger.Println("No S3 bucket name defined in configuration")
ok = false
}
if from.time.IsZero() {
logger.Println("Please specify the beginning of the replay period")
ok = false
}
if to.time.IsZero() {
to.Set("now")
}
if !ok {
os.Exit(1)
}
config := mqttArchiver.ReplayerConfig{
S3Config: mqttArchiver.S3Config{
Endpoint: viper.GetString("s3.endpoint"),
AccessKey: viper.GetString("s3.accessKey"),
SecretKey: viper.GetString("s3.secretKey"),
UseSSL: viper.GetBool("s3.ssl"),
BucketName: viper.GetString("s3.bucket"),
},
WorkingDir: viper.GetString("workingDir"),
Logger: logger,
Follow: follow,
From: from.time,
To: to.time,
}
replayer, err := mqttArchiver.NewReplayer(config)
if err != nil {
logger.Fatalln(err)
}
files := make(chan mqttArchiver.Archive)
errors := make(chan error)
eol := make(chan struct{})
go replayer.ListArchives(files, errors, eol)
for {
select {
case <-eol:
return
case err := <-errors:
logger.Println(err)
case file := <-files:
file.Reader.Close()
fmt.Println(file.FileName)
}
}
},
}
func init() {
listCmd.Flags().BoolVarP(&follow, "follow", "f", false, "list archives as they are produced")
listCmd.Flags().Var(&from, "from", "beginning of list period")
listCmd.Flags().Var(&to, "to", "end of list period")
rootCmd.AddCommand(listCmd)
}

125
cli/cmd/replay.go

@ -0,0 +1,125 @@
/*
Copyright © 2022 Nicolas MASSE
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
package cmd
import (
"os"
"os/signal"
"syscall"
mqttArchiver "github.com/nmasse-itix/mqtt-archiver"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
// listCmd represents the list command
var replayCmd = &cobra.Command{
Use: "replay",
Short: "Replays event from archives to MQTT broker",
Long: `TODO`,
Run: func(cmd *cobra.Command, args []string) {
// Each main feature gets its own default client id to prevent the replay
// feature from colliding with the archive function
viper.SetDefault("mqtt.clientId", "mqtt-archiver-replay")
ok := true
if viper.GetString("s3.endpoint") == "" {
logger.Println("No S3 endpoint defined in configuration")
ok = false
}
if viper.GetString("s3.accessKey") == "" {
logger.Println("No S3 access key defined in configuration")
ok = false
}
if viper.GetString("s3.secretKey") == "" {
logger.Println("No S3 secret key defined in configuration")
ok = false
}
if viper.GetString("s3.bucket") == "" {
logger.Println("No S3 bucket name defined in configuration")
ok = false
}
if viper.GetString("mqtt.broker") == "" {
logger.Println("No MQTT broker defined in configuration")
ok = false
}
if from.time.IsZero() {
logger.Println("Please specify the beginning of the replay period")
ok = false
}
if to.time.IsZero() {
to.Set("now")
}
if !ok {
os.Exit(1)
}
config := mqttArchiver.ReplayerConfig{
S3Config: mqttArchiver.S3Config{
Endpoint: viper.GetString("s3.endpoint"),
AccessKey: viper.GetString("s3.accessKey"),
SecretKey: viper.GetString("s3.secretKey"),
UseSSL: viper.GetBool("s3.ssl"),
BucketName: viper.GetString("s3.bucket"),
},
MqttConfig: mqttArchiver.MqttConfig{
BrokerURL: viper.GetString("mqtt.broker"),
Username: viper.GetString("mqtt.username"),
Password: viper.GetString("mqtt.password"),
ClientID: viper.GetString("mqtt.clientId"),
Timeout: viper.GetDuration("mqtt.timeout"),
GracePeriod: viper.GetDuration("mqtt.gracePeriod"),
},
WorkingDir: viper.GetString("workingDir"),
Logger: logger,
TopicPrefix: prefix,
From: from.time,
To: to.time,
}
replayer, err := mqttArchiver.NewReplayer(config)
if err != nil {
logger.Fatalln(err)
}
go func() {
// trap SIGINT and SIGTEM to gracefully stop
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
// Wait for SIGTERM or SIGINT
sig := <-sigs
logger.Printf("Received signal %s", sig)
replayer.StopReplay()
}()
logger.Println("Starting the replay process...")
replayer.StartReplay()
},
}
func init() {
replayCmd.Flags().Var(&from, "from", "beginning of replay period")
replayCmd.Flags().Var(&to, "to", "end of replay period")
replayCmd.Flags().StringVarP(&prefix, "prefix", "p", "", "prefix MQTT topic with the supplied string")
rootCmd.AddCommand(replayCmd)
}

25
cli/cmd/root.go

@ -31,6 +31,31 @@ import (
"github.com/spf13/viper" "github.com/spf13/viper"
) )
type timeArg struct {
time time.Time
}
func (t *timeArg) String() string {
return t.time.String()
}
func (t *timeArg) Set(val string) error {
var err error
if val == "now" {
t.time = time.Now().UTC()
} else {
t.time, err = time.ParseInLocation("2006-01-02T15:04:05", val, time.UTC)
}
return err
}
func (t *timeArg) Type() string {
return "time"
}
var follow bool = false
var from, to timeArg
var prefix string
var cfgFile string var cfgFile string
var logger *log.Logger var logger *log.Logger

4
mqtt.go

@ -56,7 +56,7 @@ func SetMqttLogger(logger *log.Logger) {
} }
// NewMqttClient creates a new MQTT client and connects to the broker // NewMqttClient creates a new MQTT client and connects to the broker
func NewMqttClient(config MqttConfig) (mqtt.Client, error) { func NewMqttClient(config MqttConfig, clean bool) (mqtt.Client, error) {
if config.BrokerURL == "" { if config.BrokerURL == "" {
return nil, fmt.Errorf("MQTT broker URL is empty") return nil, fmt.Errorf("MQTT broker URL is empty")
} }
@ -67,7 +67,7 @@ func NewMqttClient(config MqttConfig) (mqtt.Client, error) {
opts.SetConnectRetry(true) opts.SetConnectRetry(true)
opts.SetConnectRetryInterval(config.Timeout) opts.SetConnectRetryInterval(config.Timeout)
opts.SetOrderMatters(false) opts.SetOrderMatters(false)
opts.SetCleanSession(false) opts.SetCleanSession(clean)
opts.SetClientID(config.ClientID) opts.SetClientID(config.ClientID)
if config.Username != "" && config.Password != "" { if config.Username != "" && config.Password != "" {
opts.SetUsername(config.Username) opts.SetUsername(config.Username)

279
replay.go

@ -0,0 +1,279 @@
/*
Copyright © 2022 Nicolas MASSE
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
package lib
import (
"bufio"
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"os"
"path"
"sync"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)
const (
// Maximum length of a JSON entry (bytes)
MaxTokenSize int = 1024 * 1024
)
// A ReplayerConfig holds the configuration data of a Replayer
type ReplayerConfig struct {
S3Config S3Config // credentials to connect to S3
MqttConfig MqttConfig // credentials to connect to MQTT
WorkingDir string // location to store JSON files
Follow bool // when listing archives, wait for new archives as they are produced
Logger *log.Logger // a logger
From time.Time // begining of the replay period
To time.Time // end of the replay period
TopicPrefix string // prefix topic with this string
}
// A Replayer replays events from JSON archives to the MQTT broker
type Replayer struct {
Config ReplayerConfig // the replayer public configuration
wg sync.WaitGroup // a wait group to keep track of each running go routine
s3Client *minio.Client // the s3 client
mqttClient mqtt.Client // the MQTT client
done chan bool // a channel to signal the replayer it must ends gracefully
}
// An Archive represents an archive that has been opened (file descriptor is open).
// Any method receiving this structure is responsible for calling Reader.Close().
type Archive struct {
Timestamp time.Time
Reader io.ReadCloser
FileName string
}
// NewReplayer builds a replayer by its public configuration
func NewReplayer(c ReplayerConfig) (*Replayer, error) {
var replayer Replayer = Replayer{
Config: c,
}
var err error
replayer.s3Client, err = minio.New(replayer.Config.S3Config.Endpoint, &minio.Options{
Creds: credentials.NewStaticV4(replayer.Config.S3Config.AccessKey, replayer.Config.S3Config.SecretKey, ""),
Secure: replayer.Config.S3Config.UseSSL,
})
if err != nil {
return nil, err
}
ctx := context.Background()
exists, err := replayer.s3Client.BucketExists(ctx, replayer.Config.S3Config.BucketName)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("s3 bucket does not exist")
}
// There are two consumers of this channel (ListArchives and StartReplay)
replayer.done = make(chan bool, 2)
return &replayer, nil
}
// ListArchives sends a list of available archives (and potential errors)
// through the files (and errors) channels.
// If replayer.Config.Follow is false, the eol channel is used to signal
// the end of the list.
func (replayer *Replayer) ListArchives(files chan Archive, errors chan error, eol chan struct{}) {
replayer.wg.Add(1)
defer replayer.wg.Done()
i := replayer.Config.From
var end time.Time = replayer.Config.To
var loose bool = false
main:
for {
var retries = 5
for i.Before(end) {
fd, err := replayer.OpenArchive(i)
if err != nil {
errors <- err
} else if fd != nil {
var archive Archive = Archive{
Timestamp: i,
FileName: i.Format(logfileFormat),
Reader: fd,
}
files <- archive
} else if loose && retries > 0 {
// When following archive production, the productor might not
// had the time to rotate its archives. Retry for a few seconds...
retries--
time.Sleep(time.Second)
continue
}
i = i.Add(rotationInterval)
}
if !replayer.Config.Follow {
eol <- struct{}{} // Signal end of list
break
}
// End or continue after a one second pause
select {
case <-replayer.done:
break main
case <-time.After(time.Second):
}
end = time.Now().UTC()
loose = true
}
}
// StopReplay initiates a graceful stop of the replay process.
func (replayer *Replayer) StopReplay() {
replayer.done <- true
replayer.done <- true
replayer.wg.Wait()
}
// StartReplay starts the replay process.
func (replayer *Replayer) StartReplay() {
var err error
replayer.wg.Add(1)
defer replayer.wg.Done()
// initialize the MQTT library
SetMqttLogger(replayer.Config.Logger)
replayer.mqttClient, err = NewMqttClient(replayer.Config.MqttConfig, true)
if err != nil {
replayer.Config.Logger.Println(err)
return
}
files := make(chan Archive)
errors := make(chan error)
eol := make(chan struct{})
go replayer.ListArchives(files, errors, eol)
main:
for {
var archive Archive
select {
case <-eol:
break main
case err := <-errors:
replayer.Config.Logger.Println(err)
continue
case archive = <-files:
err = replayer.ReplayArchive(archive)
if err != nil {
replayer.Config.Logger.Println(err)
}
case <-replayer.done:
break main
}
}
}
// ReplayArchive replays a specific archive.
func (replayer *Replayer) ReplayArchive(archive Archive) error {
defer archive.Reader.Close()
replayer.Config.Logger.Printf("Replaying archive %s...", archive.FileName)
var buffer []byte = make([]byte, MaxTokenSize)
scanner := bufio.NewScanner(archive.Reader)
scanner.Buffer(buffer, MaxTokenSize)
for scanner.Scan() {
var evt EventLogEntry
err := json.Unmarshal(scanner.Bytes(), &evt)
if err != nil {
return err
}
var topic string = evt.Topic
if replayer.Config.TopicPrefix != "" {
topic = replayer.Config.TopicPrefix + topic
}
token := replayer.mqttClient.Publish(topic, MQTT_QOS_2, false, evt.Payload)
if !token.WaitTimeout(replayer.Config.MqttConfig.Timeout) {
return fmt.Errorf("timeout")
}
}
return nil
}
// OpenArchive opens the archive containing to a specific point in time.
// It handle the following cases:
// - .json file in the working directory
// - .json.gz file in the working directory
// - .json.gz file in the S3 bucket
func (replayer *Replayer) OpenArchive(moment time.Time) (io.ReadCloser, error) {
fileName := moment.Format(logfileFormat)
logFilePath := path.Join(replayer.Config.WorkingDir, fileName)
fd, err := os.OpenFile(logFilePath, os.O_RDONLY, 0755)
if err == nil {
return fd, nil
} else if !errors.Is(err, os.ErrNotExist) {
return nil, err
}
fileName += ".gz"
fd, err = os.OpenFile(logFilePath, os.O_RDONLY, 0755)
if err == nil {
return gzip.NewReader(fd)
} else if !errors.Is(err, os.ErrNotExist) {
return nil, err
}
ctx := context.Background()
s3FileName := path.Join(string(fileName[0:4]), fileName)
obj, err := replayer.s3Client.GetObject(ctx, replayer.Config.S3Config.BucketName, s3FileName, minio.GetObjectOptions{})
if err != nil {
return nil, err
}
_, err = obj.Stat()
if err == nil {
return gzip.NewReader(obj)
}
var s3err minio.ErrorResponse
if errors.As(err, &s3err) && s3err.Code == "NoSuchKey" {
return nil, nil
}
return nil, err
}
Loading…
Cancel
Save