MENU
カタログクリップ
本ページはプロモーションを含みます。

PM2.5グラフをhttpサーバで表示する。SPS30 + Raspberry pi pico 2 Wを使って。

2025 9/04
広告
電子工作
2025年9月4日

微小粒子状物質センサーSPS30とRaspberry pi pico 2 Wで、PM2.5濃度を1602LCDに表示するとともに、pico 2 w上に簡易httpサーバーをたててこれまでのPM2.5濃度をグラフ表示するようにしました。

前回はSPS30をPCに直接繋いでグラフ表示をしていました。その続き。

あわせて読みたい
SensirionのSPS30粒子状物質(PM2.5)センサーを使ったメモ SensirionのSPS30という粒子状物質(PM2.5)センサーを使ったときのメモです。 センサー センサー類はセンシリオンがよい。性能が良くて、その割に安価。 センサー公称精…
目次

raspberry pi pico 2 Wをつかう

秋月で1375円税込み。アリエクよりやすい。

キュアメイドのテーブル…

はじめてのraspberry piです。アーキテクチャがわかりませんでしたが、初回にUSBマウントしてmicropythonのファームウェアを書き込みしておくと、起動時にmain.pyを探して実行してくれるらしい。

今2025/9ですが、当たり前ながら2025/7に発表のあったRP2350A4という修正版ではありませんでした。A2でした。A4はいつ流通するのでしょうか。1年後くらい?

1602はたまたま手元にあったLCM1602 (PCF8574T)を積んだI2C通信できるものです。ドライバは3.3Vで駆動しますが、1602液晶のほうが5V駆動です。でもモジュールとして一体になっていて剥がせないので、液晶も3.3Vで駆動させます。なので文字が薄いです。(そのうちグラフィカル液晶がアリエクから届くのでいまのところ。)

raspberry pi pico は通信(I/Oピン)は 3.3Vらしいです。SPS30は電源は5Vで、通信は5 Vでも3.3 Vでも可能。

raspberry piのUART0 (GPIO0/GPIO1)にSPS30、I2C SDA=GP4, SCL=GP5にLCD(PCF8574T)をつなぎました。

見にくいですが、PM2.5, PM1.0, PM10濃度がLCDに表示され、1秒ごとに更新されています。

httpサーバー

raspberry pi pico 2 wを自宅wifiにつなぎ、pico上でhttpサーバーを起動します。パソコンなどから、picoのipアドレス(例えば: 192.168.1.123)に接続します。すると下画像のようなグラフが表示されます。

サーバーでは、最上部に現在値のPM値を表示し、グラフで直近1日(1分平均)、7日(5分平均)、1ヶ月(30分平均)、1年(2時間平均)を表示するようにしました。表示は自動で更新されます。また、各グラフのデータをcsvでダウンロードできます。データはフラッシュに保存され、次回しばらくしてから測定を再開しても過去のデータが表示されるようになっています。

ソースコード

micro pythonコードは次のようにしました。

# pico2w_sps30_persist_debug_v6k.py
# 簡素永続化(.tmp→renameのみ)。初回は強制保存。
# 1分平均(1日), 5分平均(7日), 30分平均(30日), 2時間平均(1年)
from machine import UART, I2C, Pin
import struct, time, socket, network, ujson, os, gc
from array import array

# ===== 設定 =====
SSID, PASS = "YOUR_SSID", "YOUR_PASSWORD"
HTTP_PORT = 80
JST_OFFSET = 9*3600
DEBUG = True
def dbg(*a):
    if DEBUG:
        try: print(*a)
        except: pass

VERBOSE_WIFI = DEBUG
HEADER = b"SPS30STv6k\0"
OLD_HEADERS = (b"SPS30STv6j\0", b"SPS30STv6i\0", b"SPS30STv6h\0")
MIN_VALID_MIN = 1672531200 // 60  # 2023-01-01Z

# I2C LCD
I2C_ID, SDA_PIN, SCL_PIN, I2C_FREQ = 0, 4, 5, 100_000
LCD_ADDR = 0x27

# UART(SPS30)
uart = UART(0, baudrate=115200, bits=8, parity=None, stop=1, tx=Pin(0), rx=Pin(1), timeout=100)

# 永続化
STATE_FILE = "/sps30_state.bin"
STATE_TMP  = "/sps30_state.tmp"
SAVE_MIN_INTERVAL_MS = 180_000
_last_state_save_ms = 0
loaded_ok = False
BOOT_TS = time.ticks_ms()

def pad16(s): return (s + " " * 16)[:16]

# ===== LCD =====
class I2cLcd1602:
    def __init__(self, i2c, addr, cols=16, rows=2):
        self.i2c, self.addr = i2c, addr
        self.bl = 0x08
        time.sleep_ms(40)
        for _ in range(3): self._w4(0x30,0); time.sleep_ms(5)
        self._w4(0x20,0); self.cmd(0x28); self.cmd(0x0C); self.cmd(0x06); self.clear()
    def _w4(self, data, rs):
        d = (data & 0xF0) | self.bl | rs
        self.i2c.writeto(self.addr, bytes([d | 0x04])); time.sleep_us(500)
        self.i2c.writeto(self.addr, bytes([d & ~0x04])); time.sleep_us(50)
    def cmd(self, c): self._w4(c,0); self._w4(c<<4,0)
    def ch(self, b): self._w4(b,1); self._w4(b<<4,1)
    def write(self, s):
        for c in s: self.ch(c if isinstance(c,int) else ord(c))
    def locate(self,c,r): self.cmd(0x80 | (0x40*r + c))
    def clear(self): self.cmd(0x01); time.sleep_ms(2)

