2026年3月23日 星期一

Esp32 直接寫入到TimescaleDB

1.先將 PostgreSQLpg_hba.conf 做設定

notepad "C:\Program Files\PostgreSQL\15\data\pg_hba.conf"
在檔案中的最後一行加上
host all all 192.168.0.0/24 trust

如下
# TYPE DATABASE USER ADDRESS METHOD


# "local" is for Unix domain socket connections only

local all all scram-sha-256

# IPv4 local connections:

host all all 127.0.0.1/32 scram-sha-256

# IPv6 local connections:

host all all ::1/128 scram-sha-256

# Allow replication connections from localhost, by a user with the

# replication privilege.

local replication all scram-sha-256

host replication all 127.0.0.1/32 scram-sha-256

host replication all ::1/128 scram-sha-256

host all all 192.168.0.0/24 trust


存檔關閉,重啟 PostgreSQL 服務 。

SELECT type, address, auth_method FROM pg_hba_file_rules WHERE type = 'host';

192.168.0.0 trust 代表有生效


2.建立 table

-- 啟用 TimescaleDB extension

CREATE EXTENSION IF NOT EXISTS timescaledb;


-- 建立 table

CREATE TABLE IF NOT EXISTS sensor_data (

time TIMESTAMPTZ NOT NULL DEFAULT NOW(),

device_id TEXT NOT NULL,

temperature FLOAT

);


-- 轉成 hypertable

SELECT create_hypertable('sensor_data', 'time', if_not_exists => TRUE);

3.esp32程式碼:

#include <SPI.h>

#include <Ethernet2.h>

#include <ModbusMaster.h>


// ===== W5500 =====

#define W5500_CS 6

#define W5500_RST 14

byte mac[] = { 0xDE, 0xAD, 0xBE, 0xEF, 0xFE, 0xED };

IPAddress dbServer(192, 168, 0, 220); // 你電腦的 IP

EthernetClient client;


// ===== RS485 =====

#define RS_TX_PIN 5

#define RS_RX_PIN 4

#define RS_ENAMBLE_232_PIN 21

#define RS_ENAMBLE_422_PIN 16

#define RS_ENAMBLE_485_PIN 15

#define RS_BAUDRATE 9600

ModbusMaster node;


// ===== DB 設定 =====

const char* DB_NAME = "postgres";

const char* DB_USER = "postgres";


// ─────────────────────────────────────────────

// PostgreSQL wire protocol helpers

// ─────────────────────────────────────────────


// 讀掉一個完整的 server message,回傳 type byte

char pgReadMessage() {

unsigned long t = millis();

while (!client.available()) {

if (millis() - t > 5000) { Serial.println("PG timeout"); return 0; }

delay(10);

}

char msgType = (char)client.read();


// 4 bytes 長度

uint8_t lb[4] = {0};

int got = 0;

t = millis();

while (got < 4) {

if (client.available()) lb[got++] = client.read();

if (millis() - t > 2000) break;

}

int bodyLen = (((int)lb[0]<<24)|((int)lb[1]<<16)|((int)lb[2]<<8)|(int)lb[3]) - 4;


// 如果是錯誤訊息,印出來

if (msgType == 'E' && bodyLen > 0 && bodyLen < 400) {

uint8_t body[400];

got = 0; t = millis();

while (got < bodyLen) {

if (client.available()) body[got++] = client.read();

if (millis() - t > 2000) break;

}

// PG 錯誤格式:一堆 "field_type + string\0" 組合

// 印出所有可讀字元

Serial.print("PG Error: ");

for (int i = 0; i < got; i++) {

if (body[i] >= 32 && body[i] < 127) Serial.print((char)body[i]);

else if (body[i] == 0) Serial.print(" | ");

}

Serial.println();

return msgType;

}


// 其他訊息:讀掉 body

got = 0; t = millis();

while (got < bodyLen) {

if (client.available()) { client.read(); got++; }

if (millis() - t > 2000) break;

}

return msgType;

}


