Connect to Enedis smart meter (Linky) and send data to MQTT
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

250 lines
5.7 KiB

package main
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"os"
"reflect"
"strings"
"sync"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
diff "github.com/r3labs/diff/v2"
"go.bug.st/serial"
yaml "gopkg.in/yaml.v3"
)
const (
MQTT_QOS_0 = 0
MQTT_QOS_1 = 1
MQTT_QOS_2 = 2
)
type TicData struct {
Value string `json:"val"`
Timestamp int64 `json:"ts"`
}
func main() {
//mqtt.ERROR = log.New(os.Stdout, "[ERROR] ", 0)
//mqtt.CRITICAL = log.New(os.Stdout, "[CRIT] ", 0)
//mqtt.WARN = log.New(os.Stdout, "[WARN] ", 0)
//mqtt.DEBUG = log.New(os.Stdout, "[DEBUG] ", 0)
var portName string
var mqttUri string
var mqttUsername string
var mqttPassword string
var mqttClientId string
var mqttClient mqtt.Client
flag.StringVar(&portName, "p", "/dev/ttyUSB1", "Serial port to use")
flag.StringVar(&mqttUri, "s", "", "MQTT Server URI (tcp://server:port or ssl://server:port)")
flag.StringVar(&mqttUsername, "u", "", "MQTT Username")
flag.StringVar(&mqttPassword, "w", "", "MQTT Password")
flag.StringVar(&mqttClientId, "c", "esptic-e2e-test", "MQTT Client ID")
flag.Parse()
cases := flag.Args()
if len(cases) == 0 || len(cases) > 1 {
fmt.Println("Usage: e2e [options] test_case")
fmt.Println()
fmt.Println("Options:")
flag.PrintDefaults()
fmt.Println()
fmt.Println("Available test cases:")
for _, testCase := range testCases {
fmt.Printf("- %s\n", testCase.Name)
}
os.Exit(1)
}
testcaseName := cases[0]
if mqttUri != "" {
opts := mqtt.NewClientOptions()
opts.AddBroker(mqttUri)
opts.SetAutoReconnect(false)
opts.SetConnectRetry(false)
opts.SetOrderMatters(false)
opts.SetCleanSession(true)
opts.SetClientID(mqttClientId)
if mqttUsername != "" {
opts.SetUsername(mqttUsername)
}
if mqttPassword != "" {
opts.SetPassword(mqttPassword)
}
mqttClient = mqtt.NewClient(opts)
fmt.Printf("Connecting to MQTT server %s...\n", mqttUri)
ct := mqttClient.Connect()
if !ct.WaitTimeout(30 * time.Second) {
fmt.Println("Timeout waiting for MQTT server!")
os.Exit(1)
}
if ct.Error() != nil {
fmt.Println(ct.Error())
os.Exit(1)
}
}
for _, testCase := range testCases {
var mut sync.Mutex
if testCase.Name != testcaseName {
continue
}
fmt.Printf("Running test case %s...\n", testCase.Name)
var mode serial.Mode
var modeName string
if testCase.Mode == TIC_MODE_HISTORIQUE {
mode = serial.Mode{
BaudRate: 1200,
DataBits: 7,
Parity: serial.EvenParity,
StopBits: serial.OneStopBit,
}
modeName = "historique"
} else if testCase.Mode == TIC_MODE_STANDARD {
mode = serial.Mode{
BaudRate: 9600,
DataBits: 7,
Parity: serial.EvenParity,
StopBits: serial.OneStopBit,
}
modeName = "standard"
} else {
panic("Unknown mode")
}
fmt.Printf("Opening port %s with mode %s...\n", portName, modeName)
port, err := serial.Open(portName, &mode)
if err != nil {
panic(err)
}
defer port.Close()
var mqttObservedValues map[string][]string = make(map[string][]string, len(testCase.Expected))
if mqttUri != "" {
var topics map[string]byte = make(map[string]byte, len(testCase.Expected))
for topic := range testCase.Expected {
topics[topic] = MQTT_QOS_1
}
st := mqttClient.SubscribeMultiple(topics, func(c mqtt.Client, m mqtt.Message) {
if m.Retained() {
return
}
var data TicData
err := json.Unmarshal(m.Payload(), &data)
if err != nil {
fmt.Printf("Cannot unmarshal JSON payload '%s'\n", string(m.Payload()))
return
}
mut.Lock()
defer mut.Unlock()
values, ok := mqttObservedValues[m.Topic()]
if ok {
mqttObservedValues[m.Topic()] = append(values, data.Value)
} else {
mqttObservedValues[m.Topic()] = []string{data.Value}
}
})
st.Wait() // Happy path: there won't be a timeout...
}
for i, frame := range testCase.Sent {
fmt.Printf("Sending trame %d...\n", i)
var b bytes.Buffer
b.WriteByte(0x02)
for _, info := range frame {
b.WriteString(fmt.Sprintf("\n%s\r", info))
}
b.WriteByte(0x03)
buffer := b.Bytes()
n, err := port.Write(buffer)
if err != nil {
panic(err)
}
if n != len(buffer) {
panic("Cannot send bytes to serial port!")
}
// Can be any value between 16.7 and 33.4 ms
time.Sleep(33 * time.Millisecond)
}
fmt.Println("Waiting for MQTT messages...")
success := false
for i := 0; i < 15; i++ {
mut.Lock()
ok := reflect.DeepEqual(mqttObservedValues, testCase.Expected)
mut.Unlock()
if ok {
success = true
break
}
// there may be ongoing MQTT messages waiting to be sent/delivered
// wait one second and try again
time.Sleep(time.Second)
}
if !success {
changelog, err := diff.Diff(testCase.Expected, mqttObservedValues)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Println("TEST FAILED!")
fmt.Println()
fmt.Println("Changelog:")
fmt.Println()
for _, change := range changelog {
fmt.Printf("%s: %s: %s -> %s\n", change.Type, strings.Join(change.Path, "."), str(change.From), str(change.To))
}
fmt.Println()
fmt.Println("Expected:")
b, _ := yaml.Marshal(testCase.Expected)
fmt.Println(string(b))
fmt.Println()
fmt.Println("Got:")
b, _ = yaml.Marshal(mqttObservedValues)
fmt.Println(string(b))
fmt.Println()
os.Exit(1)
}
if mqttUri != "" {
var topics []string = make([]string, len(testCase.Expected))
i := 0
for topic := range testCase.Expected {
topics[i] = topic
i++
}
ut := mqttClient.Unsubscribe(topics...)
ut.Wait() // Happy path: there won't be a timeout...
}
fmt.Println("TEST SUCCEEDED!")
break
}
if mqttClient.IsConnected() {
mqttClient.Disconnect(0)
}
fmt.Println("Done.")
}
func str(s interface{}) string {
if s == nil {
return "nil"
}
return fmt.Sprintf("%s", s)
}