Merge pull request #90 from s-allius/s-allius/issue56

S allius/issue56
This commit is contained in:
Stefan Allius
2024-06-14 00:05:48 +02:00
committed by GitHub
19 changed files with 247 additions and 87 deletions

View File

@@ -1,5 +1,6 @@
import logging
import traceback
from asyncio import StreamReader, StreamWriter
from messages import hex_dump_memory
logger = logging.getLogger('conn')
@@ -7,7 +8,8 @@ logger = logging.getLogger('conn')
class AsyncStream():
def __init__(self, reader, writer, addr) -> None:
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr) -> None:
logger.debug('AsyncStream.__init__')
self.reader = reader
self.writer = writer

View File

@@ -1,5 +1,6 @@
import logging
# import gc
from asyncio import StreamReader, StreamWriter
from async_stream import AsyncStream
from gen3.talent import Talent
@@ -8,12 +9,13 @@ logger = logging.getLogger('conn')
class ConnectionG3(AsyncStream, Talent):
def __init__(self, reader, writer, addr, remote_stream, server_side: bool,
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, remote_stream: 'ConnectionG3', server_side: bool,
id_str=b'') -> None:
AsyncStream.__init__(self, reader, writer, addr)
Talent.__init__(self, server_side, id_str)
self.remoteStream = remote_stream
self.remoteStream: 'ConnectionG3' = remote_stream
'''
Our puplic methods

View File

@@ -1,7 +1,8 @@
import asyncio
import logging
import traceback
import json
import asyncio
from asyncio import StreamReader, StreamWriter
from config import Config
from inverter import Inverter
from gen3.connection_g3 import ConnectionG3
@@ -44,7 +45,7 @@ class InverterG3(Inverter, ConnectionG3):
destroyed
'''
def __init__(self, reader, writer, addr):
def __init__(self, reader: StreamReader, writer: StreamWriter, addr):
super().__init__(reader, writer, addr, None, True)
self.__ha_restarts = -1

View File