// 送出有 type byte message(一般訊息用)

void pgSendMessage(char type, const uint8_t* body, int bodyLen) {

int totalLen = bodyLen + 4;

uint8_t hdr[5];

hdr[0] = (uint8_t)type;

hdr[1] = (totalLen >> 24) & 0xFF;

hdr[2] = (totalLen >> 16) & 0xFF;

hdr[3] = (totalLen >> 8) & 0xFF;

hdr[4] = (totalLen ) & 0xFF;

client.write(hdr, 5);

if (bodyLen > 0) client.write(body, bodyLen);

}


// 等到收到 ReadyForQuery ('Z')

void pgWaitReady() {

char r = 0;

for (int i = 0; i < 15 && r != 'Z'; i++) r = pgReadMessage();

}


// ─────────────────────────────────────────────

// 連線到 PostgreSQLStartup → Auth → Ready

// ─────────────────────────────────────────────

bool pgConnect() {

Serial.print("TCP -> DB...");

if (!client.connect(dbServer, 5432)) {

Serial.println("FAIL");

return false;

}

Serial.println("OK");


// ── Startup message(沒有 type byte,格式特殊)──

uint8_t buf[128];

int pos = 0;

// Protocol 3.0

buf[pos++]=0x00; buf[pos++]=0x03; buf[pos++]=0x00; buf[pos++]=0x00;

// "user\0<user>\0"

memcpy(buf+pos,"user",4); pos+=4; buf[pos++]=0;

int ul=strlen(DB_USER); memcpy(buf+pos,DB_USER,ul); pos+=ul; buf[pos++]=0;

// "database\0<db>\0"

memcpy(buf+pos,"database",8); pos+=8; buf[pos++]=0;

int dl=strlen(DB_NAME); memcpy(buf+pos,DB_NAME,dl); pos+=dl; buf[pos++]=0;

buf[pos++]=0; // terminator


int totalLen = pos + 4;

uint8_t lenBytes[4] = {

(uint8_t)(totalLen>>24),(uint8_t)(totalLen>>16),

(uint8_t)(totalLen>>8), (uint8_t)(totalLen)

};

client.write(lenBytes, 4);

client.write(buf, pos);


// 伺服器回 'R'auth request)或直接 'Z'trust mode

char r = pgReadMessage();

Serial.printf("Startup resp: '%c'\n", r);


if (r == 'R') {

// trust mode 下伺服器送 AuthenticationOk 然後一堆 parameter status

// 直接等 ReadyForQuery

pgWaitReady();

} else if (r != 'Z') {

Serial.println("Unexpected startup response");

client.stop();

return false;

}


Serial.println("DB ready!");

return true;

}


// ─────────────────────────────────────────────

// 執行 INSERT SQL

// ─────────────────────────────────────────────

bool pgInsert(float temperature) {

char sql[200];

snprintf(sql, sizeof(sql),

"INSERT INTO sensor_data (time, device_id, temperature) "

"VALUES (NOW(), 'CH4_SENSOR', %.2f);",

temperature

);


int sLen = strlen(sql);

uint8_t body[256];

memcpy(body, sql, sLen);

body[sLen] = 0;

pgSendMessage('Q', body, sLen + 1);


char resp = pgReadMessage();

Serial.printf("Insert resp: '%c'\n", resp);


if (resp == 'E') {

Serial.println("SQL Error!");

pgWaitReady();

return false;

}

pgWaitReady();

return true;

}


// ─────────────────────────────────────────────

// Modbus callbacks

// ─────────────────────────────────────────────

void preTransmission() {

while (Serial2.available()) Serial2.read();

}

void postTransmission() {

Serial2.flush();

delayMicroseconds(100);

while (Serial2.available()) Serial2.read();

}


// ─────────────────────────────────────────────

