Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bb39567d05 | ||
|
|
b6431f8448 | ||
|
|
714dd92f35 | ||
|
|
02861f70af |
@@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
## [0.5.2] - 2023-11-09
|
||||||
|
|
||||||
|
- add int64 data type to info parser
|
||||||
|
- allow multiple calls to Message.close()
|
||||||
|
- check for race cond. on closing and establishing client connections
|
||||||
|
|
||||||
## [0.5.1] - 2023-11-05
|
## [0.5.1] - 2023-11-05
|
||||||
|
|
||||||
- fixes f-string by limes007
|
- fixes f-string by limes007
|
||||||
|
|||||||
@@ -16,11 +16,15 @@ class AsyncStream(Message):
|
|||||||
self.writer = writer
|
self.writer = writer
|
||||||
self.remoteStream = remote_stream
|
self.remoteStream = remote_stream
|
||||||
self.addr = addr
|
self.addr = addr
|
||||||
|
self.r_addr = ''
|
||||||
|
self.l_addr = ''
|
||||||
|
|
||||||
'''
|
'''
|
||||||
Our puplic methods
|
Our puplic methods
|
||||||
'''
|
'''
|
||||||
async def loop(self) -> None:
|
async def loop(self):
|
||||||
|
self.r_addr = self.writer.get_extra_info('peername')
|
||||||
|
self.l_addr = self.writer.get_extra_info('sockname')
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
@@ -35,26 +39,27 @@ class AsyncStream(Message):
|
|||||||
ConnectionAbortedError,
|
ConnectionAbortedError,
|
||||||
BrokenPipeError,
|
BrokenPipeError,
|
||||||
RuntimeError) as error:
|
RuntimeError) as error:
|
||||||
logger.warning(f'In loop for {self.addr}: {error}')
|
logger.warning(f'In loop for l{self.l_addr} | '
|
||||||
|
f'r{self.r_addr}: {error}')
|
||||||
self.close()
|
self.close()
|
||||||
return
|
return self
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.error(
|
logger.error(
|
||||||
f"Exception for {self.addr}:\n"
|
f"Exception for {self.addr}:\n"
|
||||||
f"{traceback.format_exc()}")
|
f"{traceback.format_exc()}")
|
||||||
self.close()
|
self.close()
|
||||||
return
|
return self
|
||||||
|
|
||||||
def disc(self) -> None:
|
def disc(self) -> None:
|
||||||
logger.debug(f'in AsyncStream.disc() {self.addr}')
|
logger.debug(f'in AsyncStream.disc() l{self.l_addr} | r{self.r_addr}')
|
||||||
self.writer.close()
|
self.writer.close()
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
logger.debug(f'in AsyncStream.close() {self.addr}')
|
logger.debug(f'in AsyncStream.close() l{self.l_addr} | r{self.r_addr}')
|
||||||
self.writer.close()
|
self.writer.close()
|
||||||
super().close() # call close handler in the parent class
|
super().close() # call close handler in the parent class
|
||||||
|
|
||||||
# logger.info (f'AsyncStream refs: {gc.get_referrers(self)}')
|
# logger.info(f'AsyncStream refs: {gc.get_referrers(self)}')
|
||||||
|
|
||||||
'''
|
'''
|
||||||
Our private methods
|
Our private methods
|
||||||
@@ -96,4 +101,4 @@ class AsyncStream(Message):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
logging.debug(f"AsyncStream.__del__ {self.addr}")
|
logging.debug(f"AsyncStream.__del__ l{self.l_addr} | r{self.r_addr}")
|
||||||
|
|||||||
@@ -341,6 +341,9 @@ class Infos:
|
|||||||
elif data_type == 0x46: # 'F' -> float32
|
elif data_type == 0x46: # 'F' -> float32
|
||||||
result = round(struct.unpack_from('!f', buf, ind)[0], 2)
|
result = round(struct.unpack_from('!f', buf, ind)[0], 2)
|
||||||
ind += 4
|
ind += 4
|
||||||
|
elif data_type == 0x4c: # 'L' -> int64
|
||||||
|
result = struct.unpack_from('!q', buf, ind)[0]
|
||||||
|
ind += 8
|
||||||
else:
|
else:
|
||||||
self.inc_counter('Invalid_Data_Type')
|
self.inc_counter('Invalid_Data_Type')
|
||||||
logging.error(f"Infos.parse: data_type: {data_type}"
|
logging.error(f"Infos.parse: data_type: {data_type}"
|
||||||
|
|||||||
@@ -107,7 +107,7 @@ class Inverter(AsyncStream):
|
|||||||
self.inc_counter('Inverter_Cnt')
|
self.inc_counter('Inverter_Cnt')
|
||||||
await self.loop()
|
await self.loop()
|
||||||
self.dec_counter('Inverter_Cnt')
|
self.dec_counter('Inverter_Cnt')
|
||||||
logging.info(f'Server loop stopped for {addr}')
|
logging.info(f'Server loop stopped for r{self.r_addr}')
|
||||||
|
|
||||||
# if the server connection closes, we also have to disconnect
|
# if the server connection closes, we also have to disconnect
|
||||||
# the connection to te TSUN cloud
|
# the connection to te TSUN cloud
|
||||||
@@ -121,15 +121,22 @@ class Inverter(AsyncStream):
|
|||||||
|
|
||||||
async def client_loop(self, addr):
|
async def client_loop(self, addr):
|
||||||
'''Loop for receiving messages from the TSUN cloud (client-side)'''
|
'''Loop for receiving messages from the TSUN cloud (client-side)'''
|
||||||
await self.remoteStream.loop()
|
clientStream = await self.remoteStream.loop()
|
||||||
logging.info(f'Client loop stopped for {addr}')
|
logging.info(f'Client loop stopped for l{clientStream.l_addr}')
|
||||||
|
|
||||||
# if the client connection closes, we don't touch the server
|
# if the client connection closes, we don't touch the server
|
||||||
# connection. Instead we erase the client connection stream,
|
# connection. Instead we erase the client connection stream,
|
||||||
# thus on the next received packet from the inverter, we can
|
# thus on the next received packet from the inverter, we can
|
||||||
# establish a new connection to the TSUN cloud
|
# establish a new connection to the TSUN cloud
|
||||||
self.remoteStream.remoteStream = None # erase backlink to inverter
|
|
||||||
self.remoteStream = None # than erase client connection
|
# erase backlink to inverter
|
||||||
|
clientStream.remoteStream = None
|
||||||
|
|
||||||
|
if self.remoteStream == clientStream:
|
||||||
|
# logging.debug(f'Client l{clientStream.l_addr} refs:'
|
||||||
|
# f' {gc.get_referrers(clientStream)}')
|
||||||
|
# than erase client connection
|
||||||
|
self.remoteStream = None
|
||||||
|
|
||||||
async def async_create_remote(self) -> None:
|
async def async_create_remote(self) -> None:
|
||||||
'''Establish a client connection to the TSUN cloud'''
|
'''Establish a client connection to the TSUN cloud'''
|
||||||
@@ -197,7 +204,7 @@ class Inverter(AsyncStream):
|
|||||||
f"/{node_id}{id}/config", data_json)
|
f"/{node_id}{id}/config", data_json)
|
||||||
|
|
||||||
def close(self) -> None:
|
def close(self) -> None:
|
||||||
logging.debug(f'Inverter.close() {self.addr}')
|
logging.debug(f'Inverter.close() l{self.l_addr} | r{self.r_addr}')
|
||||||
super().close() # call close handler in the parent class
|
super().close() # call close handler in the parent class
|
||||||
# logger.debug (f'Inverter refs: {gc.get_referrers(self)}')
|
# logger.debug (f'Inverter refs: {gc.get_referrers(self)}')
|
||||||
|
|
||||||
|
|||||||
@@ -110,7 +110,7 @@ class Message(metaclass=IterRegistry):
|
|||||||
# we have refernces to methods of this class in self.switch
|
# we have refernces to methods of this class in self.switch
|
||||||
# so we have to erase self.switch, otherwise this instance can't be
|
# so we have to erase self.switch, otherwise this instance can't be
|
||||||
# deallocated by the garbage collector ==> we get a memory leak
|
# deallocated by the garbage collector ==> we get a memory leak
|
||||||
del self.switch
|
self.switch.clear()
|
||||||
|
|
||||||
def inc_counter(self, counter: str) -> None:
|
def inc_counter(self, counter: str) -> None:
|
||||||
self.db.inc_counter(counter)
|
self.db.inc_counter(counter)
|
||||||
|
|||||||
Reference in New Issue
Block a user