Browse Source

initial commit

main
Nicolas Massé 4 years ago
commit
237afad89a
  1. 1
      .gitignore
  2. 21
      LICENSE
  3. 14
      README.md
  4. 37
      cli/cmd/db.go
  5. 99
      cli/cmd/migrate.go
  6. 83
      cli/cmd/process.go
  7. 88
      cli/cmd/root.go
  8. 28
      cli/main.go
  9. 12
      go.mod
  10. 1271
      go.sum
  11. 48
      migrations.go
  12. 83
      mqtt.go
  13. 22
      podman-compose.yaml
  14. 268
      process.go
  15. 28
      schemas/001_initial.sql
  16. 12
      tic-tsdb.yaml

1
.gitignore

@ -0,0 +1 @@
.podman-compose

21
LICENSE

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright © 2022 Nicolas MASSE
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

14
README.md

@ -0,0 +1,14 @@
# Saves TIC events to TimescaleDB
## Testing
```sh
podman-compose up -d
go run cli/main.go process
declare -a fields=(IINST IINST1 IINST2 IINST3 PAPP BASE HCHP HCHC)
while sleep 1; do
value=$((1 + RANDOM % 100))
field=${fields[1 + $((RANDOM % ${#fields[@]}))]}
echo "{\"ts\":$EPOCHSECONDS,\"val\":\"$(printf %03d $value)\"}" | pub -broker mqtt://localhost:1883 -topic esp-tic/status/tic/$field -username dev -password secret -qos 1
done
```

37
cli/cmd/db.go