void setup() {

Serial.begin(115200);


// W5500 reset

pinMode(W5500_RST, OUTPUT);

digitalWrite(W5500_RST, LOW); delay(20);

digitalWrite(W5500_RST, HIGH); delay(100);


SPI.begin(12, 13, 11, W5500_CS);

Ethernet.init(W5500_CS);

IPAddress ip(192, 168, 0, 50);

IPAddress gateway(192, 168, 0, 1);

IPAddress subnet(255, 255, 255, 0);

Ethernet.begin(mac, ip, gateway, gateway, subnet);

Serial.print("IP: "); Serial.println(Ethernet.localIP());


// RS485

pinMode(RS_ENAMBLE_232_PIN, OUTPUT);

pinMode(RS_ENAMBLE_422_PIN, OUTPUT);

pinMode(RS_ENAMBLE_485_PIN, OUTPUT);

digitalWrite(RS_ENAMBLE_232_PIN, LOW);

digitalWrite(RS_ENAMBLE_422_PIN, LOW);

digitalWrite(RS_ENAMBLE_485_PIN, HIGH);

Serial2.begin(RS_BAUDRATE, SERIAL_8N1, RS_RX_PIN, RS_TX_PIN);

node.begin(1, Serial2);

node.preTransmission(preTransmission);

node.postTransmission(postTransmission);


Serial.println("System Ready");

pgConnect();

}


void loop() {

uint8_t result = node.readHoldingRegisters(43, 1);


if (result == node.ku8MBSuccess) {

uint16_t raw = node.getResponseBuffer(0);

float temperature = raw / 10.0;

Serial.print("Temp: "); Serial.println(temperature);


// DB 連線斷掉就重連

if (!client.connected()) {

Serial.println("Reconnecting...");

pgConnect();

}


if (pgInsert(temperature)) {

Serial.println("Saved to DB!");

}

} else {

Serial.print("Modbus error: 0x");

Serial.println(result, HEX);

}


delay(2000);

}

2-1.符合格式及建stored procedure

{"id":10,"add":400082,"dt":1773408584755,"v":17754}


建立 table :

CREATE TABLE IF NOT EXISTS public.raw_sensor_data (

ts TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,

machine_id INTEGER NOT NULL,

tag_addr INTEGER NOT NULL,

val DOUBLE PRECISION

);


SELECT create_hypertable('public.raw_sensor_data', 'ts', if_not_exists => TRUE);


建立 stored procedure :

CREATE OR REPLACE FUNCTION insert_machine_data_json(IN p_json_data jsonb)

RETURNS void AS $$

BEGIN

INSERT INTO public.raw_sensor_data (ts, machine_id, tag_addr, val)

VALUES (

CURRENT_TIMESTAMP,

(p_json_data->>'id')::INTEGER,

(p_json_data->>'add')::INTEGER,

(p_json_data->>'v')::DOUBLE PRECISION

);

END;

$$ LANGUAGE plpgsql;


測試 procedure 可否寫入 :

SELECT insert_machine_data_json('{"id":10,"add":400082,"v":177.54}');


-- 確認有寫入

SELECT * FROM public.raw_sensor_data ORDER BY ts DESC LIMIT 5;


2-2.調整esp32程式:

#include <SPI.h>

#include <Ethernet2.h>

#include <ModbusMaster.h>


// ===== W5500 =====

#define W5500_CS 6

#define W5500_RST 14

byte mac[] = { 0xDE, 0xAD, 0xBE, 0xEF, 0xFE, 0xED };

IPAddress dbServer(192, 168, 0, 220); // 你電腦的 IP

EthernetClient client;


// ===== RS485 =====

#define RS_TX_PIN 5

#define RS_RX_PIN 4

#define RS_ENAMBLE_232_PIN 21

#define RS_ENAMBLE_422_PIN 16

#define RS_ENAMBLE_485_PIN 15

#define RS_BAUDRATE 9600

ModbusMaster node;


// ===== DB 設定 =====

const char* DB_NAME = "postgres";

