Merge branch 'refactoring-async-stream' of https://github.com/s-allius/tsun-gen3-proxy into titan-scan

This commit is contained in:
Stefan Allius
2024-09-29 20:27:28 +02:00
19 changed files with 798 additions and 895 deletions

File diff suppressed because it is too large Load Diff

Before

Width:  |  Height:  |  Size: 49 KiB

After

Width:  |  Height:  |  Size: 47 KiB

View File

@@ -5,9 +5,9 @@
[note: You can stick notes on diagrams too!{bg:cornsilk}] [note: You can stick notes on diagrams too!{bg:cornsilk}]
[Mqtt;<<Singleton>>|<static>ha_restarts;<static>__client;<static>__cb_MqttIsUp|<async>publish();<async>close()] [Mqtt;<<Singleton>>|<static>ha_restarts;<static>__client;<static>__cb_MqttIsUp|<async>publish();<async>close()]
[Inverter|cls.db_stat;cls.entity_prfx;cls.discovery_prfx;cls.proxy_node_id;cls.proxy_unique_id;cls.mqtt:Mqtt|] [Inverter|cls.db_stat;cls.entity_prfx;cls.discovery_prfx;cls.proxy_node_id;cls.proxy_unique_id;cls.mqtt:Mqtt;;__ha_restarts|async_create_remote(inv_prot, conn_class)async_publ_mqtt()]
[Inverter]^[InverterG3|__ha_restarts|async_create_remote();async_publ_mqtt();;close()] [Inverter]^[InverterG3|addr;remote:StreamPtr;local:StreamPtr|async_create_remote();;close()]
[Inverter]^[InverterG3P|__ha_restarts|async_create_remote(;)async_publ_mqtt();close()] [Inverter]^[InverterG3P|addr;remote:StreamPtr;local:StreamPtr|async_create_remote();;close()]
[Mqtt;<<Singleton>>]<-++[Inverter] [Mqtt;<<Singleton>>]<-++[Inverter]
[IterRegistry||__iter__]^[Message|server_side:bool;header_valid:bool;header_len:unsigned;data_len:unsigned;unique_id;node_id;sug_area;_recv_buffer:bytearray;_send_buffer:bytearray;_forward_buffer:bytearray;db:Infos;new_data:list;state|_read():void<abstract>;close():void;inc_counter():void;dec_counter():void] [IterRegistry||__iter__]^[Message|server_side:bool;header_valid:bool;header_len:unsigned;data_len:unsigned;unique_id;node_id;sug_area;_recv_buffer:bytearray;_send_buffer:bytearray;_forward_buffer:bytearray;db:Infos;new_data:list;state|_read():void<abstract>;close():void;inc_counter():void;dec_counter():void]
@@ -23,35 +23,27 @@
[AsyncStream]<-[AsyncStreamClient] [AsyncStream]<-[AsyncStreamClient]
[ConnectionG3|remote.stream:ConnectionG3|healthy()] [ConnectionG3||]
[ConnectionG3Client|remote.stream:ConnectionG3|close()] [ConnectionG3]<remote-[InverterG3]
[ConnectionG3Server|remote.stream:ConnectionG3|;close()] [InverterG3]-remote>[AsyncStreamClient]
[ConnectionG3]^[ConnectionG3Client] [ConnectionG3]<-local++[InverterG3]
[ConnectionG3]^[ConnectionG3Server] [InverterG3]++local->[AsyncStreamServer]
[ConnectionG3Client]<-[ConnectionG3Server]
[ConnectionG3Client]++-1>[AsyncStreamClient]
[ConnectionG3Server]^[InverterG3]
[ConnectionG3Server]++-1>[AsyncStreamServer]
[ConnectionG3P|remote.stream:ConnectionG3P|healthy();close()] [ConnectionG3P||]
[ConnectionG3PClient|remote.stream:ConnectionG3P|close()] [ConnectionG3P]<remote-[InverterG3P]
[ConnectionG3PServer|remote.stream:ConnectionG3P|;close()] [InverterG3P]-remote>[AsyncStreamClient]
[ConnectionG3P]^[ConnectionG3PClient] [ConnectionG3P]<-local++[InverterG3P]
[ConnectionG3P]^[ConnectionG3PServer] [InverterG3P]++local->[AsyncStreamServer]
[ConnectionG3PClient]<-[ConnectionG3PServer]
[ConnectionG3PClient]++-1>[AsyncStreamClient]
[ConnectionG3PServer]^[InverterG3P]
[ConnectionG3PServer]++-1>[AsyncStreamServer]
[Infos|stat;new_stat_data;info_dev|static_init();dev_value();inc_counter();dec_counter();ha_proxy_conf;ha_conf;ha_remove;update_db;set_db_def_value;get_db_value;ignore_this_device] [Infos|stat;new_stat_data;info_dev|static_init();dev_value();inc_counter();dec_counter();ha_proxy_conf;ha_conf;ha_remove;update_db;set_db_def_value;get_db_value;ignore_this_device]
[Infos]^[InfosG3||ha_confs();parse()] [Infos]^[InfosG3||ha_confs();parse()]
[Infos]^[InfosG3P||ha_confs();parse()] [Infos]^[InfosG3P||ha_confs();parse()]
[Talent|await_conn_resp_cnt;id_str;contact_name;contact_mail;db:InfosG3;mb:Modbus;switch|msg_contact_info();msg_ota_update();msg_get_time();msg_collector_data();msg_inverter_data();msg_unknown();;close()] [Talent|ifc:AsyncIfc;conn_no;addr;;await_conn_resp_cnt;id_str;contact_name;contact_mail;db:InfosG3;mb:Modbus;switch|msg_contact_info();msg_ota_update();msg_get_time();msg_collector_data();msg_inverter_data();msg_unknown();;healthy();close()]
[Talent]^[ConnectionG3] [Talent]^[ConnectionG3]
[Talent]use->[<<AsyncIfc>>] [Talent]use->[<<AsyncIfc>>]
[Talent]->[InfosG3] [Talent]->[InfosG3]
[SolarmanV5|control;serial;snr;db:InfosG3P;mb:Modbus;switch|msg_unknown();;close()] [SolarmanV5|ifc:AsyncIfc;conn_no;addr;;control;serial;snr;db:InfosG3P;mb:Modbus;switch|msg_unknown();;healthy();close()]
[SolarmanV5]^[ConnectionG3P] [SolarmanV5]^[ConnectionG3P]
[SolarmanV5]use->[<<AsyncIfc>>] [SolarmanV5]use->[<<AsyncIfc>>]
[SolarmanV5]->[InfosG3P] [SolarmanV5]->[InfosG3P]

View File