# ===== LCDトースト =====
TOAST_MS = 2000
toast_until = 0
toast_l1 = toast_l2 = ""
last_pm0 = last_pm1 = ""
def toast(l1, l2="", ms=TOAST_MS):
    global toast_until, toast_l1, toast_l2
    toast_l1, toast_l2 = pad16(l1), pad16(l2)
    toast_until = time.ticks_add(time.ticks_ms(), ms)
    dbg("TOAST:", l1, "|", l2)
def toast_active(): return time.ticks_diff(toast_until, time.ticks_ms()) > 0
def show_pm(pm1, pm25, pm10):
    global last_pm0, last_pm1
    l0 = pad16("PM2.5 Now:{:5.2f}".format(pm25))
    l1 = pad16("1:{:5.2f} 10:{:5.2f}".format(pm1, pm10))
    if not toast_active():
        if l0 != last_pm0: lcd.locate(0,0); lcd.write(l0); last_pm0 = l0
        if l1 != last_pm1: lcd.locate(0,1); lcd.write(l1); last_pm1 = l1

# ===== SPS30 (UART/SHDLC float) =====
START, ESC, XON, XOFF = 0x7E, 0x7D, 0x11, 0x13
ADR = 0x00
CMD_START, CMD_STOP, CMD_READ = 0x00, 0x01, 0x03
def _stuff(b): return bytes([ESC, b ^ 0x20]) if b in (START,ESC,XON,XOFF) else bytes([b])
def _unstuff(bs):
    out=bytearray(); it=iter(bs)
    for x in it:
        if x==ESC:
            y=next(it,None)
            if y is None: break
            out.append(y^0x20)
        else:
            out.append(x)
    return bytes(out)
def _chk(payload): return (~(sum(payload)&0xFF)) & 0xFF
def _frame(cmd,data=b''):
    core=bytes([ADR,cmd,len(data)])+data; chk=_chk(core)
    st=b''.join(_stuff(x) for x in core+bytes([chk]))
    return bytes([START])+st+bytes([START])
def _wr(cmd,data=b''): uart.write(_frame(cmd,data))
def _rd(timeout_ms=150):
    t0 = time.ticks_ms()
    while True:
        if uart.any() and uart.read(1)==bytes([START]): break
        if time.ticks_diff(time.ticks_ms(), t0) > timeout_ms: return None
    buf = bytearray()
    while True:
        b = uart.read(1)
        if not b:
            if time.ticks_diff(time.ticks_ms(), t0) > timeout_ms: return None
            continue
        if b[0]==START: break
        buf.extend(b)
    raw = _unstuff(buf)
    if len(raw)<5: return None
    adr,cmd,state,L = raw[0],raw[1],raw[2],raw[3]
    data,chk = raw[4:-1],raw[-1]
    if adr!=ADR or _chk(raw[:-1])!=chk or state!=0x00 or len(data)!=L: return None
    return cmd,data
def sps30_start_float(): _wr(CMD_START, bytes([0x01,0x03])); _rd(300); dbg("SPS30 start float")
def sps30_read():
    _wr(CMD_READ); r=_rd(300)
    if not r: return None
    cmd,data = r
    if cmd!=CMD_READ or len(data)<40: return None
    f = struct.unpack('>10f', data[:40])
    return {"pm1":f[0], "pm25":f[1], "pm4":f[2], "pm10":f[3], "tps":f[9]}

# ===== 30秒平均 =====
class AvgBuf:
    def __init__(self, cap):
        self.cap = cap
        self.buf = array('f', [0.0]*cap)
        self.n = 0
    def add(self, v):
        if self.n < self.cap:
            self.buf[self.n] = v; self.n += 1
        else:
            for i in range(self.cap-1): self.buf[i] = self.buf[i+1]
            self.buf[self.cap-1] = v
    def avg(self):
        if self.n == 0: return float('nan')
        s = 0.0
        for i in range(self.n): s += self.buf[i]
        return s / self.n
avg_pm1, avg_pm25, avg_pm4, avg_pm10, avg_tps = (AvgBuf(30) for _ in range(5))

# ===== リング仕様 =====
N1D = 24*60       # 1440
N7D5 = 7*24*12    # 2016
N30D30 = 30*24*2  # 1440
N2H = 365*12      # 4380

labels1m = array('I', [0]*N1D)
pm1_1m  = array('f', [0.0]*N1D)
pm25_1m = array('f', [0.0]*N1D)
pm4_1m  = array('f', [0.0]*N1D)
pm10_1m = array('f', [0.0]*N1D)
tps_1m  = array('f', [0.0]*N1D)
idx1m = 0; cnt1m = 0

labels5 = array('I', [0]*N7D5)
pm1_5  = array('f', [0.0]*N7D5)
pm25_5 = array('f', [0.0]*N7D5)
pm4_5  = array('f', [0.0]*N7D5)
pm10_5 = array('f', [0.0]*N7D5)
tps_5  = array('f', [0.0]*N7D5)
idx5 = 0; cnt5 = 0
sum5 = array('f', [0.0]*5)
win5 = array('f', [0.0]*25)   # 5×5
i5 = 0; filled5 = 0

