Skip to content

Data Logger with Anomaly Detection

Example file: examples/data-logger.plcproj Difficulty: Intermediate–Advanced Key instructions: INTERVALMS, SCRIPT, GRT, OTL/OTU, ADD, MOV

Overview

This example demonstrates how to turn a Raspberry Pi PLC into a self-learning data logger that records sensor readings to a SQLite database and uses statistical analysis to flag anomalous power consumption.

The ladder logic handles the real-time I/O — periodic sampling, manual triggers, alarm latching, indicator LEDs — while a Python script handles what ladder logic cannot: database storage and standard-deviation-based anomaly detection.

What It Does

  1. Every 60 seconds, an INTERVALMS instruction triggers a sample
  2. A Python SCRIPT reads Temperature, Humidity, and Power values, inserts them into a SQLite database (~/plc_log.db), and compares the current power reading against the rolling 24-hour average
  3. If power deviates by more than 2 standard deviations from the mean, the script returns 1 (anomaly detected)
  4. Ladder logic latches an alarm and lights a warning LED until the operator acknowledges it
  5. A manual sample button allows on-demand readings at any time
  6. A sample counter tracks the total number of readings taken

I/O Map

Inputs

Address Symbol Description
I:0/0 ManualSample Pushbutton — take an immediate reading
I:0/1 ResetAlarm Pushbutton — acknowledge and clear the anomaly alarm

Outputs

Address Symbol Description
O:0/0 SampleLED Flashes briefly each time a sample is recorded
O:0/1 AnomalyLED On when anomalous power usage detected (stays on until reset)

Internal Registers

Address Symbol Description
B:0/0 SampleDue One-shot trigger bit — a sample needs to be taken
B:0/1 AnomalyActive Latched alarm flag
N:0 Temperature Current temperature reading
N:1 Humidity Current humidity reading
N:2 PowerWatts Current power consumption (watts)
N:3 AnomalyResult Script output: 0 = normal, 1 = anomaly
N:4 SampleCount Running total of samples taken
V:0 SampleInterval INTERVALMS periodic timer (60,000 ms)

Ladder Logic — Rung by Rung

Rung 0 — Periodic Sampling Interval

──[INTERVALMS V:0 60000ms]──(OTL B:0/0)──

The INTERVALMS instruction produces a one-scan TRUE pulse every 60 seconds. This latches the SampleDue bit, signaling that a reading should be taken.

Tip: Change the period to adjust logging frequency — use 5000 for testing (every 5 seconds) or 300000 for 5-minute intervals in production.

Rung 1 — Manual Sample Trigger

──[XIC I:0/0]──(OTL B:0/0)──

Pressing the ManualSample button also latches SampleDue, allowing the operator to take a reading at any time without waiting for the interval.

Rung 2 — Data Logging Script (the core)

──[XIC B:0/0]──[SCRIPT  Args: N:0, N:1, N:2  Dest: N:3  Timeout: 10s]──

On the rising edge of SampleDue, this SCRIPT instruction launches a Python script that:

  1. Opens (or creates) ~/plc_log.db
  2. Creates the sensor_log table if it doesn't exist
  3. Inserts the current Temperature, Humidity, and PowerWatts values with a timestamp
  4. Queries all power readings from the last 24 hours
  5. Calculates the mean and standard deviation
  6. Returns 1 if the current power reading is more than 2 standard deviations from the mean, 0 otherwise

Python Script:

import sqlite3, datetime, statistics, os

# Open or create the log database
db_path = os.path.expanduser("~/plc_log.db")
db = sqlite3.connect(db_path)
db.execute("""CREATE TABLE IF NOT EXISTS sensor_log(
    ts TEXT, temp INT, hum INT, pwr INT)""")

# Record the current sensor readings
now = datetime.datetime.now().isoformat()
db.execute("INSERT INTO sensor_log VALUES(?,?,?,?)",
    (now, Temperature, Humidity, PowerWatts))
db.commit()

# Anomaly detection: compare current power against
# the rolling 24-hour average +/- 2 standard deviations
rows = db.execute(
    "SELECT pwr FROM sensor_log "
    "WHERE ts > datetime('now','-24 hours')"
).fetchall()

if len(rows) > 10:
    values = [r[0] for r in rows]
    mean = statistics.mean(values)
    stdev = statistics.stdev(values)
    if stdev > 0 and abs(PowerWatts - mean) > 2 * stdev:
        AnomalyResult = 1
    else:
        AnomalyResult = 0
else:
    AnomalyResult = 0

db.close()

The script uses the symbol-name method (recommended): Temperature, Humidity, and PowerWatts are auto-injected as Python variables from the source addresses, and AnomalyResult is auto-printed because it matches the destination symbol name.

Rung 3 — Sample Counter

──[XIC B:0/0]──[ADD  N:4 + 1 → N:4]──

Increments the SampleCount register each time a sample is taken. Useful for monitoring in the watch panel.

Rung 4 — Sample Indicator LED

──[XIC B:0/0]──(OTE O:0/0)──