@@ -1,61 +1,16 @@
import logging import logging
from asyncio import StreamReader, StreamWriter
if __name__ == "app.src.gen3.connection_g3": if __name__ == "app.src.gen3.connection_g3":
from app.src.async_stream import AsyncStreamServer
from app.src.async_stream import AsyncStreamClient, StreamPtr
from app.src.gen3.talent import Talent from app.src.gen3.talent import Talent
else: # pragma: no cover else: # pragma: no cover
from async_stream import AsyncStreamServer
from async_stream import AsyncStreamClient, StreamPtr
from gen3.talent import Talent from gen3.talent import Talent
logger = logging.getLogger('conn') logger = logging.getLogger('conn')
class ConnectionG3(Talent): class ConnectionG3(Talent):
async def async_create_remote(self) -> None: def __init__(self, addr, ifc, server_side, id_str=b'') -> None:
pass # virtual interface # pragma: no cover super().__init__(addr, server_side, ifc, id_str)
async def async_publ_mqtt(self) -> None:
pass # virtual interface # pragma: no cover
def healthy(self) -> bool:
logger.debug('ConnectionG3 healthy()')
return self._ifc.healthy()
def close(self): def close(self):
self._ifc.close() super().close()
Talent.close(self)
# logger.info(f'AsyncStream refs: {gc.get_referrers(self)}')
class ConnectionG3Server(ConnectionG3):
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, rstream: 'ConnectionG3Client',
id_str=b'') -> None:
server_side = True
self.remote = StreamPtr(rstream)
self._ifc = AsyncStreamServer(reader, writer,
self.async_publ_mqtt,
self.async_create_remote,
self.remote)
self.conn_no = self._ifc.get_conn_no()
self.addr = addr
Talent.__init__(self, server_side, self._ifc, id_str)
class ConnectionG3Client(ConnectionG3):
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, rstream: 'ConnectionG3Server',
id_str=b'') -> None:
server_side = False
self.remote = StreamPtr(rstream)
self._ifc = AsyncStreamClient(reader, writer,
self.remote)
self.conn_no = self._ifc.get_conn_no()
self.addr = addr
Talent.__init__(self, server_side, self._ifc, id_str)

View File

@@ -2,58 +2,40 @@ import logging
from asyncio import StreamReader, StreamWriter from asyncio import StreamReader, StreamWriter
if __name__ == "app.src.gen3.inverter_g3": if __name__ == "app.src.gen3.inverter_g3":
from app.src.inverter import Inverter from app.src.inverter_base import InverterBase
from app.src.gen3.connection_g3 import ConnectionG3Server from app.src.async_stream import StreamPtr
from app.src.gen3.connection_g3 import ConnectionG3Client from app.src.async_stream import AsyncStreamServer
from app.src.gen3.connection_g3 import ConnectionG3
else: # pragma: no cover else: # pragma: no cover
from inverter import Inverter from inverter_base import InverterBase
from gen3.connection_g3 import ConnectionG3Server from async_stream import StreamPtr
from gen3.connection_g3 import ConnectionG3Client from async_stream import AsyncStreamServer
from gen3.connection_g3 import ConnectionG3
logger_mqtt = logging.getLogger('mqtt') logger_mqtt = logging.getLogger('mqtt')
class InverterG3(Inverter, ConnectionG3Server): class InverterG3(InverterBase):
'''class Inverter is a derivation of an Async_Stream
The class has some class method for managing common resources like a
connection to the MQTT broker or proxy error counter which are common
for all inverter connection
Instances of the class are connections to an inverter and can have an
optional link to an remote connection to the TSUN cloud. A remote
connection dies with the inverter connection.
class methods:
class_init(): initialize the common resources of the proxy (MQTT
broker, Proxy DB, etc). Must be called before the
first inverter instance can be created
class_close(): release the common resources of the proxy. Should not
be called before any instances of the class are
destroyed
methods:
server_loop(addr): Async loop method for receiving messages from the
inverter (server-side)
client_loop(addr): Async loop method for receiving messages from the
TSUN cloud (client-side)
async_create_remote(): Establish a client connection to the TSUN cloud
async_publ_mqtt(): Publish data to MQTT broker
close(): Release method which must be called before a instance can be
destroyed
'''
def __init__(self, reader: StreamReader, writer: StreamWriter, addr): def __init__(self, reader: StreamReader, writer: StreamWriter, addr):
Inverter.__init__(self) super().__init__()
ConnectionG3Server.__init__(self, reader, writer, addr, None)
self.addr = addr self.addr = addr
self.remote = StreamPtr(None)
ifc = AsyncStreamServer(reader, writer,
self.async_publ_mqtt,
self.async_create_remote,
self.remote)
self.remote = StreamPtr(None)
self.local = StreamPtr(
ConnectionG3(addr, ifc, True)
)
async def async_create_remote(self) -> None: async def async_create_remote(self) -> None:
await Inverter.async_create_remote( await InverterBase.async_create_remote(
self, 'tsun', ConnectionG3Client) self, 'tsun', ConnectionG3)
def close(self) -> None: def close(self) -> None:
logging.debug(f'InverterG3.close() {self.addr}') logging.debug(f'InverterG3.close() {self.addr}')
ConnectionG3Server.close(self) self.local.stream.close()
# logging.info(f'Inverter refs: {gc.get_referrers(self)}') # logging.info(f'Inverter refs: {gc.get_referrers(self)}')

View File

@@ -46,13 +46,15 @@ class Talent(Message):
MB_REGULAR_TIMEOUT = 60 MB_REGULAR_TIMEOUT = 60
TXT_UNKNOWN_CTRL = 'Unknown Ctrl' TXT_UNKNOWN_CTRL = 'Unknown Ctrl'
def __init__(self, server_side: bool, ifc: "AsyncIfc", id_str=b''): def __init__(self, addr, server_side: bool, ifc: "AsyncIfc", id_str=b''):
super().__init__(server_side, self.send_modbus_cb, mb_timeout=15) super().__init__(server_side, self.send_modbus_cb, mb_timeout=15)
ifc.rx_set_cb(self.read) ifc.rx_set_cb(self.read)
ifc.prot_set_timeout_cb(self._timeout) ifc.prot_set_timeout_cb(self._timeout)
ifc.prot_set_init_new_client_conn_cb(self._init_new_client_conn) ifc.prot_set_init_new_client_conn_cb(self._init_new_client_conn)
ifc.prot_set_update_header_cb(self._update_header) ifc.prot_set_update_header_cb(self._update_header)
self.addr = addr
self.ifc = ifc self.ifc = ifc
self.conn_no = ifc.get_conn_no()
self.await_conn_resp_cnt = 0 self.await_conn_resp_cnt = 0
self.id_str = id_str self.id_str = id_str
self.contact_name = b'' self.contact_name = b''
@@ -93,6 +95,10 @@ class Talent(Message):
''' '''
Our puplic methods Our puplic methods
''' '''
def healthy(self) -> bool:
logger.debug('Talent healthy()')
return self.ifc.healthy()
def close(self) -> None: def close(self) -> None:
logging.debug('Talent.close()') logging.debug('Talent.close()')
if self.server_side: if self.server_side:
@@ -110,6 +116,7 @@ class Talent(Message):
self.log_lvl.clear() self.log_lvl.clear()
self.state = State.closed self.state = State.closed
self.mb_timer.close() self.mb_timer.close()
self.ifc.close()
self.ifc.rx_set_cb(None) self.ifc.rx_set_cb(None)
self.ifc.prot_set_timeout_cb(None) self.ifc.prot_set_timeout_cb(None)
self.ifc.prot_set_init_new_client_conn_cb(None) self.ifc.prot_set_init_new_client_conn_cb(None)

View File