labels30m = array('I', [0]*N30D30)
pm1_30  = array('f', [0.0]*N30D30)
pm25_30 = array('f', [0.0]*N30D30)
pm4_30  = array('f', [0.0]*N30D30)
pm10_30 = array('f', [0.0]*N30D30)
tps_30  = array('f', [0.0]*N30D30)
idx30 = 0; cnt30 = 0
sum30 = array('f', [0.0]*5)
win30 = array('f', [0.0]*150) # 30×5
i30 = 0; filled30 = 0

SCALE1Y = 100
def enc1y(v):
    if v != v: return 0
    x = int(round(max(0.0, v)*SCALE1Y)); return 65535 if x>65535 else x
def dec1y(x): return round(x / SCALE1Y, 2)

base2h_min = 0
labels2h_off = array('H', [0]*N2H)
pm1_2h  = array('H', [0]*N2H)
pm25_2h = array('H', [0]*N2H)
pm4_2h  = array('H', [0]*N2H)
pm10_2h = array('H', [0]*N2H)
tps_2h  = array('H', [0]*N2H)
idx2h = 0; cnt2h = 0
sum120 = array('f', [0.0]*5)
win120 = array('f', [0.0]*600)  # 120×5
i120 = 0; filled120 = 0

# ===== 永続化 =====
def _exists(p):
    try: os.stat(p); return True
    except OSError: return False

def _arr_write(f, arr):
    mv = memoryview(arr)
    try:
        mv = mv.cast('B')
    except:
        f.write(bytes(arr)); return
    off = 0; n = len(mv)
    while off < n:
        end = off + 4096
        f.write(mv[off:end]); off = end

def _arr_read_full(f, arr, itemsize):
    mv = memoryview(arr)
    try:
        mv = mv.cast('B')
    except:
        need = len(arr)*itemsize
        b = f.read(need)
        if b is None or len(b) != need: raise OSError("short read")
        arr[:] = array({1:'B',2:'H',4:'I'}.get(itemsize,'B'), b)
        return
    need = len(mv)
    got = 0
    while got < need:
        r = f.readinto(mv[got:])
        if not r: raise OSError("short read")
        got += r

def _expected_size():
    win_total   = (5*4)+(25*4)+(5*4)+(150*4)+(5*4)+(600*4)
    labels_total= (N1D*4)+(N7D5*4)+(N30D30*4)+(N2H*2)
    values_total= (5*N1D*4)+(5*N7D5*4)+(5*N30D30*4)+(5*N2H*2)
    return len(HEADER) + 48 + 8 + 4 + win_total + labels_total + values_total

def _try_load_from(path):
    global idx1m,cnt1m,idx5,cnt5,idx30,cnt30,idx2h,cnt2h
    global i5,filled5,i30,filled30,i120,filled120,base2h_min
    if not _exists(path): return False
    sz = os.stat(path)[6]; exp = _expected_size()
    if sz != exp:
        dbg("SIZE NG", path, sz, "!=", exp); return False
    with open(path,"rb") as f:
        sig = f.read(len(HEADER))
        if sig != HEADER:
            ok=False
            for h in OLD_HEADERS:
                f.seek(0)
                if f.read(len(h)) == h: ok=True; dbg("legacy header", h); break
            if not ok: dbg("HEADER NG"); return False
        (idx1m, cnt1m,
         idx5,  cnt5,
         idx30, cnt30,
         idx2h, cnt2h,
         i5, filled5,
         i30, filled30) = struct.unpack("<12I", f.read(48))
        i120, filled120 = struct.unpack("<2I", f.read(8))
        base2h_min, = struct.unpack("<I", f.read(4))
        for arr,sz,name in (
           (sum5,4,"sum5"),(win5,4,"win5"),(sum30,4,"sum30"),(win30,4,"win30"),(sum120,4,"sum120"),(win120,4,"win120"),
           (labels1m,4,"labels1m"),(labels5,4,"labels5"),(labels30m,4,"labels30m"),(labels2h_off,2,"labels2h_off"),
           (pm1_1m,4,"pm1_1m"),(pm25_1m,4,"pm25_1m"),(pm4_1m,4,"pm4_1m"),(pm10_1m,4,"pm10_1m"),(tps_1m,4,"tps_1m"),
           (pm1_5,4,"pm1_5"),(pm25_5,4,"pm25_5"),(pm4_5,4,"pm4_5"),(pm10_5,4,"pm10_5"),(tps_5,4,"tps_5"),
           (pm1_30,4,"pm1_30"),(pm25_30,4,"pm25_30"),(pm4_30,4,"pm4_30"),(pm10_30,4,"pm10_30"),(tps_30,4,"tps_30"),
           (pm1_2h,2,"pm1_2h"),(pm25_2h,2,"pm25_2h"),(pm4_2h,2,"pm4_2h"),(pm10_2h,2,"pm10_2h"),(tps_2h,2,"tps_2h"),
        ):
            _arr_read_full(f, arr, sz)
            dbg("LOAD", name, "OK")
    dbg("LOAD OK from", path, "idx/cnt:", idx1m,cnt1m, idx5,cnt5, idx30,cnt30, idx2h,cnt2h)
    return True