The SampleLED turns on for the duration of the trigger scan — a brief flash indicating a reading was just taken.

Rung 5 — Clear Trigger

──[XIC B:0/0]──(OTU B:0/0)──

Unlatches SampleDue so the next INTERVALMS pulse or manual press creates a fresh rising edge for the SCRIPT instruction.

Rung 6 — Latch Anomaly Alarm

──[GRT  N:3 > 0]──(OTL B:0/1)──

If the SCRIPT's AnomalyResult is greater than 0, the AnomalyActive alarm is latched. Because the SCRIPT runs asynchronously (result appears on the scan after completion), this comparison runs every scan to catch the result whenever it arrives.

Rung 7 — Reset Alarm

──[XIC I:0/1]──(OTU B:0/1)──

The operator presses the ResetAlarm button to acknowledge and clear the anomaly alarm.

Rung 8 — Clear Anomaly Register

──[XIC I:0/1]──[MOV  0 → N:3]──

Also clears the AnomalyResult register to 0 when the alarm is reset, preventing the alarm from immediately re-latching on the next scan.

Rung 9 — Anomaly Warning Light

──[XIC B:0/1]──(OTE O:0/1)──

The AnomalyLED output follows the AnomalyActive flag — it stays on from the moment an anomaly is detected until the operator presses ResetAlarm.

How It Works: Timing Diagram

Scan:     1      2      3      ...    N      N+1    N+2
          ─────────────────────────────────────────────
INTMS:    FALSE  FALSE  FALSE  ...    TRUE   FALSE  FALSE
B:0/0:    FALSE  FALSE  FALSE  ...    TRUE   FALSE  FALSE
SCRIPT:   -      -      -      ...    LAUNCH RUNNING DONE → N:3 written
GRT N:3:  FALSE  FALSE  FALSE  ...    FALSE  FALSE  TRUE (if anomaly)
B:0/1:    FALSE  FALSE  FALSE  ...    FALSE  FALSE  LATCHED
O:0/1:    OFF    OFF    OFF    ...    OFF    OFF    ON

Key point: The SCRIPT result arrives one or more scans after the trigger, so the GRT comparison on Rung 6 catches it asynchronously.

Prerequisites

  • Python 3 installed on the Raspberry Pi (usually pre-installed)
  • Python's sqlite3 and statistics modules (both in the standard library)
  • Write access to the home directory for the database file

Customization Ideas

Change the Sampling Interval

Edit the INTERVALMS period in Rung 0: - 5000 — every 5 seconds (testing) - 60000 — every 60 seconds (default) - 300000 — every 5 minutes (low-frequency monitoring) - 3600000 — every hour (long-term trends)

Monitor Temperature or Humidity Instead of Power

Change the anomaly detection target in the Python script. Replace pwr with temp or hum in the SQL query:

rows = db.execute(
    "SELECT temp FROM sensor_log "
    "WHERE ts > datetime('now','-24 hours')"
).fetchall()
# ... and compare against Temperature instead of PowerWatts

Add Data Retention (Cleanup Old Records)

Add this line after the INSERT to keep only the last 30 days:

db.execute("DELETE FROM sensor_log WHERE ts < datetime('now','-30 days')")

Export to CSV

Add a second SCRIPT on a manual trigger to export the database:

import sqlite3, csv, os
db = sqlite3.connect(os.path.expanduser("~/plc_log.db"))
rows = db.execute("SELECT * FROM sensor_log ORDER BY ts").fetchall()
with open(os.path.expanduser("~/plc_export.csv"), "w", newline="") as f:
    w = csv.writer(f)
    w.writerow(["Timestamp", "Temperature", "Humidity", "Power"])
    w.writerows(rows)
db.close()
print(len(rows))

Adjust Anomaly Sensitivity

Change the threshold from 2 standard deviations to a different value: - 1.5 * stdev — more sensitive (more false alarms) - 3 * stdev — less sensitive (only extreme spikes)

Viewing the Logged Data

Query the database directly from the Raspberry Pi terminal:

# View the last 10 readings
sqlite3 ~/plc_log.db "SELECT * FROM sensor_log ORDER BY ts DESC LIMIT 10"

# View daily averages
sqlite3 ~/plc_log.db "SELECT date(ts), avg(temp), avg(hum), avg(pwr) FROM sensor_log GROUP BY date(ts)"

# Count total records
sqlite3 ~/plc_log.db "SELECT count(*) FROM sensor_log"

Concepts Demonstrated

Concept Where
Periodic triggering with INTERVALMS Rung 0
SCRIPT instruction with symbol-name variables Rung 2
Python database access (SQLite) Rung 2 script
Statistical anomaly detection Rung 2 script
Asynchronous SCRIPT result handling Rungs 2 + 6
Latch/unlatch alarm pattern (OTL/OTU) Rungs 6 + 7
Manual trigger alongside automatic interval Rungs 0 + 1
Sample counting with ADD Rung 3
Register clearing with MOV Rung 8