@@ -1,61 +1,18 @@
import logging import logging
from asyncio import StreamReader, StreamWriter
if __name__ == "app.src.gen3plus.connection_g3p": if __name__ == "app.src.gen3plus.connection_g3p":
from app.src.async_stream import AsyncStreamServer
from app.src.async_stream import AsyncStreamClient, StreamPtr
from app.src.gen3plus.solarman_v5 import SolarmanV5 from app.src.gen3plus.solarman_v5 import SolarmanV5
else: # pragma: no cover else: # pragma: no cover
from async_stream import AsyncStreamServer
from async_stream import AsyncStreamClient, StreamPtr
from gen3plus.solarman_v5 import SolarmanV5 from gen3plus.solarman_v5 import SolarmanV5
logger = logging.getLogger('conn') logger = logging.getLogger('conn')
class ConnectionG3P(SolarmanV5): class ConnectionG3P(SolarmanV5):
async def async_create_remote(self) -> None: def __init__(self, addr, ifc, server_side,
pass # virtual interface # pragma: no cover client_mode: bool = False) -> None:
super().__init__(addr, server_side, client_mode, ifc)
async def async_publ_mqtt(self) -> None:
pass # virtual interface # pragma: no cover
def healthy(self) -> bool:
logger.debug('ConnectionG3P healthy()')
return self._ifc.healthy()
def close(self): def close(self):
self._ifc.close() super().close()
SolarmanV5.close(self)
# logger.info(f'AsyncStream refs: {gc.get_referrers(self)}') # logger.info(f'AsyncStream refs: {gc.get_referrers(self)}')
class ConnectionG3PServer(ConnectionG3P):
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, rstream: 'ConnectionG3PClient',
client_mode: bool) -> None:
server_side = True
self.remote = StreamPtr(rstream)
self._ifc = AsyncStreamServer(reader, writer,
self.async_publ_mqtt,
self.async_create_remote,
self.remote)
self.conn_no = self._ifc.get_conn_no()
self.addr = addr
SolarmanV5.__init__(self, server_side, client_mode, self._ifc)
class ConnectionG3PClient(ConnectionG3P):
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, rstream: 'ConnectionG3PServer') -> None:
server_side = False
client_mode = False
self.remote = StreamPtr(rstream)
self._ifc = AsyncStreamClient(reader, writer, self.remote)
self.conn_no = self._ifc.get_conn_no()
self.addr = addr
SolarmanV5.__init__(self, server_side, client_mode, self._ifc)

View File

@@ -2,60 +2,40 @@ import logging
from asyncio import StreamReader, StreamWriter from asyncio import StreamReader, StreamWriter
if __name__ == "app.src.gen3plus.inverter_g3p": if __name__ == "app.src.gen3plus.inverter_g3p":
from app.src.inverter import Inverter from app.src.inverter_base import InverterBase
from app.src.gen3plus.connection_g3p import ConnectionG3PServer from app.src.async_stream import StreamPtr
from app.src.gen3plus.connection_g3p import ConnectionG3PClient from app.src.async_stream import AsyncStreamServer
from app.src.gen3plus.connection_g3p import ConnectionG3P
else: # pragma: no cover else: # pragma: no cover
from inverter import Inverter from inverter_base import InverterBase
from gen3plus.connection_g3p import ConnectionG3PServer from async_stream import StreamPtr
from gen3plus.connection_g3p import ConnectionG3PClient from async_stream import AsyncStreamServer
from gen3plus.connection_g3p import ConnectionG3P
logger_mqtt = logging.getLogger('mqtt') logger_mqtt = logging.getLogger('mqtt')
class InverterG3P(Inverter, ConnectionG3PServer): class InverterG3P(InverterBase):
'''class Inverter is a derivation of an Async_Stream
The class has some class method for managing common resources like a
connection to the MQTT broker or proxy error counter which are common
for all inverter connection
Instances of the class are connections to an inverter and can have an
optional link to an remote connection to the TSUN cloud. A remote
connection dies with the inverter connection.
class methods:
class_init(): initialize the common resources of the proxy (MQTT
broker, Proxy DB, etc). Must be called before the
first inverter instance can be created
class_close(): release the common resources of the proxy. Should not
be called before any instances of the class are
destroyed
methods:
server_loop(addr): Async loop method for receiving messages from the
inverter (server-side)
client_loop(addr): Async loop method for receiving messages from the
TSUN cloud (client-side)
async_create_remote(): Establish a client connection to the TSUN cloud
async_publ_mqtt(): Publish data to MQTT broker
close(): Release method which must be called before a instance can be
destroyed
'''
def __init__(self, reader: StreamReader, writer: StreamWriter, addr, def __init__(self, reader: StreamReader, writer: StreamWriter, addr,
client_mode: bool = False): client_mode: bool = False):
Inverter.__init__(self) super().__init__()
ConnectionG3PServer.__init__(
self, reader, writer, addr, None, client_mode=client_mode)
self.addr = addr self.addr = addr
self.remote = StreamPtr(None)
ifc = AsyncStreamServer(reader, writer,
self.async_publ_mqtt,
self.async_create_remote,
self.remote)
self.local = StreamPtr(
ConnectionG3P(addr, ifc, True, client_mode)
)
async def async_create_remote(self) -> None: async def async_create_remote(self) -> None:
await Inverter.async_create_remote( await InverterBase.async_create_remote(
self, 'solarman', ConnectionG3PClient) self, 'solarman', ConnectionG3P)
def close(self) -> None: def close(self) -> None:
logging.debug(f'InverterG3P.close() {self.addr}') logging.debug(f'InverterG3P.close() {self.addr}')
ConnectionG3PServer.close(self) self.local.stream.close()
# logger.debug (f'Inverter refs: {gc.get_referrers(self)}') # logger.debug (f'Inverter refs: {gc.get_referrers(self)}')

View File

@@ -62,14 +62,17 @@ class SolarmanV5(Message):
HDR_FMT = '<BLLL' HDR_FMT = '<BLLL'
'''format string for packing of the header''' '''format string for packing of the header'''
def __init__(self, server_side: bool, client_mode: bool, ifc: "AsyncIfc"): def __init__(self, addr, server_side: bool, client_mode: bool,
ifc: "AsyncIfc"):
super().__init__(server_side, self.send_modbus_cb, mb_timeout=8) super().__init__(server_side, self.send_modbus_cb, mb_timeout=8)
ifc.rx_set_cb(self.read) ifc.rx_set_cb(self.read)
ifc.prot_set_timeout_cb(self._timeout) ifc.prot_set_timeout_cb(self._timeout)
ifc.prot_set_init_new_client_conn_cb(self._init_new_client_conn) ifc.prot_set_init_new_client_conn_cb(self._init_new_client_conn)
ifc.prot_set_update_header_cb(self._update_header) ifc.prot_set_update_header_cb(self._update_header)
self.addr = addr
self.ifc = ifc self.ifc = ifc
self.conn_no = ifc.get_conn_no()
self.header_len = 11 # overwrite construcor in class Message self.header_len = 11 # overwrite construcor in class Message
self.control = 0 self.control = 0
self.seq = Sequence(server_side) self.seq = Sequence(server_side)
@@ -152,6 +155,10 @@ class SolarmanV5(Message):
''' '''
Our puplic methods Our puplic methods
''' '''
def healthy(self) -> bool:
logger.debug('SolarmanV5 healthy()')
return self.ifc.healthy()
def close(self) -> None: def close(self) -> None:
logging.debug('Solarman.close()') logging.debug('Solarman.close()')
if self.server_side: if self.server_side:
@@ -169,6 +176,7 @@ class SolarmanV5(Message):
self.log_lvl.clear() self.log_lvl.clear()
self.state = State.closed self.state = State.closed
self.mb_timer.close() self.mb_timer.close()
self.ifc.close()
self.ifc.rx_set_cb(None) self.ifc.rx_set_cb(None)
self.ifc.prot_set_timeout_cb(None) self.ifc.prot_set_timeout_cb(None)
self.ifc.prot_set_init_new_client_conn_cb(None) self.ifc.prot_set_init_new_client_conn_cb(None)

