From 39aba31bbdf2a168e5fbcd1e5c9f65ff57d5404b Mon Sep 17 00:00:00 2001 From: Stefan Allius Date: Tue, 1 Oct 2024 19:50:42 +0200 Subject: [PATCH] refactor close handling --- app/src/async_ifc.py | 4 - app/src/async_stream.py | 68 ++++----------- app/src/gen3/connection_g3.py | 3 - app/src/gen3/inverter_g3.py | 5 -- app/src/gen3/talent.py | 9 +- app/src/gen3plus/connection_g3p.py | 4 - app/src/gen3plus/inverter_g3p.py | 5 -- app/src/gen3plus/solarman_v5.py | 6 +- app/src/inverter_base.py | 30 ++++++- app/src/modbus_tcp.py | 3 +- app/src/server.py | 3 +- app/tests/test_inverter_g3.py | 134 +++++++++++----------------- app/tests/test_inverter_g3p.py | 135 +++++++++++------------------ app/tests/test_solarman.py | 12 ++- app/tests/test_talent.py | 16 ++-- 15 files changed, 173 insertions(+), 264 deletions(-) diff --git a/app/src/async_ifc.py b/app/src/async_ifc.py index 99204a9..144f270 100644 --- a/app/src/async_ifc.py +++ b/app/src/async_ifc.py @@ -117,7 +117,3 @@ class AsyncIfc(ABC): @abstractmethod def prot_set_update_header_cb(self, callback): pass # pragma: no cover - - @abstractmethod - def prot_set_close_header_cb(self, callback): - pass # pragma: no cover diff --git a/app/src/async_stream.py b/app/src/async_stream.py index e08bdb4..3d9957d 100644 --- a/app/src/async_stream.py +++ b/app/src/async_stream.py @@ -35,14 +35,12 @@ class AsyncIfcImpl(AsyncIfc): self.timeout_cb = None self.init_new_client_conn_cb = None self.update_header_cb = None - self.close_cb = None def close(self): self.timeout_cb = None self.fwd_fifo.reg_trigger(None) self.tx_fifo.reg_trigger(None) self.rx_fifo.reg_trigger(None) - self.close_cb = None def set_node_id(self, value: str): self.node_id = value @@ -126,9 +124,6 @@ class AsyncIfcImpl(AsyncIfc): def prot_set_update_header_cb(self, callback): self.update_header_cb = callback - def prot_set_close_header_cb(self, callback): - self.close_cb = callback - class StreamPtr(): '''Descr StreamPtr''' @@ -212,7 +207,6 @@ class AsyncStream(AsyncIfcImpl): f'connection timeout ({dead_conn_to}s) ' f'for {self.l_addr}') await self.disc() - self.close() return self except OSError as error: @@ -220,14 +214,12 @@ class AsyncStream(AsyncIfcImpl): f'{error} for l{self.l_addr} | ' f'r{self.r_addr}') await self.disc() - self.close() return self except RuntimeError as error: logger.info(f'[{self.node_id}:{self.conn_no}] ' f'{error} for {self.l_addr}') await self.disc() - self.close() return self except Exception: @@ -251,10 +243,7 @@ class AsyncStream(AsyncIfcImpl): hint: must be called before releasing the connection instance """ - close_cb = self.close_cb super().close() - if close_cb: - close_cb() self._reader.feed_eof() # abort awaited read if self._writer.is_closing(): return @@ -304,21 +293,22 @@ class AsyncStream(AsyncIfcImpl): except OSError as error: if self.remote.stream: - rmt = self.remote.stream - self.remote.stream = None - logger.error(f'[{rmt.node_id}:{rmt.conn_no}] Fwd: {error} for ' - f'l{rmt._ifc.l_addr} | r{rmt._ifc.r_addr}') - await rmt._ifc.disc() - rmt._ifc.close() + rmt = self.remote + logger.error(f'[{rmt.stream.node_id}:{rmt.stream.conn_no}] ' + f'Fwd: {error} for ' + f'l{rmt.ifc.l_addr} | r{rmt.ifc.r_addr}') + await rmt.ifc.disc() + if rmt.ifc.close_cb: + rmt.ifc.close_cb() except RuntimeError as error: if self.remote.stream: - rmt = self.remote.stream - self.remote.stream = None - logger.info(f'[{rmt.node_id}:{rmt.conn_no}] ' - f'Fwd: {error} for {rmt._ifc.l_addr}') - await rmt._ifc.disc() - rmt._ifc.close() + rmt = self.remote + logger.info(f'[{rmt.stream.node_id}:{rmt.stream.conn_no}] ' + f'Fwd: {error} for {rmt.ifc.l_addr}') + await rmt.ifc.disc() + if rmt.ifc.close_cb: + rmt.ifc.close_cb() except Exception: Infos.inc_counter('SW_Exception') @@ -381,46 +371,22 @@ class AsyncStreamServer(AsyncStream): except Exception: pass - def close(self) -> None: - """close handler for a no waiting disconnect - - hint: must be called before releasing the connection instance - """ - logging.info( - f'AsyncStreamServer.close() l{self.l_addr} | r{self.r_addr}') - self.async_create_remote = None - self.async_publ_mqtt = None - super().close() - class AsyncStreamClient(AsyncStream): def __init__(self, reader: StreamReader, writer: StreamWriter, - rstream: "StreamPtr") -> None: + rstream: "StreamPtr", close_cb) -> None: AsyncStream.__init__(self, reader, writer, rstream) + self.close_cb = close_cb async def client_loop(self, _: str) -> None: '''Loop for receiving messages from the TSUN cloud (client-side)''' - logging.info(f'AsynStream.client_loop{self} rem-> {self.remote}') await self.loop() logger.info(f'[{self.node_id}:{self.conn_no}] ' 'Client loop stopped for' f' l{self.l_addr}') - server_ifc = self.remote.ifc - - # if the client connection closes, we don't touch the server - # connection. Instead we erase the client connection stream, - # thus on the next received packet from the inverter, we can - # establish a new connection to the TSUN cloud - - if server_ifc and server_ifc.remote.ifc == self: - # logging.debug(f'Client l{client_stream.l_addr} refs:' - # f' {gc.get_referrers(client_stream)}') - # than erase client connection - server_ifc.remote.stream = None # erases stream and ifc link - - # erase backlink to inverter - self.remote.stream = None + if self.close_cb: + self.close_cb() async def _async_forward(self) -> None: """forward handler transmits data over the remote connection""" diff --git a/app/src/gen3/connection_g3.py b/app/src/gen3/connection_g3.py index 6123693..b1e8b8b 100644 --- a/app/src/gen3/connection_g3.py +++ b/app/src/gen3/connection_g3.py @@ -11,6 +11,3 @@ logger = logging.getLogger('conn') class ConnectionG3(Talent): def __init__(self, addr, ifc, server_side, id_str=b'') -> None: super().__init__(addr, server_side, ifc, id_str) - - def close(self): - super().close() diff --git a/app/src/gen3/inverter_g3.py b/app/src/gen3/inverter_g3.py index c1eaf61..05f12ec 100644 --- a/app/src/gen3/inverter_g3.py +++ b/app/src/gen3/inverter_g3.py @@ -33,8 +33,3 @@ class InverterG3(InverterBase): async def async_create_remote(self) -> None: await InverterBase.async_create_remote( self, 'tsun', ConnectionG3) - - def close(self) -> None: - logging.debug(f'InverterG3.close() {self.addr}') - self.local.stream.close() -# logging.info(f'Inverter refs: {gc.get_referrers(self)}') diff --git a/app/src/gen3/talent.py b/app/src/gen3/talent.py index 4f9b309..f0c3246 100644 --- a/app/src/gen3/talent.py +++ b/app/src/gen3/talent.py @@ -438,8 +438,8 @@ class Talent(Message): result = struct.unpack_from('!q', self.ifc.rx_peek(), self.header_len) self.ts_offset = result[0]-ts - if self.remote.stream: - self.remote.stream.ts_offset = self.ts_offset + if self.ifc.remote.stream: + self.ifc.remote.stream.ts_offset = self.ts_offset logger.debug(f'tsun-time: {int(result[0]):08x}' f' proxy-time: {ts:08x}' f' offset: {self.ts_offset}') @@ -597,9 +597,8 @@ class Talent(Message): self.header_len+self.data_len] if self.ctrl.is_req(): - if self.remote.stream.mb.recv_req(data[hdr_len:], - self.remote.stream. - msg_forward): + rstream = self.ifc.remote.stream + if rstream.mb.recv_req(data[hdr_len:], rstream.msg_forward): self.inc_counter('Modbus_Command') else: self.inc_counter('Invalid_Msg_Format') diff --git a/app/src/gen3plus/connection_g3p.py b/app/src/gen3plus/connection_g3p.py index b86592e..66d327e 100644 --- a/app/src/gen3plus/connection_g3p.py +++ b/app/src/gen3plus/connection_g3p.py @@ -12,7 +12,3 @@ class ConnectionG3P(SolarmanV5): def __init__(self, addr, ifc, server_side, client_mode: bool = False) -> None: super().__init__(addr, server_side, client_mode, ifc) - - def close(self): - super().close() - # logger.info(f'AsyncStream refs: {gc.get_referrers(self)}') diff --git a/app/src/gen3plus/inverter_g3p.py b/app/src/gen3plus/inverter_g3p.py index f8fbfc7..a362e4f 100644 --- a/app/src/gen3plus/inverter_g3p.py +++ b/app/src/gen3plus/inverter_g3p.py @@ -34,8 +34,3 @@ class InverterG3P(InverterBase): async def async_create_remote(self) -> None: await InverterBase.async_create_remote( self, 'solarman', ConnectionG3P) - - def close(self) -> None: - logging.debug(f'InverterG3P.close() {self.addr}') - self.local.stream.close() -# logger.debug (f'Inverter refs: {gc.get_referrers(self)}') diff --git a/app/src/gen3plus/solarman_v5.py b/app/src/gen3plus/solarman_v5.py index a9dfb79..c0cd827 100644 --- a/app/src/gen3plus/solarman_v5.py +++ b/app/src/gen3plus/solarman_v5.py @@ -603,9 +603,9 @@ class SolarmanV5(Message): self.forward_at_cmd_resp = True elif ftype == self.MB_RTU_CMD: - if self.remote.stream.mb.recv_req(data[15:], - self.remote.stream. - __forward_msg): + rstream = self.ifc.remote.stream + if rstream.mb.recv_req(data[15:], + rstream.__forward_msg): self.inc_counter('Modbus_Command') else: logger.error('Invalid Modbus Msg') diff --git a/app/src/inverter_base.py b/app/src/inverter_base.py index ac9ecd7..0b6fad7 100644 --- a/app/src/inverter_base.py +++ b/app/src/inverter_base.py @@ -22,6 +22,32 @@ class InverterBase(Inverter): def __init__(self): self.__ha_restarts = -1 + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb) -> None: + logging.info(f'Inverter.__exit__() {self.addr}') + self.__del_remote() + if self.local.stream: + self.local.stream.close() + self.local.stream = None + + if self.local.ifc: + self.local.ifc.close() + self.local.ifc = None + + def __del__(self) -> None: + logging.info(f'Inverter.__del__() {self.addr}') + + def __del_remote(self): + if self.remote.stream: + self.remote.stream.close() + self.remote.stream = None + + if self.remote.ifc: + self.remote.ifc.close() + self.remote.ifc = None + async def async_create_remote(self, inv_prot: str, conn_class) -> None: '''Establish a client connection to the TSUN cloud''' tsun = Config.get(inv_prot) @@ -36,8 +62,8 @@ class InverterBase(Inverter): logging.info(f'[{stream.node_id}] Connect to {addr}') connect = asyncio.open_connection(host, port) reader, writer = await connect - ifc = AsyncStreamClient(reader, writer, - self.local) + ifc = AsyncStreamClient( + reader, writer, self.local, self.__del_remote) if hasattr(stream, 'id_str'): self.remote.stream = conn_class( diff --git a/app/src/modbus_tcp.py b/app/src/modbus_tcp.py index bac6243..8bacce3 100644 --- a/app/src/modbus_tcp.py +++ b/app/src/modbus_tcp.py @@ -27,6 +27,7 @@ class ModbusConn(): reader, writer = await connection self.inverter = InverterG3P(reader, writer, self.addr, client_mode=True) + self.inverter.__enter__() stream = self.inverter.local.stream logging.info(f'[{stream.node_id}:{stream.conn_no}] ' f'Connected to {self.addr}') @@ -37,7 +38,7 @@ class ModbusConn(): async def __aexit__(self, exc_type, exc, tb): Infos.dec_counter('Inverter_Cnt') await self.inverter.local.ifc.publish_outstanding_mqtt() - self.inverter.close() + self.inverter.__exit__(exc_type, exc, tb) class ModbusTcp(): diff --git a/app/src/server.py b/app/src/server.py index a40a178..bbd7f83 100644 --- a/app/src/server.py +++ b/app/src/server.py @@ -74,7 +74,8 @@ async def handle_client(reader: StreamReader, writer: StreamWriter, inv_class): '''Handles a new incoming connection and starts an async loop''' addr = writer.get_extra_info('peername') - await inv_class(reader, writer, addr).local.ifc.server_loop() + with inv_class(reader, writer, addr) as inv: + await inv.local.ifc.server_loop() async def handle_shutdown(web_task): diff --git a/app/tests/test_inverter_g3.py b/app/tests/test_inverter_g3.py index e30f472..27ee51b 100644 --- a/app/tests/test_inverter_g3.py +++ b/app/tests/test_inverter_g3.py @@ -47,11 +47,6 @@ def patch_conn_init(): with patch.object(ConnectionG3, '__init__', return_value= None) as conn: yield conn -@pytest.fixture -def patch_conn_close(): - with patch.object(ConnectionG3, 'close') as conn: - yield conn - class FakeReader(): def __init__(self): self.on_recv = asyncio.Event() @@ -104,131 +99,102 @@ def patch_open_connection(): yield conn -def test_method_calls(patch_conn_close): - spy2 = patch_conn_close +def test_method_calls(): reader = FakeReader() writer = FakeWriter() addr = ('proxy.local', 10000) - inverter = InverterG3(reader, writer, addr) - assert inverter.local.stream - assert inverter.local.ifc - - inverter.close() - spy2.assert_called_once() + with InverterG3(reader, writer, addr) as inverter: + assert inverter.local.stream + assert inverter.local.ifc @pytest.mark.asyncio -async def test_remote_conn(config_conn, patch_open_connection, patch_conn_close): +async def test_remote_conn(config_conn, patch_open_connection): _ = config_conn _ = patch_open_connection assert asyncio.get_running_loop() - spy1 = patch_conn_close - - inverter = InverterG3(FakeReader(), FakeWriter(), ('proxy.local', 10000)) - - await inverter.async_create_remote() - await asyncio.sleep(0) - assert inverter.remote.stream - inverter.close() - spy1.assert_called_once() + with InverterG3(FakeReader(), FakeWriter(), ('proxy.local', 10000)) as inverter: + await inverter.async_create_remote() + await asyncio.sleep(0) + assert inverter.remote.stream @pytest.mark.asyncio -async def test_remote_except(config_conn, patch_open_connection, patch_conn_close): +async def test_remote_except(config_conn, patch_open_connection): _ = config_conn _ = patch_open_connection assert asyncio.get_running_loop() - spy1 = patch_conn_close - global test test = TestType.RD_TEST_TIMEOUT - inverter = InverterG3(FakeReader(), FakeWriter(), ('proxy.local', 10000)) + with InverterG3(FakeReader(), FakeWriter(), ('proxy.local', 10000)) as inverter: + await inverter.async_create_remote() + await asyncio.sleep(0) + assert inverter.remote.stream==None - await inverter.async_create_remote() - await asyncio.sleep(0) - assert inverter.remote.stream==None - - test = TestType.RD_TEST_EXCEPT - await inverter.async_create_remote() - await asyncio.sleep(0) - assert inverter.remote.stream==None - inverter.close() - spy1.assert_called_once() + test = TestType.RD_TEST_EXCEPT + await inverter.async_create_remote() + await asyncio.sleep(0) + assert inverter.remote.stream==None @pytest.mark.asyncio -async def test_mqtt_publish(config_conn, patch_open_connection, patch_conn_close): +async def test_mqtt_publish(config_conn, patch_open_connection): _ = config_conn _ = patch_open_connection assert asyncio.get_running_loop() - spy1 = patch_conn_close - Inverter.class_init() - inverter = InverterG3(FakeReader(), FakeWriter(), ('proxy.local', 10000)) - stream = inverter.local.stream - await inverter.async_publ_mqtt() # check call with invalid unique_id - stream._Talent__set_serial_no(serial_no= "123344") - - stream.new_data['inverter'] = True - stream.db.db['inverter'] = {} - await inverter.async_publ_mqtt() - assert stream.new_data['inverter'] == False + with InverterG3(FakeReader(), FakeWriter(), ('proxy.local', 10000)) as inverter: + stream = inverter.local.stream + await inverter.async_publ_mqtt() # check call with invalid unique_id + stream._Talent__set_serial_no(serial_no= "123344") - stream.new_data['env'] = True - stream.db.db['env'] = {} - await inverter.async_publ_mqtt() - assert stream.new_data['env'] == False + stream.new_data['inverter'] = True + stream.db.db['inverter'] = {} + await inverter.async_publ_mqtt() + assert stream.new_data['inverter'] == False - Infos.new_stat_data['proxy'] = True - await inverter.async_publ_mqtt() - assert Infos.new_stat_data['proxy'] == False + stream.new_data['env'] = True + stream.db.db['env'] = {} + await inverter.async_publ_mqtt() + assert stream.new_data['env'] == False - inverter.close() - spy1.assert_called_once() + Infos.new_stat_data['proxy'] = True + await inverter.async_publ_mqtt() + assert Infos.new_stat_data['proxy'] == False @pytest.mark.asyncio -async def test_mqtt_err(config_conn, patch_open_connection, patch_mqtt_err, patch_conn_close): +async def test_mqtt_err(config_conn, patch_open_connection, patch_mqtt_err): _ = config_conn _ = patch_open_connection _ = patch_mqtt_err assert asyncio.get_running_loop() - spy1 = patch_conn_close - Inverter.class_init() - inverter = InverterG3(FakeReader(), FakeWriter(), ('proxy.local', 10000)) - stream = inverter.local.stream - stream._Talent__set_serial_no(serial_no= "123344") - stream.new_data['inverter'] = True - stream.db.db['inverter'] = {} - await inverter.async_publ_mqtt() - assert stream.new_data['inverter'] == True - - inverter.close() - spy1.assert_called_once() + with InverterG3(FakeReader(), FakeWriter(), ('proxy.local', 10000)) as inverter: + stream = inverter.local.stream + stream._Talent__set_serial_no(serial_no= "123344") + stream.new_data['inverter'] = True + stream.db.db['inverter'] = {} + await inverter.async_publ_mqtt() + assert stream.new_data['inverter'] == True @pytest.mark.asyncio -async def test_mqtt_except(config_conn, patch_open_connection, patch_mqtt_except, patch_conn_close): +async def test_mqtt_except(config_conn, patch_open_connection, patch_mqtt_except): _ = config_conn _ = patch_open_connection _ = patch_mqtt_except assert asyncio.get_running_loop() - spy1 = patch_conn_close - Inverter.class_init() - inverter = InverterG3(FakeReader(), FakeWriter(), ('proxy.local', 10000)) - stream = inverter.local.stream - stream._Talent__set_serial_no(serial_no= "123344") - - stream.new_data['inverter'] = True - stream.db.db['inverter'] = {} - await inverter.async_publ_mqtt() - assert stream.new_data['inverter'] == True + with InverterG3(FakeReader(), FakeWriter(), ('proxy.local', 10000)) as inverter: + stream = inverter.local.stream + stream._Talent__set_serial_no(serial_no= "123344") - inverter.close() - spy1.assert_called_once() + stream.new_data['inverter'] = True + stream.db.db['inverter'] = {} + await inverter.async_publ_mqtt() + assert stream.new_data['inverter'] == True diff --git a/app/tests/test_inverter_g3p.py b/app/tests/test_inverter_g3p.py index 5a4e5a4..ff5564a 100644 --- a/app/tests/test_inverter_g3p.py +++ b/app/tests/test_inverter_g3p.py @@ -48,11 +48,6 @@ def patch_conn_init(): with patch.object(ConnectionG3P, '__init__', return_value= None) as conn: yield conn -@pytest.fixture -def patch_conn_close(): - with patch.object(ConnectionG3P, 'close') as conn: - yield conn - class FakeReader(): def __init__(self): self.on_recv = asyncio.Event() @@ -104,132 +99,102 @@ def patch_open_connection(): with patch.object(asyncio, 'open_connection', new_open) as conn: yield conn - -def test_method_calls(patch_conn_close): - spy2 = patch_conn_close +def test_method_calls(): reader = FakeReader() writer = FakeWriter() addr = ('proxy.local', 10000) - inverter = InverterG3P(reader, writer, addr, client_mode=False) - assert inverter.local.stream - assert inverter.local.ifc - - inverter.close() - spy2.assert_called_once() + with InverterG3P(reader, writer, addr, client_mode=False) as inverter: + assert inverter.local.stream + assert inverter.local.ifc @pytest.mark.asyncio -async def test_remote_conn(config_conn, patch_open_connection, patch_conn_close): +async def test_remote_conn(config_conn, patch_open_connection): _ = config_conn _ = patch_open_connection assert asyncio.get_running_loop() - spy1 = patch_conn_close - - inverter = InverterG3P(FakeReader(), FakeWriter(), ('proxy.local', 10000), client_mode=False) - - await inverter.async_create_remote() - await asyncio.sleep(0) - assert inverter.remote.stream - inverter.close() - spy1.assert_called_once() + with InverterG3P(FakeReader(), FakeWriter(), ('proxy.local', 10000), client_mode=False) as inverter: + await inverter.async_create_remote() + await asyncio.sleep(0) + assert inverter.remote.stream @pytest.mark.asyncio -async def test_remote_except(config_conn, patch_open_connection, patch_conn_close): +async def test_remote_except(config_conn, patch_open_connection): _ = config_conn _ = patch_open_connection assert asyncio.get_running_loop() - - spy1 = patch_conn_close global test test = TestType.RD_TEST_TIMEOUT - inverter = InverterG3P(FakeReader(), FakeWriter(), ('proxy.local', 10000), client_mode=False) + with InverterG3P(FakeReader(), FakeWriter(), ('proxy.local', 10000), client_mode=False) as inverter: + await inverter.async_create_remote() + await asyncio.sleep(0) + assert inverter.remote.stream==None - await inverter.async_create_remote() - await asyncio.sleep(0) - assert inverter.remote.stream==None - - test = TestType.RD_TEST_EXCEPT - await inverter.async_create_remote() - await asyncio.sleep(0) - assert inverter.remote.stream==None - inverter.close() - spy1.assert_called_once() + test = TestType.RD_TEST_EXCEPT + await inverter.async_create_remote() + await asyncio.sleep(0) + assert inverter.remote.stream==None @pytest.mark.asyncio -async def test_mqtt_publish(config_conn, patch_open_connection, patch_conn_close): +async def test_mqtt_publish(config_conn, patch_open_connection): _ = config_conn _ = patch_open_connection assert asyncio.get_running_loop() - spy1 = patch_conn_close - Inverter.class_init() - inverter = InverterG3P(FakeReader(), FakeWriter(), ('proxy.local', 10000), client_mode=False) - stream = inverter.local.stream - await inverter.async_publ_mqtt() # check call with invalid unique_id - stream._SolarmanV5__set_serial_no(snr= 123344) - - stream.new_data['inverter'] = True - stream.db.db['inverter'] = {} - await inverter.async_publ_mqtt() - assert stream.new_data['inverter'] == False + with InverterG3P(FakeReader(), FakeWriter(), ('proxy.local', 10000), client_mode=False) as inverter: + stream = inverter.local.stream + await inverter.async_publ_mqtt() # check call with invalid unique_id + stream._SolarmanV5__set_serial_no(snr= 123344) - stream.new_data['env'] = True - stream.db.db['env'] = {} - await inverter.async_publ_mqtt() - assert stream.new_data['env'] == False + stream.new_data['inverter'] = True + stream.db.db['inverter'] = {} + await inverter.async_publ_mqtt() + assert stream.new_data['inverter'] == False - Infos.new_stat_data['proxy'] = True - await inverter.async_publ_mqtt() - assert Infos.new_stat_data['proxy'] == False + stream.new_data['env'] = True + stream.db.db['env'] = {} + await inverter.async_publ_mqtt() + assert stream.new_data['env'] == False - inverter.close() - spy1.assert_called_once() + Infos.new_stat_data['proxy'] = True + await inverter.async_publ_mqtt() + assert Infos.new_stat_data['proxy'] == False @pytest.mark.asyncio -async def test_mqtt_err(config_conn, patch_open_connection, patch_mqtt_err, patch_conn_close): +async def test_mqtt_err(config_conn, patch_open_connection, patch_mqtt_err): _ = config_conn _ = patch_open_connection _ = patch_mqtt_err assert asyncio.get_running_loop() - spy1 = patch_conn_close - Inverter.class_init() - inverter = InverterG3P(FakeReader(), FakeWriter(), ('proxy.local', 10000), client_mode=False) - stream = inverter.local.stream - stream._SolarmanV5__set_serial_no(snr= 123344) - stream.new_data['inverter'] = True - stream.db.db['inverter'] = {} - await inverter.async_publ_mqtt() - assert stream.new_data['inverter'] == True - - inverter.close() - spy1.assert_called_once() + with InverterG3P(FakeReader(), FakeWriter(), ('proxy.local', 10000), client_mode=False) as inverter: + stream = inverter.local.stream + stream._SolarmanV5__set_serial_no(snr= 123344) + stream.new_data['inverter'] = True + stream.db.db['inverter'] = {} + await inverter.async_publ_mqtt() + assert stream.new_data['inverter'] == True @pytest.mark.asyncio -async def test_mqtt_except(config_conn, patch_open_connection, patch_mqtt_except, patch_conn_close): +async def test_mqtt_except(config_conn, patch_open_connection, patch_mqtt_except): _ = config_conn _ = patch_open_connection _ = patch_mqtt_except assert asyncio.get_running_loop() - spy1 = patch_conn_close - Inverter.class_init() - inverter = InverterG3P(FakeReader(), FakeWriter(), ('proxy.local', 10000), client_mode=False) - stream = inverter.local.stream - stream._SolarmanV5__set_serial_no(snr= 123344) - - stream.new_data['inverter'] = True - stream.db.db['inverter'] = {} - await inverter.async_publ_mqtt() - assert stream.new_data['inverter'] == True + with InverterG3P(FakeReader(), FakeWriter(), ('proxy.local', 10000), client_mode=False) as inverter: + stream = inverter.local.stream + stream._SolarmanV5__set_serial_no(snr= 123344) - inverter.close() - spy1.assert_called_once() + stream.new_data['inverter'] = True + stream.db.db['inverter'] = {} + await inverter.async_publ_mqtt() + assert stream.new_data['inverter'] == True diff --git a/app/tests/test_solarman.py b/app/tests/test_solarman.py index 6da7d23..795b32f 100644 --- a/app/tests/test_solarman.py +++ b/app/tests/test_solarman.py @@ -32,13 +32,17 @@ class Mqtt(): self.data = data +class FakeIfc(AsyncIfcImpl): + def __init__(self): + super().__init__() + self.remote = StreamPtr(None) + class MemoryStream(SolarmanV5): def __init__(self, msg, chunks = (0,), server_side: bool = True): - _ifc = AsyncIfcImpl() + _ifc = FakeIfc() super().__init__(('test.local', 1234), server_side, client_mode=False, ifc=_ifc) if server_side: self.mb.timeout = 0.4 # overwrite for faster testing - self.remote = StreamPtr(None) self.mb_first_timeout = 0.5 self.mb_timeout = 0.5 self.sent_pdu = b'' @@ -101,8 +105,8 @@ class MemoryStream(SolarmanV5): def createClientStream(self, msg, chunks = (0,)): c = MemoryStream(msg, chunks, False) - self.remote.stream = c - c. remote.stream = self + self.ifc.remote.stream = c + c.ifc.remote.stream = self return c def _SolarmanV5__flush_recv_msg(self) -> None: diff --git a/app/tests/test_talent.py b/app/tests/test_talent.py index 38b0edd..d082885 100644 --- a/app/tests/test_talent.py +++ b/app/tests/test_talent.py @@ -16,14 +16,17 @@ Infos.static_init() tracer = logging.getLogger('tracer') +class FakeIfc(AsyncIfcImpl): + def __init__(self): + super().__init__() + self.remote = StreamPtr(None) class MemoryStream(Talent): def __init__(self, msg, chunks = (0,), server_side: bool = True): - self.ifc = AsyncIfcImpl() + self.ifc = FakeIfc() super().__init__(('test.local', 1234), server_side, self.ifc) if server_side: self.mb.timeout = 0.4 # overwrite for faster testing - self.remote = StreamPtr(None) self.mb_first_timeout = 0.5 self.mb_timeout = 0.5 self.sent_pdu = b'' @@ -37,7 +40,6 @@ class MemoryStream(Talent): self.addr = 'Test: SrvSide' self.send_msg_ofs = 0 self.msg_recvd = [] - self.remote.stream = None def write_cb(self): self.sent_pdu = self.ifc.tx_fifo.get() @@ -73,8 +75,8 @@ class MemoryStream(Talent): def createClientStream(self, msg, chunks = (0,)): c = MemoryStream(msg, chunks, False) - self.remote.stream = c - c. remote.stream = self + self.ifc.remote.stream = c + c.ifc.remote.stream = self return c def _Talent__flush_recv_msg(self) -> None: @@ -1059,7 +1061,7 @@ def test_msg_time_resp(config_tsun_inv1, msg_time_rsp): m = MemoryStream(msg_time_rsp, (0,), False) s = MemoryStream(b'', (0,), True) assert s.ts_offset==0 - m.remote.stream = s + m.ifc.remote.stream = s m.db.stat['proxy']['Unknown_Ctrl'] = 0 m.read() # read complete msg, and dispatch msg assert not m.header_valid # must be invalid, since msg was handled and buffer flushed @@ -1075,7 +1077,7 @@ def test_msg_time_resp(config_tsun_inv1, msg_time_rsp): assert m.ifc.fwd_fifo.get()==b'' assert m.ifc.tx_fifo.get()==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 - m.remote.stream = None + m.ifc.remote.stream = None s.close() m.close()