sunlit_solar/sensor.py

317 lines
16 KiB
Python

from homeassistant.components.sensor import SensorEntity
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.helpers.entity import DeviceInfo
from .const import DOMAIN
from .helper import get_merged_device_data
async def async_setup_entry(hass, entry, async_add_entities):
coordinator = hass.data[DOMAIN][entry.entry_id]["coordinator"]
data = get_merged_device_data(coordinator)
sensors = []
for family_key in data:
family = data[family_key]
if "devices" not in family:
continue
for device_key in family["devices"]:
device = family["devices"][device_key]
device_info = DeviceInfo(
identifiers={(DOMAIN, device["deviceId"], device.get("deviceSn",""))},
name=device.get("stationName", "Unbekanntes Gerät"),
serial_number=device.get("deviceSn", "Unbekannt"),
suggested_area=family.get("name", "Unbekannt"),
manufacturer="Sunlit Solar",
model=device.get("deviceType", "Unbekanntes Modell"),
)
if device["deviceType"] == "ENERGY_STORAGE_BATTERY":
device_info = DeviceInfo(
identifiers={(DOMAIN, device["deviceId"], device.get("deviceSn",""))},
name="Speicher",
serial_number=device.get("deviceSn", "Unbekannt"),
suggested_area=family.get("name", "Unbekannt"),
manufacturer="Sunlit Solar",
model=device.get("deviceType", "Unbekanntes Modell"),
)
sensor_keys = [
{'id': 'status', 'name': 'Status', 'device_class': None, 'state_class': None, 'native_unit_of_measurement': None},
{'id': 'batteryLevel', 'name': 'Ladung Speicher komplett', 'device_class': "battery", 'state_class': None, 'native_unit_of_measurement': "%"},
{'id': 'batterySoc', 'name': 'Ladung Speicher 1 (Kopf)', 'device_class': "battery", 'state_class': None, 'native_unit_of_measurement': "%"},
{'id': 'deviceCount', 'name': 'Speicheranzahl', 'device_class': None, 'state_class': None, 'native_unit_of_measurement': None},
{'id': 'chargeRemaining', 'name': 'verbleibende Ladung', 'device_class': "battery", 'state_class': None, 'native_unit_of_measurement': "%"},
{'id': 'dischargeRemaining', 'name': 'verbleibende Entladung', 'device_class': "battery", 'state_class': None, 'native_unit_of_measurement': "%"},
{'id': 'inputPower', 'name': 'Eingangsleistung', 'device_class': "power", 'state_class': None, 'native_unit_of_measurement': "W"},
{'id': 'outputPower', 'name': 'Ausgangsleistung', 'device_class': "power", 'state_class': None, 'native_unit_of_measurement': "W"},
{'id': 'firmwareVersion', 'name': 'Firmware-Version', 'device_class': None, 'state_class': None, 'native_unit_of_measurement': None},
# {'id': 'status', 'name': 'Status', 'value': device.get('status', 'Unbekannt'), 'device_class': None, 'state_class': None, 'native_unit_of_measurement': None},
# {'id': 'status', 'name': 'Status', 'value': device.get('status', 'Unbekannt'), 'device_class': None, 'state_class': None, 'native_unit_of_measurement': None},
# {'id': 'status', 'name': 'Status', 'value': device.get('status', 'Unbekannt'), 'device_class': None, 'state_class': None, 'native_unit_of_measurement': None},
]
if device.get('battery1Soc',None) != None:
sensor_keys.append(
{'id': 'battery1Soc', 'name': 'Ladung Speicher 2', 'device_class': "battery", 'state_class': None, 'native_unit_of_measurement': "%"},
)
if device.get('battery2Soc',None) != None:
sensor_keys.append(
{'id': 'battery2Soc', 'name': 'Ladung Speicher 3', 'device_class': "battery", 'state_class': None, 'native_unit_of_measurement': "%"},
)
if device.get('battery3Soc',None) != None:
sensor_keys.append(
{'id': 'battery3Soc', 'name': 'Ladung Speicher 4', 'device_class': "battery", 'state_class': None, 'native_unit_of_measurement': "%"},
)
for sensor in sensor_keys:
sensors.append(Sensor(
coordinator,
device_info,
entry.entry_id,
f"{sensor['id']}",
family_key,
device_key,
sensor['name'],
sensor['device_class'],
sensor['state_class'],
sensor['native_unit_of_measurement']
))
if isinstance(device['batteryMppt1Data'], dict):
sensors.append(Sensor(
coordinator,
device_info,
entry.entry_id,
f"batteryMpptInVol",
family_key,
device_key,
f'Speicher 1 MPPT 1 Eingangsspannung',
"voltage", #sensor['device_class'],
None, #sensor['state_class'],
"V", #sensor['native_unit_of_measurement']
"batteryMppt1Data"
))
sensors.append(Sensor(
coordinator,
device_info,
entry.entry_id,
f"batteryMpptInCur",
family_key,
device_key,
f'Speicher 1 MPPT 1 Eingangsstrom ',
"current", #sensor['device_class'],
None, #sensor['state_class'],
"A", #sensor['native_unit_of_measurement']
"batteryMppt1Data"
))
sensors.append(Sensor(
coordinator,
device_info,
entry.entry_id,
f"batteryMpptInPower",
family_key,
device_key,
f'Speicher 1 MPPT 1 Eingangsleistung',
"power", #sensor['device_class'],
None, #sensor['state_class'],
"W", #sensor['native_unit_of_measurement']
"batteryMppt1Data"
))
if isinstance(device['batteryMppt2Data'], dict):
sensors.append(Sensor(
coordinator,
device_info,
entry.entry_id,
"batteryMpptInVol",
family_key,
device_key,
f'Speicher 1 MPPT 2 Eingangsspannung',
"voltage", #sensor['device_class'],
None, #sensor['state_class'],
"V", #sensor['native_unit_of_measurement']
"batteryMppt2Data"
))
sensors.append(Sensor(
coordinator,
device_info,
entry.entry_id,
f"batteryMpptInCur",
family_key,
device_key,
f'Speicher 1 MPPT 2 Eingangsstrom ',
"current", #sensor['device_class'],
None, #sensor['state_class'],
"A", #sensor['native_unit_of_measurement']
"batteryMppt2Data"
))
sensors.append(Sensor(
coordinator,
device_info,
entry.entry_id,
f"batteryMpptInPower",
family_key,
device_key,
f'Speicher 1 MPPT 2 Eingangsleistung',
"power", #sensor['device_class'],
None, #sensor['state_class'],
"W", #sensor['native_unit_of_measurement']
"batteryMppt2Data"
))
if isinstance(device['battery1MpptData'], dict):
sensors.append(Sensor(
coordinator,
device_info,
entry.entry_id,
f"batteryMpptInVol",
family_key,
device_key,
f'Speicher 2 MPPT 1 Eingangsspannung',
"voltage", #sensor['device_class'],
None, #sensor['state_class'],
"V", #sensor['native_unit_of_measurement']
"battery1MpptData"
))
sensors.append(Sensor(
coordinator,
device_info,
entry.entry_id,
f"batteryMpptInCur",
family_key,
device_key,
f'Speicher 2 MPPT 1 Eingangsstrom ',
"current", #sensor['device_class'],
None, #sensor['state_class'],
"A", #sensor['native_unit_of_measurement']
"battery1MpptData"
))
sensors.append(Sensor(
coordinator,
device_info,
entry.entry_id,
f"batteryMpptInPower",
family_key,
device_key,
f'Speicher 2 MPPT 1 Eingangsleistung',
"power", #sensor['device_class'],
None, #sensor['state_class'],
"W", #sensor['native_unit_of_measurement']
"battery1MpptData"
))
if isinstance(device['battery2MpptData'], dict):
sensors.append(Sensor(
coordinator,
device_info,
entry.entry_id,
f"batteryMpptInVol",
family_key,
device_key,
f'Speicher 3 MPPT 1 Eingangsspannung',
"voltage", #sensor['device_class'],
None, #sensor['state_class'],
"V", #sensor['native_unit_of_measurement']
"battery2MpptData"
))
sensors.append(Sensor(
coordinator,
device_info,
entry.entry_id,
"batteryMpptInCur",
family_key,
device_key,
f'Speicher 3 MPPT 1 Eingangsstrom ',
"current", #sensor['device_class'],
None, #sensor['state_class'],
"A", #sensor['native_unit_of_measurement']
"battery2MpptData"
))
sensors.append(Sensor(
coordinator,
device_info,
entry.entry_id,
f"batteryMpptInPower",
family_key,
device_key,
f'Speicher 3 MPPT 1 Eingangsleistung',
"power", #sensor['device_class'],
None, #sensor['state_class'],
"W", #sensor['native_unit_of_measurement']
"battery2MpptData"
))
if isinstance(device['battery3MpptData'], dict):
sensors.append(Sensor(
coordinator,
device_info,
entry.entry_id,
"batteryMpptInVol",
family_key,
device_key,
f'Speicher 4 MPPT 1 Eingangsspannung',
"voltage", #sensor['device_class'],
None, #sensor['state_class'],
"V", #sensor['native_unit_of_measurement']
"battery3MpptData"
))
sensors.append(Sensor(
coordinator,
device_info,
entry.entry_id,
f"batteryMpptInCur",
family_key,
device_key,
f'Speicher 4 MPPT 1 Eingangsstrom ',
"current", #sensor['device_class'],
None, #sensor['state_class'],
"A", #sensor['native_unit_of_measurement']
"battery3MpptData"
))
sensors.append(Sensor(
coordinator,
device_info,
entry.entry_id,
"batteryMpptInPower",
family_key,
device_key,
f'Speicher 4 MPPT 1 Eingangsleistung',
"power", #sensor['device_class'],
None, #sensor['state_class'],
"W", #sensor['native_unit_of_measurement']
"battery3MpptData"
))
async_add_entities(sensors)
class Sensor(CoordinatorEntity, SensorEntity):
def __init__(self, coordinator, deviceinfo, entry_id, id, family_key, device_key, name, device_class, state_class, native_unit_of_measurement, subkey=None):
super().__init__(coordinator)
self._entry_id = entry_id
self._deviceinfo = deviceinfo
self._family_key = family_key
self._device_key = device_key
self._subkey = subkey
self._id = id
self._attr_name = name
self._attr_unique_id = f"{entry_id}_{family_key}_{device_key}_{id}"
if subkey:
self._attr_unique_id += f"{entry_id}_{family_key}_{device_key}_{subkey}_{id}"
self._attr_native_unit_of_measurement = native_unit_of_measurement
self._attr_device_class = device_class
self._attr_state_class = state_class
@property
def native_value(self):
data = get_merged_device_data(self.coordinator)
if self._family_key not in data:
return None
family = data[self._family_key]
if "devices" not in family or self._device_key not in family["devices"]:
return None
device = family["devices"][self._device_key]
if self._subkey:
if self._subkey in device and isinstance(device[self._subkey], dict):
return device[self._subkey].get(self._id, 'Unbekannt')
else:
return device.get(self._subkey, [])[self._id] if isinstance(device.get(self._subkey, []), list) else device.get(self._subkey, {}).get(self._id, 'Unbekannt')
return device.get(self._id, 'Unbekannt')
@property
def device_info(self) -> DeviceInfo:
return self._deviceinfo