const char* DB_USER = "postgres";


// ─────────────────────────────────────────────

// PostgreSQL wire protocol helpers

// ─────────────────────────────────────────────


// 讀掉一個完整的 server message,回傳 type byte

char pgReadMessage() {

unsigned long t = millis();

while (!client.available()) {

if (millis() - t > 5000) { Serial.println("PG timeout"); return 0; }

delay(10);

}

char msgType = (char)client.read();


// 4 bytes 長度

uint8_t lb[4] = {0};

int got = 0;

t = millis();

while (got < 4) {

if (client.available()) lb[got++] = client.read();

if (millis() - t > 2000) break;

}

int bodyLen = (((int)lb[0]<<24)|((int)lb[1]<<16)|((int)lb[2]<<8)|(int)lb[3]) - 4;


// 如果是錯誤訊息,印出來

if (msgType == 'E' && bodyLen > 0 && bodyLen < 400) {

uint8_t body[400];

got = 0; t = millis();

while (got < bodyLen) {

if (client.available()) body[got++] = client.read();

if (millis() - t > 2000) break;

}

// PG 錯誤格式:一堆 "field_type + string\0" 組合

// 印出所有可讀字元

Serial.print("PG Error: ");

for (int i = 0; i < got; i++) {

if (body[i] >= 32 && body[i] < 127) Serial.print((char)body[i]);

else if (body[i] == 0) Serial.print(" | ");

}

Serial.println();

return msgType;

}


// 其他訊息:讀掉 body

got = 0; t = millis();

while (got < bodyLen) {

if (client.available()) { client.read(); got++; }

if (millis() - t > 2000) break;

}

return msgType;

}


// 送出有 type byte message(一般訊息用)

void pgSendMessage(char type, const uint8_t* body, int bodyLen) {

int totalLen = bodyLen + 4;

uint8_t hdr[5];

hdr[0] = (uint8_t)type;

hdr[1] = (totalLen >> 24) & 0xFF;

hdr[2] = (totalLen >> 16) & 0xFF;

hdr[3] = (totalLen >> 8) & 0xFF;

hdr[4] = (totalLen ) & 0xFF;

client.write(hdr, 5);

if (bodyLen > 0) client.write(body, bodyLen);

}


// 等到收到 ReadyForQuery ('Z')

void pgWaitReady() {

char r = 0;

for (int i = 0; i < 15 && r != 'Z'; i++) r = pgReadMessage();

}


// ─────────────────────────────────────────────

// 連線到 PostgreSQLStartup → Auth → Ready

// ─────────────────────────────────────────────

bool pgConnect() {

Serial.print("TCP -> DB...");

if (!client.connect(dbServer, 5432)) {

Serial.println("FAIL");

return false;

}

Serial.println("OK");


// ── Startup message(沒有 type byte,格式特殊)──

uint8_t buf[128];

int pos = 0;

// Protocol 3.0

buf[pos++]=0x00; buf[pos++]=0x03; buf[pos++]=0x00; buf[pos++]=0x00;

// "user\0<user>\0"

memcpy(buf+pos,"user",4); pos+=4; buf[pos++]=0;

int ul=strlen(DB_USER); memcpy(buf+pos,DB_USER,ul); pos+=ul; buf[pos++]=0;

// "database\0<db>\0"

memcpy(buf+pos,"database",8); pos+=8; buf[pos++]=0;

int dl=strlen(DB_NAME); memcpy(buf+pos,DB_NAME,dl); pos+=dl; buf[pos++]=0;

buf[pos++]=0; // terminator


int totalLen = pos + 4;

uint8_t lenBytes[4] = {

(uint8_t)(totalLen>>24),(uint8_t)(totalLen>>16),

(uint8_t)(totalLen>>8), (uint8_t)(totalLen)

};

client.write(lenBytes, 4);

client.write(buf, pos);


// 伺服器回 'R'auth request)或直接 'Z'trust mode

char r = pgReadMessage();

Serial.printf("Startup resp: '%c'\n", r);


if (r == 'R') {

// trust mode 下伺服器送 AuthenticationOk 然後一堆 parameter status

// 直接等 ReadyForQuery

pgWaitReady();

} else if (r != 'Z') {

Serial.println("Unexpected startup response");

client.stop();

return false;

}


Serial.println("DB ready!");

return true;

}