@ -0,0 +1,37 @@
/*
Copyright © 2022 Nicolas MASSE
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
package cmd
import (
"github.com/spf13/cobra"
)
// dbCmd represents the db command
var dbCmd = &cobra.Command{
Use: "db",
Short: "Interact with the underlying DB",
Long: `TODO`,
}
func init() {
rootCmd.AddCommand(dbCmd)
}

99
cli/cmd/migrate.go

@ -0,0 +1,99 @@
/*
Copyright © 2022 Nicolas MASSE
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
package cmd
import (
"database/sql"
"os"
_ "github.com/jackc/pgx/v4/stdlib"
ticTsdb "github.com/nmasse-itix/tic-tsdb"
"github.com/pressly/goose/v3"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
// migrateCmd represents the migrate command
var migrateCmd = &cobra.Command{
Use: "migrate",
Short: "Run database schema migrations",
Long: `Goose commands:
up Migrate the DB to the most recent version available
up-by-one Migrate the DB up by 1
up-to VERSION Migrate the DB to a specific VERSION
down Roll back the version by 1
down-to VERSION Roll back to a specific VERSION
redo Re-run the latest migration
reset Roll back all migrations
status Dump the migration status for the current DB
version Print the current version of the database
create NAME [sql|go] Creates new migration file with the current timestamp
fix Apply sequential ordering to migrations
`,
Run: func(cmd *cobra.Command, args []string) {
ok := true
if viper.GetString("sql.database") == "" {
logger.Println("No database name defined in configuration")
ok = false
}
if viper.GetString("sql.hostname") == "" {
logger.Println("No database server defined in configuration")
ok = false
}
if len(args) < 1 {
logger.Println("Please specify goose command!")
ok = false
}
if !ok {
logger.Println()
cmd.Help()
os.Exit(1)
}
dbUrl := getDatabaseUrl()
logger.Println("Connecting to PostgreSQL server...")
db, err := sql.Open("pgx", dbUrl)
if err != nil {
logger.Println(err)
os.Exit(1)
}
defer db.Close()
goose.SetBaseFS(ticTsdb.SqlMigrationFS)
if err := goose.SetDialect("postgres"); err != nil {
logger.Println(err)
os.Exit(1)
}
gooseCmd := args[0]
gooseOpts := args[1:]
if err := goose.Run(gooseCmd, db, "schemas", gooseOpts...); err != nil {
logger.Println(err)
os.Exit(1)
}
},
}
func init() {
dbCmd.AddCommand(migrateCmd)
}

83
cli/cmd/process.go

@ -0,0 +1,83 @@
/*
Copyright © 2022 Nicolas MASSE
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
package cmd
import (
"os"
ticTsdb "github.com/nmasse-itix/tic-tsdb"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
// processCmd represents the process command
var processCmd = &cobra.Command{
Use: "process",
Short: "Saves MQTT events to TimescaleDB",
Long: `TODO`,
Run: func(cmd *cobra.Command, args []string) {
ok := true
if viper.GetString("sql.database") == "" {
logger.Println("No database name defined in configuration")
ok = false
}
if viper.GetString("sql.hostname") == "" {
logger.Println("No database server defined in configuration")
ok = false
}
if viper.GetString("mqtt.broker") == "" {
logger.Println("No MQTT broker defined in configuration")
ok = false
}
if !ok {
logger.Println()
cmd.Help()
os.Exit(1)
}
logger.Println("Dispatching...")
config := ticTsdb.ProcessorConfig{
Sql: ticTsdb.SqlConfig{
Url: getDatabaseUrl(),
},
Mqtt: ticTsdb.MqttConfig{
BrokerURL: viper.GetString("mqtt.broker"),
Username: viper.GetString("mqtt.username"),
Password: viper.GetString("mqtt.password"),
ClientID: viper.GetString("mqtt.clientId"),
Timeout: viper.GetDuration("mqtt.timeout"),
GracePeriod: viper.GetDuration("mqtt.gracePeriod"),
},
Logger: logger,
}
processor := ticTsdb.NewProcessor(config)
err := processor.Process()
if err != nil {
logger.Println(err)
os.Exit(1)
}
},
}
func init() {
rootCmd.AddCommand(processCmd)
}

88
cli/cmd/root.go

@ -0,0 +1,88 @@
/*
Copyright © 2022 Nicolas MASSE
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
package cmd
import (
"fmt"
"log"
"os"
"time"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
var cfgFile string
var logger *log.Logger
func getDatabaseUrl() string {
return fmt.Sprintf("postgres://%s:%s@%s:%d/%s?sslmode=disable", viper.GetString("sql.username"), viper.GetString("sql.password"), viper.GetString("sql.hostname"), viper.GetInt("sql.port"), viper.GetString("sql.database"))
}
// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
Use: "tic-tsdb",
Short: "Saves MQTT events to TimescaleDB",
Long: ``,
}
// Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the rootCmd.
func Execute() {
if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
func init() {
// Initializes a new logger without timestamps
logger = log.New(os.Stderr, "", 0)
// Set default configuration
viper.SetDefault("sql.port", 5432)
viper.SetDefault("mqtt.clientId", "tic-tsdb")
viper.SetDefault("mqtt.timeout", 30*time.Second)
viper.SetDefault("mqtt.gracePeriod", 5*time.Second)
cobra.OnInitialize(initConfig)
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $PWD/tic-tsdb.yaml)")
}
// initConfig reads in config file and ENV variables if set.
func initConfig() {
if cfgFile != "" {
// Use config file from the flag.
viper.SetConfigFile(cfgFile)
} else {
// Search working directory with name "tic-tsdb" (without extension).
viper.AddConfigPath(".")
viper.SetConfigName("tic-tsdb")
}
viper.AutomaticEnv() // read in environment variables that match
// If a config file is found, read it in.
if err := viper.ReadInConfig(); err == nil {
fmt.Println("Using config file:", viper.ConfigFileUsed())
}
}

28
cli/main.go

@ -0,0 +1,28 @@
/*
Copyright © 2022 Nicolas MASSE
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
package main
import "github.com/nmasse-itix/tic-tsdb/cli/cmd"
func main() {
cmd.Execute()
}

12
go.mod

@ -0,0 +1,12 @@
module github.com/nmasse-itix/tic-tsdb
go 1.16
require (
github.com/eclipse/paho.mqtt.golang v1.3.5
github.com/jackc/pgx/v4 v4.15.0
github.com/pressly/goose/v3 v3.5.3
github.com/rubenv/sql-migrate v1.1.1
github.com/spf13/cobra v1.3.0
github.com/spf13/viper v1.10.1
)

1271
go.sum

File diff suppressed because it is too large

48
migrations.go

@ -0,0 +1,48 @@
/*
Copyright © 2022 Nicolas MASSE
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
package lib
import (
"database/sql"
"embed"
goose "github.com/pressly/goose/v3"
)
// SqlMigrationFS stores a list of database schema migration scripts
//go:embed schemas/*.sql
var SqlMigrationFS embed.FS
// MigrateDb migrates the provided database to the most recent schema
func MigrateDb(db *sql.DB) error {
goose.SetBaseFS(SqlMigrationFS)
if err := goose.SetDialect("postgres"); err != nil {
return err
}
if err := goose.Up(db, "schemas"); err != nil {
return err
}
return nil
}

83
mqtt.go

@ -0,0 +1,83 @@
/*
Copyright © 2022 Nicolas MASSE
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
package lib
import (
"fmt"
"log"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
// Those flags define the MQTT Quality of Service (QoS) levels
const (
MQTT_QOS_0 = 0 // QoS 1
MQTT_QOS_1 = 1 // QoS 2
MQTT_QOS_2 = 2 // QoS 3
)
// An MqttConfig represents the required information to connect to an MQTT
// broker.
type MqttConfig struct {
BrokerURL string // broker url (tcp://hostname:port or ssl://hostname:port)
Username string // username (optional)
Password string // password (optional)
ClientID string // MQTT ClientID
Timeout time.Duration // how much time to wait for connect and subscribe operations to complete
GracePeriod time.Duration // how much time to wait for the disconnect operation to complete
}
// SetMqttLogger sets the logger to be used by the underlying MQTT library
func SetMqttLogger(logger *log.Logger) {
mqtt.CRITICAL = logger
mqtt.ERROR = logger
mqtt.WARN = logger
}
// NewMqttClient creates a new MQTT client and connects to the broker
func NewMqttClient(config MqttConfig) (mqtt.Client, error) {
if config.BrokerURL == "" {
return nil, fmt.Errorf("MQTT broker URL is empty")
}
opts := mqtt.NewClientOptions()
opts.AddBroker(config.BrokerURL)
opts.SetAutoReconnect(true)
opts.SetConnectRetry(true)
opts.SetConnectRetryInterval(config.Timeout)
opts.SetOrderMatters(false)
opts.SetCleanSession(false)
opts.SetClientID(config.ClientID)
if config.Username != "" {
opts.SetUsername(config.Username)
opts.SetPassword(config.Password)
}
client := mqtt.NewClient(opts)
ct := client.Connect()
if !ct.WaitTimeout(config.Timeout) {
return nil, fmt.Errorf("mqtt: timeout waiting for connection")
}
return client, nil
}

22
podman-compose.yaml

@ -0,0 +1,22 @@
version: '3.1'
services:
timescale:
image: docker.io/timescale/timescaledb:latest-pg12
ports:
- "5432:5432"
volumes:
- ./.podman-compose/pg-data:/var/lib/postgresql/data:z
restart: always
environment:
POSTGRES_PASSWORD: secret
POSTGRES_USER: tic
POSTGRES_HOST_AUTH_METHOD: scram-sha-256
POSTGRES_INITDB_ARGS: --auth-host=scram-sha-256
POSTGRES_DB: tic
mosquitto:
image: docker.io/library/eclipse-mosquitto:2.0
ports:
- "1883:1883"
volumes:
- ./.podman-compose/mosquitto-data:/mosquitto/data:z
- ./.podman-compose/mosquitto-config:/mosquitto/config:z

268
process.go

@ -0,0 +1,268 @@
/*
Copyright © 2022 Nicolas MASSE
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
package lib
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"strconv"
"strings"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
_ "github.com/jackc/pgx/v4/stdlib"
)
// An SqlConfig stores connection details to the database
type SqlConfig struct {
Url string // Database URL (driver://user:password@hostname:port/db?opts)
}
// A ProcessorConfig stores the configuration of a processor
type ProcessorConfig struct {
Sql SqlConfig
Mqtt MqttConfig
Logger *log.Logger
}
// A UnixEpoch is a time.Time that serializes / deserializes as Unix epoch
type UnixEpoch time.Time
// MarshalJSON returns the current value as JSON
func (t UnixEpoch) MarshalJSON() ([]byte, error) {
t2 := time.Time(t)
return []byte(fmt.Sprintf("%d", t2.Unix())), nil
}
// UnmarshalJSON initialises the current object from its JSON representation
func (t *UnixEpoch) UnmarshalJSON(b []byte) error {
unix, err := strconv.ParseInt(string(b), 10, 64)
if err != nil {
return err
}
*t = UnixEpoch(time.Unix(unix, 0))
return nil
}
// A TicMessage represents data received from the TIC (Tele Information Client)
type TicMessage struct {
Timestamp UnixEpoch `json:"ts"`
Field string `json:"-"`
Value string `json:"val"`
}
// A Processor receives events from the MQTT broker and saves data to the database
type Processor struct {
Config ProcessorConfig // the configuration
client mqtt.Client // the MQTT client
messages chan TicMessage // channel to send events from the MQTT go routines to the main method
conn *sql.DB // the database connection
}
const (
// How many in-flight MQTT messages to buffer
MESSAGE_CHANNEL_LENGTH = 10
// SQL Query to store current data
UpsertCurrentQuery string = `
INSERT INTO current VALUES ($1, $2, $3)
ON CONFLICT (time, phase) DO UPDATE
SET current = excluded.current`
// SQL Query to store power data
UpsertPowerQuery string = `
INSERT INTO power VALUES ($1, $2)
ON CONFLICT (time) DO UPDATE
SET power = excluded.power`
// SQL Query to store energy data
UpsertEnergyQuery string = `
INSERT INTO energy VALUES ($1, $2, $3)
ON CONFLICT (time, tariff) DO UPDATE
SET reading = excluded.reading`
)
// NewProcessor creates a new processor from its configuration
func NewProcessor(c ProcessorConfig) *Processor {
processor := Processor{
Config: c,
messages: make(chan TicMessage, MESSAGE_CHANNEL_LENGTH),
}
return &processor
}
// usefulTopics is a list of topics of interest
var usefulTopics map[string]bool = map[string]bool{
"IINST": true,
"IINST1": true,
"IINST2": true,
"IINST3": true,
"PAPP": true,
"BASE": true,
"HCHP": true,
"HCHC": true,
}
// Process receives MQTT messages and saves data to the SQL database
func (processor *Processor) Process() error {
var err error
// connect to the SQL Database
processor.Config.Logger.Println("Connecting to PostgreSQL server...")
processor.conn, err = sql.Open("pgx", processor.Config.Sql.Url)
if err != nil {
return err
}
defer processor.conn.Close()
// do SQL Schema migrations
processor.Config.Logger.Println("Ensuring db schema is up-to-date...")
err = MigrateDb(processor.conn)
if err != nil {
return err
}
// connect to the MQTT broker
SetMqttLogger(processor.Config.Logger)
processor.Config.Logger.Println("Connecting to MQTT server...")
processor.client, err = NewMqttClient(processor.Config.Mqtt)
if err != nil {
return err
}
// subscribe to topics
topics := "esp-tic/status/tic/#"
processor.Config.Logger.Printf("Subscribing to topics %s...", topics)
st := processor.client.Subscribe(topics, MQTT_QOS_2, processor.processMessage)
if !st.WaitTimeout(processor.Config.Mqtt.Timeout) {
return fmt.Errorf("mqtt: timeout waiting for subscribe")
}
// process MQTT messages
for {
msg := <-processor.messages
processor.Config.Logger.Printf("%s: %s", msg.Field, msg.Value)
var err error
if msg.Field == "IINST" || msg.Field == "IINST1" || msg.Field == "IINST2" || msg.Field == "IINST3" {
err = processor.processCurrent(msg)
} else if msg.Field == "PAPP" {
err = processor.processPower(msg)
} else if msg.Field == "BASE" || msg.Field == "HCHP" || msg.Field == "HCHC" {
err = processor.processEnergy(msg)
}
if err != nil {
processor.Config.Logger.Println(err)
}
}
}
// processCurrent saves current data to the database
func (processor *Processor) processCurrent(msg TicMessage) error {
phase := 0
if msg.Field != "IINST" {
phase = int(msg.Field[5] - '0')
}
value, err := strconv.ParseInt(msg.Value, 10, 32)
if err != nil {
return err
}
rows, err := processor.conn.Query(UpsertCurrentQuery,
time.Time(msg.Timestamp),
phase,
value)
if err != nil {
return err
}
rows.Close()
return nil
}
// processPower saves power data to the database
func (processor *Processor) processPower(msg TicMessage) error {
value, err := strconv.ParseInt(msg.Value, 10, 32)
if err != nil {
return err
}
rows, err := processor.conn.Query(UpsertPowerQuery,
time.Time(msg.Timestamp),
value)
if err != nil {
return err
}
rows.Close()
return nil
}
// processEnergy saves energy readings to the database
func (processor *Processor) processEnergy(msg TicMessage) error {
value, err := strconv.ParseInt(msg.Value, 10, 32)
if err != nil {
return err
}
rows, err := processor.conn.Query(UpsertEnergyQuery,
time.Time(msg.Timestamp),
msg.Field,
value)
if err != nil {
return err
}
rows.Close()
return nil
}
// processMessage is the callback routine called by the MQTT library to process
// events.
func (processor *Processor) processMessage(c mqtt.Client, m mqtt.Message) {
if m.Retained() {
return
}
topic := m.Topic()
pos := strings.LastIndexByte(topic, '/')
if pos == -1 {
return
}
field := topic[pos+1:]
var ok bool
if _, ok = usefulTopics[field]; !ok {
return
}
var msg TicMessage
err := json.Unmarshal(m.Payload(), &msg)
if err != nil {
processor.Config.Logger.Println(err)
return
}
msg.Field = field
processor.messages <- msg
}

28
schemas/001_initial.sql

@ -0,0 +1,28 @@
-- +goose Up
CREATE TABLE current (
time TIMESTAMP (0) WITHOUT TIME ZONE NOT NULL,
phase INTEGER NOT NULL DEFAULT(0),
current INTEGER NOT NULL,
UNIQUE (time, phase)
);
CREATE TABLE power (
time TIMESTAMP (0) WITHOUT TIME ZONE UNIQUE NOT NULL,
power INTEGER NOT NULL
);
CREATE TABLE energy (
time TIMESTAMP (0) WITHOUT TIME ZONE NOT NULL,
tariff TEXT NOT NULL,
reading INTEGER NOT NULL,
UNIQUE (time, tariff)
);
SELECT create_hypertable('current','time');
SELECT create_hypertable('power','time');
SELECT create_hypertable('energy','time');
-- +goose Down
DROP TABLE current;
DROP TABLE power;
DROP TABLE energy;

12
tic-tsdb.yaml

@ -0,0 +1,12 @@
mqtt:
broker: tcp://localhost:1883
username: dev
password: secret
timeout: 5s
gracePeriod: 2s
sql:
database: tic
username: tic
password: secret
hostname: localhost
port: 5432
Loading…
Cancel
Save