def save_state(force=False):
    global _last_state_save_ms
    now = time.ticks_ms()

    # 既存ファイルがある場合のみクールダウンを適用
    if (not force) and _exists(STATE_FILE) and time.ticks_diff(now, BOOT_TS) < 180000:
        dbg("SAVE skip: boot cooldown (file exists)"); 
        return

    # 読み込み未完了かつ既存があるなら上書き禁止
    if (not force) and (not loaded_ok) and _exists(STATE_FILE):
        dbg("SAVE skip: loaded_ok=False and file exists"); 
        return

    if (not force) and time.ticks_diff(now, _last_state_save_ms) < SAVE_MIN_INTERVAL_MS:
        return

    try:
        with open(STATE_TMP, "wb") as f:
            f.write(HEADER)
            f.write(struct.pack("<12I",
                idx1m, cnt1m, idx5, cnt5, idx30, cnt30, idx2h, cnt2h,
                i5, filled5, i30, filled30
            ))
            f.write(struct.pack("<2I", i120, filled120))
            f.write(struct.pack("<I", base2h_min))
            for arr in (sum5,win5,sum30,win30,sum120,win120,
                        labels1m,labels5,labels30m,labels2h_off,
                        pm1_1m,pm25_1m,pm4_1m,pm10_1m,tps_1m,
                        pm1_5,pm25_5,pm4_5,pm10_5,tps_5,
                        pm1_30,pm25_30,pm4_30,pm10_30,tps_30,
                        pm1_2h,pm25_2h,pm4_2h,pm10_2h,tps_2h):
                _arr_write(f, arr)
        try:
            try: os.remove(STATE_FILE)
            except OSError: pass
            os.rename(STATE_TMP, STATE_FILE)
            _last_state_save_ms = now
            dbg("SAVE OK; exists:", _exists(STATE_FILE), "size:", os.stat(STATE_FILE)[6])
        except OSError as e:
            dbg("SAVE rename err:", e)
    except Exception as e:
        dbg("SAVE err:", e)

def load_state():
    global loaded_ok
    loaded_ok = False
    ok = _try_load_from(STATE_FILE)
    if not ok: ok = _try_load_from(STATE_TMP)
    loaded_ok = ok
    if _exists(STATE_FILE):
        dbg("STATE exists:", True, "size:", os.stat(STATE_FILE)[6], "loaded_ok:", loaded_ok)
    else:
        dbg("STATE exists:", False)

# ===== Wi-Fi + NTP =====
wifi_err_msg = ""; _last_wifi_try = 0
time_synced = False; ntp_err_msg = ""; _last_ntp_try = 0
_prev_wifi = None; _prev_ntp = None
def wifi_status_text(st):
    table = {0:"idle",1:"connecting",2:"got_ip",3:"got_ip",-1:"connect_fail",-2:"no_ap",-3:"bad_pass"}
    return table.get(st, "unknown({})".format(st))
def wifi_connect_once(timeout_ms=15000):
    global wifi_err_msg, _prev_wifi
    wlan = network.WLAN(network.STA_IF); wlan.active(True)
    try:
        if not wlan.isconnected():
            toast("WiFi CONNECT..", SSID[:16]); wlan.connect(SSID, PASS)
        t0 = time.ticks_ms()
        while not wlan.isconnected() and time.ticks_diff(time.ticks_ms(), t0) < timeout_ms:
            time.sleep_ms(200)
        st = wlan.status()
        if wlan.isconnected():
            wifi_err_msg = ""
            if VERBOSE_WIFI: dbg("WiFi OK:", wlan.ifconfig())
            if _prev_wifi is not True: toast("WiFi OK", wlan.ifconfig()[0])
            _prev_wifi = True; return wlan
        else:
            wifi_err_msg = "WiFi err: " + wifi_status_text(st); dbg(wifi_err_msg)
            if _prev_wifi is not False: toast("WiFi ERROR", wifi_status_text(st)[:16])
            _prev_wifi = False; return wlan
    except Exception as e:
        wifi_err_msg = "WiFi exc: {}".format(repr(e)); dbg(wifi_err_msg)
        if _prev_wifi is not False: toast("WiFi EXC", repr(e)[:16])
        _prev_wifi = False; return wlan
def ensure_wifi(period_ms=10_000):
    global _last_wifi_try
    now = time.ticks_ms()
    if time.ticks_diff(now, _last_wifi_try) < period_ms:
        return network.WLAN(network.STA_IF)
    _last_wifi_try = now; return wifi_connect_once()
def is_time_synced():
    try: return time.localtime()[0] >= 2023
    except: return False
def attempt_ntp_if_needed(period_ms=30_000):
    global time_synced, ntp_err_msg, _last_ntp_try, _prev_ntp
    if time_synced or not network.WLAN(network.STA_IF).isconnected(): return
    now = time.ticks_ms()
    if time.ticks_diff(now, _last_ntp_try) < period_ms: return
    _last_ntp_try = now
    try:
        toast("NTP SYNC...", ""); import ntptime; ntptime.settime()
        if is_time_synced():
            time_synced = True; ntp_err_msg=""; dbg("NTP OK")
            if _prev_ntp is not True: toast("NTP OK",""); _prev_ntp=True
        else:
            ntp_err_msg="NTP no-sync"; dbg(ntp_err_msg)
            if _prev_ntp is not False: toast("NTP ERROR","no-sync"); _prev_ntp=False
    except Exception as e:
        ntp_err_msg=repr(e); dbg("NTP err:", ntp_err_msg)
        if _prev_ntp is not False: toast("NTP ERROR", repr(e)[:16]); _prev_ntp=False

# ===== HTTP =====
def _sendall(conn, buf):
    mv = memoryview(buf)
    while len(mv):
        try: n = conn.send(mv)
        except OSError: return
        if not n: continue
        mv = mv[n:]