View File

@@ -1,8 +1,6 @@
import asyncio import asyncio
import logging import logging
import traceback
import json import json
from aiomqtt import MqttCodeError
if __name__ == "app.src.inverter": if __name__ == "app.src.inverter":
from app.src.config import Config from app.src.config import Config
@@ -17,6 +15,28 @@ logger_mqtt = logging.getLogger('mqtt')
class Inverter(): class Inverter():
'''class Inverter is a baseclass
The class has some class method for managing common resources like a
connection to the MQTT broker or proxy error counter which are common
for all inverter connection
Instances of the class are connections to an inverter and can have an
optional link to an remote connection to the TSUN cloud. A remote
connection dies with the inverter connection.
class methods:
class_init(): initialize the common resources of the proxy (MQTT
broker, Proxy DB, etc). Must be called before the
first inverter instance can be created
class_close(): release the common resources of the proxy. Should not
be called before any instances of the class are
destroyed
methods:
async_create_remote(): Establish a client connection to the TSUN cloud
async_publ_mqtt(): Publish data to MQTT broker
'''
@classmethod @classmethod
def class_init(cls) -> None: def class_init(cls) -> None:
logging.debug('Inverter.class_init') logging.debug('Inverter.class_init')
@@ -84,86 +104,3 @@ class Inverter():
logging.info('Close MQTT Task') logging.info('Close MQTT Task')
loop.run_until_complete(cls.mqtt.close()) loop.run_until_complete(cls.mqtt.close())
cls.mqtt = None cls.mqtt = None
def __init__(self):
self.__ha_restarts = -1
async def async_create_remote(self, inv_prot: str, conn_class) -> None:
'''Establish a client connection to the TSUN cloud'''
tsun = Config.get(inv_prot)
host = tsun['host']
port = tsun['port']
addr = (host, port)
try:
logging.info(f'[{self.node_id}] Connect to {addr}')
connect = asyncio.open_connection(host, port)
reader, writer = await connect
if hasattr(self, 'id_str'):
self.remote.stream = conn_class(
reader, writer, addr, self, self.id_str)
else:
self.remote.stream = conn_class(
reader, writer, addr, self)
logging.info(f'[{self.remote.stream.node_id}:'
f'{self.remote.stream.conn_no}] '
f'Connected to {addr}')
asyncio.create_task(self.remote.ifc.client_loop(addr))
except (ConnectionRefusedError, TimeoutError) as error:
logging.info(f'{error}')
except Exception:
self.inc_counter('SW_Exception')
logging.error(
f"Inverter: Exception for {addr}:\n"
f"{traceback.format_exc()}")
async def async_publ_mqtt(self) -> None:
'''publish data to MQTT broker'''
if not self.unique_id:
return
# check if new inverter or collector infos are available or when the
# home assistant has changed the status back to online
try:
if (('inverter' in self.new_data and self.new_data['inverter'])
or ('collector' in self.new_data and
self.new_data['collector'])
or self.mqtt.ha_restarts != self.__ha_restarts):
await self._register_proxy_stat_home_assistant()
await self.__register_home_assistant()
self.__ha_restarts = self.mqtt.ha_restarts
for key in self.new_data:
await self.__async_publ_mqtt_packet(key)
for key in Infos.new_stat_data:
await Inverter._async_publ_mqtt_proxy_stat(key)
except MqttCodeError as error:
logging.error(f'Mqtt except: {error}')
except Exception:
self.inc_counter('SW_Exception')
logging.error(
f"Inverter: Exception:\n"
f"{traceback.format_exc()}")
async def __async_publ_mqtt_packet(self, key):
db = self.db.db
if key in db and self.new_data[key]:
data_json = json.dumps(db[key])
node_id = self.node_id
logger_mqtt.debug(f'{key}: {data_json}')
await self.mqtt.publish(f'{self.entity_prfx}{node_id}{key}', data_json) # noqa: E501
self.new_data[key] = False
async def __register_home_assistant(self) -> None:
'''register all our topics at home assistant'''
for data_json, component, node_id, id in self.db.ha_confs(
self.entity_prfx, self.node_id, self.unique_id,
self.sug_area):
logger_mqtt.debug(f"MQTT Register: cmp:'{component}'"
f" node_id:'{node_id}' {data_json}")
await self.mqtt.publish(f"{self.discovery_prfx}{component}"
f"/{node_id}{id}/config", data_json)
self.db.reg_clr_at_midnight(f'{self.entity_prfx}{self.node_id}')

108
app/src/inverter_base.py Normal file
View File

@@ -0,0 +1,108 @@
import asyncio
import logging
import traceback
import json
from aiomqtt import MqttCodeError
if __name__ == "app.src.inverter_base":
from app.src.inverter import Inverter
from app.src.async_stream import AsyncStreamClient
from app.src.config import Config
from app.src.infos import Infos
else: # pragma: no cover
from inverter import Inverter
from async_stream import AsyncStreamClient
from config import Config
from infos import Infos
logger_mqtt = logging.getLogger('mqtt')
class InverterBase(Inverter):
def __init__(self):
self.__ha_restarts = -1
async def async_create_remote(self, inv_prot: str, conn_class) -> None:
'''Establish a client connection to the TSUN cloud'''
tsun = Config.get(inv_prot)
host = tsun['host']
port = tsun['port']
addr = (host, port)
stream = self.local.stream
try:
logging.info(f'[{stream.node_id}] Connect to {addr}')
connect = asyncio.open_connection(host, port)
reader, writer = await connect
ifc = AsyncStreamClient(reader, writer,
self.remote)
if hasattr(stream, 'id_str'):
self.remote.stream = conn_class(
addr, ifc, False, stream.id_str)
else:
self.remote.stream = conn_class(
addr, ifc, False)
logging.info(f'[{self.remote.stream.node_id}:'
f'{self.remote.stream.conn_no}] '
f'Connected to {addr}')
asyncio.create_task(self.remote.ifc.client_loop(addr))
except (ConnectionRefusedError, TimeoutError) as error:
logging.info(f'{error}')
except Exception:
Infos.inc_counter('SW_Exception')
logging.error(
f"Inverter: Exception for {addr}:\n"
f"{traceback.format_exc()}")
async def async_publ_mqtt(self) -> None:
'''publish data to MQTT broker'''
stream = self.local.stream
if not stream.unique_id:
return
# check if new inverter or collector infos are available or when the
# home assistant has changed the status back to online
try:
if (('inverter' in stream.new_data and stream.new_data['inverter'])
or ('collector' in stream.new_data and
stream.new_data['collector'])
or self.mqtt.ha_restarts != self.__ha_restarts):
await self._register_proxy_stat_home_assistant()
await self.__register_home_assistant(stream)
self.__ha_restarts = self.mqtt.ha_restarts
for key in stream.new_data:
await self.__async_publ_mqtt_packet(stream, key)
for key in Infos.new_stat_data:
await Inverter._async_publ_mqtt_proxy_stat(key)
except MqttCodeError as error:
logging.error(f'Mqtt except: {error}')
except Exception:
Infos.inc_counter('SW_Exception')
logging.error(
f"Inverter: Exception:\n"
f"{traceback.format_exc()}")
async def __async_publ_mqtt_packet(self, stream, key):
db = stream.db.db
if key in db and stream.new_data[key]:
data_json = json.dumps(db[key])
node_id = stream.node_id
logger_mqtt.debug(f'{key}: {data_json}')
await self.mqtt.publish(f'{self.entity_prfx}{node_id}{key}', data_json) # noqa: E501
stream.new_data[key] = False
async def __register_home_assistant(self, stream) -> None:
'''register all our topics at home assistant'''
for data_json, component, node_id, id in stream.db.ha_confs(
self.entity_prfx, stream.node_id, stream.unique_id,
stream.sug_area):
logger_mqtt.debug(f"MQTT Register: cmp:'{component}'"
f" node_id:'{node_id}' {data_json}")
await self.mqtt.publish(f"{self.discovery_prfx}{component}"
f"/{node_id}{id}/config", data_json)
stream.db.reg_clr_at_midnight(f'{self.entity_prfx}{stream.node_id}')

