From 090a8197a687ef1ce01bd716574c59db0f1019b2 Mon Sep 17 00:00:00 2001 From: Nicolas MASSE Date: Thu, 17 Feb 2022 20:44:54 +0100 Subject: [PATCH] add an option to filter out topics matching a regex --- archive.go | 13 +++++++++++-- cli/cmd/archive.go | 1 + mqtt-archiver.yaml | 1 + 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/archive.go b/archive.go index 473a5fa..e535239 100644 --- a/archive.go +++ b/archive.go @@ -31,6 +31,7 @@ import ( "os" "path" "path/filepath" + "regexp" "sync" "time" @@ -67,6 +68,8 @@ type Archiver struct { WorkingDir string // location to store JSON files Logger log.Logger // a logger SubscribePattern string // the pattern (ie. "#") to subscribe to + FilterRegex string // topics matching this regex will filtered out + filter *regexp.Regexp // compiled regex: topics matching this regex will filtered out currentFilename string // the current JSON filename wg sync.WaitGroup // a wait group to keep track of each running go routine client mqtt.Client // the MQTT client @@ -84,6 +87,7 @@ const ( func (archiver *Archiver) StartArchive() error { archiver.done = make(chan bool) archiver.eventLog = make(chan EventLogEntry, 10) + archiver.filter = regexp.MustCompile(archiver.FilterRegex) // connect to MQTT server SetMqttLogger(&archiver.Logger) @@ -388,12 +392,17 @@ func (archiver *Archiver) openLogFile(fileName string) (*os.File, error) { // processMessage is the callback routine called by the MQTT library to process // events. func (archiver *Archiver) processMessage(c mqtt.Client, m mqtt.Message) { + archiver.wg.Add(1) + defer archiver.wg.Done() + if m.Retained() { return } - archiver.wg.Add(1) - defer archiver.wg.Done() + if archiver.filter.Match([]byte(m.Topic())) { + return + } + entry := EventLogEntry{ Version: 1, Timestamp: time.Now().UTC(), diff --git a/cli/cmd/archive.go b/cli/cmd/archive.go index 2859009..4f17645 100644 --- a/cli/cmd/archive.go +++ b/cli/cmd/archive.go @@ -80,6 +80,7 @@ var archiveCmd = &cobra.Command{ }, SubscribePattern: viper.GetString("subscribePattern"), WorkingDir: viper.GetString("workingDir"), + FilterRegex: viper.GetString("exclude"), Logger: *logger, } diff --git a/mqtt-archiver.yaml b/mqtt-archiver.yaml index 69a10bc..b898c35 100644 --- a/mqtt-archiver.yaml +++ b/mqtt-archiver.yaml @@ -5,6 +5,7 @@ mqtt: timeout: 5s gracePeriod: 2s workingDir: .podman-compose/mqtt-archiver/ +exclude: ^test s3: endpoint: localhost:9000 accessKey: dev