HTML = """<!doctype html><meta charset="utf-8">
<title>SPS30 Monitor</title><meta name="viewport" content="width=device-width,initial-scale=1">
<script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
<style>
body{font-family:sans-serif;margin:16px}
#cards{display:grid;grid-template-columns:repeat(auto-fit,minmax(160px,1fr));gap:8px}
.card{padding:8px;border:1px solid #ddd;border-radius:8px}
h2{margin:8px 0} section{margin-top:16px} canvas{max-height:320px}
a.btn{display:inline-block;margin-right:8px}
</style>
<h2>SPS30</h2>
<div id="cards">
  <div class="card">PM1.0: <b id="pm1">-</b> µg/m³</div>
  <div class="card">PM2.5: <b id="pm25">-</b> µg/m³</div>
  <div class="card">PM4.0: <b id="pm4">-</b> µg/m³</div>
  <div class="card">PM10: <b id="pm10">-</b> µg/m³</div>
  <div class="card">Typical Size: <b id="tps">-</b> µm</div>
  <div class="card">
    <a class="btn" href="/csv1d">CSV 1d</a>
    <a class="btn" href="/csv7d">CSV 7d</a>
    <a class="btn" href="/csv30d">CSV 30d</a>
    <a class="btn" href="/csv1y">CSV 1y</a>
  </div>
</div>
<section><h3>1日(<b>1分平均</b>)</h3><canvas id="c1d"></canvas></section>
<section><h3>7日(<b>5分平均</b>)</h3><canvas id="c7d"></canvas></section>
<section><h3>1か月(<b>30分平均</b>)</h3><canvas id="c30d"></canvas></section>
<section><h3>1年(<b>2時間平均</b>)</h3><canvas id="c1y"></canvas></section>
<script>
function card(v){ return (v==null||isNaN(v))?'-':Number(v).toFixed(2); }
let C=null;
function ensureCharts(){
  if(!window.Chart) return false;
  if(C) return true;
  function mk(id){
    return new Chart(document.getElementById(id).getContext('2d'),{
      type:'line',
      data:{labels:[],datasets:[
        {label:'PM1.0',data:[],spanGaps:true},
        {label:'PM2.5',data:[],spanGaps:true},
        {label:'PM4.0',data:[],spanGaps:true},
        {label:'PM10',data:[],spanGaps:true},
        {label:'Typical Size (µm)',data:[],spanGaps:true,yAxisID:'y2'}
      ]},
      options:{responsive:true,animation:false,interaction:{mode:'nearest',intersect:false},
        scales:{y:{title:{display:true,text:'µg/m³'},beginAtZero:true},
                y2:{position:'right',grid:{drawOnChartArea:false},title:{display:true,text:'µm'},beginAtZero:true}},
        plugins:{legend:{position:'bottom'}}
      }
    });
  }
  try{ C={c1d:mk('c1d'), c7d:mk('c7d'), c30d:mk('c30d'), c1y:mk('c1y')}; return true; }catch(e){ C=null; return false; }
}
async function updateCards(){
  const now = await (await fetch('/now',{cache:'no-store'})).json();
  document.getElementById('pm1').textContent  = card(now.pm1);
  document.getElementById('pm25').textContent = card(now.pm25);
  document.getElementById('pm4').textContent  = card(now.pm4);
  document.getElementById('pm10').textContent = card(now.pm10);
  document.getElementById('tps').textContent  = card(now.tps);
}
async function updateChart(ch,url){
  const d = await (await fetch(url,{cache:'no-store'})).json();
  ch.data.labels=d.labels;
  ch.data.datasets[0].data=d.pm1;
  ch.data.datasets[1].data=d.pm25;
  ch.data.datasets[2].data=d.pm4;
  ch.data.datasets[3].data=d.pm10;
  ch.data.datasets[4].data=d.tps;
  ch.update('none');
}
async function tick(){
  try{ await updateCards(); }catch(e){}
  if(ensureCharts()){
    try{ await updateChart(C.c1d,'/data1d'); }catch(e){}
    try{ await updateChart(C.c7d,'/data7d'); }catch(e){}
    try{ await updateChart(C.c30d,'/data30d'); }catch(e){}
    try{ await updateChart(C.c1y,'/data1y'); }catch(e){}
  }
}
tick(); setInterval(tick,15000);
</script>"""

def http_listen():
    s = socket.socket()
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind(("0.0.0.0", HTTP_PORT))
    s.listen(4); s.settimeout(0)
    dbg("HTTP listening on", HTTP_PORT)
    return s

