Saves data from the TIC (Enedis Tele Information Client) received from MQTT to a TimescaleDB
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

268 lines
6.9 KiB

/*
Copyright © 2022 Nicolas MASSE
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
package lib
import (
"database/sql"
"encoding/json"
"fmt"
"log"
"strconv"
"strings"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
_ "github.com/jackc/pgx/v4/stdlib"
)
// An SqlConfig stores connection details to the database
type SqlConfig struct {
Url string // Database URL (driver://user:password@hostname:port/db?opts)
}
// A ProcessorConfig stores the configuration of a processor
type ProcessorConfig struct {
Sql SqlConfig
Mqtt MqttConfig
Logger *log.Logger
}
// A UnixEpoch is a time.Time that serializes / deserializes as Unix epoch
type UnixEpoch time.Time
// MarshalJSON returns the current value as JSON
func (t UnixEpoch) MarshalJSON() ([]byte, error) {
t2 := time.Time(t)
return []byte(fmt.Sprintf("%d", t2.Unix())), nil
}
// UnmarshalJSON initialises the current object from its JSON representation
func (t *UnixEpoch) UnmarshalJSON(b []byte) error {
unix, err := strconv.ParseInt(string(b), 10, 64)
if err != nil {
return err
}
*t = UnixEpoch(time.Unix(unix, 0))
return nil
}
// A TicMessage represents data received from the TIC (Tele Information Client)
type TicMessage struct {
Timestamp UnixEpoch `json:"ts"`
Field string `json:"-"`
Value string `json:"val"`
}
// A Processor receives events from the MQTT broker and saves data to the database
type Processor struct {
Config ProcessorConfig // the configuration
client mqtt.Client // the MQTT client
messages chan TicMessage // channel to send events from the MQTT go routines to the main method
conn *sql.DB // the database connection
}
const (
// How many in-flight MQTT messages to buffer
MESSAGE_CHANNEL_LENGTH = 10
// SQL Query to store current data
UpsertCurrentQuery string = `
INSERT INTO current VALUES ($1, $2, $3)
ON CONFLICT (time, phase) DO UPDATE
SET current = excluded.current`
// SQL Query to store power data
UpsertPowerQuery string = `
INSERT INTO power VALUES ($1, $2)
ON CONFLICT (time) DO UPDATE
SET power = excluded.power`
// SQL Query to store energy data
UpsertEnergyQuery string = `
INSERT INTO energy VALUES ($1, $2, $3)
ON CONFLICT (time, tariff) DO UPDATE
SET reading = excluded.reading`
)
// NewProcessor creates a new processor from its configuration
func NewProcessor(c ProcessorConfig) *Processor {
processor := Processor{
Config: c,
messages: make(chan TicMessage, MESSAGE_CHANNEL_LENGTH),
}
return &processor
}
// usefulTopics is a list of topics of interest
var usefulTopics map[string]bool = map[string]bool{
"IINST": true,
"IINST1": true,
"IINST2": true,
"IINST3": true,
"PAPP": true,
"BASE": true,
"HCHP": true,
"HCHC": true,
}
// Process receives MQTT messages and saves data to the SQL database
func (processor *Processor) Process() error {
var err error
// connect to the SQL Database
processor.Config.Logger.Println("Connecting to PostgreSQL server...")
processor.conn, err = sql.Open("pgx", processor.Config.Sql.Url)
if err != nil {
return err
}
defer processor.conn.Close()
// do SQL Schema migrations
processor.Config.Logger.Println("Ensuring db schema is up-to-date...")
err = MigrateDb(processor.conn)
if err != nil {
return err
}
// connect to the MQTT broker
SetMqttLogger(processor.Config.Logger)
processor.Config.Logger.Println("Connecting to MQTT server...")
processor.client, err = NewMqttClient(processor.Config.Mqtt)
if err != nil {
return err
}
// subscribe to topics
topics := "esp-tic/status/tic/#"
processor.Config.Logger.Printf("Subscribing to topics %s...", topics)
st := processor.client.Subscribe(topics, MQTT_QOS_2, processor.processMessage)
if !st.WaitTimeout(processor.Config.Mqtt.Timeout) {
return fmt.Errorf("mqtt: timeout waiting for subscribe")
}
// process MQTT messages
for {
msg := <-processor.messages
processor.Config.Logger.Printf("%s: %s", msg.Field, msg.Value)
var err error
if msg.Field == "IINST" || msg.Field == "IINST1" || msg.Field == "IINST2" || msg.Field == "IINST3" {
err = processor.processCurrent(msg)
} else if msg.Field == "PAPP" {
err = processor.processPower(msg)
} else if msg.Field == "BASE" || msg.Field == "HCHP" || msg.Field == "HCHC" {
err = processor.processEnergy(msg)
}
if err != nil {
processor.Config.Logger.Println(err)
}
}
}
// processCurrent saves current data to the database
func (processor *Processor) processCurrent(msg TicMessage) error {
phase := 0
if msg.Field != "IINST" {
phase = int(msg.Field[5] - '0')
}
value, err := strconv.ParseInt(msg.Value, 10, 32)
if err != nil {
return err
}
rows, err := processor.conn.Query(UpsertCurrentQuery,
time.Time(msg.Timestamp),
phase,
value)
if err != nil {
return err
}
rows.Close()
return nil
}
// processPower saves power data to the database
func (processor *Processor) processPower(msg TicMessage) error {
value, err := strconv.ParseInt(msg.Value, 10, 32)
if err != nil {
return err
}
rows, err := processor.conn.Query(UpsertPowerQuery,
time.Time(msg.Timestamp),
value)
if err != nil {
return err
}
rows.Close()
return nil
}
// processEnergy saves energy readings to the database
func (processor *Processor) processEnergy(msg TicMessage) error {
value, err := strconv.ParseInt(msg.Value, 10, 32)
if err != nil {
return err
}
rows, err := processor.conn.Query(UpsertEnergyQuery,
time.Time(msg.Timestamp),
msg.Field,
value)
if err != nil {
return err
}
rows.Close()
return nil
}
// processMessage is the callback routine called by the MQTT library to process
// events.
func (processor *Processor) processMessage(c mqtt.Client, m mqtt.Message) {
if m.Retained() {
return
}
topic := m.Topic()
pos := strings.LastIndexByte(topic, '/')
if pos == -1 {
return
}
field := topic[pos+1:]
var ok bool
if _, ok = usefulTopics[field]; !ok {
return
}
var msg TicMessage
err := json.Unmarshal(m.Payload(), &msg)
if err != nil {
processor.Config.Logger.Println(err)
return
}
msg.Field = field
processor.messages <- msg
}