// ─────────────────────────────────────────────

// 執行 INSERT SQL

// ─────────────────────────────────────────────

bool pgInsert(float val, int machine_id, int tag_addr) {

char sql[256];

snprintf(sql, sizeof(sql),

"SELECT insert_machine_data_json('{\"id\":%d,\"add\":%d,\"v\":%.4f}');",

machine_id, tag_addr, val

);


int sLen = strlen(sql);

uint8_t body[256];

memcpy(body, sql, sLen);

body[sLen] = 0;

pgSendMessage('Q', body, sLen + 1);


char resp = pgReadMessage();

Serial.printf("Insert resp: '%c'\n", resp);


if (resp == 'E') {

Serial.println("SQL Error!");

pgWaitReady();

return false;

}

pgWaitReady();

return true;

}


// ─────────────────────────────────────────────

// Modbus callbacks

// ─────────────────────────────────────────────

void preTransmission() {

while (Serial2.available()) Serial2.read();

}

void postTransmission() {

Serial2.flush();

delayMicroseconds(100);

while (Serial2.available()) Serial2.read();

}


// ─────────────────────────────────────────────

void setup() {

Serial.begin(115200);


// W5500 reset

pinMode(W5500_RST, OUTPUT);

digitalWrite(W5500_RST, LOW); delay(20);

digitalWrite(W5500_RST, HIGH); delay(100);


SPI.begin(12, 13, 11, W5500_CS);

Ethernet.init(W5500_CS);

IPAddress ip(192, 168, 0, 50);

IPAddress gateway(192, 168, 0, 1);

IPAddress subnet(255, 255, 255, 0);

Ethernet.begin(mac, ip, gateway, gateway, subnet);

Serial.print("IP: "); Serial.println(Ethernet.localIP());


// RS485

pinMode(RS_ENAMBLE_232_PIN, OUTPUT);

pinMode(RS_ENAMBLE_422_PIN, OUTPUT);

pinMode(RS_ENAMBLE_485_PIN, OUTPUT);

digitalWrite(RS_ENAMBLE_232_PIN, LOW);

digitalWrite(RS_ENAMBLE_422_PIN, LOW);

digitalWrite(RS_ENAMBLE_485_PIN, HIGH);

Serial2.begin(RS_BAUDRATE, SERIAL_8N1, RS_RX_PIN, RS_TX_PIN);

node.begin(1, Serial2);

node.preTransmission(preTransmission);

node.postTransmission(postTransmission);


Serial.println("System Ready");

pgConnect();

}


void loop() {

uint8_t result = node.readHoldingRegisters(43, 1);

if (result == node.ku8MBSuccess) {

uint16_t raw = node.getResponseBuffer(0);

float val = raw / 10.0;

Serial.print("Temp: "); Serial.println(val);


// DB 連線斷掉就重連

if (!client.connected()) {

Serial.println("Reconnecting...");

pgConnect();

}


// machine_id=10, tag_addr=43 ( Modbus register)

if (pgInsert(val, 10, 43)) {

Serial.println("Saved to DB!");

}

} else {

Serial.print("Modbus error: 0x");

Serial.println(result, HEX);

}

delay(2000);

}

成功寫入:





沒有留言:

張貼留言

Raspberry Pi 做 PostgreSQL(Timescale) 讓資料可以寫入到PostgreSQL

先安裝樹梅派系統 https://asps5414.blogspot.com/2026/03/raspberry-pi-os64-bit-litegui.html sudo apt install postgresql -y sudo systemctl status postg...