def http_listen_and_reply(conn, path):
    def _reply(body, ctype=b"text/html; charset=utf-8"):
        head = b"HTTP/1.1 200 OK\r\nContent-Type: " + ctype + \
               b"\r\nConnection: close\r\nContent-Length: %d\r\n\r\n" % len(body)
        _sendall(conn, head + body)
    if path == b"/now":
        b = ujson.dumps({
            "pm1": None if curr_now["pm1"] is None else round(curr_now["pm1"],2),
            "pm25":None if curr_now["pm25"] is None else round(curr_now["pm25"],2),
            "pm4": None if curr_now["pm4"] is None else round(curr_now["pm4"],2),
            "pm10":None if curr_now["pm10"] is None else round(curr_now["pm10"],2),
            "tps": None if curr_now["tps"] is None else round(curr_now["tps"],2),
        }).encode()
        _reply(b, b"application/json"); return
    if path == b"/data1d":  _reply(ujson.dumps(build_payload_1d()).encode(), b"application/json"); return
    if path == b"/data7d":  _reply(ujson.dumps(build_payload_7d()).encode(), b"application/json"); return
    if path == b"/data30d": _reply(ujson.dumps(build_payload_30d()).encode(), b"application/json"); return
    if path == b"/data1y":  _reply(ujson.dumps(build_payload_1y()).encode(), b"application/json"); return
    if path in (b"/csv1d", b"/csv7d", b"/csv30d", b"/csv1y"):
        try:
            fn = "/sps30.csv"
            if path == b"/csv1d":  write_csv_1d(fn)
            elif path == b"/csv7d": write_csv_7d(fn)
            elif path == b"/csv30d":write_csv_30d(fn)
            else:                  write_csv_1y(fn)
            sz = os.stat(fn)[6]
            dbg("CSV send", path, "size", sz)
            head = b"HTTP/1.1 200 OK\r\nContent-Type: text/csv\r\n" \
                   b"Content-Disposition: attachment; filename=\"sps30.csv\"\r\n" \
                   b"Connection: close\r\nContent-Length: %d\r\n\r\n" % sz
            _sendall(conn, head)
            with open(fn, "rb") as f:
                while True:
                    b = f.read(1024)
                    if not b: break
                    _sendall(conn, b)
        except Exception as e:
            msg = ("error,"+repr(e)).encode()
            err = b"HTTP/1.1 500 Internal Server Error\r\nContent-Type: text/plain\r\n" \
                  b"Connection: close\r\nContent-Length: %d\r\n\r\n" % len(msg)
            _sendall(conn, err + msg)
        return
    _reply(HTML.encode(), b"text/html; charset=utf-8")

def http_serve_once(s):
    try:
        conn, addr = s.accept()
    except OSError:
        return
    try:
        conn.settimeout(2)
        deadline = time.ticks_add(time.ticks_ms(), 1500)
        req = b""
        while b"\r\n\r\n" not in req and len(req) < 4096:
            if time.ticks_diff(deadline, time.ticks_ms()) <= 0: break
            try: chunk = conn.recv(512)
            except OSError: break
            if not chunk: time.sleep_ms(10); continue
            req += chunk
        path = b"/"
        if req:
            first = req.split(b"\r\n",1)[0]; sp = first.split()
            if len(sp) >= 2: path = sp[1]
        dbg("HTTP", addr, path)
        http_listen_and_reply(conn, path)
    except OSError as e:
        dbg("HTTP err:", e)
    finally:
        try: conn.close()
        except: pass

# ===== JSTラベル/日時 =====
def fmt_label_from_min(mins):
    try:
        t = time.localtime(mins*60 + JST_OFFSET)
        return "{:02d}/{:02d} {:02d}:{:02d}".format(t[1],t[2],t[3],t[4])
    except:
        return str(mins)
def fmt_dt_from_min(mins):
    t = time.localtime(mins*60 + JST_OFFSET)
    return "{:04d}-{:02d}-{:02d} {:02d}:{:02d}".format(t[0],t[1],t[2],t[3],t[4])

# ===== ペイロード =====
def _append_if_valid(dstL, dsts, lab_min, vals):
    if lab_min >= MIN_VALID_MIN:
        dstL.append(fmt_label_from_min(lab_min))
        for a,v in zip(dsts, vals): a.append(round(v,2))
def build_payload_1d():
    n = min(cnt1m, N1D)
    if n<=0: return {"labels":[],"pm1":[],"pm25":[],"pm4":[],"pm10":[],"tps":[]}
    start = (idx1m - n) % N1D
    L,P1,P25,P4,P10,TPS = [],[],[],[],[],[]
    for i in range(n):
        k = (start + i) % N1D
        _append_if_valid(L, (P1,P25,P4,P10,TPS), labels1m[k],
                         (pm1_1m[k], pm25_1m[k], pm4_1m[k], pm10_1m[k], tps_1m[k]))
    return {"labels":L,"pm1":P1,"pm25":P25,"pm4":P4,"pm10":P10,"tps":TPS}
def _build_payload_generic(n, idx, labels, arrs):
    if n<=0: return {"labels":[],"pm1":[],"pm25":[],"pm4":[],"pm10":[],"tps":[]}
    start = (idx - n) % len(labels)
    L,P1,P25,P4,P10,TPS = [],[],[],[],[],[]
    for i in range(n):
        k = (start + i) % len(labels)
        _append_if_valid(L, (P1,P25,P4,P10,TPS), labels[k],
                         (arrs[0][k], arrs[1][k], arrs[2][k], arrs[3][k], arrs[4][k]))
    return {"labels":L,"pm1":P1,"pm25":P25,"pm4":P4,"pm10":P10,"tps":TPS}
def build_payload_7d():   return _build_payload_generic(cnt5, idx5, labels5, (pm1_5, pm25_5, pm4_5, pm10_5, tps_5))
def build_payload_30d():  return _build_payload_generic(min(cnt30, N30D30), idx30, labels30m, (pm1_30, pm25_30, pm4_30, pm10_30, tps_30))
def build_payload_1y():
    n = cnt2h
    if n<=0: return {"labels":[],"pm1":[],"pm25":[],"pm4":[],"pm10":[],"tps":[]}
    start = (idx2h - n) % N2H
    L,P1,P25,P4,P10,TPS = [],[],[],[],[],[]
    for i in range(n):
        k = (start + i) % N2H
        lab_min = base2h_min + 120*labels2h_off[k]
        if lab_min >= MIN_VALID_MIN:
            L.append(fmt_label_from_min(lab_min))
            P1.append(dec1y(pm1_2h[k]));  P25.append(dec1y(pm25_2h[k]))
            P4.append(dec1y(pm4_2h[k]));  P10.append(dec1y(pm10_2h[k])); TPS.append(dec1y(tps_2h[k]))
    return {"labels":L,"pm1":P1,"pm25":P25,"pm4":P4,"pm10":P10,"tps":TPS}

