Compare commits

...

11 Commits

Author SHA1 Message Date
Stefan Allius
c3430f509e Version 0.5.3 2023-11-12 15:23:43 +01:00
Stefan Allius
32a669d0d1 Merge pull request #27 from s-allius/s-allius/issue26
S allius/issue26
2023-11-12 15:19:48 +01:00
Stefan Allius
4d9f00221c fix the palnt offline problem in tsun cloud
- use TSUN timestamp instead of local time,
  as TSUN also expects Central European Summer
  Time in winter
2023-11-12 15:15:30 +01:00
Stefan Allius
27c723b0c8 init contact_mail and contact_name 2023-11-12 01:06:24 +01:00
Stefan Allius
4bd59b91b3 send contact info every time a client connection is established 2023-11-11 23:49:06 +01:00
Stefan Allius
3a3c6142b8 ignore build.sh 2023-11-09 20:43:46 +01:00
Stefan Allius
5d36397f2f remover apk from the final image 2023-11-09 20:17:19 +01:00
Stefan Allius
bb39567d05 Version 0.5.2 2023-11-09 20:05:56 +01:00
Stefan Allius
b6431f8448 improve client conn disconection
- check for race cond. on closing and establishing
  client connections
- improve connection trace
2023-11-09 20:03:09 +01:00
Stefan Allius
714dd92f35 allow multiple calls to Message.close() 2023-11-08 18:57:56 +01:00
Stefan Allius
02861f70af - add int64 data type to info parser 2023-11-07 00:19:48 +01:00
9 changed files with 144 additions and 40 deletions

View File

@@ -7,6 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
## [0.5.3] - 2023-11-12
- remove apk packet manager from the final image
- send contact info every time a client connection is established
- change timestamp from local time to utc
## [0.5.2] - 2023-11-09
- add int64 data type to info parser
- allow multiple calls to Message.close()
- check for race cond. on closing and establishing client connections
## [0.5.1] - 2023-11-05 ## [0.5.1] - 2023-11-05
- fixes f-string by limes007 - fixes f-string by limes007

View File

@@ -1,4 +1,5 @@
tests/ tests/
**/__pycache__ **/__pycache__
*.pyc *.pyc
.DS_Store .DS_Store
build.sh

View File

