Custom Starlark

Introduction

The custom Starlark MQTT integration attempts to make it easy to connect your current MQTT application or Edge controller towards Neowit and write code that transforms the topics and payloads into something that Neowit understands.

Technical details

Starlark

The integration uses the Starlark language and our extension modules to the language in order to define devices and metrics that can be stored in the Neowit application. The Starlark language is a simple Python subset that should be easy to learn. It is fairly limited, but should contain enough power to deal with normal JSON MQTT payloads.

See Starlark documentation.

Connecting to the MQTT broker

The integration settings page will show the connection details you need to use to connect to our MQTT broker. See our MQTT page for more details on this.

Troubleshooting

The Starlark implementation is intended for advanced users, and we currently don't have any good ways to help you debug the Starlark code.

  • If your code does not compile correctly due to syntax errors, the integration will show up as not connected. The built-in editor should give you some hints of what is wrong, but it's not perfect.

  • If no devices or metrics show up, you're code may be throwing errors. We currently don't have any good way of showing that to you, but feel free to contact support so that we may help you out.

Writing code

Our Starlark executor looks for a function called on_publish with accepts the arguments topic and payload. The topic arguments is the MQTT topic that the payload was published to.

The goal of the code you need to define is to convert the topic and payload into something that Neowit understands using the following modules:

Examples

Simple example

Consider the following JSON payload published to topic mytopic/location1/device1. The code following would create one unique device and publish a temperature value to the Neowit metrics store.

PUBLISH mytopic/location1/device2
{ at: 1719065842, "temperature": 30.3}
# called when MQTT receives a PUBLISH from the app or edge controller
def on_publish(topic, payload):
    # the payload is JSON formatted, we need to decode
    # it into a starlark data structure.
    data = json.decode(payload)

    # ignore payloads that doesnt have the expected input
    if not "at" in data or not "temperature" in data:
        return
    
    # the topic here includes the information we
    # need to identity a unique device, we use this
    # as our basis. The other attributes are mocked up,
    # but could also be defined if the data is available
    # in this or other payloads.
    external_id = topic.replace("mytopic/", "")
    device = devices.Device(
        external_id=external_id,
        name="My device 1",
        vendor="My device vendor",
        model="My device model",
        status="STATUS_CONNECTED",
        status_reason="Received something"
    )

    # this will register or update the device.
    devices.upsert(device)

    # publish the metric with the given unix timestamp
    # and value on the sensor.TEMP sensor.
    series.publish(external_id, data["at"], sensors.TEMP, data["temperature"])
        

More advanced example

This example decodes a JSON structure and upserts devices and series extracted from the json document.

time_layout = "2006-01-02 15:04:05.000-0700"

##
## Main entry point
##
def on_publish(topic, payload):
  data = json.decode(payload)
  ts = time.parse_time(data["updated"], time_layout)
  if "rooms" in data:
    for room in data["rooms"]:
      publish_room(ts, room)
  if "dampers" in data:
    for damper in data["dampers"]:
      publish_damper(ts, damper)

##
## Upsert damper device and publish series
##
def publish_damper(ts, damper):
  # '564.001-SQ402 Rom 5.1826 .....'
  descr = damper["airflowDescription"].split(" ")
  if len(descr) < 3:
    return

  code, room = descr[0], descr[2]
  id = 'damper%s@%s' % (code, room)
  device = devices.Device(
    external_id = id,
    name = "Damper: %s %s" % (code, room),
    vendor = "My damper vendor",
    model = "My damper model",
    status = "STATUS_CONNECTED",
    status_reason = "OK"
  )
  devices.upsert(device)
  series.publish(id, ts.unix, sensors.AIR_FLOW_CUBIC_HOUR, get_number_or_none(damper, "airflow"))
  series.publish(id, ts.unix, sensors.AIR_FLOW_DEMAND_PERCENTAGE, get_number_or_none(damper, "demandInPrc"))

##
## Upsert room device and publish series
##
def publish_room(ts, room):
  device = devices.Device(
    external_id = room["location"],
    name = "Room: %s" % room["location"],
    vendor = "My vendor",
    model = "My model",
    status = "STATUS_CONNECTED",
    status_reason = "OK"
  )
  devices.upsert(device)
  series.publish(device.external_id, ts.unix, sensors.TEMP, get_number_or_none(room, "temperature"))
  series.publish(device.external_id, ts.unix, sensors.CO2, get_number_or_none(room, "airQuality"))
  series.publish(device.external_id, ts.unix, sensors.MOTION_DETECTED, get_bool_or_none(room, "pir"))
  series.publish(device.external_id, ts.unix, sensors.LIGHT_LUX, get_number_or_none(room, "lightIntensity"))
  series.publish(device.external_id, ts.unix, sensors.AIR_FLOW_DEMAND_PERCENTAGE, get_number_or_none(room, "airDemand"))
  series.publish(device.external_id, ts.unix, sensors.SETPOINT_CO2_PPM, get_number_or_none(room, "setpointCo2Actual"))
  series.publish(device.external_id, ts.unix, sensors.SETPOINT_TEMPERATURE_C, get_number_or_none(room, "setpointActual"))


##
## Helpers
##

def get_number_or_none(room, key):
  if key not in room:
    return None
  value = room[key]
  return None if value == "NULL" else value

def get_bool_or_none(room, key):
  if key not in room:
    return None
  value = room[key]
  return None if value == "NULL" else value

Last updated