# ===== CSV =====
def _csv_header(f): f.write("datetime,pm1,pm2_5,pm4,pm10,typical_size\n")
def write_csv_1d(path):
    with open(path,"w") as f:
        _csv_header(f); n = min(cnt1m, N1D)
        if n<=0: return
        start = (idx1m - n) % N1D
        for i in range(n):
            k = (start + i) % N1D; lab = labels1m[k]
            if lab < MIN_VALID_MIN: continue
            f.write("{},{:.2f},{:.2f},{:.2f},{:.2f},{:.2f}\n".format(fmt_dt_from_min(lab), pm1_1m[k], pm25_1m[k], pm4_1m[k], pm10_1m[k], tps_1m[k]))
def write_csv_7d(path):
    with open(path,"w") as f:
        _csv_header(f); n = cnt5
        if n<=0: return
        start = (idx5 - n) % N7D5
        for i in range(n):
            k = (start + i) % N7D5; lab = labels5[k]
            if lab < MIN_VALID_MIN: continue
            f.write("{},{:.2f},{:.2f},{:.2f},{:.2f},{:.2f}\n".format(fmt_dt_from_min(lab), pm1_5[k], pm25_5[k], pm4_5[k], pm10_5[k], tps_5[k]))
def write_csv_30d(path):
    with open(path,"w") as f:
        _csv_header(f); n = min(cnt30, N30D30)
        if n<=0: return
        start = (idx30 - n) % N30D30
        for i in range(n):
            k = (start + i) % N30D30; lab = labels30m[k]
            if lab < MIN_VALID_MIN: continue
            f.write("{},{:.2f},{:.2f},{:.2f},{:.2f},{:.2f}\n".format(fmt_dt_from_min(lab), pm1_30[k], pm25_30[k], pm4_30[k], pm10_30[k], tps_30[k]))
def write_csv_1y(path):
    with open(path,"w") as f:
        _csv_header(f); n = cnt2h
        if n<=0: return
        start = (idx2h - n) % N2H
        for i in range(n):
            k = (start + i) % N2H
            lab = base2h_min + 120*labels2h_off[k]
            if lab < MIN_VALID_MIN: continue
            f.write("{},{:.2f},{:.2f},{:.2f},{:.2f},{:.2f}\n".format(
                fmt_dt_from_min(lab), dec1y(pm1_2h[k]), dec1y(pm25_2h[k]), dec1y(pm4_2h[k]), dec1y(pm10_2h[k]), dec1y(tps_2h[k])
            ))

# ===== 初期化 =====
def fs_free_kb():
    try:
        bsize, frsize, blocks, bfree, bavail, *_ = os.statvfs("/")
        return (bavail * frsize) // 1024
    except: return -1

i2c = I2C(I2C_ID, sda=Pin(SDA_PIN), scl=Pin(SCL_PIN), freq=I2C_FREQ)
lcd = I2cLcd1602(i2c, LCD_ADDR)
lcd.locate(0,0); lcd.write(pad16("Booting..."))
lcd.locate(0,1); lcd.write(pad16(""))

dbg("=== SPS30 app start ===")
dbg("FS free ~{} KB".format(fs_free_kb()))
sps30_start_float(); time.sleep(3)
load_state()
wlan = wifi_connect_once()
srv = http_listen()

curr_now = {"pm1":None, "pm25":None, "pm4":None, "pm10":None, "tps":None}
last_min_seen = -1