View File

@@ -19,24 +19,25 @@ class ModbusConn():
self.host = host self.host = host
self.port = port self.port = port
self.addr = (host, port) self.addr = (host, port)
self.stream = None self.inverter = None
async def __aenter__(self) -> 'InverterG3P': async def __aenter__(self) -> 'InverterG3P':
'''Establish a client connection to the TSUN cloud''' '''Establish a client connection to the TSUN cloud'''
connection = asyncio.open_connection(self.host, self.port) connection = asyncio.open_connection(self.host, self.port)
reader, writer = await connection reader, writer = await connection
self.stream = InverterG3P(reader, writer, self.addr, self.inverter = InverterG3P(reader, writer, self.addr,
client_mode=True) client_mode=True)
logging.info(f'[{self.stream.node_id}:{self.stream.conn_no}] ' stream = self.inverter.local.stream
logging.info(f'[{stream.node_id}:{stream.conn_no}] '
f'Connected to {self.addr}') f'Connected to {self.addr}')
Infos.inc_counter('Inverter_Cnt') Infos.inc_counter('Inverter_Cnt')
await self.stream._ifc.publish_outstanding_mqtt() await self.inverter.local.ifc.publish_outstanding_mqtt()
return self.stream return self.inverter
async def __aexit__(self, exc_type, exc, tb): async def __aexit__(self, exc_type, exc, tb):
Infos.dec_counter('Inverter_Cnt') Infos.dec_counter('Inverter_Cnt')
await self.stream._ifc.publish_outstanding_mqtt() await self.inverter.local.ifc.publish_outstanding_mqtt()
self.stream.close() self.inverter.close()
class ModbusTcp(): class ModbusTcp():
@@ -61,9 +62,10 @@ class ModbusTcp():
'''Loop for receiving messages from the TSUN cloud (client-side)''' '''Loop for receiving messages from the TSUN cloud (client-side)'''
while True: while True:
try: try:
async with ModbusConn(host, port) as stream: async with ModbusConn(host, port) as inverter:
stream = inverter.local.stream
await stream.send_start_cmd(snr, host) await stream.send_start_cmd(snr, host)
await stream._ifc.loop() await stream.ifc.loop()
logger.info(f'[{stream.node_id}:{stream.conn_no}] ' logger.info(f'[{stream.node_id}:{stream.conn_no}] '
f'Connection closed - Shutdown: ' f'Connection closed - Shutdown: '
f'{stream.shutdown_started}') f'{stream.shutdown_started}')

View File

@@ -4,10 +4,24 @@ import asyncio
from itertools import count from itertools import count
from mock import patch from mock import patch
from app.src.async_stream import AsyncStream, AsyncIfcImpl from app.src.async_stream import StreamPtr
from app.src.gen3.connection_g3 import ConnectionG3Server from app.src.async_stream import AsyncStream, AsyncStreamServer, AsyncIfcImpl
from app.src.gen3.connection_g3 import ConnectionG3
from app.src.gen3.talent import Talent from app.src.gen3.talent import Talent
class FakeInverter():
async def async_publ_mqtt(self) -> None:
pass # dummy funcion
async def async_create_remote(self, inv_prot: str, conn_class) -> None:
pass # dummy function
def __init__ (self):
self.remote = StreamPtr(None)
self.local = StreamPtr(None)
@pytest.fixture @pytest.fixture
def patch_async_init(): def patch_async_init():
with patch.object(AsyncStream, '__init__') as conn: with patch.object(AsyncStream, '__init__') as conn:
@@ -61,24 +75,26 @@ class FakeWriter():
def test_method_calls(patch_talent_init, patch_healthy, patch_async_close, patch_talent_close): def test_method_calls(patch_healthy, patch_async_close):
AsyncIfcImpl._ids = count(5) AsyncIfcImpl._ids = count(5)
spy2 = patch_talent_init
spy3 = patch_healthy spy3 = patch_healthy
spy4 = patch_async_close spy4 = patch_async_close
spy5 = patch_talent_close
reader = FakeReader() reader = FakeReader()
writer = FakeWriter() writer = FakeWriter()
id_str = "id_string" id_str = "id_string"
addr = ('proxy.local', 10000) addr = ('proxy.local', 10000)
conn = ConnectionG3Server(reader, writer, addr, inv = FakeInverter()
rstream= None, id_str=id_str) ifc = AsyncStreamServer(reader, writer,
assert 5 == conn._ifc.get_conn_no() inv.async_publ_mqtt,
spy2.assert_called_once_with(conn, True, conn._ifc, id_str) inv.async_create_remote,
inv.remote)
conn = ConnectionG3(addr, ifc, server_side=True, id_str=id_str)
assert 5 == conn.conn_no
assert 5 == conn.ifc.get_conn_no()
conn.healthy() conn.healthy()
spy3.assert_called_once() spy3.assert_called_once()
conn.close() conn.close()
spy4.assert_called_once() spy4.assert_called_once()
spy5.assert_called_once()

View File

