diff --git a/.gitignore b/.gitignore index e7e8e06..1da811a 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,5 @@ tsun_proxy/** Doku/** .DS_Store .coverage +.env coverage.xml diff --git a/README.md b/README.md index 7e9baec..9e3ef05 100644 --- a/README.md +++ b/README.md @@ -94,11 +94,16 @@ You find more details here: https://toml.io/en/v1.0.0 ```toml -# configuration to reach tsun cloud +# configuration for tsun cloud for 'generation 3' inverters tsun.enabled = true # false: disables connecting to the tsun cloud, and avoids updates tsun.host = 'logger.talent-monitoring.com' tsun.port = 5005 +# configuration for solarman cloud for 'generation 3 plus' inverters +solarman.enabled = true # false: disables connecting to the tsun cloud, and avoids updates +solarman.host = 'iot.talent-monitoring.com' +solarman.port = 10000 + # mqtt broker configuration mqtt.host = 'mqtt' # URL or IP address of the mqtt broker @@ -125,10 +130,23 @@ inverters.allow_all = false # True: allow inverters, even if we have no invert [inverters."R17xxxxxxxxxxxx1"] node_id = 'inv1' # Optional, MQTT replacement for inverters serial number suggested_area = 'roof' # Optional, suggested installation area for home-assistant +pv1 = {type = 'RSM40-8-410M', manufacturer = 'Risen'} +pv2 = {type = 'RSM40-8-410M', manufacturer = 'Risen'} [inverters."R17xxxxxxxxxxxx2"] node_id = 'inv2' # Optional, MQTT replacement for inverters serial number suggested_area = 'balcony' # Optional, suggested installation area for home-assistant +pv1 = {type = 'RSM40-8-410M', manufacturer = 'Risen'} +pv2 = {type = 'RSM40-8-410M', manufacturer = 'Risen'} + +[inverters."Y17xxxxxxxxxxxx1"] +monitor_sn = 2000000000 +node_id = 'inv_3' # MQTT replacement for inverters serial number +suggested_area = 'Garage' # suggested installation place for home-assistant +pv1 = {type = 'RSM40-8-410M', manufacturer = 'Risen'} +pv2 = {type = 'RSM40-8-410M', manufacturer = 'Risen'} +pv3 = {type = 'RSM40-8-405M', manufacturer = 'Risen'} +pv4 = {type = 'RSM40-8-405M', manufacturer = 'Risen'} ``` diff --git a/app/config/default_config.toml b/app/config/default_config.toml index 6aa1119..5d62a14 100644 --- a/app/config/default_config.toml +++ b/app/config/default_config.toml @@ -3,6 +3,10 @@ tsun.enabled = true # false: disables connecting to the tsun cloud, and avoids tsun.host = 'logger.talent-monitoring.com' tsun.port = 5005 +# configuration to reach the new tsun cloud for G3 Plus inverters +solarman.enabled = true # false: disables connecting to the tsun cloud, and avoids updates +solarman.host = 'iot.talent-monitoring.com' +solarman.port = 10000 # mqtt broker configuration mqtt.host = 'mqtt' # URL or IP address of the mqtt broker diff --git a/app/proxy.svg b/app/proxy.svg new file mode 100644 index 0000000..5d2d1d7 --- /dev/null +++ b/app/proxy.svg @@ -0,0 +1,278 @@ + + + + + + +G + + + +A0 + + + +You can stick notes +on diagrams too! + + + +A1 + +Singleton + + + +A2 + +Mqtt + +<static>ha_restarts +<static>__client +<static>__cb_MqttIsUp + +<async>publish() +<async>close() + + + +A1->A2 + + + + + +A10 + +Inverter + +cls.db_stat +cls.entity_prfx +cls.discovery_prfx +cls.proxy_node_id +cls.proxy_unique_id +cls.mqtt:Mqtt + + + + +A2->A10 + + + + +A3 + +IterRegistry + + +__iter__ + + + +A4 + +Message + +server_side:bool +header_valid:bool +header_len:unsigned +data_len:unsigned +unique_id +node_id +sug_area +_recv_buffer:bytearray +_send_buffer:bytearray +_forward_buffer:bytearray +db:Infos +new_data:list + +_read():void<abstract> +close():void +inc_counter():void +dec_counter():void + + + +A3->A4 + + + + + +A5 + +Talent + +await_conn_resp_cnt +id_str +contact_name +contact_mail +switch + +msg_contact_info() +msg_ota_update() +msg_get_time() +msg_collector_data() +msg_inverter_data() +msg_unknown() +close() + + + +A4->A5 + + + + + +A6 + +SolarmanV5 + +control +serial +snr +switch + +msg_unknown() +close() + + + +A4->A6 + + + + + +A7 + +ConnectionG3 + +remoteStream:ConnectionG3 + +close() + + + +A5->A7 + + + + + +A8 + +ConnectionG3P + +remoteStream:ConnectionG3P + +close() + + + +A6->A8 + + + + + +A7->A7 + + +0..1 +has + + + +A11 + +InverterG3 + +__ha_restarts + +async_create_remote() +close() + + + +A7->A11 + + + + + +A8->A8 + + +0..1 +has + + + +A12 + +InverterG3P + +__ha_restarts + +async_create_remote() +close() + + + +A8->A12 + + + + + +A9 + +AsyncStream + +reader +writer +addr +r_addr +l_addr + +<async>server_loop() +<async>client_loop() +<async>loop +disc() +close() +__async_read() +__async_write() +__async_forward() + + + +A9->A7 + + + + + +A9->A8 + + + + + +A10->A11 + + + + + +A10->A12 + + + + + diff --git a/app/proxy.yuml b/app/proxy.yuml new file mode 100644 index 0000000..a7c47c9 --- /dev/null +++ b/app/proxy.yuml @@ -0,0 +1,21 @@ +// {type:class} +// {direction:topDown} +// {generate:true} + +[note: You can stick notes on diagrams too!{bg:cornsilk}] +[Singleton]^[Mqtt|ha_restarts;__client;__cb_MqttIsUp|publish();close()] + +[IterRegistry||__iter__]^[Message|server_side:bool;header_valid:bool;header_len:unsigned;data_len:unsigned;unique_id;node_id;sug_area;_recv_buffer:bytearray;_send_buffer:bytearray;_forward_buffer:bytearray;db:Infos;new_data:list|_read():void;close():void;inc_counter():void;dec_counter():void] +[Message]^[Talent|await_conn_resp_cnt;id_str;contact_name;contact_mail;switch|msg_contact_info();msg_ota_update();msg_get_time();msg_collector_data();msg_inverter_data();msg_unknown();;close()] +[Message]^[SolarmanV5|control;serial;snr;switch|msg_unknown();;close()] +[Talent]^[ConnectionG3|remoteStream:ConnectionG3|close()] +[SolarmanV5]^[ConnectionG3P|remoteStream:ConnectionG3P|close()] +[AsyncStream|reader;writer;addr;r_addr;l_addr|server_loop();client_loop();loop;disc();close();;__async_read();__async_write();__async_forward()]^[ConnectionG3] +[AsyncStream]^[ConnectionG3P] +[Inverter|cls.db_stat;cls.entity_prfx;cls.discovery_prfx;cls.proxy_node_id;cls.proxy_unique_id;cls.mqtt:Mqtt|]^[InverterG3|__ha_restarts|async_create_remote();;close()] +[Inverter]^[InverterG3P|__ha_restarts|async_create_remote();;close()] +[Mqtt]-[Inverter] +[ConnectionG3]^[InverterG3] +[ConnectionG3]has-0..1>[ConnectionG3] +[ConnectionG3P]^[InverterG3P] +[ConnectionG3P]has-0..1>[ConnectionG3P] diff --git a/app/src/async_stream.py b/app/src/async_stream.py index 7b2909a..f4f841b 100644 --- a/app/src/async_stream.py +++ b/app/src/async_stream.py @@ -1,27 +1,57 @@ import logging import traceback -# from config import Config -# import gc -from messages import Message, hex_dump_memory +from messages import hex_dump_memory logger = logging.getLogger('conn') -class AsyncStream(Message): +class AsyncStream(): - def __init__(self, reader, writer, addr, remote_stream, server_side: bool, - id_str=b'') -> None: - super().__init__(server_side, id_str) + def __init__(self, reader, writer, addr) -> None: + logger.debug('AsyncStream.__init__') self.reader = reader self.writer = writer - self.remoteStream = remote_stream self.addr = addr self.r_addr = '' self.l_addr = '' - ''' - Our puplic methods - ''' + async def server_loop(self, addr): + '''Loop for receiving messages from the inverter (server-side)''' + logging.info(f'Accept connection from {addr} to {self.l_addr}') + self.inc_counter('Inverter_Cnt') + await self.loop() + self.dec_counter('Inverter_Cnt') + logging.info(f'Server loop stopped for r{self.r_addr}') + + # if the server connection closes, we also have to disconnect + # the connection to te TSUN cloud + if self.remoteStream: + logging.debug("disconnect client connection") + self.remoteStream.disc() + try: + await self._async_publ_mqtt_proxy_stat('proxy') + except Exception: + pass + + async def client_loop(self, addr): + '''Loop for receiving messages from the TSUN cloud (client-side)''' + clientStream = await self.remoteStream.loop() + logging.info(f'Client loop stopped for l{clientStream.l_addr}') + + # if the client connection closes, we don't touch the server + # connection. Instead we erase the client connection stream, + # thus on the next received packet from the inverter, we can + # establish a new connection to the TSUN cloud + + # erase backlink to inverter + clientStream.remoteStream = None + + if self.remoteStream == clientStream: + # logging.debug(f'Client l{clientStream.l_addr} refs:' + # f' {gc.get_referrers(clientStream)}') + # than erase client connection + self.remoteStream = None + async def loop(self): self.r_addr = self.writer.get_extra_info('peername') self.l_addr = self.writer.get_extra_info('sockname') @@ -52,15 +82,12 @@ class AsyncStream(Message): return self def disc(self) -> None: - logger.debug(f'in AsyncStream.disc() l{self.l_addr} | r{self.r_addr}') + logger.debug(f'AsyncStream.disc() l{self.l_addr} | r{self.r_addr}') self.writer.close() def close(self): - logger.debug(f'in AsyncStream.close() l{self.l_addr} | r{self.r_addr}') + logger.debug(f'AsyncStream.close() l{self.l_addr} | r{self.r_addr}') self.writer.close() - super().close() # call close handler in the parent class - -# logger.info(f'AsyncStream refs: {gc.get_referrers(self)}') ''' Our private methods @@ -86,9 +113,8 @@ class AsyncStream(Message): if not self.remoteStream: await self.async_create_remote() if self.remoteStream: - self.remoteStream._init_new_client_conn(self.contact_name, - self.contact_mail) - await self.remoteStream.__async_write() + if self.remoteStream._init_new_client_conn(): + await self.remoteStream.__async_write() if self.remoteStream: hex_dump_memory(logging.INFO, @@ -99,11 +125,6 @@ class AsyncStream(Message): await self.remoteStream.writer.drain() self._forward_buffer = bytearray(0) - async def async_create_remote(self) -> None: - pass - - async def async_publ_mqtt(self) -> None: - pass - def __del__(self): - logging.debug(f"AsyncStream.__del__ l{self.l_addr} | r{self.r_addr}") + logger.debug( + f"AsyncStream.__del__ l{self.l_addr} | r{self.r_addr}") diff --git a/app/src/config.py b/app/src/config.py index ada2f90..3778e09 100644 --- a/app/src/config.py +++ b/app/src/config.py @@ -19,6 +19,11 @@ class Config(): 'host': Use(str), 'port': And(Use(int), lambda n: 1024 <= n <= 65535) }, + 'solarman': { + 'enabled': Use(bool), + 'host': Use(str), + 'port': And(Use(int), lambda n: 1024 <= n <= 65535) + }, 'mqtt': { 'host': Use(str), 'port': And(Use(int), lambda n: 1024 <= n <= 65535), @@ -34,6 +39,7 @@ class Config(): }, 'inverters': { 'allow_all': Use(bool), And(Use(str), lambda s: len(s) == 16): { + Optional('monitor_sn', default=0): Use(int), Optional('node_id', default=""): And(Use(str), Use(lambda s: s + '/' if len(s) > 0 and @@ -67,6 +73,8 @@ class Config(): usr_config = tomllib.load(f) config['tsun'] = def_config['tsun'] | usr_config['tsun'] + config['solarman'] = def_config['solarman'] | \ + usr_config['solarman'] config['mqtt'] = def_config['mqtt'] | usr_config['mqtt'] config['ha'] = def_config['ha'] | usr_config['ha'] config['inverters'] = def_config['inverters'] | \ diff --git a/app/src/gen3/connection_g3.py b/app/src/gen3/connection_g3.py new file mode 100644 index 0000000..c93156e --- /dev/null +++ b/app/src/gen3/connection_g3.py @@ -0,0 +1,36 @@ +import logging +# import gc +from async_stream import AsyncStream +from gen3.talent import Talent + +logger = logging.getLogger('conn') + + +class ConnectionG3(AsyncStream, Talent): + + def __init__(self, reader, writer, addr, remote_stream, server_side: bool, + id_str=b'') -> None: + AsyncStream.__init__(self, reader, writer, addr) + Talent.__init__(self, server_side, id_str) + + self.remoteStream = remote_stream + + ''' + Our puplic methods + ''' + def close(self): + AsyncStream.close(self) + Talent.close(self) + # logger.info(f'AsyncStream refs: {gc.get_referrers(self)}') + + async def async_create_remote(self) -> None: + pass + + async def async_publ_mqtt(self) -> None: + pass + + ''' + Our private methods + ''' + def __del__(self): + super().__del__() diff --git a/app/src/gen3/inverter_g3.py b/app/src/gen3/inverter_g3.py new file mode 100644 index 0000000..b3e33d4 --- /dev/null +++ b/app/src/gen3/inverter_g3.py @@ -0,0 +1,126 @@ +import asyncio +import logging +import traceback +import json +from config import Config +from inverter import Inverter +from gen3.connection_g3 import ConnectionG3 +from aiomqtt import MqttCodeError +from infos import Infos + +# import gc + +# logger = logging.getLogger('conn') +logger_mqtt = logging.getLogger('mqtt') + + +class InverterG3(Inverter, ConnectionG3): + '''class Inverter is a derivation of an Async_Stream + + The class has some class method for managing common resources like a + connection to the MQTT broker or proxy error counter which are common + for all inverter connection + + Instances of the class are connections to an inverter and can have an + optional link to an remote connection to the TSUN cloud. A remote + connection dies with the inverter connection. + + class methods: + class_init(): initialize the common resources of the proxy (MQTT + broker, Proxy DB, etc). Must be called before the + first inverter instance can be created + class_close(): release the common resources of the proxy. Should not + be called before any instances of the class are + destroyed + + methods: + server_loop(addr): Async loop method for receiving messages from the + inverter (server-side) + client_loop(addr): Async loop method for receiving messages from the + TSUN cloud (client-side) + async_create_remote(): Establish a client connection to the TSUN cloud + async_publ_mqtt(): Publish data to MQTT broker + close(): Release method which must be called before a instance can be + destroyed + ''' + + def __init__(self, reader, writer, addr): + super().__init__(reader, writer, addr, None, True) + self.__ha_restarts = -1 + + async def async_create_remote(self) -> None: + '''Establish a client connection to the TSUN cloud''' + tsun = Config.get('tsun') + host = tsun['host'] + port = tsun['port'] + addr = (host, port) + + try: + logging.info(f'Connected to {addr}') + connect = asyncio.open_connection(host, port) + reader, writer = await connect + self.remoteStream = ConnectionG3(reader, writer, addr, self, + False, self.id_str) + asyncio.create_task(self.client_loop(addr)) + + except (ConnectionRefusedError, TimeoutError) as error: + logging.info(f'{error}') + except Exception: + self.inc_counter('SW_Exception') + logging.error( + f"Inverter: Exception for {addr}:\n" + f"{traceback.format_exc()}") + + async def async_publ_mqtt(self) -> None: + '''publish data to MQTT broker''' + # check if new inverter or collector infos are available or when the + # home assistant has changed the status back to online + try: + if (('inverter' in self.new_data and self.new_data['inverter']) + or ('collector' in self.new_data and + self.new_data['collector']) + or self.mqtt.ha_restarts != self.__ha_restarts): + await self._register_proxy_stat_home_assistant() + await self.__register_home_assistant() + self.__ha_restarts = self.mqtt.ha_restarts + + for key in self.new_data: + await self.__async_publ_mqtt_packet(key) + for key in Infos.new_stat_data: + await self._async_publ_mqtt_proxy_stat(key) + + except MqttCodeError as error: + logging.error(f'Mqtt except: {error}') + except Exception: + self.inc_counter('SW_Exception') + logging.error( + f"Inverter: Exception:\n" + f"{traceback.format_exc()}") + + async def __async_publ_mqtt_packet(self, key): + db = self.db.db + if key in db and self.new_data[key]: + data_json = json.dumps(db[key]) + node_id = self.node_id + logger_mqtt.debug(f'{key}: {data_json}') + await self.mqtt.publish(f'{self.entity_prfx}{node_id}{key}', data_json) # noqa: E501 + self.new_data[key] = False + + async def __register_home_assistant(self) -> None: + '''register all our topics at home assistant''' + for data_json, component, node_id, id in self.db.ha_confs( + self.entity_prfx, self.node_id, self.unique_id, + False, self.sug_area): + logger_mqtt.debug(f"MQTT Register: cmp:'{component}'" + f" node_id:'{node_id}' {data_json}") + await self.mqtt.publish(f"{self.discovery_prfx}{component}" + f"/{node_id}{id}/config", data_json) + + def close(self) -> None: + logging.debug(f'InverterG3.close() l{self.l_addr} | r{self.r_addr}') + super().close() # call close handler in the parent class +# logger.debug (f'Inverter refs: {gc.get_referrers(self)}') + + def __del__(self): + logging.debug("InverterG3.__del__") + super().__del__() diff --git a/app/src/gen3/talent.py b/app/src/gen3/talent.py new file mode 100644 index 0000000..cac7ca2 --- /dev/null +++ b/app/src/gen3/talent.py @@ -0,0 +1,350 @@ +import struct +import logging +import time +from datetime import datetime + +if __name__ == "app.src.gen3.talent": + from app.src.messages import hex_dump_memory, Message + from app.src.config import Config +else: # pragma: no cover + from messages import hex_dump_memory, Message + from config import Config + +logger = logging.getLogger('msg') + + +class Control: + def __init__(self, ctrl: int): + self.ctrl = ctrl + + def __int__(self) -> int: + return self.ctrl + + def is_ind(self) -> bool: + return (self.ctrl == 0x91) + + def is_req(self) -> bool: + return (self.ctrl == 0x70) + + def is_resp(self) -> bool: + return (self.ctrl == 0x99) + + +class Talent(Message): + + def __init__(self, server_side: bool, id_str=b''): + super().__init__(server_side) + self.await_conn_resp_cnt = 0 + self.id_str = id_str + self.contact_name = b'' + self.contact_mail = b'' + self.switch = { + 0x00: self.msg_contact_info, + 0x13: self.msg_ota_update, + 0x22: self.msg_get_time, + 0x71: self.msg_collector_data, + 0x04: self.msg_inverter_data, + } + + ''' + Our puplic methods + ''' + def close(self) -> None: + logging.debug('Talent.close()') + # we have refernces to methods of this class in self.switch + # so we have to erase self.switch, otherwise this instance can't be + # deallocated by the garbage collector ==> we get a memory leak + self.switch.clear() + + def set_serial_no(self, serial_no: str): + + if self.unique_id == serial_no: + logger.debug(f'SerialNo: {serial_no}') + else: + inverters = Config.get('inverters') + # logger.debug(f'Inverters: {inverters}') + + if serial_no in inverters: + inv = inverters[serial_no] + self.node_id = inv['node_id'] + self.sug_area = inv['suggested_area'] + logger.debug(f'SerialNo {serial_no} allowed! area:{self.sug_area}') # noqa: E501 + else: + self.node_id = '' + self.sug_area = '' + if 'allow_all' not in inverters or not inverters['allow_all']: + self.inc_counter('Unknown_SNR') + self.unique_id = None + logger.warning(f'ignore message from unknow inverter! (SerialNo: {serial_no})') # noqa: E501 + return + logger.debug(f'SerialNo {serial_no} not known but accepted!') + + self.unique_id = serial_no + + def read(self) -> None: + self._read() + + if not self.header_valid: + self.__parse_header(self._recv_buffer, len(self._recv_buffer)) + + if self.header_valid and len(self._recv_buffer) >= (self.header_len + + self.data_len): + hex_dump_memory(logging.INFO, f'Received from {self.addr}:', + self._recv_buffer, self.header_len+self.data_len) + + self.set_serial_no(self.id_str.decode("utf-8")) + self.__dispatch_msg() + self.__flush_recv_msg() + return + + def forward(self, buffer, buflen) -> None: + tsun = Config.get('tsun') + if tsun['enabled']: + self._forward_buffer = buffer[:buflen] + hex_dump_memory(logging.DEBUG, 'Store for forwarding:', + buffer, buflen) + + self.__parse_header(self._forward_buffer, + len(self._forward_buffer)) + fnc = self.switch.get(self.msg_id, self.msg_unknown) + logger.info(self.__flow_str(self.server_side, 'forwrd') + + f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}') + return + + def _init_new_client_conn(self) -> bool: + contact_name = self.contact_name + contact_mail = self.contact_mail + logger.info(f'name: {contact_name} mail: {contact_mail}') + self.msg_id = 0 + self.await_conn_resp_cnt += 1 + self.__build_header(0x91) + self._send_buffer += struct.pack(f'!{len(contact_name)+1}p' + f'{len(contact_mail)+1}p', + contact_name, contact_mail) + + self.__finish_send_msg() + return True + + ''' + Our private methods + ''' + def __flow_str(self, server_side: bool, type: str): # noqa: F821 + switch = { + 'rx': ' <', + 'tx': ' >', + 'forwrd': '<< ', + 'drop': ' xx', + 'rxS': '> ', + 'txS': '< ', + 'forwrdS': ' >>', + 'dropS': 'xx ', + } + if server_side: + type += 'S' + return switch.get(type, '???') + + def _timestamp(self): # pragma: no cover + if False: + # utc as epoche + ts = time.time() + else: + # convert localtime in epoche + ts = (datetime.now() - datetime(1970, 1, 1)).total_seconds() + return round(ts*1000) + + # check if there is a complete header in the buffer, parse it + # and set + # self.header_len + # self.data_len + # self.id_str + # self.ctrl + # self.msg_id + # + # if the header is incomplete, than self.header_len is still 0 + # + def __parse_header(self, buf: bytes, buf_len: int) -> None: + + if (buf_len < 5): # enough bytes to read len and id_len? + return + result = struct.unpack_from('!lB', buf, 0) + len = result[0] # len of complete message + id_len = result[1] # len of variable id string + + hdr_len = 5+id_len+2 + + if (buf_len < hdr_len): # enough bytes for complete header? + return + + result = struct.unpack_from(f'!{id_len+1}pBB', buf, 4) + + # store parsed header values in the class + self.id_str = result[0] + self.ctrl = Control(result[1]) + self.msg_id = result[2] + self.data_len = len-id_len-3 + self.header_len = hdr_len + self.header_valid = True + return + + def __build_header(self, ctrl) -> None: + self.send_msg_ofs = len(self._send_buffer) + self._send_buffer += struct.pack(f'!l{len(self.id_str)+1}pBB', + 0, self.id_str, ctrl, self.msg_id) + fnc = self.switch.get(self.msg_id, self.msg_unknown) + logger.info(self.__flow_str(self.server_side, 'tx') + + f' Ctl: {int(ctrl):#02x} Msg: {fnc.__name__!r}') + + def __finish_send_msg(self) -> None: + _len = len(self._send_buffer) - self.send_msg_ofs + struct.pack_into('!l', self._send_buffer, self.send_msg_ofs, _len-4) + + def __dispatch_msg(self) -> None: + fnc = self.switch.get(self.msg_id, self.msg_unknown) + if self.unique_id: + logger.info(self.__flow_str(self.server_side, 'rx') + + f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}') + fnc() + else: + logger.info(self.__flow_str(self.server_side, 'drop') + + f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}') + + def __flush_recv_msg(self) -> None: + self._recv_buffer = self._recv_buffer[(self.header_len+self.data_len):] + self.header_valid = False + + ''' + Message handler methods + ''' + def msg_contact_info(self): + if self.ctrl.is_ind(): + if self.server_side and self.__process_contact_info(): + self.__build_header(0x91) + self._send_buffer += b'\x01' + self.__finish_send_msg() + # don't forward this contact info here, we will build one + # when the remote connection is established + elif self.await_conn_resp_cnt > 0: + self.await_conn_resp_cnt -= 1 + else: + self.forward(self._recv_buffer, self.header_len+self.data_len) + return + else: + logger.warning('Unknown Ctrl') + self.inc_counter('Unknown_Ctrl') + self.forward(self._recv_buffer, self.header_len+self.data_len) + + def __process_contact_info(self) -> bool: + result = struct.unpack_from('!B', self._recv_buffer, self.header_len) + name_len = result[0] + if self.data_len < name_len+2: + return False + result = struct.unpack_from(f'!{name_len+1}pB', self._recv_buffer, + self.header_len) + self.contact_name = result[0] + mail_len = result[1] + logger.info(f'name: {self.contact_name}') + + result = struct.unpack_from(f'!{mail_len+1}p', self._recv_buffer, + self.header_len+name_len+1) + self.contact_mail = result[0] + logger.info(f'mail: {self.contact_mail}') + return True + + def msg_get_time(self): + tsun = Config.get('tsun') + if tsun['enabled']: + if self.ctrl.is_ind(): + if self.data_len >= 8: + ts = self._timestamp() + result = struct.unpack_from('!q', self._recv_buffer, + self.header_len) + logger.debug(f'tsun-time: {result[0]:08x}' + f' proxy-time: {ts:08x}') + else: + logger.warning('Unknown Ctrl') + self.inc_counter('Unknown_Ctrl') + self.forward(self._recv_buffer, self.header_len+self.data_len) + else: + if self.ctrl.is_ind(): + if self.data_len == 0: + ts = self._timestamp() + logger.debug(f'time: {ts:08x}') + + self.__build_header(0x91) + self._send_buffer += struct.pack('!q', ts) + self.__finish_send_msg() + + else: + logger.warning('Unknown Ctrl') + self.inc_counter('Unknown_Ctrl') + + def parse_msg_header(self): + result = struct.unpack_from('!lB', self._recv_buffer, self.header_len) + + data_id = result[0] # len of complete message + id_len = result[1] # len of variable id string + logger.debug(f'Data_ID: {data_id} id_len: {id_len}') + + msg_hdr_len = 5+id_len+9 + + result = struct.unpack_from(f'!{id_len+1}pBq', self._recv_buffer, + self.header_len + 4) + + logger.debug(f'ID: {result[0]} B: {result[1]}') + logger.debug(f'time: {result[2]:08x}') + # logger.info(f'time: {datetime.utcfromtimestamp(result[2]).strftime( + # "%Y-%m-%d %H:%M:%S")}') + return msg_hdr_len + + def msg_collector_data(self): + if self.ctrl.is_ind(): + self.__build_header(0x99) + self._send_buffer += b'\x01' + self.__finish_send_msg() + self.__process_data() + + elif self.ctrl.is_resp(): + return # ignore received response + else: + logger.warning('Unknown Ctrl') + self.inc_counter('Unknown_Ctrl') + + self.forward(self._recv_buffer, self.header_len+self.data_len) + + def msg_inverter_data(self): + if self.ctrl.is_ind(): + self.__build_header(0x99) + self._send_buffer += b'\x01' + self.__finish_send_msg() + self.__process_data() + + elif self.ctrl.is_resp(): + return # ignore received response + else: + logger.warning('Unknown Ctrl') + self.inc_counter('Unknown_Ctrl') + + self.forward(self._recv_buffer, self.header_len+self.data_len) + + def __process_data(self): + msg_hdr_len = self.parse_msg_header() + + for key, update in self.db.parse(self._recv_buffer, self.header_len + + msg_hdr_len): + if update: + self.new_data[key] = True + + def msg_ota_update(self): + if self.ctrl.is_req(): + self.inc_counter('OTA_Start_Msg') + elif self.ctrl.is_ind(): + pass + else: + logger.warning('Unknown Ctrl') + self.inc_counter('Unknown_Ctrl') + self.forward(self._recv_buffer, self.header_len+self.data_len) + + def msg_unknown(self): + logger.warning(f"Unknow Msg: ID:{self.msg_id}") + self.inc_counter('Unknown_Msg') + self.forward(self._recv_buffer, self.header_len+self.data_len) diff --git a/app/src/gen3plus/connection_g3p.py b/app/src/gen3plus/connection_g3p.py new file mode 100644 index 0000000..a9362ce --- /dev/null +++ b/app/src/gen3plus/connection_g3p.py @@ -0,0 +1,36 @@ +import logging +# import gc +from async_stream import AsyncStream +from gen3plus.solarman_v5 import SolarmanV5 + +logger = logging.getLogger('conn') + + +class ConnectionG3P(AsyncStream, SolarmanV5): + + def __init__(self, reader, writer, addr, remote_stream, + server_side: bool) -> None: + AsyncStream.__init__(self, reader, writer, addr) + SolarmanV5.__init__(self, server_side) + + self.remoteStream = remote_stream + + ''' + Our puplic methods + ''' + def close(self): + AsyncStream.close(self) + SolarmanV5.close(self) + # logger.info(f'AsyncStream refs: {gc.get_referrers(self)}') + + async def async_create_remote(self) -> None: + pass + + async def async_publ_mqtt(self) -> None: + pass + + ''' + Our private methods + ''' + def __del__(self): + super().__del__() diff --git a/app/src/gen3plus/inverter_g3p.py b/app/src/gen3plus/inverter_g3p.py new file mode 100644 index 0000000..e8f979a --- /dev/null +++ b/app/src/gen3plus/inverter_g3p.py @@ -0,0 +1,126 @@ +import asyncio +import logging +import traceback +import json +from config import Config +from inverter import Inverter +from gen3plus.connection_g3p import ConnectionG3P +from aiomqtt import MqttCodeError +from infos import Infos + +# import gc + +# logger = logging.getLogger('conn') +logger_mqtt = logging.getLogger('mqtt') + + +class InverterG3P(Inverter, ConnectionG3P): + '''class Inverter is a derivation of an Async_Stream + + The class has some class method for managing common resources like a + connection to the MQTT broker or proxy error counter which are common + for all inverter connection + + Instances of the class are connections to an inverter and can have an + optional link to an remote connection to the TSUN cloud. A remote + connection dies with the inverter connection. + + class methods: + class_init(): initialize the common resources of the proxy (MQTT + broker, Proxy DB, etc). Must be called before the + first inverter instance can be created + class_close(): release the common resources of the proxy. Should not + be called before any instances of the class are + destroyed + + methods: + server_loop(addr): Async loop method for receiving messages from the + inverter (server-side) + client_loop(addr): Async loop method for receiving messages from the + TSUN cloud (client-side) + async_create_remote(): Establish a client connection to the TSUN cloud + async_publ_mqtt(): Publish data to MQTT broker + close(): Release method which must be called before a instance can be + destroyed + ''' + + def __init__(self, reader, writer, addr): + super().__init__(reader, writer, addr, None, True) + self.__ha_restarts = -1 + + async def async_create_remote(self) -> None: + '''Establish a client connection to the TSUN cloud''' + tsun = Config.get('solarman') + host = tsun['host'] + port = tsun['port'] + addr = (host, port) + + try: + logging.info(f'Connected to {addr}') + connect = asyncio.open_connection(host, port) + reader, writer = await connect + self.remoteStream = ConnectionG3P(reader, writer, addr, self, + False) + asyncio.create_task(self.client_loop(addr)) + + except (ConnectionRefusedError, TimeoutError) as error: + logging.info(f'{error}') + except Exception: + self.inc_counter('SW_Exception') + logging.error( + f"Inverter: Exception for {addr}:\n" + f"{traceback.format_exc()}") + + async def async_publ_mqtt(self) -> None: + '''publish data to MQTT broker''' + # check if new inverter or collector infos are available or when the + # home assistant has changed the status back to online + try: + if (('inverter' in self.new_data and self.new_data['inverter']) + or ('collector' in self.new_data and + self.new_data['collector']) + or self.mqtt.ha_restarts != self.__ha_restarts): + await self._register_proxy_stat_home_assistant() + await self.__register_home_assistant() + self.__ha_restarts = self.mqtt.ha_restarts + + for key in self.new_data: + await self.__async_publ_mqtt_packet(key) + for key in Infos.new_stat_data: + await self._async_publ_mqtt_proxy_stat(key) + + except MqttCodeError as error: + logging.error(f'Mqtt except: {error}') + except Exception: + self.inc_counter('SW_Exception') + logging.error( + f"Inverter: Exception:\n" + f"{traceback.format_exc()}") + + async def __async_publ_mqtt_packet(self, key): + db = self.db.db + if key in db and self.new_data[key]: + data_json = json.dumps(db[key]) + node_id = self.node_id + logger_mqtt.debug(f'{key}: {data_json}') + await self.mqtt.publish(f'{self.entity_prfx}{node_id}{key}', data_json) # noqa: E501 + self.new_data[key] = False + + async def __register_home_assistant(self) -> None: + '''register all our topics at home assistant''' + for data_json, component, node_id, id in self.db.ha_confs( + self.entity_prfx, self.node_id, self.unique_id, + False, self.sug_area): + logger_mqtt.debug(f"MQTT Register: cmp:'{component}'" + f" node_id:'{node_id}' {data_json}") + await self.mqtt.publish(f"{self.discovery_prfx}{component}" + f"/{node_id}{id}/config", data_json) + + def close(self) -> None: + logging.debug(f'InverterG3P.close() l{self.l_addr} | r{self.r_addr}') + super().close() # call close handler in the parent class +# logger.debug (f'Inverter refs: {gc.get_referrers(self)}') + + def __del__(self): + logging.debug("InverterG3P.__del__") + super().__del__() diff --git a/app/src/gen3plus/solarman_v5.py b/app/src/gen3plus/solarman_v5.py new file mode 100644 index 0000000..391f315 --- /dev/null +++ b/app/src/gen3plus/solarman_v5.py @@ -0,0 +1,279 @@ +import struct +import logging +# import time +from datetime import datetime + +if __name__ == "app.src.gen3plus.solarman_v5": + from app.src.messages import hex_dump_memory, Message + from app.src.config import Config +else: # pragma: no cover + from messages import hex_dump_memory, Message + from config import Config +# import traceback + +logger = logging.getLogger('msg') + + +class SolarmanV5(Message): + + def __init__(self, server_side: bool): + super().__init__(server_side) + + self.header_len = 11 # overwrite construcor in class Message + self.control = 0 + self.serial = 0 + self.snr = 0 + # self.await_conn_resp_cnt = 0 + # self.id_str = id_str + self.switch = { + 0x4110: self.msg_dev_ind, # hello + 0x1110: self.msg_dev_rsp, + 0x4210: self.msg_unknown, # data + 0x1210: self.msg_data_rsp, + 0x4310: self.msg_unknown, + 0x4710: self.msg_unknown, # heatbeat + 0x1710: self.msg_hbeat_rsp, + 0x4810: self.msg_unknown, # hello end + } + + ''' + Our puplic methods + ''' + def close(self) -> None: + logging.debug('Solarman.close()') + # we have refernces to methods of this class in self.switch + # so we have to erase self.switch, otherwise this instance can't be + # deallocated by the garbage collector ==> we get a memory leak + self.switch.clear() + + def set_serial_no(self, snr: int): + serial_no = str(snr) + if self.unique_id == serial_no: + logger.debug(f'SerialNo: {serial_no}') + else: + found = False + inverters = Config.get('inverters') + # logger.debug(f'Inverters: {inverters}') + + for key, inv in inverters.items(): + # logger.debug(f'key: {key} -> {inv}') + if (type(inv) is dict and 'monitor_sn' in inv + and inv['monitor_sn'] == snr): + found = True + self.node_id = inv['node_id'] + self.sug_area = inv['suggested_area'] + logger.debug(f'SerialNo {serial_no} allowed! area:{self.sug_area}') # noqa: E501 + + if not found: + self.node_id = '' + self.sug_area = '' + if 'allow_all' not in inverters or not inverters['allow_all']: + self.inc_counter('Unknown_SNR') + self.unique_id = None + logger.warning(f'ignore message from unknow inverter! (SerialNo: {serial_no})') # noqa: E501 + return + logger.debug(f'SerialNo {serial_no} not known but accepted!') + + self.unique_id = serial_no + + def read(self) -> None: + self._read() + + if not self.header_valid: + self.__parse_header(self._recv_buffer, len(self._recv_buffer)) + + if self.header_valid and len(self._recv_buffer) >= (self.header_len + + self.data_len+2): + hex_dump_memory(logging.INFO, f'Received from {self.addr}:', + self._recv_buffer, self.header_len+self.data_len+2) + if self.__trailer_is_ok(self._recv_buffer, self.header_len + + self.data_len + 2): + self.set_serial_no(self.snr) + self.__dispatch_msg() + self.__flush_recv_msg() + return + + def forward(self, buffer, buflen) -> None: + tsun = Config.get('solarman') + if tsun['enabled']: + self._forward_buffer = buffer[:buflen] + hex_dump_memory(logging.DEBUG, 'Store for forwarding:', + buffer, buflen) + + self.__parse_header(self._forward_buffer, + len(self._forward_buffer)) + fnc = self.switch.get(self.control, self.msg_unknown) + logger.info(self.__flow_str(self.server_side, 'forwrd') + + f' Ctl: {int(self.control):#04x}' + f' Msg: {fnc.__name__!r}') + return + + def _init_new_client_conn(self) -> bool: + # self.__build_header(0x91) + # self._send_buffer += struct.pack(f'!{len(contact_name)+1}p' + # f'{len(contact_mail)+1}p', + # contact_name, contact_mail) + + # self.__finish_send_msg() + return False + + ''' + Our private methods + ''' + def __flow_str(self, server_side: bool, type: str): # noqa: F821 + switch = { + 'rx': ' <', + 'tx': ' >', + 'forwrd': '<< ', + 'drop': ' xx', + 'rxS': '> ', + 'txS': '< ', + 'forwrdS': ' >>', + 'dropS': 'xx ', + } + if server_side: + type += 'S' + return switch.get(type, '???') + + def __parse_header(self, buf: bytes, buf_len: int) -> None: + + if (buf_len < self.header_len): # enough bytes for complete header? + return + + result = struct.unpack_from(' bool: + crc = buf[self.data_len+11] + stop = buf[self.data_len+12] + if stop != 0x15: + return False + check = sum(buf[1:buf_len-2]) & 0xff + if check != crc: + logger.debug(f'CRC {int(crc):#02x} {int(check):#08x}' + f' Stop:{int(stop):#02x}') + return False + + return True + + def __dispatch_msg(self) -> None: + fnc = self.switch.get(self.control, self.msg_unknown) + if self.unique_id: + logger.info(self.__flow_str(self.server_side, 'rx') + + f' Ctl: {int(self.control):#04x}' + + f' Msg: {fnc.__name__!r}') + fnc() + else: + logger.info(self.__flow_str(self.server_side, 'drop') + + f' Ctl: {int(self.control):#04x}' + + f' Msg: {fnc.__name__!r}') + + def __flush_recv_msg(self) -> None: + self._recv_buffer = self._recv_buffer[(self.header_len + + self.data_len+2):] + self.header_valid = False + ''' + def modbus(self, data): + POLY = 0xA001 + + crc = 0xFFFF + for byte in data: + crc ^= byte + for _ in range(8): + crc = ((crc >> 1) ^ POLY + if (crc & 0x0001) + else crc >> 1) + return crc + + def validate_modbus_crc(self, frame): + # Calculate crc with all but the last 2 bytes of + # the frame (they contain the crc) + calc_crc = 0xFFFF + for pos in frame[:-2]: + calc_crc ^= pos + for i in range(8): + if (calc_crc & 1) != 0: + calc_crc >>= 1 + calc_crc ^= 0xA001 # bitwise 'or' with modbus magic + # number (0xa001 == bitwise + # reverse of 0x8005) + else: + calc_crc >>= 1 + + # Compare calculated crc with the one supplied in the frame.... + frame_crc, = struct.unpack(' None: + def parse(self, buf, ind=0): # -> None | tuple[str,bool]: '''parse a data sequence received from the inverter and stores the values in Infos.db diff --git a/app/src/inverter.py b/app/src/inverter.py index e96dcbe..1053bd5 100644 --- a/app/src/inverter.py +++ b/app/src/inverter.py @@ -1,48 +1,15 @@ import asyncio import logging -import traceback import json from config import Config -from async_stream import AsyncStream from mqtt import Mqtt -from aiomqtt import MqttCodeError from infos import Infos -# import gc - # logger = logging.getLogger('conn') logger_mqtt = logging.getLogger('mqtt') -class Inverter(AsyncStream): - '''class Inverter is a derivation of an Async_Stream - - The class has some class method for managing common resources like a - connection to the MQTT broker or proxy error counter which are common - for all inverter connection - - Instances of the class are connections to an inverter and can have an - optional link to an remote connection to the TSUN cloud. A remote - connection dies with the inverter connection. - - class methods: - class_init(): initialize the common resources of the proxy (MQTT - broker, Proxy DB, etc). Must be called before the - first inverter instance can be created - class_close(): release the common resources of the proxy. Should not - be called before any instances of the class are - destroyed - - methods: - server_loop(addr): Async loop method for receiving messages from the - inverter (server-side) - client_loop(addr): Async loop method for receiving messages from the - TSUN cloud (client-side) - async_create_remote(): Establish a client connection to the TSUN cloud - async_publ_mqtt(): Publish data to MQTT broker - close(): Release method which must be called before a instance can be - destroyed - ''' +class Inverter(): @classmethod def class_init(cls) -> None: logging.debug('Inverter.class_init') @@ -57,21 +24,21 @@ class Inverter(AsyncStream): cls.proxy_unique_id = ha['proxy_unique_id'] # call Mqtt singleton to establisch the connection to the mqtt broker - cls.mqtt = Mqtt(cls.__cb_mqtt_is_up) + cls.mqtt = Mqtt(cls._cb_mqtt_is_up) @classmethod - async def __cb_mqtt_is_up(cls) -> None: + async def _cb_mqtt_is_up(cls) -> None: logging.info('Initialize proxy device on home assistant') # register proxy status counters at home assistant - await cls.__register_proxy_stat_home_assistant() + await cls._register_proxy_stat_home_assistant() # send values of the proxy status counters await asyncio.sleep(0.5) # wait a bit, before sending data - cls.new_stat_data['proxy'] = True # force sending data to sync ha - await cls.__async_publ_mqtt_proxy_stat('proxy') + Infos.new_stat_data['proxy'] = True # force sending data to sync ha + await cls._async_publ_mqtt_proxy_stat('proxy') @classmethod - async def __register_proxy_stat_home_assistant(cls) -> None: + async def _register_proxy_stat_home_assistant(cls) -> None: '''register all our topics at home assistant''' for data_json, component, node_id, id in cls.db_stat.ha_confs( cls.entity_prfx, cls.proxy_node_id, @@ -80,15 +47,15 @@ class Inverter(AsyncStream): await cls.mqtt.publish(f'{cls.discovery_prfx}{component}/{node_id}{id}/config', data_json) # noqa: E501 @classmethod - async def __async_publ_mqtt_proxy_stat(cls, key) -> None: + async def _async_publ_mqtt_proxy_stat(cls, key) -> None: stat = Infos.stat - if key in stat and cls.new_stat_data[key]: + if key in stat and Infos.new_stat_data[key]: data_json = json.dumps(stat[key]) node_id = cls.proxy_node_id logger_mqtt.debug(f'{key}: {data_json}') await cls.mqtt.publish(f"{cls.entity_prfx}{node_id}{key}", data_json) - cls.new_stat_data[key] = False + Infos.new_stat_data[key] = False @classmethod def class_close(cls, loop) -> None: @@ -96,121 +63,3 @@ class Inverter(AsyncStream): logging.info('Close MQTT Task') loop.run_until_complete(cls.mqtt.close()) cls.mqtt = None - - def __init__(self, reader, writer, addr): - super().__init__(reader, writer, addr, None, True) - self.ha_restarts = -1 - - async def server_loop(self, addr): - '''Loop for receiving messages from the inverter (server-side)''' - logging.info(f'Accept connection from {addr}') - self.inc_counter('Inverter_Cnt') - await self.loop() - self.dec_counter('Inverter_Cnt') - logging.info(f'Server loop stopped for r{self.r_addr}') - - # if the server connection closes, we also have to disconnect - # the connection to te TSUN cloud - if self.remoteStream: - logging.debug("disconnect client connection") - self.remoteStream.disc() - try: - await self.__async_publ_mqtt_proxy_stat('proxy') - except Exception: - pass - - async def client_loop(self, addr): - '''Loop for receiving messages from the TSUN cloud (client-side)''' - clientStream = await self.remoteStream.loop() - logging.info(f'Client loop stopped for l{clientStream.l_addr}') - - # if the client connection closes, we don't touch the server - # connection. Instead we erase the client connection stream, - # thus on the next received packet from the inverter, we can - # establish a new connection to the TSUN cloud - - # erase backlink to inverter - clientStream.remoteStream = None - - if self.remoteStream == clientStream: - # logging.debug(f'Client l{clientStream.l_addr} refs:' - # f' {gc.get_referrers(clientStream)}') - # than erase client connection - self.remoteStream = None - - async def async_create_remote(self) -> None: - '''Establish a client connection to the TSUN cloud''' - tsun = Config.get('tsun') - host = tsun['host'] - port = tsun['port'] - addr = (host, port) - - try: - logging.info(f'Connected to {addr}') - connect = asyncio.open_connection(host, port) - reader, writer = await connect - self.remoteStream = AsyncStream(reader, writer, addr, self, - False, self.id_str) - asyncio.create_task(self.client_loop(addr)) - - except ConnectionRefusedError as error: - logging.info(f'{error}') - except Exception: - self.inc_counter('SW_Exception') - logging.error( - f"Inverter: Exception for {addr}:\n" - f"{traceback.format_exc()}") - - async def async_publ_mqtt(self) -> None: - '''publish data to MQTT broker''' - # check if new inverter or collector infos are available or when the - # home assistant has changed the status back to online - try: - if (('inverter' in self.new_data and self.new_data['inverter']) - or ('collector' in self.new_data and - self.new_data['collector']) - or self.mqtt.ha_restarts != self.ha_restarts): - await self.__register_proxy_stat_home_assistant() - await self.__register_home_assistant() - self.ha_restarts = self.mqtt.ha_restarts - - for key in self.new_data: - await self.__async_publ_mqtt_packet(key) - for key in self.new_stat_data: - await self.__async_publ_mqtt_proxy_stat(key) - - except MqttCodeError as error: - logging.error(f'Mqtt except: {error}') - except Exception: - self.inc_counter('SW_Exception') - logging.error( - f"Inverter: Exception:\n" - f"{traceback.format_exc()}") - - async def __async_publ_mqtt_packet(self, key): - db = self.db.db - if key in db and self.new_data[key]: - data_json = json.dumps(db[key]) - node_id = self.node_id - logger_mqtt.debug(f'{key}: {data_json}') - await self.mqtt.publish(f'{self.entity_prfx}{node_id}{key}', data_json) # noqa: E501 - self.new_data[key] = False - - async def __register_home_assistant(self) -> None: - '''register all our topics at home assistant''' - for data_json, component, node_id, id in self.db.ha_confs( - self.entity_prfx, self.node_id, self.unique_id, - False, self.sug_area): - logger_mqtt.debug(f"MQTT Register: cmp:'{component}'" - f" node_id:'{node_id}' {data_json}") - await self.mqtt.publish(f"{self.discovery_prfx}{component}" - f"/{node_id}{id}/config", data_json) - - def close(self) -> None: - logging.debug(f'Inverter.close() l{self.l_addr} | r{self.r_addr}') - super().close() # call close handler in the parent class -# logger.debug (f'Inverter refs: {gc.get_referrers(self)}') - - def __del__(self): - logging.debug("Inverter.__del__") - super().__del__() diff --git a/app/src/messages.py b/app/src/messages.py index 82e702f..f348446 100644 --- a/app/src/messages.py +++ b/app/src/messages.py @@ -1,15 +1,10 @@ -import struct import logging -import time -from datetime import datetime import weakref if __name__ == "app.src.messages": from app.src.infos import Infos - from app.src.config import Config else: # pragma: no cover from infos import Infos - from config import Config logger = logging.getLogger('msg') @@ -45,23 +40,6 @@ def hex_dump_memory(level, info, data, num): tracer.log(level, '\n'.join(lines)) -class Control: - def __init__(self, ctrl: int): - self.ctrl = ctrl - - def __int__(self) -> int: - return self.ctrl - - def is_ind(self) -> bool: - return (self.ctrl == 0x91) - - def is_req(self) -> bool: - return (self.ctrl == 0x70) - - def is_resp(self) -> bool: - return (self.ctrl == 0x99) - - class IterRegistry(type): def __iter__(cls): for ref in cls._registry: @@ -72,10 +50,10 @@ class IterRegistry(type): class Message(metaclass=IterRegistry): _registry = [] - new_stat_data = {} - def __init__(self, server_side: bool, id_str=b''): + def __init__(self, server_side: bool): self._registry.append(weakref.ref(self)) + self.server_side = server_side self.header_valid = False self.header_len = 0 @@ -83,22 +61,11 @@ class Message(metaclass=IterRegistry): self.unique_id = 0 self.node_id = '' self.sug_area = '' - self.await_conn_resp_cnt = 0 - self.id_str = id_str - self.contact_name = b'' - self.contact_mail = b'' self._recv_buffer = bytearray(0) self._send_buffer = bytearray(0) self._forward_buffer = bytearray(0) self.db = Infos() self.new_data = {} - self.switch = { - 0x00: self.msg_contact_info, - 0x13: self.msg_ota_update, - 0x22: self.msg_get_time, - 0x71: self.msg_collector_data, - 0x04: self.msg_inverter_data, - } ''' Empty methods, that have to be implemented in any child class which @@ -112,306 +79,12 @@ class Message(metaclass=IterRegistry): Our puplic methods ''' def close(self) -> None: - # we have refernces to methods of this class in self.switch - # so we have to erase self.switch, otherwise this instance can't be - # deallocated by the garbage collector ==> we get a memory leak - self.switch.clear() + pass # pragma: no cover def inc_counter(self, counter: str) -> None: self.db.inc_counter(counter) - self.new_stat_data['proxy'] = True + Infos.new_stat_data['proxy'] = True def dec_counter(self, counter: str) -> None: self.db.dec_counter(counter) - self.new_stat_data['proxy'] = True - - def set_serial_no(self, serial_no: str): - - if self.unique_id == serial_no: - logger.debug(f'SerialNo: {serial_no}') - else: - inverters = Config.get('inverters') - # logger.debug(f'Inverters: {inverters}') - - if serial_no in inverters: - inv = inverters[serial_no] - self.node_id = inv['node_id'] - self.sug_area = inv['suggested_area'] - logger.debug(f'SerialNo {serial_no} allowed! area:{self.sug_area}') # noqa: E501 - else: - self.node_id = '' - self.sug_area = '' - if 'allow_all' not in inverters or not inverters['allow_all']: - self.inc_counter('Unknown_SNR') - self.unique_id = None - logger.warning(f'ignore message from unknow inverter! (SerialNo: {serial_no})') # noqa: E501 - return - logger.debug(f'SerialNo {serial_no} not known but accepted!') - - self.unique_id = serial_no - - def read(self) -> None: - self._read() - - if not self.header_valid: - self.__parse_header(self._recv_buffer, len(self._recv_buffer)) - - if self.header_valid and len(self._recv_buffer) >= (self.header_len + - self.data_len): - hex_dump_memory(logging.INFO, f'Received from {self.addr}:', - self._recv_buffer, self.header_len+self.data_len) - - self.set_serial_no(self.id_str.decode("utf-8")) - self.__dispatch_msg() - self.__flush_recv_msg() - return - - def forward(self, buffer, buflen) -> None: - tsun = Config.get('tsun') - if tsun['enabled']: - self._forward_buffer = buffer[:buflen] - hex_dump_memory(logging.DEBUG, 'Store for forwarding:', - buffer, buflen) - - self.__parse_header(self._forward_buffer, - len(self._forward_buffer)) - fnc = self.switch.get(self.msg_id, self.msg_unknown) - logger.info(self.__flow_str(self.server_side, 'forwrd') + - f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}') - return - - def _init_new_client_conn(self, contact_name, contact_mail) -> None: - logger.info(f'name: {contact_name} mail: {contact_mail}') - self.msg_id = 0 - self.await_conn_resp_cnt += 1 - self.__build_header(0x91) - self._send_buffer += struct.pack(f'!{len(contact_name)+1}p' - f'{len(contact_mail)+1}p', - contact_name, contact_mail) - - self.__finish_send_msg() - - ''' - Our private methods - ''' - def __flow_str(self, server_side: bool, type: - ('rx', 'tx', 'forwrd', 'drop')): # noqa: F821 - switch = { - 'rx': ' <', - 'tx': ' >', - 'forwrd': '<< ', - 'drop': ' xx', - 'rxS': '> ', - 'txS': '< ', - 'forwrdS': ' >>', - 'dropS': 'xx ', - } - if server_side: - type += 'S' - return switch.get(type, '???') - - def _timestamp(self): # pragma: no cover - if False: - # utc as epoche - ts = time.time() - else: - # convert localtime in epoche - ts = (datetime.now() - datetime(1970, 1, 1)).total_seconds() - return round(ts*1000) - - # check if there is a complete header in the buffer, parse it - # and set - # self.header_len - # self.data_len - # self.id_str - # self.ctrl - # self.msg_id - # - # if the header is incomplete, than self.header_len is still 0 - # - def __parse_header(self, buf: bytes, buf_len: int) -> None: - - if (buf_len < 5): # enough bytes to read len and id_len? - return - result = struct.unpack_from('!lB', buf, 0) - len = result[0] # len of complete message - id_len = result[1] # len of variable id string - - hdr_len = 5+id_len+2 - - if (buf_len < hdr_len): # enough bytes for complete header? - return - - result = struct.unpack_from(f'!{id_len+1}pBB', buf, 4) - - # store parsed header values in the class - self.id_str = result[0] - self.ctrl = Control(result[1]) - self.msg_id = result[2] - self.data_len = len-id_len-3 - self.header_len = hdr_len - self.header_valid = True - return - - def __build_header(self, ctrl) -> None: - self.send_msg_ofs = len(self._send_buffer) - self._send_buffer += struct.pack(f'!l{len(self.id_str)+1}pBB', - 0, self.id_str, ctrl, self.msg_id) - fnc = self.switch.get(self.msg_id, self.msg_unknown) - logger.info(self.__flow_str(self.server_side, 'tx') + - f' Ctl: {int(ctrl):#02x} Msg: {fnc.__name__!r}') - - def __finish_send_msg(self) -> None: - _len = len(self._send_buffer) - self.send_msg_ofs - struct.pack_into('!l', self._send_buffer, self.send_msg_ofs, _len-4) - - def __dispatch_msg(self) -> None: - fnc = self.switch.get(self.msg_id, self.msg_unknown) - if self.unique_id: - logger.info(self.__flow_str(self.server_side, 'rx') + - f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}') - fnc() - else: - logger.info(self.__flow_str(self.server_side, 'drop') + - f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}') - - def __flush_recv_msg(self) -> None: - self._recv_buffer = self._recv_buffer[(self.header_len+self.data_len):] - self.header_valid = False - - ''' - Message handler methods - ''' - def msg_contact_info(self): - if self.ctrl.is_ind(): - if self.server_side and self.__process_contact_info(): - self.__build_header(0x91) - self._send_buffer += b'\x01' - self.__finish_send_msg() - # don't forward this contact info here, we will build one - # when the remote connection is established - elif self.await_conn_resp_cnt > 0: - self.await_conn_resp_cnt -= 1 - else: - self.forward(self._recv_buffer, self.header_len+self.data_len) - return - else: - logger.warning('Unknown Ctrl') - self.inc_counter('Unknown_Ctrl') - self.forward(self._recv_buffer, self.header_len+self.data_len) - - def __process_contact_info(self) -> bool: - result = struct.unpack_from('!B', self._recv_buffer, self.header_len) - name_len = result[0] - if self.data_len < name_len+2: - return False - result = struct.unpack_from(f'!{name_len+1}pB', self._recv_buffer, - self.header_len) - self.contact_name = result[0] - mail_len = result[1] - logger.info(f'name: {self.contact_name}') - - result = struct.unpack_from(f'!{mail_len+1}p', self._recv_buffer, - self.header_len+name_len+1) - self.contact_mail = result[0] - logger.info(f'mail: {self.contact_mail}') - return True - - def msg_get_time(self): - tsun = Config.get('tsun') - if tsun['enabled']: - if self.ctrl.is_ind(): - if self.data_len >= 8: - ts = self._timestamp() - result = struct.unpack_from('!q', self._recv_buffer, - self.header_len) - logger.debug(f'tsun-time: {result[0]:08x}' - f' proxy-time: {ts:08x}') - else: - logger.warning('Unknown Ctrl') - self.inc_counter('Unknown_Ctrl') - self.forward(self._recv_buffer, self.header_len+self.data_len) - else: - if self.ctrl.is_ind(): - if self.data_len == 0: - ts = self._timestamp() - logger.debug(f'time: {ts:08x}') - - self.__build_header(0x91) - self._send_buffer += struct.pack('!q', ts) - self.__finish_send_msg() - - else: - logger.warning('Unknown Ctrl') - self.inc_counter('Unknown_Ctrl') - - def parse_msg_header(self): - result = struct.unpack_from('!lB', self._recv_buffer, self.header_len) - - data_id = result[0] # len of complete message - id_len = result[1] # len of variable id string - logger.debug(f'Data_ID: {data_id} id_len: {id_len}') - - msg_hdr_len = 5+id_len+9 - - result = struct.unpack_from(f'!{id_len+1}pBq', self._recv_buffer, - self.header_len + 4) - - logger.debug(f'ID: {result[0]} B: {result[1]}') - logger.debug(f'time: {result[2]:08x}') - # logger.info(f'time: {datetime.utcfromtimestamp(result[2]).strftime( - # "%Y-%m-%d %H:%M:%S")}') - return msg_hdr_len - - def msg_collector_data(self): - if self.ctrl.is_ind(): - self.__build_header(0x99) - self._send_buffer += b'\x01' - self.__finish_send_msg() - self.__process_data() - - elif self.ctrl.is_resp(): - return # ignore received response - else: - logger.warning('Unknown Ctrl') - self.inc_counter('Unknown_Ctrl') - - self.forward(self._recv_buffer, self.header_len+self.data_len) - - def msg_inverter_data(self): - if self.ctrl.is_ind(): - self.__build_header(0x99) - self._send_buffer += b'\x01' - self.__finish_send_msg() - self.__process_data() - - elif self.ctrl.is_resp(): - return # ignore received response - else: - logger.warning('Unknown Ctrl') - self.inc_counter('Unknown_Ctrl') - - self.forward(self._recv_buffer, self.header_len+self.data_len) - - def __process_data(self): - msg_hdr_len = self.parse_msg_header() - - for key, update in self.db.parse(self._recv_buffer, self.header_len - + msg_hdr_len): - if update: - self.new_data[key] = True - - def msg_ota_update(self): - if self.ctrl.is_req(): - self.inc_counter('OTA_Start_Msg') - elif self.ctrl.is_ind(): - pass - else: - logger.warning('Unknown Ctrl') - self.inc_counter('Unknown_Ctrl') - self.forward(self._recv_buffer, self.header_len+self.data_len) - - def msg_unknown(self): - logger.warning(f"Unknow Msg: ID:{self.msg_id}") - self.inc_counter('Unknown_Msg') - self.forward(self._recv_buffer, self.header_len+self.data_len) + Infos.new_stat_data['proxy'] = True diff --git a/app/src/mqtt.py b/app/src/mqtt.py index 1bf8bb7..413daf4 100644 --- a/app/src/mqtt.py +++ b/app/src/mqtt.py @@ -18,8 +18,8 @@ class Singleton(type): class Mqtt(metaclass=Singleton): - client = None - cb_MqttIsUp = None + __client = None + __cb_MqttIsUp = None def __init__(self, cb_MqttIsUp): logger_mqtt.debug('MQTT: __init__') @@ -50,8 +50,8 @@ class Mqtt(metaclass=Singleton): async def publish(self, topic: str, payload: str | bytes | bytearray | int | float | None = None) -> None: - if self.client: - await self.client.publish(topic, payload) + if self.__client: + await self.__client.publish(topic, payload) async def __loop(self) -> None: mqtt = Config.get('mqtt') @@ -59,22 +59,24 @@ class Mqtt(metaclass=Singleton): logger_mqtt.info(f'start MQTT: host:{mqtt["host"]} port:' f'{mqtt["port"]} ' f'user:{mqtt["user"]}') - self.client = aiomqtt.Client(hostname=mqtt['host'], port=mqtt['port'], - username=mqtt['user'], - password=mqtt['passwd']) + self.__client = aiomqtt.Client(hostname=mqtt['host'], + port=mqtt['port'], + username=mqtt['user'], + password=mqtt['passwd']) interval = 5 # Seconds while True: try: - async with self.client: + async with self.__client: logger_mqtt.info('MQTT broker connection established') if self.cb_MqttIsUp: await self.cb_MqttIsUp() - async with self.client.messages() as messages: - await self.client.subscribe(f"{ha['auto_conf_prefix']}" - "/status") + async with self.__client.messages() as messages: + await self.__client.subscribe( + f"{ha['auto_conf_prefix']}" + "/status") async for message in messages: status = message.payload.decode("UTF-8") logger_mqtt.info('Home-Assistant Status:' @@ -89,5 +91,5 @@ class Mqtt(metaclass=Singleton): await asyncio.sleep(interval) except asyncio.CancelledError: logger_mqtt.debug("MQTT task cancelled") - self.client = None + self.__client = None return diff --git a/app/src/server.py b/app/src/server.py index 1bbbcd1..cefbde5 100644 --- a/app/src/server.py +++ b/app/src/server.py @@ -4,8 +4,10 @@ import signal import functools import os from logging import config # noqa F401 -from async_stream import AsyncStream +from messages import Message from inverter import Inverter +from gen3.inverter_g3 import InverterG3 +from gen3plus.inverter_g3p import InverterG3P from config import Config @@ -13,7 +15,14 @@ async def handle_client(reader, writer): '''Handles a new incoming connection and starts an async loop''' addr = writer.get_extra_info('peername') - await Inverter(reader, writer, addr).server_loop(addr) + await InverterG3(reader, writer, addr).server_loop(addr) + + +async def handle_client_v2(reader, writer): + '''Handles a new incoming connection and starts an async loop''' + + addr = writer.get_extra_info('peername') + await InverterG3P(reader, writer, addr).server_loop(addr) def handle_SIGTERM(loop): @@ -24,7 +33,7 @@ def handle_SIGTERM(loop): # # first, close all open TCP connections # - for stream in AsyncStream: + for stream in Message: stream.close() # @@ -81,11 +90,12 @@ if __name__ == "__main__": functools.partial(handle_SIGTERM, loop)) # - # Create a task for our listening server. This must be a task! If we call + # Create taska for our listening servera. These must be tasks! If we call # start_server directly out of our main task, the eventloop will be blocked # and we can't receive and handle the UNIX signals! # loop.create_task(asyncio.start_server(handle_client, '0.0.0.0', 5005)) + loop.create_task(asyncio.start_server(handle_client_v2, '0.0.0.0', 10000)) try: loop.run_forever() diff --git a/app/tests/test_messages.py b/app/tests/test_messages.py index 582d562..0b75dbe 100644 --- a/app/tests/test_messages.py +++ b/app/tests/test_messages.py @@ -1,6 +1,6 @@ # test_with_pytest.py import pytest, logging -from app.src.messages import Message, Control +from app.src.gen3.talent import Talent, Control from app.src.config import Config from app.src.infos import Infos @@ -9,7 +9,7 @@ Infos.static_init() tracer = logging.getLogger('tracer') -class MemoryStream(Message): +class MemoryStream(Talent): def __init__(self, msg, chunks = (0,), server_side: bool = True): super().__init__(server_side) self.__msg = msg @@ -45,8 +45,8 @@ class MemoryStream(Message): def _timestamp(self): return 1700260990000 - def _Message__flush_recv_msg(self) -> None: - super()._Message__flush_recv_msg() + def _Talent__flush_recv_msg(self) -> None: + super()._Talent__flush_recv_msg() self.msg_count += 1 return @@ -290,8 +290,10 @@ def test_read_two_messages(ConfigTsunAllowAll, Msg2ContactInfo,MsgContactResp,Ms assert m._send_buffer==MsgContactResp assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 - m._send_buffer = bytearray(0) # clear send buffer for next test - m._init_new_client_conn(b'solarhub', b'solarhub@123456') + m._send_buffer = bytearray(0) # clear send buffer for next test + m.contact_name = b'solarhub' + m.contact_mail = b'solarhub@123456' + m._init_new_client_conn() assert m._send_buffer==b'\x00\x00\x00,\x10R170000000000001\x91\x00\x08solarhub\x0fsolarhub@123456' m._send_buffer = bytearray(0) # clear send buffer for next test @@ -308,8 +310,10 @@ def test_read_two_messages(ConfigTsunAllowAll, Msg2ContactInfo,MsgContactResp,Ms assert m._send_buffer==MsgContactResp2 assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 - m._send_buffer = bytearray(0) # clear send buffer for next test - m._init_new_client_conn(b'solarhub', b'solarhub@123456') + m._send_buffer = bytearray(0) # clear send buffer for next test + m.contact_name = b'solarhub' + m.contact_mail = b'solarhub@123456' + m._init_new_client_conn() assert m._send_buffer==b'\x00\x00\x00,\x10R170000000000002\x91\x00\x08solarhub\x0fsolarhub@123456' m.close() @@ -700,14 +704,14 @@ def test_ctrl_byte(): def test_msg_iterator(): - m1 = Message(server_side=True) - m2 = Message(server_side=True) - m3 = Message(server_side=True) + m1 = Talent(server_side=True) + m2 = Talent(server_side=True) + m3 = Talent(server_side=True) m3.close() del m3 test1 = 0 test2 = 0 - for key in Message: + for key in Talent: if key == m1: test1+=1 elif key == m2: @@ -718,19 +722,19 @@ def test_msg_iterator(): assert test2 == 1 def test_proxy_counter(): - m = Message(server_side=True) + m = Talent(server_side=True) assert m.new_data == {} m.db.stat['proxy']['Unknown_Msg'] = 0 - m.new_stat_data['proxy'] = False + Infos.new_stat_data['proxy'] = False m.inc_counter('Unknown_Msg') assert m.new_data == {} - assert m.new_stat_data == {'proxy': True} + assert Infos.new_stat_data == {'proxy': True} assert 1 == m.db.stat['proxy']['Unknown_Msg'] - m.new_stat_data['proxy'] = False + Infos.new_stat_data['proxy'] = False m.dec_counter('Unknown_Msg') assert m.new_data == {} - assert m.new_stat_data == {'proxy': True} + assert Infos.new_stat_data == {'proxy': True} assert 0 == m.db.stat['proxy']['Unknown_Msg'] m.close() diff --git a/app/tests/test_solarman.py b/app/tests/test_solarman.py new file mode 100644 index 0000000..af4931e --- /dev/null +++ b/app/tests/test_solarman.py @@ -0,0 +1,94 @@ +import pytest, json +from app.src.gen3plus.solarman_v5 import SolarmanV5 +from app.src.infos import Infos + +# initialize the proxy statistics +Infos.static_init() + +class MemoryStream(SolarmanV5): + def __init__(self, msg, chunks = (0,), server_side: bool = True): + super().__init__(server_side) + self.__msg = msg + self.__msg_len = len(msg) + self.__chunks = chunks + self.__offs = 0 + self.__chunk_idx = 0 + self.msg_count = 0 + self.addr = 'Test: SrvSide' + + def append_msg(self, msg): + self.__msg += msg + self.__msg_len += len(msg) + + def _read(self) -> int: + copied_bytes = 0 + try: + if (self.__offs < self.__msg_len): + len = self.__chunks[self.__chunk_idx] + self.__chunk_idx += 1 + if len!=0: + self._recv_buffer += self.__msg[self.__offs:len] + copied_bytes = len - self.__offs + self.__offs = len + else: + self._recv_buffer += self.__msg[self.__offs:] + copied_bytes = self.__msg_len - self.__offs + self.__offs = self.__msg_len + except: + pass + return copied_bytes + + def _timestamp(self): + return 1700260990000 + + def _SolarmanV5__flush_recv_msg(self) -> None: + super()._SolarmanV5__flush_recv_msg() + self.msg_count += 1 + return + + +def get_sn() -> bytes: + return b'\xc8\x1e\x4d\x7b' + +def get_inv_no() -> bytes: + return b'T170000000000001' + +def get_invalid_sn(): + return b'R170000000000002' + + +@pytest.fixture +def TestMsg(): # Contact Info message + msg = b'\xa5\xd4\x00\x10\x41\x00\x01' +get_sn() +b'\x02\xba\xd2\x00\x00' + msg += b'\x19\x00\x00\x00\x00\x00\x00\x00\x05\x3c\x78\x01\x64\x01\x4c\x53' + msg += b'\x57\x35\x42\x4c\x45\x5f\x31\x37\x5f\x30\x32\x42\x30\x5f\x31\x2e' + msg += b'\x30\x35\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x40\x2a\x8f\x4f\x51\x54\x31\x39\x32\x2e' + msg += b'\x31\x36\x38\x2e\x38\x30\x2e\x34\x39\x00\x00\x00\x0f\x00\x01\xb0' + msg += b'\x02\x0f\x00\xff\x56\x31\x2e\x31\x2e\x30\x30\x2e\x30\x42\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xfe\xfe\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x41\x6c\x6c\x69\x75\x73\x2d\x48\x6f' + msg += b'\x6d\x65\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x3c' + msg += b'\x15' + return msg + +def test_read_message(TestMsg): + m = MemoryStream(TestMsg, (0,)) + m.read() # read complete msg, and dispatch msg + assert not m.header_valid # must be invalid, since msg was handled and buffer flushed + assert m.msg_count == 1 + # assert m.id_str == b"R170000000000001" + assert m.snr == 2068651720 + # assert m.unique_id == None + # assert int(m.ctrl)==145 + # assert m.msg_id==0 + assert m.control == 0x4110 + assert m.serial == 0x0100 + assert m.data_len == 0xd4 + assert m._forward_buffer==b'' + m.close() + diff --git a/docker-compose.yaml b/docker-compose.yaml index da08278..de3e5e5 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -80,6 +80,7 @@ services: - $(DNS2:-4.4.4.4} ports: - 5005:5005 + - 10000:10000 volumes: - ${PROJECT_DIR}./tsun-proxy/log:/home/tsun-proxy/log - ${PROJECT_DIR}./tsun-proxy/config:/home/tsun-proxy/config diff --git a/system_tests/test_tcp_socket.py b/system_tests/test_tcp_socket.py index f559c00..606ea68 100644 --- a/system_tests/test_tcp_socket.py +++ b/system_tests/test_tcp_socket.py @@ -128,6 +128,7 @@ def ClientConnection(): s.connect((host, port)) s.settimeout(1) yield s + time.sleep(2.5) s.close() def tempClientConnection(): diff --git a/system_tests/test_tcp_socket_v2.py b/system_tests/test_tcp_socket_v2.py new file mode 100644 index 0000000..ec26476 --- /dev/null +++ b/system_tests/test_tcp_socket_v2.py @@ -0,0 +1,101 @@ +# test_with_pytest.py and scapy +# +import pytest, socket, time, os +from dotenv import load_dotenv + +#from scapy.all import * +#from scapy.layers.inet import IP, TCP, TCP_client + +load_dotenv() + +SOLARMAN_SNR = os.getenv('SOLARMAN_SNR', '00000080') + +def get_sn() -> bytes: + return bytes.fromhex(SOLARMAN_SNR) + +def get_inv_no() -> bytes: + return b'T170000000000001' + +def get_invalid_sn(): + return b'R170000000000002' + + +@pytest.fixture +def MsgContactInfo(): # Contact Info message + msg = b'\xa5\xd4\x00\x10\x41\x00\x01' +get_sn() +b'\x02\xba\xd2\x00\x00' + msg += b'\x19\x00\x00\x00\x00\x00\x00\x00\x05\x3c\x78\x01\x64\x01\x4c\x53' + msg += b'\x57\x35\x42\x4c\x45\x5f\x31\x37\x5f\x30\x32\x42\x30\x5f\x31\x2e' + msg += b'\x30\x35\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x40\x2a\x8f\x4f\x51\x54\x31\x39\x32\x2e' + msg += b'\x31\x36\x38\x2e\x38\x30\x2e\x34\x39\x00\x00\x00\x0f\x00\x01\xb0' + msg += b'\x02\x0f\x00\xff\x56\x31\x2e\x31\x2e\x30\x30\x2e\x30\x42\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xfe\xfe\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x41\x6c\x6c\x69\x75\x73\x2d\x48\x6f' + msg += b'\x6d\x65\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x3c' + msg += b'\x15' + return msg + +@pytest.fixture +def MsgContactResp(): # Contact Response message + msg = b'\xa5\x0a\x00\x10\x11\x01\x01' +get_sn() +b'\x02\x01\x6a\xfd\x8f' + msg += b'\x65\x3c\x00\x00\x00\x75\x15' + return msg + + + +@pytest.fixture(scope="session") +def ClientConnection(): + #host = '172.16.30.7' + host = 'logger.talent-monitoring.com' + #host = 'iot.talent-monitoring.com' + #host = '127.0.0.1' + port = 10000 + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect((host, port)) + s.settimeout(1) + yield s + s.close() + +def checkResponse(data, Msg): + check = bytearray(data) + check[5]= Msg[5] # ignore seq + check[13:17]= Msg[13:17] # ignore timestamp + check[21]= Msg[21] # ignore crc + assert check == Msg + + +def tempClientConnection(): + #host = '172.16.30.7' + host = 'logger.talent-monitoring.com' + #host = 'iot.talent-monitoring.com' + #host = '127.0.0.1' + port = 10000 + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect((host, port)) + s.settimeout(1) + yield s + time.sleep(2.5) + s.close() + +def test_open_close(): + try: + for s in tempClientConnection(): + pass + except: + assert False + assert True + +def test_conn_msg(ClientConnection,MsgContactInfo, MsgContactResp): + s = ClientConnection + try: + s.sendall(MsgContactInfo) + # time.sleep(2.5) + data = s.recv(1024) + except TimeoutError: + pass + # time.sleep(2.5) + checkResponse(data, MsgContactResp) \ No newline at end of file