@@ -52,6 +52,8 @@ COPY --from=builder /root/wheels /root/wheels
RUN python -m pip install --no-cache --no-index /root/wheels/* && \ RUN python -m pip install --no-cache --no-index /root/wheels/* && \
rm -rf /root/wheels rm -rf /root/wheels
RUN apk --purge del apk-tools
# copy the content of the local src and config directory to the working directory # copy the content of the local src and config directory to the working directory
COPY --chmod=0700 entrypoint.sh /root/entrypoint.sh COPY --chmod=0700 entrypoint.sh /root/entrypoint.sh
COPY config . COPY config .

View File

@@ -9,18 +9,22 @@ logger = logging.getLogger('conn')
class AsyncStream(Message): class AsyncStream(Message):
def __init__(self, reader, writer, addr, remote_stream, server_side: bool def __init__(self, reader, writer, addr, remote_stream, server_side: bool,
) -> None: id_str=b'') -> None:
super().__init__(server_side) super().__init__(server_side, id_str)
self.reader = reader self.reader = reader
self.writer = writer self.writer = writer
self.remoteStream = remote_stream self.remoteStream = remote_stream
self.addr = addr self.addr = addr
self.r_addr = ''
self.l_addr = ''
''' '''
Our puplic methods Our puplic methods
''' '''
async def loop(self) -> None: async def loop(self):
self.r_addr = self.writer.get_extra_info('peername')
self.l_addr = self.writer.get_extra_info('sockname')
while True: while True:
try: try:
@@ -35,26 +39,27 @@ class AsyncStream(Message):
ConnectionAbortedError, ConnectionAbortedError,
BrokenPipeError, BrokenPipeError,
RuntimeError) as error: RuntimeError) as error:
logger.warning(f'In loop for {self.addr}: {error}') logger.warning(f'In loop for l{self.l_addr} | '
f'r{self.r_addr}: {error}')
self.close() self.close()
return return self
except Exception: except Exception:
logger.error( logger.error(
f"Exception for {self.addr}:\n" f"Exception for {self.addr}:\n"
f"{traceback.format_exc()}") f"{traceback.format_exc()}")
self.close() self.close()
return return self
def disc(self) -> None: def disc(self) -> None:
logger.debug(f'in AsyncStream.disc() {self.addr}') logger.debug(f'in AsyncStream.disc() l{self.l_addr} | r{self.r_addr}')
self.writer.close() self.writer.close()
def close(self): def close(self):
logger.debug(f'in AsyncStream.close() {self.addr}') logger.debug(f'in AsyncStream.close() l{self.l_addr} | r{self.r_addr}')
self.writer.close() self.writer.close()
super().close() # call close handler in the parent class super().close() # call close handler in the parent class
# logger.info (f'AsyncStream refs: {gc.get_referrers(self)}') # logger.info(f'AsyncStream refs: {gc.get_referrers(self)}')
''' '''
Our private methods Our private methods
@@ -79,6 +84,10 @@ class AsyncStream(Message):
if self._forward_buffer: if self._forward_buffer:
if not self.remoteStream: if not self.remoteStream:
await self.async_create_remote() await self.async_create_remote()
if self.remoteStream:
self.remoteStream._init_new_client_conn(self.contact_name,
self.contact_mail)
await self.remoteStream.__async_write()
if self.remoteStream: if self.remoteStream:
hex_dump_memory(logging.INFO, hex_dump_memory(logging.INFO,
@@ -96,4 +105,4 @@ class AsyncStream(Message):
pass pass
def __del__(self): def __del__(self):
logging.debug(f"AsyncStream.__del__ {self.addr}") logging.debug(f"AsyncStream.__del__ l{self.l_addr} | r{self.r_addr}")

View File

@@ -341,6 +341,9 @@ class Infos:
elif data_type == 0x46: # 'F' -> float32 elif data_type == 0x46: # 'F' -> float32
result = round(struct.unpack_from('!f', buf, ind)[0], 2) result = round(struct.unpack_from('!f', buf, ind)[0], 2)
ind += 4 ind += 4
elif data_type == 0x4c: # 'L' -> int64
result = struct.unpack_from('!q', buf, ind)[0]
ind += 8
else: else:
self.inc_counter('Invalid_Data_Type') self.inc_counter('Invalid_Data_Type')
logging.error(f"Infos.parse: data_type: {data_type}" logging.error(f"Infos.parse: data_type: {data_type}"

View File

@@ -107,7 +107,7 @@ class Inverter(AsyncStream):
self.inc_counter('Inverter_Cnt') self.inc_counter('Inverter_Cnt')
await self.loop() await self.loop()
self.dec_counter('Inverter_Cnt') self.dec_counter('Inverter_Cnt')
logging.info(f'Server loop stopped for {addr}') logging.info(f'Server loop stopped for r{self.r_addr}')
# if the server connection closes, we also have to disconnect # if the server connection closes, we also have to disconnect
# the connection to te TSUN cloud # the connection to te TSUN cloud
@@ -121,15 +121,22 @@ class Inverter(AsyncStream):
async def client_loop(self, addr): async def client_loop(self, addr):
'''Loop for receiving messages from the TSUN cloud (client-side)''' '''Loop for receiving messages from the TSUN cloud (client-side)'''
await self.remoteStream.loop() clientStream = await self.remoteStream.loop()
logging.info(f'Client loop stopped for {addr}') logging.info(f'Client loop stopped for l{clientStream.l_addr}')
# if the client connection closes, we don't touch the server # if the client connection closes, we don't touch the server
# connection. Instead we erase the client connection stream, # connection. Instead we erase the client connection stream,
# thus on the next received packet from the inverter, we can # thus on the next received packet from the inverter, we can
# establish a new connection to the TSUN cloud # establish a new connection to the TSUN cloud
self.remoteStream.remoteStream = None # erase backlink to inverter
self.remoteStream = None # than erase client connection # erase backlink to inverter
clientStream.remoteStream = None
if self.remoteStream == clientStream:
# logging.debug(f'Client l{clientStream.l_addr} refs:'
# f' {gc.get_referrers(clientStream)}')
# than erase client connection
self.remoteStream = None
async def async_create_remote(self) -> None: async def async_create_remote(self) -> None:
'''Establish a client connection to the TSUN cloud''' '''Establish a client connection to the TSUN cloud'''
@@ -142,7 +149,8 @@ class Inverter(AsyncStream):
logging.info(f'Connected to {addr}') logging.info(f'Connected to {addr}')
connect = asyncio.open_connection(host, port) connect = asyncio.open_connection(host, port)
reader, writer = await connect reader, writer = await connect
self.remoteStream = AsyncStream(reader, writer, addr, self, False) self.remoteStream = AsyncStream(reader, writer, addr, self,
False, self.id_str)
asyncio.create_task(self.client_loop(addr)) asyncio.create_task(self.client_loop(addr))
except ConnectionRefusedError as error: except ConnectionRefusedError as error:
@@ -197,7 +205,7 @@ class Inverter(AsyncStream):
f"/{node_id}{id}/config", data_json) f"/{node_id}{id}/config", data_json)
def close(self) -> None: def close(self) -> None:
logging.debug(f'Inverter.close() {self.addr}') logging.debug(f'Inverter.close() l{self.l_addr} | r{self.r_addr}')
super().close() # call close handler in the parent class super().close() # call close handler in the parent class
# logger.debug (f'Inverter refs: {gc.get_referrers(self)}') # logger.debug (f'Inverter refs: {gc.get_referrers(self)}')

View File

@@ -74,7 +74,7 @@ class Message(metaclass=IterRegistry):
_registry = [] _registry = []
new_stat_data = {} new_stat_data = {}
def __init__(self, server_side: bool): def __init__(self, server_side: bool, id_str=b''):
self._registry.append(weakref.ref(self)) self._registry.append(weakref.ref(self))
self.server_side = server_side self.server_side = server_side
self.header_valid = False self.header_valid = False
@@ -83,6 +83,9 @@ class Message(metaclass=IterRegistry):
self.unique_id = 0 self.unique_id = 0
self.node_id = '' self.node_id = ''
self.sug_area = '' self.sug_area = ''
self.id_str = id_str
self.contact_name = b''
self.contact_mail = b''
self._recv_buffer = b'' self._recv_buffer = b''
self._send_buffer = bytearray(0) self._send_buffer = bytearray(0)
self._forward_buffer = bytearray(0) self._forward_buffer = bytearray(0)
@@ -110,7 +113,7 @@ class Message(metaclass=IterRegistry):
# we have refernces to methods of this class in self.switch # we have refernces to methods of this class in self.switch
# so we have to erase self.switch, otherwise this instance can't be # so we have to erase self.switch, otherwise this instance can't be
# deallocated by the garbage collector ==> we get a memory leak # deallocated by the garbage collector ==> we get a memory leak
del self.switch self.switch.clear()
def inc_counter(self, counter: str) -> None: def inc_counter(self, counter: str) -> None:
self.db.inc_counter(counter) self.db.inc_counter(counter)
@@ -175,6 +178,16 @@ class Message(metaclass=IterRegistry):
f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}') f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}')
return return
def _init_new_client_conn(self, contact_name, contact_mail) -> None:
logger.info(f'name: {contact_name} mail: {contact_mail}')
self.msg_id = 0
self.__build_header(0x91)
self._send_buffer += struct.pack(f'!{len(contact_name)+1}p'
f'{len(contact_mail)+1}p',
contact_name, contact_mail)
self.__finish_send_msg()
''' '''
Our private methods Our private methods
''' '''
@@ -271,29 +284,56 @@ class Message(metaclass=IterRegistry):
self.__build_header(0x99) self.__build_header(0x99)
self._send_buffer += b'\x01' self._send_buffer += b'\x01'
self.__finish_send_msg() self.__finish_send_msg()
self.__process_contact_info()
# don't forward this contact info here, we will build one
# when the remote connection is established
return
elif self.ctrl.is_resp(): elif self.ctrl.is_resp():
return # ignore received response from tsun return # ignore received response from tsun
else: else:
self.inc_counter('Unknown_Ctrl') self.inc_counter('Unknown_Ctrl')
self.forward(self._recv_buffer, self.header_len+self.data_len)
self.forward(self._recv_buffer, self.header_len+self.data_len) def __process_contact_info(self):
result = struct.unpack_from('!B', self._recv_buffer, self.header_len)
name_len = result[0]
result = struct.unpack_from(f'!{name_len+1}pB', self._recv_buffer,
self.header_len)
self.contact_name = result[0]
mail_len = result[1]
logger.info(f'name: {self.contact_name}')
result = struct.unpack_from(f'!{mail_len+1}p', self._recv_buffer,
self.header_len+name_len+1)
self.contact_mail = result[0]
logger.info(f'mail: {self.contact_mail}')
def msg_get_time(self): def msg_get_time(self):
if self.ctrl.is_ind(): tsun = Config.get('tsun')
ts = self.__timestamp() if tsun['enabled']:
logger.debug(f'time: {ts:08x}') if self.ctrl.is_resp():
ts = self.__timestamp()
self.__build_header(0x99) result = struct.unpack_from('!q', self._recv_buffer,
self._send_buffer += struct.pack('!q', ts) self.header_len)
self.__finish_send_msg() logger.debug(f'tsun-time: {result[0]:08x}'
f' proxy-time: {ts:08x}')
elif self.ctrl.is_resp():
result = struct.unpack_from('!q', self._recv_buffer,
self.header_len)
logger.debug(f'tsun-time: {result[0]:08x}')
return # ignore received response from tsun
else: else:
self.inc_counter('Unknown_Ctrl') if self.ctrl.is_ind():
ts = self.__timestamp()
logger.debug(f'time: {ts:08x}')
self.__build_header(0x99)
self._send_buffer += struct.pack('!q', ts)
self.__finish_send_msg()
elif self.ctrl.is_resp():
result = struct.unpack_from('!q', self._recv_buffer,
self.header_len)
logger.debug(f'tsun-time: {result[0]:08x}')
return # ignore received response from tsun
else:
self.inc_counter('Unknown_Ctrl')
self.forward(self._recv_buffer, self.header_len+self.data_len) self.forward(self._recv_buffer, self.header_len+self.data_len)
def parse_msg_header(self): def parse_msg_header(self):

View File

@@ -67,6 +67,10 @@ def Msg2ContactInfo(): # two Contact Info messages
def MsgContactResp(): # Contact Response message def MsgContactResp(): # Contact Response message
return b'\x00\x00\x00\x14\x10R170000000000001\x99\x00\x01' return b'\x00\x00\x00\x14\x10R170000000000001\x99\x00\x01'
@pytest.fixture
def MsgContactResp2(): # Contact Response message
return b'\x00\x00\x00\x14\x10R170000000000002\x99\x00\x01'
@pytest.fixture @pytest.fixture
def MsgContactInvalid(): # Contact Response message def MsgContactInvalid(): # Contact Response message
return b'\x00\x00\x00\x14\x10R170000000000001\x93\x00\x01' return b'\x00\x00\x00\x14\x10R170000000000001\x93\x00\x01'
@@ -246,7 +250,7 @@ def test_read_message_in_chunks2(MsgContactInfo):
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
m.close() m.close()
def test_read_two_messages(ConfigTsunAllowAll, Msg2ContactInfo): def test_read_two_messages(ConfigTsunAllowAll, Msg2ContactInfo,MsgContactResp,MsgContactResp2):
ConfigTsunAllowAll ConfigTsunAllowAll
m = MemoryStream(Msg2ContactInfo, (0,)) m = MemoryStream(Msg2ContactInfo, (0,))
m.db.stat['proxy']['Unknown_Ctrl'] = 0 m.db.stat['proxy']['Unknown_Ctrl'] = 0
@@ -259,9 +263,15 @@ def test_read_two_messages(ConfigTsunAllowAll, Msg2ContactInfo):
assert m.msg_id==0 assert m.msg_id==0
assert m.header_len==23 assert m.header_len==23
assert m.data_len==25 assert m.data_len==25
assert m._forward_buffer==b'\x00\x00\x00,\x10R170000000000001\x91\x00\x08solarhub\x0fsolarhub@123456' assert m._forward_buffer==b''
assert m._send_buffer==MsgContactResp
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
m._send_buffer = bytearray(0) # clear send buffer for next test
m._init_new_client_conn(b'solarhub', b'solarhub@123456')
assert m._send_buffer==b'\x00\x00\x00,\x10R170000000000001\x91\x00\x08solarhub\x0fsolarhub@123456'
m._send_buffer = bytearray(0) # clear send buffer for next test
m.read() # read complete msg, and dispatch msg m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 2 assert m.msg_count == 2
@@ -271,8 +281,13 @@ def test_read_two_messages(ConfigTsunAllowAll, Msg2ContactInfo):
assert m.msg_id==0 assert m.msg_id==0
assert m.header_len==23 assert m.header_len==23
assert m.data_len==25 assert m.data_len==25
assert m._forward_buffer==b'\x00\x00\x00,\x10R170000000000002\x91\x00\x08solarhub\x0fsolarhub@123456' assert m._forward_buffer==b''
assert m._send_buffer==MsgContactResp2
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
m._send_buffer = bytearray(0) # clear send buffer for next test
m._init_new_client_conn(b'solarhub', b'solarhub@123456')
assert m._send_buffer==b'\x00\x00\x00,\x10R170000000000002\x91\x00\x08solarhub\x0fsolarhub@123456'
m.close() m.close()
def test_msg_contact_resp(ConfigTsunInv1, MsgContactResp): def test_msg_contact_resp(ConfigTsunInv1, MsgContactResp):

View File

@@ -137,6 +137,7 @@ def test_send_contact_info1(ClientConnection, MsgContactInfo, MsgContactResp):
pass pass
assert data == MsgContactResp assert data == MsgContactResp
def test_send_contact_info2(ClientConnection, MsgContactInfo2, MsgContactInfo, MsgContactResp): def test_send_contact_info2(ClientConnection, MsgContactInfo2, MsgContactInfo, MsgContactResp):
s = ClientConnection s = ClientConnection
try: try:
@@ -154,7 +155,20 @@ def test_send_contact_info2(ClientConnection, MsgContactInfo2, MsgContactInfo, M
pass pass
assert data == MsgContactResp assert data == MsgContactResp
def test_send_contact_info3(ClientConnection, MsgContactInfo, MsgContactResp, MsgTimeStampReq):
s = ClientConnection
try:
s.sendall(MsgContactInfo)
data = s.recv(1024)
except TimeoutError:
pass
assert data == MsgContactResp
try:
s.sendall(MsgTimeStampReq)
data = s.recv(1024)
except TimeoutError:
pass
def test_send_contact_resp(ClientConnection, MsgContactResp): def test_send_contact_resp(ClientConnection, MsgContactResp):
s = ClientConnection s = ClientConnection