@@ -5,10 +5,24 @@ import asyncio
from itertools import count from itertools import count
from mock import patch from mock import patch
from app.src.singleton import Singleton from app.src.singleton import Singleton
from app.src.async_stream import AsyncStream, AsyncIfcImpl from app.src.async_stream import StreamPtr
from app.src.gen3plus.connection_g3p import ConnectionG3PServer from app.src.async_stream import AsyncStream, AsyncStreamServer, AsyncIfcImpl
from app.src.gen3plus.connection_g3p import ConnectionG3P
from app.src.gen3plus.solarman_v5 import SolarmanV5 from app.src.gen3plus.solarman_v5 import SolarmanV5
class FakeInverter():
async def async_publ_mqtt(self) -> None:
pass # dummy funcion
async def async_create_remote(self, inv_prot: str, conn_class) -> None:
pass # dummy function
def __init__ (self):
self.remote = StreamPtr(None)
self.local = StreamPtr(None)
@pytest.fixture @pytest.fixture
def patch_async_init(): def patch_async_init():
with patch.object(AsyncStream, '__init__', return_value= None) as conn: with patch.object(AsyncStream, '__init__', return_value= None) as conn:
@@ -67,24 +81,25 @@ class FakeWriter():
def test_method_calls(patch_solarman_init, patch_healthy, patch_async_close, patch_solarman_close): def test_method_calls(patch_healthy, patch_async_close):
AsyncIfcImpl._ids = count(5) AsyncIfcImpl._ids = count(5)
spy2 = patch_solarman_init
spy3 = patch_healthy spy3 = patch_healthy
spy4 = patch_async_close spy4 = patch_async_close
spy5 = patch_solarman_close
reader = FakeReader() reader = FakeReader()
writer = FakeWriter() writer = FakeWriter()
addr = ('proxy.local', 10000) addr = ('proxy.local', 10000)
conn = ConnectionG3PServer(reader, writer, addr, inv = FakeInverter()
rstream= None, client_mode=False) ifc = AsyncStreamServer(reader, writer,
assert 5 == conn._ifc.get_conn_no() inv.async_publ_mqtt,
spy2.assert_called_once_with(conn, True, False, conn._ifc) inv.async_create_remote,
inv.remote)
conn = ConnectionG3P(addr, ifc, server_side=True, client_mode=False)
assert 5 == conn.conn_no
assert 5 == conn.ifc.get_conn_no()
conn.healthy() conn.healthy()
spy3.assert_called_once() spy3.assert_called_once()
conn.close() conn.close()
spy4.assert_called_once() spy4.assert_called_once()
spy5.assert_called_once()

View File

