356 lines
14 KiB
Python
356 lines
14 KiB
Python
"""
|
|
|
|
uPurifier is a firmware for ESP8266-based custom PCBs that connects
|
|
IKEA Air Purifiers to Home Assistant via MQTT
|
|
|
|
(C) Copyright Gergo Horvath @2023.
|
|
Released under the GPL v3.0 licence.
|
|
|
|
Official repos related to this project:
|
|
https://github.com/horvathgergo/uPurifier
|
|
https://github.com/horvathgergo/esp8266-for-uppatvind
|
|
https://github.com/horvathgergo/esp8266-for-fornuftig
|
|
|
|
"""
|
|
|
|
import gc
|
|
from umqtt.simple import MQTTClient
|
|
import socket
|
|
import network
|
|
from machine import Pin, PWM, reset, unique_id
|
|
import ubinascii
|
|
import json
|
|
import time
|
|
import os
|
|
gc.collect()
|
|
|
|
|
|
class SmartAirPurifier():
|
|
|
|
def __init__(self):
|
|
self.client_id = ubinascii.hexlify(unique_id())
|
|
self.client_type = os.uname().sysname.upper()
|
|
self.wifi = network.WLAN(network.STA_IF)
|
|
self.wifi.active(True)
|
|
self.ap = network.WLAN(network.AP_IF)
|
|
self.ap.active(False)
|
|
self.device_type = None
|
|
self.config = {}
|
|
self.modes = {
|
|
0: {'state':'OFF', 'speed': 0, 'freq': 1, 'duty': 0, 'preset': 'off' },
|
|
1: {'state':'ON' , 'speed': 33, 'freq':152, 'duty':512, 'preset': 'low' },
|
|
2: {'state':'ON' , 'speed': 66, 'freq':225, 'duty':512, 'preset': 'medium'},
|
|
3: {'state':'ON' , 'speed':100, 'freq':300, 'duty':512, 'preset': 'high' },
|
|
'ON': {'state':'ON' , 'speed':100, 'freq':300, 'duty':512, 'preset': 'high' },
|
|
'OFF': {'state':'OFF', 'speed': 0, 'freq': 1, 'duty': 0, 'preset': 'off' },
|
|
'off': {'state':'OFF', 'speed': 0, 'freq': 1, 'duty': 0, 'preset': 'off' },
|
|
'low': {'state':'ON' , 'speed': 33, 'freq':152, 'duty':512, 'preset': 'low' },
|
|
'medium': {'state':'ON' , 'speed': 66, 'freq':225, 'duty':512, 'preset': 'medium' },
|
|
'high': {'state':'ON' , 'speed':100, 'freq':300, 'duty':512, 'preset': 'high' },
|
|
}
|
|
|
|
|
|
def configure(self):
|
|
"""
|
|
Configure ESP based on device type.
|
|
"""
|
|
if self.device_type == 'fornuftig':
|
|
self.btn3 = Pin(14, Pin.IN, Pin.PULL_UP)
|
|
self.btn2 = Pin(12, Pin.IN, Pin.PULL_UP)
|
|
self.btn1 = Pin(13, Pin.IN, Pin.PULL_UP)
|
|
self.max_freq = 300 #Hz
|
|
self.pwm = PWM(Pin(5, Pin.OUT), freq=1, duty=0)
|
|
self.fg = Pin(4, Pin.IN, Pin.PULL_UP)
|
|
self.url = 'https://github.com/horvathgergo/esp8266-for-fornuftig'
|
|
|
|
elif self.device_type == 'uppatvind':
|
|
self.btn = Pin(13, Pin.IN, Pin.PULL_UP)
|
|
self.max_freq = 300 #Hz
|
|
self.pwm = PWM(Pin(5, Pin.OUT), freq=1, duty=0)
|
|
self.fg = Pin(4, Pin.IN, Pin.PULL_UP)
|
|
self.url = 'https://github.com/horvathgergo/esp8266-for-uppatvind'
|
|
|
|
|
|
def load_config(self):
|
|
"""
|
|
Handle (read/parse) configuration settings provided by the user.
|
|
"""
|
|
with open('config.json', 'r') as f:
|
|
self.config = json.load(f)
|
|
return self.config
|
|
|
|
|
|
def save_config(self):
|
|
"""
|
|
Handle (write/dump) configuration settings provided by the user.
|
|
"""
|
|
with open('config.json', 'w') as f:
|
|
json.dump(self.config, f)
|
|
|
|
|
|
def connect_wifi(self):
|
|
"""
|
|
Connect to wifi at boot/reboot.
|
|
If wifi connection fails then activate fallback mechanism.
|
|
"""
|
|
try:
|
|
self.config = self.load_config()
|
|
self.wifi.connect(self.config["wifi_ssid"], self.config["wifi_psw"])
|
|
if not self.wifi.isconnected():
|
|
time.sleep(4)
|
|
if not self.wifi.isconnected():
|
|
self.open_captive_portal()
|
|
print("wifi connection successful")
|
|
except:
|
|
self.open_captive_portal()
|
|
|
|
|
|
def connect_mqtt(self):
|
|
"""
|
|
Connect to mqtt at boot/reboot.
|
|
Subscibe to state, speed and preset topics.
|
|
If wifi connection fails then activate fallback mechanism.
|
|
"""
|
|
try:
|
|
self.device_type = self.config['purifier']
|
|
bas_t = '/{}/{}/'.format(self.device_type, self.client_id.decode())
|
|
self.mqtt_client = MQTTClient(self.client_id,
|
|
self.config['mqtt_broker'],
|
|
1883,
|
|
self.config['mqtt_user'],
|
|
self.config['mqtt_psw'])
|
|
self.mqtt_client.set_callback(self.mqtt_callback)
|
|
self.mqtt_client.set_last_will(bas_t + 'availability/', 'offline', retain=True)
|
|
self.mqtt_client.connect()
|
|
time.sleep(2)
|
|
self.stat_t = bas_t + 'state/'
|
|
self.cmd_t = bas_t + 'set/'
|
|
self.pct_stat_t = bas_t + 'speed_state/'
|
|
self.pct_cmd_t = bas_t + 'speed_set/'
|
|
self.pr_mode_stat_t = bas_t + 'mode_state/'
|
|
self.pr_mode_cmd_t = bas_t + 'mode_set/'
|
|
self.avty_t = bas_t + 'availability/'
|
|
self.mqtt_client.subscribe(self.cmd_t)
|
|
self.mqtt_client.subscribe(self.pct_cmd_t)
|
|
self.mqtt_client.subscribe(self.pr_mode_cmd_t)
|
|
|
|
self.mqtt_client.publish(self.stat_t, str('OFF').encode())
|
|
self.mqtt_client.publish(self.pct_stat_t, str(0).encode())
|
|
self.mqtt_client.publish(self.pr_mode_stat_t, str('off').encode())
|
|
gc.collect()
|
|
except:
|
|
self.open_captive_portal()
|
|
|
|
|
|
def connect_ha(self):
|
|
"""
|
|
Enable mqtt autodiscovery by sending config entry to Home Assistant at startup.
|
|
"""
|
|
self.friendly_name = self.config['entity_id'].replace('_',' ')
|
|
self.friendly_name = self.friendly_name[0].upper() + self.friendly_name[1:]
|
|
|
|
ha_config = {
|
|
'name' : self.friendly_name,
|
|
'uniq_id' : self.client_id.decode(),
|
|
'obj_id' : self.config['entity_id'],
|
|
'avty_t' : self.avty_t,
|
|
'pl_avail' : 'online',
|
|
'pl_not_avail' : 'offline',
|
|
'stat_t' : self.stat_t,
|
|
'cmd_t' : self.cmd_t,
|
|
'pct_stat_t' : self.pct_stat_t,
|
|
'pct_cmd_t' : self.pct_cmd_t,
|
|
'pr_mode_stat_t' : self.pr_mode_stat_t,
|
|
'pr_mode_cmd_t' : self.pr_mode_cmd_t,
|
|
'pr_modes' : ['off','low','medium','high'],
|
|
'device': {
|
|
'ids' : [self.device_type, self.client_id.decode()],
|
|
'mf' : '@horvathgergo',
|
|
'model' : self.device_type[0].upper() + self.device_type[1:] + ' with ' + self.client_type,
|
|
'name' : 'Purifiers with ' + self.client_type,
|
|
'sw' : 'v0.1.0-beta',
|
|
'cu' : self.url,
|
|
}
|
|
|
|
}
|
|
payload = json.dumps(ha_config)
|
|
discovery_topic = 'homeassistant/fan/'+ self.client_id.decode() + '/config'
|
|
self.mqtt_client.publish(discovery_topic, payload)
|
|
self.mqtt_client.publish(self.avty_t, 'online', retain=True)
|
|
gc.collect()
|
|
|
|
|
|
def parse_request(self, request):
|
|
"""
|
|
Parse HTTP GET request string submitted through the captive portal.
|
|
"""
|
|
request = request.split(' ')[1]
|
|
request_params = request[2:].split('&')
|
|
for param in request_params:
|
|
key, value = param.split('=')
|
|
self.config[key] = value
|
|
return self.config
|
|
|
|
|
|
def css(self):
|
|
"""
|
|
Return pre-defined css style for captivel portal.
|
|
"""
|
|
with open('style.css', 'r') as f:
|
|
css = f.read()
|
|
return css
|
|
|
|
|
|
def html(self):
|
|
"""
|
|
Return pre-defined structure for captivel portal.
|
|
"""
|
|
with open('index.html', 'r') as f:
|
|
html = f.read()
|
|
html = html.replace('{{ css }}', self.css())
|
|
html = html.replace('{{ client_type }}', self.client_type)
|
|
html = html.replace('{{ client_id }}', self.client_id.decode())
|
|
html = html.encode()
|
|
return html
|
|
|
|
|
|
def open_captive_portal(self):
|
|
"""
|
|
Activate captive portal to be able to set or override wifi/mqtt settings.
|
|
"""
|
|
self.ap.active(False)
|
|
self.ap.active(True)
|
|
cp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
cp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
gc.collect()
|
|
try:
|
|
cp.bind(('', 80))
|
|
except:
|
|
cp.bind(('192.168.4.2', 80))
|
|
cp.listen(5)
|
|
while True:
|
|
conn, addr = cp.accept()
|
|
request = conn.recv(1024)
|
|
request = request.decode().split('\n',1)[0]
|
|
if '?' in request:
|
|
self.config = self.parse_request(request)
|
|
self.save_config()
|
|
conn.close()
|
|
cp.close()
|
|
self.ap.active(False)
|
|
del request, conn, addr, cp
|
|
gc.collect()
|
|
break
|
|
conn.send('HTTP/1.1 200 OK\r\n')
|
|
conn.send('Content-Type: text/html\r\n')
|
|
conn.send('Connection: close\r\n\r\n')
|
|
conn.sendall(self.html())
|
|
conn.close()
|
|
gc.collect()
|
|
reset()
|
|
|
|
|
|
def mqtt_callback(self, topic, msg):
|
|
"""
|
|
Handle mqtt callback. Activated when message received on any subscribed topics.
|
|
It evaluates if the msg is valid or not and processes/discards it accordingly.
|
|
Doesn't care too much with the topic :)
|
|
"""
|
|
msg = msg.decode()
|
|
try:
|
|
msg = int(msg)
|
|
except:
|
|
pass
|
|
if msg in list(self.modes.keys()) or (0 <= msg <= 100):
|
|
topic = topic.decode()
|
|
try:
|
|
if topic == self.pct_cmd_t:
|
|
if msg > 66:
|
|
preset = 3
|
|
elif msg > 33:
|
|
preset = 2
|
|
elif msg > 0:
|
|
preset = 1
|
|
else:
|
|
preset = 0
|
|
|
|
if msg > 0:
|
|
self.pwm.freq(min(int(round(float(msg)*2.2+80)), self.max_freq))
|
|
self.pwm.duty(512)
|
|
self.mqtt_client.publish(self.stat_t, 'ON')
|
|
self.mqtt_client.publish(self.pr_mode_stat_t, str(self.modes[preset]['preset']).encode())
|
|
|
|
else:
|
|
self.pwm.freq(1)
|
|
self.pwm.duty(0)
|
|
self.mqtt_client.publish(self.stat_t, 'OFF')
|
|
self.mqtt_client.publish(self.pr_mode_stat_t, str(self.modes[preset]['preset']).encode())
|
|
self.mqtt_client.publish(self.pct_stat_t, str(msg).encode())
|
|
else:
|
|
self.pwm.freq(self.modes[msg]['freq'])
|
|
self.pwm.duty(self.modes[msg]['duty'])
|
|
self.mqtt_client.publish(self.stat_t, str(self.modes[msg]['state']).encode())
|
|
self.mqtt_client.publish(self.pct_stat_t, str(self.modes[msg]['speed']).encode())
|
|
self.mqtt_client.publish(self.pr_mode_stat_t, str(self.modes[msg]['preset']).encode())
|
|
except:
|
|
pass
|
|
gc.collect()
|
|
|
|
|
|
def btn_callback(self, btn_state):
|
|
"""
|
|
Handle mqtt callback. Activated when button is pressed.
|
|
"""
|
|
self.pwm.freq(self.modes[btn_state]['freq'])
|
|
self.pwm.duty(self.modes[btn_state]['duty'])
|
|
self.mqtt_client.publish(self.stat_t, str(self.modes[btn_state]['state']).encode())
|
|
self.mqtt_client.publish(self.pct_stat_t, str(self.modes[btn_state]['speed']).encode())
|
|
self.mqtt_client.publish(self.pr_mode_stat_t, str(self.modes[btn_state]['preset']).encode())
|
|
|
|
|
|
def main(self, btn_state=0, btn_prev=0, attempts=5):
|
|
"""
|
|
This is the main (syncronous) function that check periodically...
|
|
...physical switch/button actions and mqtt msg on subscribed topics.
|
|
When valid cmd is received it activates callback functions.
|
|
"""
|
|
while True:
|
|
try:
|
|
self.mqtt_client.check_msg()
|
|
except:
|
|
if attempts:
|
|
self.mqtt_client.connect(False)
|
|
attempts -= 1
|
|
else:
|
|
pass
|
|
|
|
if self.device_type == 'fornuftig':
|
|
if not self.btn1.value():
|
|
btn_state = 1
|
|
elif not self.btn2.value():
|
|
btn_state = 2
|
|
elif not self.btn3.value():
|
|
btn_state = 3
|
|
else:
|
|
btn_state = 0
|
|
if btn_prev != btn_state:
|
|
print("state: " + str(btn_state))
|
|
self.btn_callback(btn_state)
|
|
btn_prev = btn_state
|
|
|
|
elif self.device_type == 'uppatvind':
|
|
if not self.btn.value():
|
|
btn_state = (btn_state + 1) % 4
|
|
print("state: " + str(btn_state))
|
|
self.btn_callback(btn_state)
|
|
time.sleep(0.5)
|
|
|
|
# Run
|
|
purifier = SmartAirPurifier()
|
|
purifier.connect_wifi()
|
|
purifier.connect_mqtt()
|
|
purifier.configure()
|
|
purifier.connect_ha()
|
|
purifier.main()
|
|
|