@@ -41,6 +41,7 @@ class Talent(Message):
self.id_str = id_str
self.contact_name = b''
self.contact_mail = b''
self.ts_offset = 0 # time offset between tsun cloud and local
self.db = InfosG3()
self.switch = {
0x00: self.msg_contact_info,
@@ -141,8 +142,8 @@ class Talent(Message):
def send_modbus_cb(self, modbus_pdu: bytearray, log_lvl: int, state: str):
if self.state != self.STATE_UP:
logger.warn(f'[{self.node_id}] ignore MODBUS cmd,'
' cause the state is not UP anymore')
logger.warning(f'[{self.node_id}] ignore MODBUS cmd,'
' cause the state is not UP anymore')
return
self.__build_header(0x70, 0x77)
@@ -204,6 +205,24 @@ class Talent(Message):
ts = (datetime.now() - datetime(1970, 1, 1)).total_seconds()
return round(ts*1000)
def _update_header(self, _forward_buffer):
'''update header for message before forwarding,
add time offset to timestamp'''
_len = len(_forward_buffer)
result = struct.unpack_from('!lB', _forward_buffer, 0)
id_len = result[1] # len of variable id string
if _len < 2*id_len + 21:
return
result = struct.unpack_from('!B', _forward_buffer, id_len+6)
msg_code = result[0]
if msg_code == 0x71 or msg_code == 0x04:
result = struct.unpack_from('!q', _forward_buffer, 13+2*id_len)
ts = result[0] + self.ts_offset
logger.debug(f'offset: {self.ts_offset:08x}'
f' proxy-time: {ts:08x}')
struct.pack_into('!q', _forward_buffer, 13+2*id_len, ts)
# check if there is a complete header in the buffer, parse it
# and set
# self.header_len
@@ -305,39 +324,35 @@ class Talent(Message):
return True
def msg_get_time(self):
tsun = Config.get('tsun')
if tsun['enabled']:
if self.ctrl.is_ind():
if self.data_len >= 8:
ts = self._timestamp()
result = struct.unpack_from('!q', self._recv_buffer,
self.header_len)
logger.debug(f'tsun-time: {result[0]:08x}'
f' proxy-time: {ts:08x}')
else:
logger.warning('Unknown Ctrl')
self.inc_counter('Unknown_Ctrl')
self.forward(self._recv_buffer, self.header_len+self.data_len)
if self.ctrl.is_ind():
if self.data_len == 0:
ts = self._timestamp()
logger.debug(f'time: {ts:08x}')
self.__build_header(0x91)
self._send_buffer += struct.pack('!q', ts)
self.__finish_send_msg()
elif self.data_len >= 8:
ts = self._timestamp()
result = struct.unpack_from('!q', self._recv_buffer,
self.header_len)
self.ts_offset = result[0]-ts
logger.debug(f'tsun-time: {int(result[0]):08x}'
f' proxy-time: {ts:08x}'
f' offset: {self.ts_offset}')
return # ignore received response
else:
if self.ctrl.is_ind():
if self.data_len == 0:
ts = self._timestamp()
logger.debug(f'time: {ts:08x}')
logger.warning('Unknown Ctrl')
self.inc_counter('Unknown_Ctrl')
self.__build_header(0x91)
self._send_buffer += struct.pack('!q', ts)
self.__finish_send_msg()
else:
logger.warning('Unknown Ctrl')
self.inc_counter('Unknown_Ctrl')
self.forward(self._recv_buffer, self.header_len+self.data_len)
def parse_msg_header(self):
result = struct.unpack_from('!lB', self._recv_buffer, self.header_len)
data_id = result[0] # len of complete message
id_len = result[1] # len of variable id string
logger.debug(f'Data_ID: {data_id} id_len: {id_len}')
logger.debug(f'Data_ID: 0x{data_id:08x} id_len: {id_len}')
msg_hdr_len = 5+id_len+9

View File

@@ -1,5 +1,6 @@
import logging
# import gc
from asyncio import StreamReader, StreamWriter
from async_stream import AsyncStream
from gen3plus.solarman_v5 import SolarmanV5
@@ -8,12 +9,13 @@ logger = logging.getLogger('conn')
class ConnectionG3P(AsyncStream, SolarmanV5):
def __init__(self, reader, writer, addr, remote_stream,
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, remote_stream: 'ConnectionG3P',
server_side: bool) -> None:
AsyncStream.__init__(self, reader, writer, addr)
SolarmanV5.__init__(self, server_side)
self.remoteStream = remote_stream
self.remoteStream: 'ConnectionG3P' = remote_stream
'''
Our puplic methods

View File

@@ -1,7 +1,8 @@
import asyncio
import logging
import traceback
import json
import asyncio
from asyncio import StreamReader, StreamWriter
from config import Config
from inverter import Inverter
from gen3plus.connection_g3p import ConnectionG3P
@@ -44,7 +45,7 @@ class InverterG3P(Inverter, ConnectionG3P):
destroyed
'''
def __init__(self, reader, writer, addr):
def __init__(self, reader: StreamReader, writer: StreamWriter, addr):
super().__init__(reader, writer, addr, None, True)
self.__ha_restarts = -1

View File

@@ -346,8 +346,8 @@ class SolarmanV5(Message):
def send_modbus_cb(self, pdu: bytearray, log_lvl: int, state: str):
if self.state != self.STATE_UP:
logger.warn(f'[{self.node_id}] ignore MODBUS cmd,'
' cause the state is not UP anymore')
logger.warning(f'[{self.node_id}] ignore MODBUS cmd,'
' cause the state is not UP anymore')
return
self.__build_header(0x4510)
self._send_buffer += struct.pack('<BHLLL', self.MB_RTU_CMD,
@@ -372,8 +372,8 @@ class SolarmanV5(Message):
async def send_at_cmd(self, AT_cmd: str) -> None:
if self.state != self.STATE_UP:
logger.warn(f'[{self.node_id}] ignore AT+ cmd,'
' as the state is not UP')
logger.warning(f'[{self.node_id}] ignore AT+ cmd,'
' as the state is not UP')
return
AT_cmd = AT_cmd.strip()
@@ -382,8 +382,7 @@ class SolarmanV5(Message):
node_id = self.node_id
key = 'at_resp'
logger.info(f'{key}: {data_json}')
asyncio.ensure_future(
self.publish_mqtt(f'{self.entity_prfx}{node_id}{key}', data_json)) # noqa: E501
await self.mqtt.publish(f'{self.entity_prfx}{node_id}{key}', data_json) # noqa: E501
return
self.forward_at_cmd_resp = False
@@ -511,8 +510,9 @@ class SolarmanV5(Message):
self.__forward_msg()
async def publish_mqtt(self, key, data):
await self.mqtt.publish(key, data) # pragma: no cover
def publish_mqtt(self, key, data):
asyncio.ensure_future(
self.mqtt.publish(key, data))
def get_cmd_rsp_log_lvl(self) -> int:
ftype = self._recv_buffer[self.header_len]
@@ -536,8 +536,7 @@ class SolarmanV5(Message):
node_id = self.node_id
key = 'at_resp'
logger.info(f'{key}: {data_json}')
asyncio.ensure_future(
self.publish_mqtt(f'{self.entity_prfx}{node_id}{key}', data_json)) # noqa: E501
self.publish_mqtt(f'{self.entity_prfx}{node_id}{key}', data_json) # noqa: E501
return
elif ftype == self.MB_RTU_CMD:
valid = data[1]

View File

@@ -343,7 +343,7 @@ class Infos:
dict[counter] -= 1
def ha_proxy_confs(self, ha_prfx: str, node_id: str, snr: str) \
-> Generator[tuple[dict, str], None, None]:
-> Generator[tuple[str, str, str, str], None, None]:
'''Generator function yields json register struct for home-assistant
auto configuration and the unique entity string, for all proxy
registers

View File

@@ -1,6 +1,6 @@
import logging
import weakref
from typing import Callable
from typing import Callable, Generator
if __name__ == "app.src.messages":
@@ -45,7 +45,7 @@ def hex_dump_memory(level, info, data, num):
class IterRegistry(type):
def __iter__(cls):
def __iter__(cls) -> Generator['Message', None, None]:
for ref in cls._registry:
obj = ref()
if obj is not None:
@@ -59,7 +59,7 @@ class Message(metaclass=IterRegistry):
STATE_CLOSED = 3
def __init__(self, server_side: bool, send_modbus_cb:
Callable[[bytes, int, str], None], mb_timeout):
Callable[[bytes, int, str], None], mb_timeout: int):
self._registry.append(weakref.ref(self))
self.server_side = server_side

View File

@@ -2,6 +2,7 @@ import logging
import asyncio
import signal
import os
from asyncio import StreamReader, StreamWriter
from logging import config # noqa F401
from messages import Message
from inverter import Inverter
@@ -11,14 +12,14 @@ from scheduler import Schedule
from config import Config
async def handle_client(reader, writer):
async def handle_client(reader: StreamReader, writer: StreamWriter):
'''Handles a new incoming connection and starts an async loop'''
addr = writer.get_extra_info('peername')
await InverterG3(reader, writer, addr).server_loop(addr)
async def handle_client_v2(reader, writer):
async def handle_client_v2(reader: StreamReader, writer: StreamWriter):
'''Handles a new incoming connection and starts an async loop'''
addr = writer.get_extra_info('peername')