@@ -8,7 +8,7 @@ from app.src.infos import Infos
from app.src.config import Config from app.src.config import Config
from app.src.inverter import Inverter from app.src.inverter import Inverter
from app.src.singleton import Singleton from app.src.singleton import Singleton
from app.src.gen3.connection_g3 import ConnectionG3Server from app.src.gen3.connection_g3 import ConnectionG3
from app.src.gen3.inverter_g3 import InverterG3 from app.src.gen3.inverter_g3 import InverterG3
from app.tests.test_modbus_tcp import patch_mqtt_err, patch_mqtt_except, test_port, test_hostname from app.tests.test_modbus_tcp import patch_mqtt_err, patch_mqtt_except, test_port, test_hostname
@@ -44,12 +44,12 @@ def module_init():
@pytest.fixture @pytest.fixture
def patch_conn_init(): def patch_conn_init():
with patch.object(ConnectionG3Server, '__init__', return_value= None) as conn: with patch.object(ConnectionG3, '__init__', return_value= None) as conn:
yield conn yield conn
@pytest.fixture @pytest.fixture
def patch_conn_close(): def patch_conn_close():
with patch.object(ConnectionG3Server, 'close') as conn: with patch.object(ConnectionG3, 'close') as conn:
yield conn yield conn
class FakeReader(): class FakeReader():
@@ -104,18 +104,14 @@ def patch_open_connection():
yield conn yield conn
def test_method_calls(patch_conn_init, patch_conn_close): def test_method_calls(patch_conn_close):
spy1 = patch_conn_init
spy2 = patch_conn_close spy2 = patch_conn_close
reader = FakeReader() reader = FakeReader()
writer = FakeWriter() writer = FakeWriter()
addr = ('proxy.local', 10000) addr = ('proxy.local', 10000)
inverter = InverterG3(reader, writer, addr) inverter = InverterG3(reader, writer, addr)
inverter.l_addr = '' assert inverter.local.stream
inverter.r_addr = '' assert inverter.local.ifc
spy1.assert_called_once()
spy1.assert_called_once_with(inverter, reader, writer, addr, None)
inverter.close() inverter.close()
spy2.assert_called_once() spy2.assert_called_once()
@@ -171,18 +167,19 @@ async def test_mqtt_publish(config_conn, patch_open_connection, patch_conn_close
Inverter.class_init() Inverter.class_init()
inverter = InverterG3(FakeReader(), FakeWriter(), ('proxy.local', 10000)) inverter = InverterG3(FakeReader(), FakeWriter(), ('proxy.local', 10000))
stream = inverter.local.stream
await inverter.async_publ_mqtt() # check call with invalid unique_id await inverter.async_publ_mqtt() # check call with invalid unique_id
inverter._Talent__set_serial_no(serial_no= "123344") stream._Talent__set_serial_no(serial_no= "123344")
inverter.new_data['inverter'] = True stream.new_data['inverter'] = True
inverter.db.db['inverter'] = {} stream.db.db['inverter'] = {}
await inverter.async_publ_mqtt() await inverter.async_publ_mqtt()
assert inverter.new_data['inverter'] == False assert stream.new_data['inverter'] == False
inverter.new_data['env'] = True stream.new_data['env'] = True
inverter.db.db['env'] = {} stream.db.db['env'] = {}
await inverter.async_publ_mqtt() await inverter.async_publ_mqtt()
assert inverter.new_data['env'] == False assert stream.new_data['env'] == False
Infos.new_stat_data['proxy'] = True Infos.new_stat_data['proxy'] = True
await inverter.async_publ_mqtt() await inverter.async_publ_mqtt()
@@ -203,12 +200,12 @@ async def test_mqtt_err(config_conn, patch_open_connection, patch_mqtt_err, patc
Inverter.class_init() Inverter.class_init()
inverter = InverterG3(FakeReader(), FakeWriter(), ('proxy.local', 10000)) inverter = InverterG3(FakeReader(), FakeWriter(), ('proxy.local', 10000))
inverter._Talent__set_serial_no(serial_no= "123344") stream = inverter.local.stream
stream._Talent__set_serial_no(serial_no= "123344")
inverter.new_data['inverter'] = True stream.new_data['inverter'] = True
inverter.db.db['inverter'] = {} stream.db.db['inverter'] = {}
await inverter.async_publ_mqtt() await inverter.async_publ_mqtt()
assert inverter.new_data['inverter'] == True assert stream.new_data['inverter'] == True
inverter.close() inverter.close()
spy1.assert_called_once() spy1.assert_called_once()
@@ -225,12 +222,13 @@ async def test_mqtt_except(config_conn, patch_open_connection, patch_mqtt_except
Inverter.class_init() Inverter.class_init()
inverter = InverterG3(FakeReader(), FakeWriter(), ('proxy.local', 10000)) inverter = InverterG3(FakeReader(), FakeWriter(), ('proxy.local', 10000))
inverter._Talent__set_serial_no(serial_no= "123344") stream = inverter.local.stream
stream._Talent__set_serial_no(serial_no= "123344")
inverter.new_data['inverter'] = True stream.new_data['inverter'] = True
inverter.db.db['inverter'] = {} stream.db.db['inverter'] = {}
await inverter.async_publ_mqtt() await inverter.async_publ_mqtt()
assert inverter.new_data['inverter'] == True assert stream.new_data['inverter'] == True
inverter.close() inverter.close()
spy1.assert_called_once() spy1.assert_called_once()

View File

@@ -8,7 +8,7 @@ from app.src.infos import Infos
from app.src.config import Config from app.src.config import Config
from app.src.inverter import Inverter from app.src.inverter import Inverter
from app.src.singleton import Singleton from app.src.singleton import Singleton
from app.src.gen3plus.connection_g3p import ConnectionG3PServer from app.src.gen3plus.connection_g3p import ConnectionG3P
from app.src.gen3plus.inverter_g3p import InverterG3P from app.src.gen3plus.inverter_g3p import InverterG3P
from app.tests.test_modbus_tcp import patch_mqtt_err, patch_mqtt_except, test_port, test_hostname from app.tests.test_modbus_tcp import patch_mqtt_err, patch_mqtt_except, test_port, test_hostname
@@ -45,12 +45,12 @@ def module_init():
@pytest.fixture @pytest.fixture
def patch_conn_init(): def patch_conn_init():
with patch.object(ConnectionG3PServer, '__init__', return_value= None) as conn: with patch.object(ConnectionG3P, '__init__', return_value= None) as conn:
yield conn yield conn
@pytest.fixture @pytest.fixture
def patch_conn_close(): def patch_conn_close():
with patch.object(ConnectionG3PServer, 'close') as conn: with patch.object(ConnectionG3P, 'close') as conn:
yield conn yield conn
class FakeReader(): class FakeReader():
@@ -105,18 +105,14 @@ def patch_open_connection():
yield conn yield conn
def test_method_calls(patch_conn_init, patch_conn_close): def test_method_calls(patch_conn_close):
spy1 = patch_conn_init
spy2 = patch_conn_close spy2 = patch_conn_close
reader = FakeReader() reader = FakeReader()
writer = FakeWriter() writer = FakeWriter()
addr = ('proxy.local', 10000) addr = ('proxy.local', 10000)
inverter = InverterG3P(reader, writer, addr, client_mode=False) inverter = InverterG3P(reader, writer, addr, client_mode=False)
inverter.l_addr = '' assert inverter.local.stream
inverter.r_addr = '' assert inverter.local.ifc
spy1.assert_called_once()
spy1.assert_called_once_with(inverter, reader, writer, addr, None, client_mode=False)
inverter.close() inverter.close()
spy2.assert_called_once() spy2.assert_called_once()
@@ -172,18 +168,19 @@ async def test_mqtt_publish(config_conn, patch_open_connection, patch_conn_close
Inverter.class_init() Inverter.class_init()
inverter = InverterG3P(FakeReader(), FakeWriter(), ('proxy.local', 10000), client_mode=False) inverter = InverterG3P(FakeReader(), FakeWriter(), ('proxy.local', 10000), client_mode=False)
stream = inverter.local.stream
await inverter.async_publ_mqtt() # check call with invalid unique_id await inverter.async_publ_mqtt() # check call with invalid unique_id
inverter._SolarmanV5__set_serial_no(snr= 123344) stream._SolarmanV5__set_serial_no(snr= 123344)
inverter.new_data['inverter'] = True stream.new_data['inverter'] = True
inverter.db.db['inverter'] = {} stream.db.db['inverter'] = {}
await inverter.async_publ_mqtt() await inverter.async_publ_mqtt()
assert inverter.new_data['inverter'] == False assert stream.new_data['inverter'] == False
inverter.new_data['env'] = True stream.new_data['env'] = True
inverter.db.db['env'] = {} stream.db.db['env'] = {}
await inverter.async_publ_mqtt() await inverter.async_publ_mqtt()
assert inverter.new_data['env'] == False assert stream.new_data['env'] == False
Infos.new_stat_data['proxy'] = True Infos.new_stat_data['proxy'] = True
await inverter.async_publ_mqtt() await inverter.async_publ_mqtt()
@@ -204,12 +201,12 @@ async def test_mqtt_err(config_conn, patch_open_connection, patch_mqtt_err, patc
Inverter.class_init() Inverter.class_init()
inverter = InverterG3P(FakeReader(), FakeWriter(), ('proxy.local', 10000), client_mode=False) inverter = InverterG3P(FakeReader(), FakeWriter(), ('proxy.local', 10000), client_mode=False)
inverter._SolarmanV5__set_serial_no(snr= 123344) stream = inverter.local.stream
stream._SolarmanV5__set_serial_no(snr= 123344)
inverter.new_data['inverter'] = True stream.new_data['inverter'] = True
inverter.db.db['inverter'] = {} stream.db.db['inverter'] = {}
await inverter.async_publ_mqtt() await inverter.async_publ_mqtt()
assert inverter.new_data['inverter'] == True assert stream.new_data['inverter'] == True
inverter.close() inverter.close()
spy1.assert_called_once() spy1.assert_called_once()
@@ -226,12 +223,13 @@ async def test_mqtt_except(config_conn, patch_open_connection, patch_mqtt_except
Inverter.class_init() Inverter.class_init()
inverter = InverterG3P(FakeReader(), FakeWriter(), ('proxy.local', 10000), client_mode=False) inverter = InverterG3P(FakeReader(), FakeWriter(), ('proxy.local', 10000), client_mode=False)
inverter._SolarmanV5__set_serial_no(snr= 123344) stream = inverter.local.stream
stream._SolarmanV5__set_serial_no(snr= 123344)
inverter.new_data['inverter'] = True stream.new_data['inverter'] = True
inverter.db.db['inverter'] = {} stream.db.db['inverter'] = {}
await inverter.async_publ_mqtt() await inverter.async_publ_mqtt()
assert inverter.new_data['inverter'] == True assert stream.new_data['inverter'] == True
inverter.close() inverter.close()
spy1.assert_called_once() spy1.assert_called_once()

View File

@@ -150,11 +150,12 @@ async def test_modbus_conn(patch_open):
_ = patch_open _ = patch_open
assert Infos.stat['proxy']['Inverter_Cnt'] == 0 assert Infos.stat['proxy']['Inverter_Cnt'] == 0
async with ModbusConn('test.local', 1234) as stream: async with ModbusConn('test.local', 1234) as inverter:
stream = inverter.local.stream
assert stream.node_id == 'G3P' assert stream.node_id == 'G3P'
assert stream.addr == ('test.local', 1234) assert stream.addr == ('test.local', 1234)
assert type(stream._ifc._reader) is FakeReader assert type(stream.ifc._reader) is FakeReader
assert type(stream._ifc._writer) is FakeWriter assert type(stream.ifc._writer) is FakeWriter
assert Infos.stat['proxy']['Inverter_Cnt'] == 1 assert Infos.stat['proxy']['Inverter_Cnt'] == 1
assert Infos.stat['proxy']['Inverter_Cnt'] == 0 assert Infos.stat['proxy']['Inverter_Cnt'] == 0
@@ -205,7 +206,7 @@ async def test_modbus_cnf2(config_conn, patch_no_mqtt, patch_open):
test += 1 test += 1
assert Infos.stat['proxy']['Inverter_Cnt'] == 1 assert Infos.stat['proxy']['Inverter_Cnt'] == 1
m.shutdown_started = True m.shutdown_started = True
m._ifc._reader.on_recv.set() m.ifc._reader.on_recv.set()
del m del m
assert 1 == test assert 1 == test
@@ -265,14 +266,14 @@ async def test_mqtt_err(config_conn, patch_mqtt_err, patch_open):
test += 1 test += 1
if test == 1: if test == 1:
m.shutdown_started = False m.shutdown_started = False
m._ifc._reader.on_recv.set() m.ifc._reader.on_recv.set()
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
assert m.state == State.closed assert m.state == State.closed
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
else: else:
m.shutdown_started = True m.shutdown_started = True
m._ifc._reader.on_recv.set() m.ifc._reader.on_recv.set()
del m del m
await asyncio.sleep(0.01) await asyncio.sleep(0.01)
@@ -298,13 +299,13 @@ async def test_mqtt_except(config_conn, patch_mqtt_except, patch_open):
test += 1 test += 1
if test == 1: if test == 1:
m.shutdown_started = False m.shutdown_started = False
m._ifc._reader.on_recv.set() m.ifc._reader.on_recv.set()
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
assert m.state == State.closed assert m.state == State.closed
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
else: else:
m.shutdown_started = True m.shutdown_started = True
m._ifc._reader.on_recv.set() m.ifc._reader.on_recv.set()
del m del m
await asyncio.sleep(0.01) await asyncio.sleep(0.01)

View File

@@ -45,7 +45,7 @@ def config_no_conn(test_port):
@pytest.fixture @pytest.fixture
def spy_at_cmd(): def spy_at_cmd():
conn = SolarmanV5(server_side=True, client_mode= False, ifc=AsyncIfcImpl()) conn = SolarmanV5(('test.local', 1234), server_side=True, client_mode= False, ifc=AsyncIfcImpl())
conn.node_id = 'inv_2/' conn.node_id = 'inv_2/'
with patch.object(conn, 'send_at_cmd', wraps=conn.send_at_cmd) as wrapped_conn: with patch.object(conn, 'send_at_cmd', wraps=conn.send_at_cmd) as wrapped_conn:
yield wrapped_conn yield wrapped_conn
@@ -53,7 +53,7 @@ def spy_at_cmd():
@pytest.fixture @pytest.fixture
def spy_modbus_cmd(): def spy_modbus_cmd():
conn = SolarmanV5(server_side=True, client_mode= False, ifc=AsyncIfcImpl()) conn = SolarmanV5(('test.local', 1234), server_side=True, client_mode= False, ifc=AsyncIfcImpl())
conn.node_id = 'inv_1/' conn.node_id = 'inv_1/'
with patch.object(conn, 'send_modbus_cmd', wraps=conn.send_modbus_cmd) as wrapped_conn: with patch.object(conn, 'send_modbus_cmd', wraps=conn.send_modbus_cmd) as wrapped_conn:
yield wrapped_conn yield wrapped_conn
@@ -61,7 +61,7 @@ def spy_modbus_cmd():
@pytest.fixture @pytest.fixture
def spy_modbus_cmd_client(): def spy_modbus_cmd_client():
conn = SolarmanV5(server_side=False, client_mode= False, ifc=AsyncIfcImpl()) conn = SolarmanV5(('test.local', 1234), server_side=False, client_mode= False, ifc=AsyncIfcImpl())
conn.node_id = 'inv_1/' conn.node_id = 'inv_1/'
with patch.object(conn, 'send_modbus_cmd', wraps=conn.send_modbus_cmd) as wrapped_conn: with patch.object(conn, 'send_modbus_cmd', wraps=conn.send_modbus_cmd) as wrapped_conn:
yield wrapped_conn yield wrapped_conn

View File

@@ -35,7 +35,7 @@ class Mqtt():
class MemoryStream(SolarmanV5): class MemoryStream(SolarmanV5):
def __init__(self, msg, chunks = (0,), server_side: bool = True): def __init__(self, msg, chunks = (0,), server_side: bool = True):
_ifc = AsyncIfcImpl() _ifc = AsyncIfcImpl()
super().__init__(server_side, client_mode=False, ifc=_ifc) super().__init__(('test.local', 1234), server_side, client_mode=False, ifc=_ifc)
if server_side: if server_side:
self.mb.timeout = 0.4 # overwrite for faster testing self.mb.timeout = 0.4 # overwrite for faster testing
self.remote = StreamPtr(None) self.remote = StreamPtr(None)
@@ -1236,9 +1236,9 @@ def test_build_logger_modell(config_tsun_allow_all, device_ind_msg):
def test_msg_iterator(): def test_msg_iterator():
Message._registry.clear() Message._registry.clear()
m1 = SolarmanV5(server_side=True, client_mode=False, ifc=AsyncIfcImpl()) m1 = SolarmanV5(('test1.local', 1234), server_side=True, client_mode=False, ifc=AsyncIfcImpl())
m2 = SolarmanV5(server_side=True, client_mode=False, ifc=AsyncIfcImpl()) m2 = SolarmanV5(('test2.local', 1234), server_side=True, client_mode=False, ifc=AsyncIfcImpl())
m3 = SolarmanV5(server_side=True, client_mode=False, ifc=AsyncIfcImpl()) m3 = SolarmanV5(('test3.local', 1234), server_side=True, client_mode=False, ifc=AsyncIfcImpl())
m3.close() m3.close()
del m3 del m3
test1 = 0 test1 = 0
@@ -1256,7 +1256,7 @@ def test_msg_iterator():
assert test2 == 1 assert test2 == 1
def test_proxy_counter(): def test_proxy_counter():
m = SolarmanV5(server_side=True, client_mode=False, ifc=AsyncIfcImpl()) m = SolarmanV5(('test.local', 1234), server_side=True, client_mode=False, ifc=AsyncIfcImpl())
assert m.new_data == {} assert m.new_data == {}
m.db.stat['proxy']['Unknown_Msg'] = 0 m.db.stat['proxy']['Unknown_Msg'] = 0
Infos.new_stat_data['proxy'] = False Infos.new_stat_data['proxy'] = False

View File

@@ -20,7 +20,7 @@ tracer = logging.getLogger('tracer')
class MemoryStream(Talent): class MemoryStream(Talent):
def __init__(self, msg, chunks = (0,), server_side: bool = True): def __init__(self, msg, chunks = (0,), server_side: bool = True):
self.ifc = AsyncIfcImpl() self.ifc = AsyncIfcImpl()
super().__init__(server_side, self.ifc) super().__init__(('test.local', 1234), server_side, self.ifc)
if server_side: if server_side:
self.mb.timeout = 0.4 # overwrite for faster testing self.mb.timeout = 0.4 # overwrite for faster testing
self.remote = StreamPtr(None) self.remote = StreamPtr(None)
@@ -1639,9 +1639,9 @@ def test_ctrl_byte():
def test_msg_iterator(): def test_msg_iterator():
m1 = Talent(server_side=True, ifc=AsyncIfcImpl()) m1 = Talent(('test1.local', 1234), server_side=True, ifc=AsyncIfcImpl())
m2 = Talent(server_side=True, ifc=AsyncIfcImpl()) m2 = Talent(('test2.local', 1234), server_side=True, ifc=AsyncIfcImpl())
m3 = Talent(server_side=True, ifc=AsyncIfcImpl()) m3 = Talent(('test3.local', 1234), server_side=True, ifc=AsyncIfcImpl())
m3.close() m3.close()
del m3 del m3
test1 = 0 test1 = 0