# ===== メインループ =====
while True:
    t0 = time.ticks_ms()

    wlan = ensure_wifi()
    if wlan.isconnected(): attempt_ntp_if_needed()

    m = sps30_read()
    if m:
        show_pm(m["pm1"], m["pm25"], m["pm10"])
        curr_now.update(m)
        avg_pm1.add(m["pm1"]); avg_pm25.add(m["pm25"]); avg_pm4.add(m["pm4"]); avg_pm10.add(m["pm10"]); avg_tps.add(m["tps"])

    if time_synced and m:
        mnow = time.time() // 60
        if mnow != last_min_seen:
            last_min_seen = mnow
            a1,a25,a4,a10,atps = avg_pm1.avg(), avg_pm25.avg(), avg_pm4.avg(), avg_pm10.avg(), avg_tps.avg()

            # 1分
            labels1m[idx1m] = int(mnow)
            pm1_1m[idx1m], pm25_1m[idx1m], pm4_1m[idx1m], pm10_1m[idx1m], tps_1m[idx1m] = a1,a25,a4,a10,atps
            dbg("WRITE 1m idx={},t={}".format(idx1m, labels1m[idx1m]))
            idx1m = (idx1m + 1) % N1D
            if cnt1m < N1D: cnt1m += 1

            # ファイル未作成なら初回強制保存
            if not _exists(STATE_FILE):
                dbg("FIRST SAVE (no file yet)")
                save_state(force=True)

            # 5分
            for j,val in enumerate((a1,a25,a4,a10,atps)):
                off = i5*5 + j; sum5[j] += val - win5[off]; win5[off] = val
            i5 = (i5 + 1) % 5
            if filled5 < 5: filled5 += 1
            if filled5 == 5 and i5 == 0:
                labels5[idx5] = int(mnow)
                pm1_5[idx5]  = sum5[0]/5.0; pm25_5[idx5] = sum5[1]/5.0; pm4_5[idx5] = sum5[2]/5.0; pm10_5[idx5] = sum5[3]/5.0; tps_5[idx5] = sum5[4]/5.0
                dbg("WRITE 5m idx={},t={}".format(idx5, labels5[idx5]))
                idx5 = (idx5 + 1) % N7D5
                if cnt5 < N7D5: cnt5 += 1

            # 30分
            for j,val in enumerate((a1,a25,a4,a10,atps)):
                off = i30*5 + j; sum30[j] += val - win30[off]; win30[off] = val
            i30 = (i30 + 1) % 30
            if filled30 < 30: filled30 += 1
            if filled30 == 30 and i30 == 0:
                labels30m[idx30] = int(mnow)
                pm1_30[idx30]  = sum30[0]/30.0; pm25_30[idx30] = sum30[1]/30.0; pm4_30[idx30] = sum30[2]/30.0; pm10_30[idx30] = sum30[3]/30.0; tps_30[idx30] = sum30[4]/30.0
                dbg("WRITE 30m idx={},t={}".format(idx30, labels30m[idx30]))
                idx30 = (idx30 + 1) % N30D30
                if cnt30 < N30D30: cnt30 += 1

            # 2時間
            for j,val in enumerate((a1,a25,a4,a10,atps)):
                off = i120*5 + j; sum120[j] += val - win120[off]; win120[off] = val
            i120 = (i120 + 1) % 120
            if filled120 < 120: filled120 += 1
            if filled120 == 120 and i120 == 0:
                end_min = int(mnow)
                if cnt2h == 0: base2h_min = end_min
                labels2h_off[idx2h] = (end_min - base2h_min) // 120
                pm1_2h[idx2h]  = enc1y(sum120[0]/120.0); pm25_2h[idx2h] = enc1y(sum120[1]/120.0)
                pm4_2h[idx2h]  = enc1y(sum120[2]/120.0); pm10_2h[idx2h] = enc1y(sum120[3]/120.0); tps_2h[idx2h] = enc1y(sum120[4]/120.0)
                dbg("WRITE 2h idx={},off={},base={}".format(idx2h, labels2h_off[idx2h], base2h_min))
                idx2h = (idx2h + 1) % N2H
                if cnt2h < N2H: cnt2h += 1

            save_state(False)

    for _ in range(3): http_serve_once(srv)
    if toast_active(): lcd.locate(0,0); lcd.write(toast_l1); lcd.locate(0,1); lcd.write(toast_l2)

    dt = time.ticks_diff(time.ticks_ms(), t0)
    if dt < 1000: time.sleep_ms(1000 - dt)
電子工作

関連記事

  • SensirionのSPS30粒子状物質(PM2.5)センサーを使ったメモ
    2025年9月2日
  • 購入前に知っておきたいUSB接続・PC接続型デジタルオシロスコープの利点と欠点。スタンドアロン型との違いは。
    2018年8月9日
  • SWR値はどのくらいまで大丈夫?SWR値から分かる反射波電力の割合。
    2018年7月23日
  • アマチュア無線用SWR計の選び方とダイヤモンド・コメット・ダイワの製品一覧。おすすめは?
    2018年7月23日
  • OWONのデジタルオシロスコープSDS5032Eの基本的な使い方。
    2018年7月10日
  • PCXpresso NXP LPC1769の評価ボード(OM13000)にmbedのバイナリを書き込む方法
    2017年5月13日
  • L, Cを測ることを考える
    2014年8月10日
  • Raspberry piで大気圧と温度を記録してグラフにして他のPCからグラフを見る
    2014年7月26日
カテゴリー
  • コンピューター
    • gnuplot & eps
    • mac
    • matplotlib
    • wordpress
  • ホーム・家電
    • アイロン
    • オーディオ
    • オーラルケア
      • ジェットウォッシャー
      • 音波振動歯ブラシ
    • カメラ
    • カー用品
    • クリーナー
    • テレビ、レコーダー
    • ドアホン
    • メンズ美容家電
      • ラムダッシュ
    • ルンバ
    • 一覧比較
    • 工具
    • 浄水器
    • 温水洗浄便座
    • 炊飯器
    • 空気清浄機・加除湿機
    • 空調・季節家電
    • 美容家電
      • フェイスケア
      • ヘアケア
      • ボディーケア
    • 血圧計
    • 調理器具
    • 電子レンジ
  • 健康
  • 家事
    • パン
    • 料理
    • 育児
    • 食品
      • おせち
      • コーヒー
  • 書籍
  • 知識
  • 趣味
    • ペン字
    • ロードバイク・クロスバイク
    • 車
    • 鉄道模型
    • 電子工作
サイト内検索
最近の投稿
  • 東芝のGR-W15BZ1とGR-W15BSの3つの違い [東芝 冷蔵庫]
  • PM2.5グラフをhttpサーバで表示する。SPS30 + Raspberry pi pico 2 Wを使って。
  • MR-WZ55MとMR-WZ55Kの3つの違い [三菱電機 冷蔵庫]
  • SensirionのSPS30粒子状物質(PM2.5)センサーを使ったメモ
  • 5+1、4+1カットシステムとは [ブラウン シェーバー]
  • Archer BE550とArcher BE450の8つの違い [TP-Link Wi-Fi 7ルーター]
  1. ホーム
  2. 趣味
  3. 電子工作
  4. PM2.5グラフをhttpサーバで表示する。SPS30 + Raspberry pi pico 2 Wを使って。
  • ホーム
  • プライバシーポリシー

© カタログクリップ
contact@beiznotes.org

目次