diff --git a/.vscode/launch.json b/.vscode/launch.json index 9d9bfa7..7977fdd 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -6,7 +6,7 @@ "configurations": [ { "name": "Python: Aktuelle Datei", - "type": "python", + "type": "debugpy", "request": "launch", "program": "${file}", "console": "integratedTerminal", diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a4454d..934a71e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,33 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased] +## [unreleased] + +## [0.9.0] - 2024-07-01 + +- fix exception in MODBUS timeout callback + +## [0.9.0-RC1] - 2024-06-29 + +- add asyncio log and debug mode +- stop the HTTP server on shutdown gracefully +- Synchronize regular MODBUS commands with the status of the inverter to prevent the inverter from crashing due to + unexpected packets. [#111](https://github.com/s-allius/tsun-gen3-proxy/issues/111) +- GEN3: avoid sending MODBUS commands to the inverter during the inverter's reporting phase +- GEN3: determine the connection timeout based on the connection state +- GEN3: support more data encodings for DSP version V5.0.17 [#108](https://github.com/s-allius/tsun-gen3-proxy/issues/108) +- detect dead connections [#100](https://github.com/s-allius/tsun-gen3-proxy/issues/100) +- improve connection logging wirt a unique connection id +- Add healthcheck, readiness and liveness checks [#91](https://github.com/s-allius/tsun-gen3-proxy/issues/91) +- MODBUS close handler releases internal resource [#93](https://github.com/s-allius/tsun-gen3-proxy/issues/93) +- add exception handling for message forwarding [#94](https://github.com/s-allius/tsun-gen3-proxy/issues/94) +- GEN3: make timestamp handling stateless, to avoid blocking when the TSUN cloud is down [#56](https://github.com/s-allius/tsun-gen3-proxy/issues/56) +- GEN3PLUS: dump invalid packages with wrong start or stop byte +- label debug imagages als `debug` +- print imgae build time during proxy start +- add type annotations +- improve async unit test and fix pytest warnings +- run github tests even for pulls on issue branches ## [0.8.1] - 2024-06-21 @@ -24,7 +50,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - catch all OSError errors in the read loop - log Modbus traces with different log levels - add Modbus fifo and timeout handler -- build version string in the same format as TSUN for GEN3 invterts +- build version string in the same format as TSUN for GEN3 inverters - add graceful shutdown - parse Modbus values and store them in the database - add cron task to request the output power every minute diff --git a/app/Dockerfile b/app/Dockerfile index 30cdae8..68440d1 100644 --- a/app/Dockerfile +++ b/app/Dockerfile @@ -15,7 +15,7 @@ RUN apk upgrade --no-cache && \ # # second stage for building wheels packages -FROM base as builder +FROM base AS builder # copy the dependencies file to the root dir and install requirements COPY ./requirements.txt /root/ @@ -26,7 +26,7 @@ RUN apk add --no-cache build-base && \ # # third stage for our runtime image -FROM base as runtime +FROM base AS runtime ARG SERVICE_NAME ARG VERSION ARG UID @@ -63,8 +63,8 @@ RUN python -m pip install --no-cache --no-index /root/wheels/* && \ COPY --chmod=0700 entrypoint.sh /root/entrypoint.sh COPY config . COPY src . - -EXPOSE 5005 +RUN date > /build-date.txt +EXPOSE 5005 8127 10000 # command to run on container start ENTRYPOINT ["/root/entrypoint.sh"] @@ -73,7 +73,7 @@ CMD [ "python3", "./server.py" ] LABEL org.opencontainers.image.title="TSUN Gen3 Proxy" LABEL org.opencontainers.image.authors="Stefan Allius" -LABEL org.opencontainers.image.source https://github.com/s-allius/tsun-gen3-proxy -LABEL org.opencontainers.image.description 'This proxy enables a reliable connection between TSUN third generation inverters (eg. TSOL MS600, MS800, MS2000) and an MQTT broker to integrate the inverter into typical home automations.' +LABEL org.opencontainers.image.source=https://github.com/s-allius/tsun-gen3-proxy +LABEL org.opencontainers.image.description='This proxy enables a reliable connection between TSUN third generation inverters (eg. TSOL MS600, MS800, MS2000) and an MQTT broker to integrate the inverter into typical home automations.' LABEL org.opencontainers.image.licenses="BSD-3-Clause" LABEL org.opencontainers.image.vendor="Stefan Allius" diff --git a/app/build.sh b/app/build.sh index c654c1c..dbb1f86 100755 --- a/app/build.sh +++ b/app/build.sh @@ -4,7 +4,7 @@ # rc: release candidate build # rel: release build and push to ghcr.io # Note: for release build, you need to set GHCR_TOKEN -# export GHCR_TOKEN= in your .profile +# export GHCR_TOKEN= in your .zprofile # see also: https://docs.github.com/en/packages/working-with-a-github-packages-registry/working-with-the-container-registry @@ -31,7 +31,7 @@ fi echo version: $VERSION build-date: $BUILD_DATE image: $IMAGE if [[ $1 == debug ]];then -docker build --build-arg "VERSION=${VERSION}" --build-arg environment=dev --build-arg "LOG_LVL=DEBUG" --label "org.opencontainers.image.created=${BUILD_DATE}" --label "org.opencontainers.image.version=${VERSION}" --label "org.opencontainers.image.revision=${BRANCH}" -t ${IMAGE}:dev app +docker build --build-arg "VERSION=${VERSION}" --build-arg environment=dev --build-arg "LOG_LVL=DEBUG" --label "org.opencontainers.image.created=${BUILD_DATE}" --label "org.opencontainers.image.version=${VERSION}" --label "org.opencontainers.image.revision=${BRANCH}" -t ${IMAGE}:debug app elif [[ $1 == dev ]];then docker build --build-arg "VERSION=${VERSION}" --build-arg environment=production --label "org.opencontainers.image.created=${BUILD_DATE}" --label "org.opencontainers.image.version=${VERSION}" --label "org.opencontainers.image.revision=${BRANCH}" -t ${IMAGE}:dev app @@ -39,16 +39,16 @@ elif [[ $1 == rc ]];then docker build --build-arg "VERSION=${VERSION}" --build-arg environment=production --label "org.opencontainers.image.created=${BUILD_DATE}" --label "org.opencontainers.image.version=${VERSION}" --label "org.opencontainers.image.revision=${BRANCH}" -t ${IMAGE}:rc -t ${IMAGE}:${VERSION} app echo 'login to ghcr.io' echo $GHCR_TOKEN | docker login ghcr.io -u s-allius --password-stdin -docker push ghcr.io/s-allius/tsun-gen3-proxy:rc -docker push ghcr.io/s-allius/tsun-gen3-proxy:${VERSION} +docker push -q ghcr.io/s-allius/tsun-gen3-proxy:rc +docker push -q ghcr.io/s-allius/tsun-gen3-proxy:${VERSION} elif [[ $1 == rel ]];then docker build --no-cache --build-arg "VERSION=${VERSION}" --build-arg environment=production --label "org.opencontainers.image.created=${BUILD_DATE}" --label "org.opencontainers.image.version=${VERSION}" --label "org.opencontainers.image.revision=${BRANCH}" -t ${IMAGE}:latest -t ${IMAGE}:${MAJOR} -t ${IMAGE}:${VERSION} app echo 'login to ghcr.io' echo $GHCR_TOKEN | docker login ghcr.io -u s-allius --password-stdin -docker push ghcr.io/s-allius/tsun-gen3-proxy:latest -docker push ghcr.io/s-allius/tsun-gen3-proxy:${MAJOR} -docker push ghcr.io/s-allius/tsun-gen3-proxy:${VERSION} +docker push -q ghcr.io/s-allius/tsun-gen3-proxy:latest +docker push -q ghcr.io/s-allius/tsun-gen3-proxy:${MAJOR} +docker push -q ghcr.io/s-allius/tsun-gen3-proxy:${VERSION} fi echo 'check docker-compose.yaml file' diff --git a/app/entrypoint.sh b/app/entrypoint.sh index 7698d41..b6f3d11 100644 --- a/app/entrypoint.sh +++ b/app/entrypoint.sh @@ -5,6 +5,7 @@ user="$(id -u)" echo "######################################################" echo "# prepare: '$SERVICE_NAME' Version:$VERSION" echo "# for running with UserID:$UID, GroupID:$GID" +echo "# Image built: $(cat /build-date.txt) " echo "#" if [ "$user" = '0' ]; then diff --git a/app/hardening_final.sh b/app/hardening_final.sh index c6896bb..279e1b6 100644 --- a/app/hardening_final.sh +++ b/app/hardening_final.sh @@ -17,6 +17,5 @@ if [ "$environment" = "production" ] ; then \ -name od -o \ -name strings -o \ -name su -o \ - -name wget -o \ \) -delete \ ; fi diff --git a/app/requirements.txt b/app/requirements.txt index b151101..ed9dcb0 100644 --- a/app/requirements.txt +++ b/app/requirements.txt @@ -1,3 +1,4 @@ aiomqtt==2.0.1 schema==0.7.5 - aiocron==1.8 \ No newline at end of file + aiocron==1.8 + aiohttp==3.9.5 \ No newline at end of file diff --git a/app/src/async_stream.py b/app/src/async_stream.py index 17f5f59..f4b0ff9 100644 --- a/app/src/async_stream.py +++ b/app/src/async_stream.py @@ -1,44 +1,77 @@ +import asyncio import logging import traceback -from messages import hex_dump_memory +import time +from asyncio import StreamReader, StreamWriter +from messages import hex_dump_memory, State +from typing import Self +from itertools import count +import gc logger = logging.getLogger('conn') class AsyncStream(): + _ids = count(0) + MAX_PROC_TIME = 2 + '''maximum processing time for a received msg in sec''' + MAX_START_TIME = 400 + '''maximum time without a received msg in sec''' + MAX_INV_IDLE_TIME = 90 + '''maximum time without a received msg from the inverter in sec''' + MAX_CLOUD_IDLE_TIME = 360 + '''maximum time without a received msg from cloud side in sec''' - def __init__(self, reader, writer, addr) -> None: + def __init__(self, reader: StreamReader, writer: StreamWriter, + addr) -> None: logger.debug('AsyncStream.__init__') self.reader = reader self.writer = writer self.addr = addr self.r_addr = '' self.l_addr = '' + self.conn_no = next(self._ids) + self.proc_start = None # start processing start timestamp + self.proc_max = 0 - async def server_loop(self, addr): + def __timeout(self) -> int: + if self.state == State.init: + to = self.MAX_START_TIME + else: + if self.server_side: + to = self.MAX_INV_IDLE_TIME + else: + to = self.MAX_CLOUD_IDLE_TIME + return to + + async def server_loop(self, addr: str) -> None: '''Loop for receiving messages from the inverter (server-side)''' - logging.info(f'[{self.node_id}] Accept connection from {addr}') + logger.info(f'[{self.node_id}:{self.conn_no}] ' + f'Accept connection from {addr}') self.inc_counter('Inverter_Cnt') await self.loop() self.dec_counter('Inverter_Cnt') - logging.info(f'[{self.node_id}] Server loop stopped for' - f' r{self.r_addr}') + logger.info(f'[{self.node_id}:{self.conn_no}] Server loop stopped for' + f' r{self.r_addr}') # if the server connection closes, we also have to disconnect # the connection to te TSUN cloud if self.remoteStream: - logging.debug("disconnect client connection") + logger.info(f'[{self.node_id}:{self.conn_no}] disc client ' + f'connection: [{self.remoteStream.node_id}:' + f'{self.remoteStream.conn_no}]') await self.remoteStream.disc() try: await self._async_publ_mqtt_proxy_stat('proxy') except Exception: pass - async def client_loop(self, addr): + async def client_loop(self, addr: str) -> None: '''Loop for receiving messages from the TSUN cloud (client-side)''' clientStream = await self.remoteStream.loop() - logging.info(f'[{self.node_id}] Client loop stopped for' - f' l{clientStream.l_addr}') + logger.info(f'[{clientStream.node_id}:{clientStream.conn_no}] ' + 'Client loop stopped for' + f' l{clientStream.l_addr}') # if the client connection closes, we don't touch the server # connection. Instead we erase the client connection stream, @@ -54,28 +87,45 @@ class AsyncStream(): # than erase client connection self.remoteStream = None - async def loop(self): + async def loop(self) -> Self: + """Async loop handler for precessing all received messages""" self.r_addr = self.writer.get_extra_info('peername') self.l_addr = self.writer.get_extra_info('sockname') - + self.proc_start = time.time() while True: try: - await self.__async_read() + proc = time.time() - self.proc_start + if proc > self.proc_max: + self.proc_max = proc + self.proc_start = None + dead_conn_to = self.__timeout() + await asyncio.wait_for(self.__async_read(), + dead_conn_to) if self.unique_id: await self.async_write() await self.__async_forward() await self.async_publ_mqtt() + except asyncio.TimeoutError: + logger.warning(f'[{self.node_id}:{self.conn_no}] Dead ' + f'connection timeout ({dead_conn_to}s) ' + f'for {self.l_addr}') + await self.disc() + self.close() + return self + except OSError as error: - logger.error(f'[{self.node_id}] {error} for l{self.l_addr} | ' + logger.error(f'[{self.node_id}:{self.conn_no}] ' + f'{error} for l{self.l_addr} | ' f'r{self.r_addr}') await self.disc() self.close() return self except RuntimeError as error: - logger.info(f"[{self.node_id}] {error} for {self.l_addr}") + logger.info(f'[{self.node_id}:{self.conn_no}] ' + f'{error} for {self.l_addr}') await self.disc() self.close() return self @@ -86,31 +136,8 @@ class AsyncStream(): f"Exception for {self.addr}:\n" f"{traceback.format_exc()}") - async def disc(self) -> None: - if self.writer.is_closing(): - return - logger.debug(f'AsyncStream.disc() l{self.l_addr} | r{self.r_addr}') - self.writer.close() - await self.writer.wait_closed() - - def close(self): - if self.writer.is_closing(): - return - logger.debug(f'AsyncStream.close() l{self.l_addr} | r{self.r_addr}') - self.writer.close() - - ''' - Our private methods - ''' - async def __async_read(self) -> None: - data = await self.reader.read(4096) - if data: - self._recv_buffer += data - self.read() # call read in parent class - else: - raise RuntimeError("Peer closed.") - - async def async_write(self, headline='Transmit to ') -> None: + async def async_write(self, headline: str = 'Transmit to ') -> None: + """Async write handler to transmit the send_buffer""" if self._send_buffer: hex_dump_memory(logging.INFO, f'{headline}{self.addr}:', self._send_buffer, len(self._send_buffer)) @@ -118,8 +145,57 @@ class AsyncStream(): await self.writer.drain() self._send_buffer = bytearray(0) # self._send_buffer[sent:] + async def disc(self) -> None: + """Async disc handler for graceful disconnect""" + if self.writer.is_closing(): + return + logger.debug(f'AsyncStream.disc() l{self.l_addr} | r{self.r_addr}') + self.writer.close() + await self.writer.wait_closed() + + def close(self) -> None: + """close handler for a no waiting disconnect + + hint: must be called before releasing the connection instance + """ + self.reader.feed_eof() # abort awaited read + if self.writer.is_closing(): + return + logger.debug(f'AsyncStream.close() l{self.l_addr} | r{self.r_addr}') + self.writer.close() + + def healthy(self) -> bool: + elapsed = 0 + if self.proc_start is not None: + elapsed = time.time() - self.proc_start + if self.state == State.closed or elapsed > self.MAX_PROC_TIME: + logging.debug(f'[{self.node_id}:{self.conn_no}:' + f'{type(self).__name__}]' + f' act:{round(1000*elapsed)}ms' + f' max:{round(1000*self.proc_max)}ms') + logging.debug(f'Healthy()) refs: {gc.get_referrers(self)}') + return elapsed < 5 + + ''' + Our private methods + ''' + async def __async_read(self) -> None: + """Async read handler to read received data from TCP stream""" + data = await self.reader.read(4096) + if data: + self.proc_start = time.time() + self._recv_buffer += data + wait = self.read() # call read in parent class + if wait > 0: + await asyncio.sleep(wait) + else: + raise RuntimeError("Peer closed.") + async def __async_forward(self) -> None: - if self._forward_buffer: + """forward handler transmits data over the remote connection""" + if not self._forward_buffer: + return + try: if not self.remoteStream: await self.async_create_remote() if self.remoteStream: @@ -136,6 +212,30 @@ class AsyncStream(): await self.remoteStream.writer.drain() self._forward_buffer = bytearray(0) + except OSError as error: + if self.remoteStream: + rmt = self.remoteStream + self.remoteStream = None + logger.error(f'[{rmt.node_id}:{rmt.conn_no}] Fwd: {error} for ' + f'l{rmt.l_addr} | r{rmt.r_addr}') + await rmt.disc() + rmt.close() + + except RuntimeError as error: + if self.remoteStream: + rmt = self.remoteStream + self.remoteStream = None + logger.info(f'[{rmt.node_id}:{rmt.conn_no}] ' + f'Fwd: {error} for {rmt.l_addr}') + await rmt.disc() + rmt.close() + + except Exception: + self.inc_counter('SW_Exception') + logger.error( + f"Fwd Exception for {self.addr}:\n" + f"{traceback.format_exc()}") + def __del__(self): logger.debug( f"AsyncStream.__del__ l{self.l_addr} | r{self.r_addr}") diff --git a/app/src/config.py b/app/src/config.py index e1ef749..8121c86 100644 --- a/app/src/config.py +++ b/app/src/config.py @@ -84,7 +84,7 @@ class Config(): ) @classmethod - def class_init(cls): # pragma: no cover + def class_init(cls) -> None | str: # pragma: no cover try: # make the default config transparaent by copying it # in the config.example file @@ -94,7 +94,9 @@ class Config(): "config/config.example.toml") except Exception: pass - cls.read() + err_str = cls.read() + del cls.conf_schema + return err_str @classmethod def _read_config_file(cls) -> dict: # pragma: no cover diff --git a/app/src/gen3/connection_g3.py b/app/src/gen3/connection_g3.py index c93156e..7e7b96d 100644 --- a/app/src/gen3/connection_g3.py +++ b/app/src/gen3/connection_g3.py @@ -1,5 +1,6 @@ import logging # import gc +from asyncio import StreamReader, StreamWriter from async_stream import AsyncStream from gen3.talent import Talent @@ -8,12 +9,13 @@ logger = logging.getLogger('conn') class ConnectionG3(AsyncStream, Talent): - def __init__(self, reader, writer, addr, remote_stream, server_side: bool, + def __init__(self, reader: StreamReader, writer: StreamWriter, + addr, remote_stream: 'ConnectionG3', server_side: bool, id_str=b'') -> None: AsyncStream.__init__(self, reader, writer, addr) Talent.__init__(self, server_side, id_str) - self.remoteStream = remote_stream + self.remoteStream: 'ConnectionG3' = remote_stream ''' Our puplic methods @@ -29,6 +31,10 @@ class ConnectionG3(AsyncStream, Talent): async def async_publ_mqtt(self) -> None: pass + def healthy(self) -> bool: + logger.debug('ConnectionG3 healthy()') + return AsyncStream.healthy(self) + ''' Our private methods ''' diff --git a/app/src/gen3/infos_g3.py b/app/src/gen3/infos_g3.py index 0dc6a35..04e5c69 100644 --- a/app/src/gen3/infos_g3.py +++ b/app/src/gen3/infos_g3.py @@ -132,11 +132,24 @@ class InfosG3(Infos): errors='replace') ind += str_len+1 + elif data_type == 0x00: # 'Nul' -> end + i = elms # abort the loop + + elif data_type == 0x41: # 'A' -> Nop ?? + # result = struct.unpack_from('!l', buf, ind)[0] + ind += 0 + i += 1 + continue + + elif data_type == 0x42: # 'B' -> byte, int8 + result = struct.unpack_from('!B', buf, ind)[0] + ind += 1 + elif data_type == 0x49: # 'I' -> int32 result = struct.unpack_from('!l', buf, ind)[0] ind += 4 - elif data_type == 0x53: # 'S' -> short + elif data_type == 0x53: # 'S' -> short, int16 result = struct.unpack_from('!h', buf, ind)[0] ind += 2 @@ -144,13 +157,14 @@ class InfosG3(Infos): result = round(struct.unpack_from('!f', buf, ind)[0], 2) ind += 4 - elif data_type == 0x4c: # 'L' -> int64 + elif data_type == 0x4c: # 'L' -> long, int64 result = struct.unpack_from('!q', buf, ind)[0] ind += 8 else: self.inc_counter('Invalid_Data_Type') logging.error(f"Infos.parse: data_type: {data_type}" + f" @0x{addr:04x} No:{i}" " not supported") return diff --git a/app/src/gen3/inverter_g3.py b/app/src/gen3/inverter_g3.py index 1930f0e..a20dc77 100644 --- a/app/src/gen3/inverter_g3.py +++ b/app/src/gen3/inverter_g3.py @@ -1,7 +1,8 @@ -import asyncio import logging import traceback import json +import asyncio +from asyncio import StreamReader, StreamWriter from config import Config from inverter import Inverter from gen3.connection_g3 import ConnectionG3 @@ -44,7 +45,7 @@ class InverterG3(Inverter, ConnectionG3): destroyed ''' - def __init__(self, reader, writer, addr): + def __init__(self, reader: StreamReader, writer: StreamWriter, addr): super().__init__(reader, writer, addr, None, True) self.__ha_restarts = -1 @@ -56,11 +57,14 @@ class InverterG3(Inverter, ConnectionG3): addr = (host, port) try: - logging.info(f'[{self.node_id}] Connected to {addr}') + logging.info(f'[{self.node_id}] Connect to {addr}') connect = asyncio.open_connection(host, port) reader, writer = await connect self.remoteStream = ConnectionG3(reader, writer, addr, self, False, self.id_str) + logging.info(f'[{self.remoteStream.node_id}:' + f'{self.remoteStream.conn_no}] ' + f'Connected to {addr}') asyncio.create_task(self.client_loop(addr)) except (ConnectionRefusedError, TimeoutError) as error: @@ -121,7 +125,7 @@ class InverterG3(Inverter, ConnectionG3): def close(self) -> None: logging.debug(f'InverterG3.close() l{self.l_addr} | r{self.r_addr}') super().close() # call close handler in the parent class -# logger.debug (f'Inverter refs: {gc.get_referrers(self)}') +# logging.info(f'Inverter refs: {gc.get_referrers(self)}') def __del__(self): logging.debug("InverterG3.__del__") diff --git a/app/src/gen3/talent.py b/app/src/gen3/talent.py index 18a336f..c6b0091 100644 --- a/app/src/gen3/talent.py +++ b/app/src/gen3/talent.py @@ -4,13 +4,15 @@ import time from datetime import datetime if __name__ == "app.src.gen3.talent": - from app.src.messages import hex_dump_memory, Message + from app.src.messages import hex_dump_memory, Message, State from app.src.modbus import Modbus + from app.src.my_timer import Timer from app.src.config import Config from app.src.gen3.infos_g3 import InfosG3 else: # pragma: no cover - from messages import hex_dump_memory, Message + from messages import hex_dump_memory, Message, State from modbus import Modbus + from my_timer import Timer from config import Config from gen3.infos_g3 import InfosG3 @@ -35,12 +37,16 @@ class Control: class Talent(Message): + MB_START_TIMEOUT = 40 + MB_REGULAR_TIMEOUT = 60 + def __init__(self, server_side: bool, id_str=b''): super().__init__(server_side, self.send_modbus_cb, mb_timeout=11) self.await_conn_resp_cnt = 0 self.id_str = id_str self.contact_name = b'' self.contact_mail = b'' + self.ts_offset = 0 # time offset between tsun cloud and local self.db = InfosG3() self.switch = { 0x00: self.msg_contact_info, @@ -64,19 +70,20 @@ class Talent(Message): } self.modbus_elms = 0 # for unit tests self.node_id = 'G3' # will be overwritten in __set_serial_no - # self.forwarding = Config.get('tsun')['enabled'] + self.mb_timer = Timer(self.mb_timout_cb, self.node_id) ''' Our puplic methods ''' def close(self) -> None: logging.debug('Talent.close()') - # we have refernces to methods of this class in self.switch + # we have references to methods of this class in self.switch # so we have to erase self.switch, otherwise this instance can't be # deallocated by the garbage collector ==> we get a memory leak self.switch.clear() self.log_lvl.clear() - self.state = self.STATE_CLOSED + self.state = State.closed + self.mb_timer.close() super().close() def __set_serial_no(self, serial_no: str): @@ -105,7 +112,7 @@ class Talent(Message): self.unique_id = serial_no - def read(self) -> None: + def read(self) -> float: self._read() if not self.header_valid: @@ -113,6 +120,9 @@ class Talent(Message): if self.header_valid and len(self._recv_buffer) >= (self.header_len + self.data_len): + if self.state == State.init: + self.state = State.received # received 1st package + log_lvl = self.log_lvl.get(self.msg_id, logging.WARNING) if callable(log_lvl): log_lvl = log_lvl() @@ -123,7 +133,7 @@ class Talent(Message): self.__set_serial_no(self.id_str.decode("utf-8")) self.__dispatch_msg() self.__flush_recv_msg() - return + return 0.5 # wait 500ms before sending a response def forward(self, buffer, buflen) -> None: tsun = Config.get('tsun') @@ -140,9 +150,9 @@ class Talent(Message): return def send_modbus_cb(self, modbus_pdu: bytearray, log_lvl: int, state: str): - if self.state != self.STATE_UP: - logger.warn(f'[{self.node_id}] ignore MODBUS cmd,' - ' cause the state is not UP anymore') + if self.state != State.up: + logger.warning(f'[{self.node_id}] ignore MODBUS cmd,' + ' cause the state is not UP anymore') return self.__build_header(0x70, 0x77) @@ -156,13 +166,25 @@ class Talent(Message): self.writer.write(self._send_buffer) self._send_buffer = bytearray(0) # self._send_buffer[sent:] - async def send_modbus_cmd(self, func, addr, val, log_lvl) -> None: - if self.state != self.STATE_UP: + def _send_modbus_cmd(self, func, addr, val, log_lvl) -> None: + if self.state != State.up: logger.log(log_lvl, f'[{self.node_id}] ignore MODBUS cmd,' ' as the state is not UP') return self.mb.build_msg(Modbus.INV_ADDR, func, addr, val, log_lvl) + async def send_modbus_cmd(self, func, addr, val, log_lvl) -> None: + self._send_modbus_cmd(func, addr, val, log_lvl) + + def mb_timout_cb(self, exp_cnt): + self.mb_timer.start(self.MB_REGULAR_TIMEOUT) + + if 0 == (exp_cnt % 30): + # logging.info("Regular Modbus Status request") + self._send_modbus_cmd(Modbus.READ_REGS, 0x2007, 2, logging.DEBUG) + else: + self._send_modbus_cmd(Modbus.READ_REGS, 0x3008, 21, logging.DEBUG) + def _init_new_client_conn(self) -> bool: contact_name = self.contact_name contact_mail = self.contact_mail @@ -204,6 +226,24 @@ class Talent(Message): ts = (datetime.now() - datetime(1970, 1, 1)).total_seconds() return round(ts*1000) + def _update_header(self, _forward_buffer): + '''update header for message before forwarding, + add time offset to timestamp''' + _len = len(_forward_buffer) + result = struct.unpack_from('!lB', _forward_buffer, 0) + id_len = result[1] # len of variable id string + if _len < 2*id_len + 21: + return + + result = struct.unpack_from('!B', _forward_buffer, id_len+6) + msg_code = result[0] + if msg_code == 0x71 or msg_code == 0x04: + result = struct.unpack_from('!q', _forward_buffer, 13+2*id_len) + ts = result[0] + self.ts_offset + logger.debug(f'offset: {self.ts_offset:08x}' + f' proxy-time: {ts:08x}') + struct.pack_into('!q', _forward_buffer, 13+2*id_len, ts) + # check if there is a complete header in the buffer, parse it # and set # self.header_len @@ -256,7 +296,8 @@ class Talent(Message): fnc = self.switch.get(self.msg_id, self.msg_unknown) if self.unique_id: logger.info(self.__flow_str(self.server_side, 'rx') + - f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}') + f' Ctl: {int(self.ctrl):#02x} ({self.state}) ' + f'Msg: {fnc.__name__!r}') fnc() else: logger.info(self.__flow_str(self.server_side, 'drop') + @@ -305,39 +346,37 @@ class Talent(Message): return True def msg_get_time(self): - tsun = Config.get('tsun') - if tsun['enabled']: - if self.ctrl.is_ind(): - if self.data_len >= 8: - ts = self._timestamp() - result = struct.unpack_from('!q', self._recv_buffer, - self.header_len) - logger.debug(f'tsun-time: {result[0]:08x}' - f' proxy-time: {ts:08x}') - else: - logger.warning('Unknown Ctrl') - self.inc_counter('Unknown_Ctrl') - self.forward(self._recv_buffer, self.header_len+self.data_len) + if self.ctrl.is_ind(): + if self.data_len == 0: + self.state = State.pend # block MODBUS cmds + self.mb_timer.start(self.MB_START_TIMEOUT) + ts = self._timestamp() + logger.debug(f'time: {ts:08x}') + self.__build_header(0x91) + self._send_buffer += struct.pack('!q', ts) + self.__finish_send_msg() + + elif self.data_len >= 8: + ts = self._timestamp() + result = struct.unpack_from('!q', self._recv_buffer, + self.header_len) + self.ts_offset = result[0]-ts + logger.debug(f'tsun-time: {int(result[0]):08x}' + f' proxy-time: {ts:08x}' + f' offset: {self.ts_offset}') + return # ignore received response else: - if self.ctrl.is_ind(): - if self.data_len == 0: - ts = self._timestamp() - logger.debug(f'time: {ts:08x}') + logger.warning('Unknown Ctrl') + self.inc_counter('Unknown_Ctrl') - self.__build_header(0x91) - self._send_buffer += struct.pack('!q', ts) - self.__finish_send_msg() - - else: - logger.warning('Unknown Ctrl') - self.inc_counter('Unknown_Ctrl') + self.forward(self._recv_buffer, self.header_len+self.data_len) def parse_msg_header(self): result = struct.unpack_from('!lB', self._recv_buffer, self.header_len) data_id = result[0] # len of complete message id_len = result[1] # len of variable id string - logger.debug(f'Data_ID: {data_id} id_len: {id_len}') + logger.debug(f'Data_ID: 0x{data_id:08x} id_len: {id_len}') msg_hdr_len = 5+id_len+9 @@ -356,7 +395,6 @@ class Talent(Message): self._send_buffer += b'\x01' self.__finish_send_msg() self.__process_data() - self.state = self.STATE_UP elif self.ctrl.is_resp(): return # ignore received response @@ -372,7 +410,7 @@ class Talent(Message): self._send_buffer += b'\x01' self.__finish_send_msg() self.__process_data() - self.state = self.STATE_UP + self.state = State.up # allow MODBUS cmds elif self.ctrl.is_resp(): return # ignore received response @@ -432,8 +470,13 @@ class Talent(Message): else: self.inc_counter('Invalid_Msg_Format') elif self.ctrl.is_ind(): - # logger.debug(f'Modbus Ind MsgLen: {modbus_len}') self.modbus_elms = 0 + # logger.debug(f'Modbus Ind MsgLen: {modbus_len}') + if not self.server_side: + logger.warning('Unknown Message') + self.inc_counter('Unknown_Msg') + return + for key, update, _ in self.mb.recv_resp(self.db, data[ hdr_len:], self.node_id): diff --git a/app/src/gen3plus/connection_g3p.py b/app/src/gen3plus/connection_g3p.py index a9362ce..352ba5e 100644 --- a/app/src/gen3plus/connection_g3p.py +++ b/app/src/gen3plus/connection_g3p.py @@ -1,5 +1,6 @@ import logging # import gc +from asyncio import StreamReader, StreamWriter from async_stream import AsyncStream from gen3plus.solarman_v5 import SolarmanV5 @@ -8,12 +9,13 @@ logger = logging.getLogger('conn') class ConnectionG3P(AsyncStream, SolarmanV5): - def __init__(self, reader, writer, addr, remote_stream, + def __init__(self, reader: StreamReader, writer: StreamWriter, + addr, remote_stream: 'ConnectionG3P', server_side: bool) -> None: AsyncStream.__init__(self, reader, writer, addr) SolarmanV5.__init__(self, server_side) - self.remoteStream = remote_stream + self.remoteStream: 'ConnectionG3P' = remote_stream ''' Our puplic methods @@ -29,6 +31,10 @@ class ConnectionG3P(AsyncStream, SolarmanV5): async def async_publ_mqtt(self) -> None: pass + def healthy(self) -> bool: + logger.debug('ConnectionG3P healthy()') + return AsyncStream.healthy(self) + ''' Our private methods ''' diff --git a/app/src/gen3plus/inverter_g3p.py b/app/src/gen3plus/inverter_g3p.py index 487fe1e..74c9b6b 100644 --- a/app/src/gen3plus/inverter_g3p.py +++ b/app/src/gen3plus/inverter_g3p.py @@ -1,7 +1,8 @@ -import asyncio import logging import traceback import json +import asyncio +from asyncio import StreamReader, StreamWriter from config import Config from inverter import Inverter from gen3plus.connection_g3p import ConnectionG3P @@ -44,7 +45,7 @@ class InverterG3P(Inverter, ConnectionG3P): destroyed ''' - def __init__(self, reader, writer, addr): + def __init__(self, reader: StreamReader, writer: StreamWriter, addr): super().__init__(reader, writer, addr, None, True) self.__ha_restarts = -1 @@ -56,11 +57,14 @@ class InverterG3P(Inverter, ConnectionG3P): addr = (host, port) try: - logging.info(f'[{self.node_id}] Connected to {addr}') + logging.info(f'[{self.node_id}] Connect to {addr}') connect = asyncio.open_connection(host, port) reader, writer = await connect self.remoteStream = ConnectionG3P(reader, writer, addr, self, False) + logging.info(f'[{self.remoteStream.node_id}:' + f'{self.remoteStream.conn_no}] ' + f'Connected to {addr}') asyncio.create_task(self.client_loop(addr)) except (ConnectionRefusedError, TimeoutError) as error: diff --git a/app/src/gen3plus/solarman_v5.py b/app/src/gen3plus/solarman_v5.py index a498a2b..432ec2e 100644 --- a/app/src/gen3plus/solarman_v5.py +++ b/app/src/gen3plus/solarman_v5.py @@ -6,15 +6,17 @@ import asyncio from datetime import datetime if __name__ == "app.src.gen3plus.solarman_v5": - from app.src.messages import hex_dump_memory, Message + from app.src.messages import hex_dump_memory, Message, State from app.src.modbus import Modbus + from app.src.my_timer import Timer from app.src.config import Config from app.src.gen3plus.infos_g3p import InfosG3P from app.src.infos import Register else: # pragma: no cover - from messages import hex_dump_memory, Message + from messages import hex_dump_memory, Message, State from config import Config from modbus import Modbus + from my_timer import Timer from gen3plus.infos_g3p import InfosG3P from infos import Register # import traceback @@ -51,6 +53,8 @@ class Sequence(): class SolarmanV5(Message): AT_CMD = 1 MB_RTU_CMD = 2 + MB_START_TIMEOUT = 40 + MB_REGULAR_TIMEOUT = 60 def __init__(self, server_side: bool): super().__init__(server_side, self.send_modbus_cb, mb_timeout=5) @@ -123,19 +127,20 @@ class SolarmanV5(Message): self.at_acl = g3p_cnf['at_acl'] self.node_id = 'G3P' # will be overwritten in __set_serial_no - # self.forwarding = Config.get('solarman')['enabled'] + self.mb_timer = Timer(self.mb_timout_cb, self.node_id) ''' Our puplic methods ''' def close(self) -> None: logging.debug('Solarman.close()') - # we have refernces to methods of this class in self.switch + # we have references to methods of this class in self.switch # so we have to erase self.switch, otherwise this instance can't be # deallocated by the garbage collector ==> we get a memory leak self.switch.clear() self.log_lvl.clear() - self.state = self.STATE_CLOSED + self.state = State.closed + self.mb_timer.close() super().close() def __set_serial_no(self, snr: int): @@ -169,7 +174,7 @@ class SolarmanV5(Message): self.unique_id = serial_no - def read(self) -> None: + def read(self) -> float: self._read() if not self.header_valid: @@ -184,10 +189,13 @@ class SolarmanV5(Message): self._recv_buffer, self.header_len+self.data_len+2) if self.__trailer_is_ok(self._recv_buffer, self.header_len + self.data_len + 2): + if self.state == State.init: + self.state = State.received + self.__set_serial_no(self.snr) self.__dispatch_msg() self.__flush_recv_msg() - return + return 0 # wait 0s before sending a response def forward(self, buffer, buflen) -> None: tsun = Config.get('solarman') @@ -253,6 +261,10 @@ class SolarmanV5(Message): self.snr = result[4] if start != 0xA5: + hex_dump_memory(logging.ERROR, + 'Drop packet w invalid start byte from' + f' {self.addr}:', buf, buf_len) + self.inc_counter('Invalid_Msg_Format') # erase broken recv buffer self._recv_buffer = bytearray() @@ -264,6 +276,9 @@ class SolarmanV5(Message): crc = buf[self.data_len+11] stop = buf[self.data_len+12] if stop != 0x15: + hex_dump_memory(logging.ERROR, + 'Drop packet w invalid stop byte from ' + f'{self.addr}:', buf, buf_len) self.inc_counter('Invalid_Msg_Format') if len(self._recv_buffer) > (self.data_len+13): next_start = buf[self.data_len+13] @@ -338,9 +353,9 @@ class SolarmanV5(Message): self.__finish_send_msg() def send_modbus_cb(self, pdu: bytearray, log_lvl: int, state: str): - if self.state != self.STATE_UP: - logger.warn(f'[{self.node_id}] ignore MODBUS cmd,' - ' cause the state is not UP anymore') + if self.state != State.up: + logger.warning(f'[{self.node_id}] ignore MODBUS cmd,' + ' cause the state is not UP anymore') return self.__build_header(0x4510) self._send_buffer += struct.pack(' None: - if self.state != self.STATE_UP: + def _send_modbus_cmd(self, func, addr, val, log_lvl) -> None: + if self.state != State.up: logger.log(log_lvl, f'[{self.node_id}] ignore MODBUS cmd,' ' as the state is not UP') return self.mb.build_msg(Modbus.INV_ADDR, func, addr, val, log_lvl) + async def send_modbus_cmd(self, func, addr, val, log_lvl) -> None: + self._send_modbus_cmd(func, addr, val, log_lvl) + + def mb_timout_cb(self, exp_cnt): + self.mb_timer.start(self.MB_REGULAR_TIMEOUT) + + self._send_modbus_cmd(Modbus.READ_REGS, 0x3008, 21, logging.DEBUG) + + if 0 == (exp_cnt % 30): + # logging.info("Regular Modbus Status request") + self._send_modbus_cmd(Modbus.READ_REGS, 0x2007, 2, logging.DEBUG) + def at_cmd_forbidden(self, cmd: str, connection: str) -> bool: return not cmd.startswith(tuple(self.at_acl[connection]['allow'])) or \ cmd.startswith(tuple(self.at_acl[connection]['block'])) async def send_at_cmd(self, AT_cmd: str) -> None: - if self.state != self.STATE_UP: - logger.warn(f'[{self.node_id}] ignore AT+ cmd,' - ' as the state is not UP') + if self.state != State.up: + logger.warning(f'[{self.node_id}] ignore AT+ cmd,' + ' as the state is not UP') return AT_cmd = AT_cmd.strip() @@ -375,8 +402,7 @@ class SolarmanV5(Message): node_id = self.node_id key = 'at_resp' logger.info(f'{key}: {data_json}') - asyncio.ensure_future( - self.publish_mqtt(f'{self.entity_prfx}{node_id}{key}', data_json)) # noqa: E501 + await self.mqtt.publish(f'{self.entity_prfx}{node_id}{key}', data_json) # noqa: E501 return self.forward_at_cmd_resp = False @@ -465,7 +491,9 @@ class SolarmanV5(Message): self.__process_data(ftype) self.__forward_msg() self.__send_ack_rsp(0x1210, ftype) - self.state = self.STATE_UP + if self.state is not State.up: + self.state = State.up + self.mb_timer.start(self.MB_START_TIMEOUT) def msg_sync_start(self): data = self._recv_buffer[self.header_len:] @@ -499,13 +527,15 @@ class SolarmanV5(Message): __forward_msg): self.inc_counter('Modbus_Command') else: + logger.error('Invalid Modbus Msg') self.inc_counter('Invalid_Msg_Format') return self.__forward_msg() - async def publish_mqtt(self, key, data): - await self.mqtt.publish(key, data) # pragma: no cover + def publish_mqtt(self, key, data): + asyncio.ensure_future( + self.mqtt.publish(key, data)) def get_cmd_rsp_log_lvl(self) -> int: ftype = self._recv_buffer[self.header_len] @@ -529,8 +559,7 @@ class SolarmanV5(Message): node_id = self.node_id key = 'at_resp' logger.info(f'{key}: {data_json}') - asyncio.ensure_future( - self.publish_mqtt(f'{self.entity_prfx}{node_id}{key}', data_json)) # noqa: E501 + self.publish_mqtt(f'{self.entity_prfx}{node_id}{key}', data_json) # noqa: E501 return elif ftype == self.MB_RTU_CMD: valid = data[1] @@ -561,7 +590,9 @@ class SolarmanV5(Message): self.__forward_msg() self.__send_ack_rsp(0x1710, ftype) - self.state = self.STATE_UP + if self.state is not State.up: + self.state = State.up + self.mb_timer.start(self.MB_START_TIMEOUT) def msg_sync_end(self): data = self._recv_buffer[self.header_len:] diff --git a/app/src/infos.py b/app/src/infos.py index ee5a38a..a32e5b7 100644 --- a/app/src/infos.py +++ b/app/src/infos.py @@ -343,7 +343,7 @@ class Infos: dict[counter] -= 1 def ha_proxy_confs(self, ha_prfx: str, node_id: str, snr: str) \ - -> Generator[tuple[dict, str], None, None]: + -> Generator[tuple[str, str, str, str], None, None]: '''Generator function yields json register struct for home-assistant auto configuration and the unique entity string, for all proxy registers diff --git a/app/src/logging.ini b/app/src/logging.ini index 8bbc7da..34db695 100644 --- a/app/src/logging.ini +++ b/app/src/logging.ini @@ -1,5 +1,5 @@ [loggers] -keys=root,tracer,mesg,conn,data,mqtt +keys=root,tracer,mesg,conn,data,mqtt,asyncio [handlers] keys=console_handler,file_handler_name1,file_handler_name2 @@ -24,6 +24,12 @@ handlers=console_handler,file_handler_name1 propagate=0 qualname=mqtt +[logger_asyncio] +level=INFO +handlers=console_handler,file_handler_name1 +propagate=0 +qualname=asyncio + [logger_data] level=DEBUG handlers=file_handler_name1 diff --git a/app/src/messages.py b/app/src/messages.py index 6736b0b..bec2994 100644 --- a/app/src/messages.py +++ b/app/src/messages.py @@ -1,6 +1,7 @@ import logging import weakref -from typing import Callable +from typing import Callable, Generator +from enum import Enum if __name__ == "app.src.messages": @@ -45,21 +46,32 @@ def hex_dump_memory(level, info, data, num): class IterRegistry(type): - def __iter__(cls): + def __iter__(cls) -> Generator['Message', None, None]: for ref in cls._registry: obj = ref() if obj is not None: yield obj +class State(Enum): + '''state of the logical connection''' + init = 0 + '''just created''' + received = 1 + '''at least one packet received''' + up = 2 + '''at least one cmd-rsp transaction''' + pend = 3 + '''inverter transaction pending, don't send MODBUS cmds''' + closed = 4 + '''connection closed''' + + class Message(metaclass=IterRegistry): _registry = [] - STATE_INIT = 0 - STATE_UP = 2 - STATE_CLOSED = 3 def __init__(self, server_side: bool, send_modbus_cb: - Callable[[bytes, int, str], None], mb_timeout): + Callable[[bytes, int, str], None], mb_timeout: int): self._registry.append(weakref.ref(self)) self.server_side = server_side @@ -78,7 +90,7 @@ class Message(metaclass=IterRegistry): self._send_buffer = bytearray(0) self._forward_buffer = bytearray(0) self.new_data = {} - self.state = self.STATE_INIT + self.state = State.init ''' Empty methods, that have to be implemented in any child class which @@ -97,7 +109,7 @@ class Message(metaclass=IterRegistry): ''' def close(self) -> None: if self.mb: - del self.mb + self.mb.close() self.mb = None pass # pragma: no cover diff --git a/app/src/modbus.py b/app/src/modbus.py index 8f3778b..7425b56 100644 --- a/app/src/modbus.py +++ b/app/src/modbus.py @@ -105,7 +105,17 @@ class Modbus(): self.req_pend = False self.tim = None + def close(self): + """free the queue and erase the callback handlers""" + logging.debug('Modbus close:') + self.__stop_timer() + self.rsp_handler = None + self.snd_handler = None + while not self.que.empty: + self.que.get_nowait() + def __del__(self): + """log statistics on the deleting of a MODBUS instance""" logging.debug(f'Modbus __del__:\n {self.counter}') def build_msg(self, addr: int, func: int, reg: int, val: int, @@ -243,6 +253,7 @@ class Modbus(): # logging.debug(f'Modbus stop timer {self}') if self.tim: self.tim.cancel() + self.tim = None def __timeout_cb(self) -> None: '''Rsponse timeout handler retransmit pdu or send next pdu''' diff --git a/app/src/my_timer.py b/app/src/my_timer.py new file mode 100644 index 0000000..46435bd --- /dev/null +++ b/app/src/my_timer.py @@ -0,0 +1,35 @@ +import asyncio +import logging +from itertools import count + + +class Timer: + def __init__(self, cb, id_str: str = ''): + self.__timeout_cb = cb + self.loop = asyncio.get_event_loop() + self.tim = None + self.id_str = id_str + self.exp_count = count(0) + + def start(self, timeout: float) -> None: + '''Start timer with timeout seconds''' + if self.tim: + self.tim.cancel() + self.tim = self.loop.call_later(timeout, self.__timeout) + logging.debug(f'[{self.id_str}]Start timer') + + def stop(self) -> None: + '''Stop timer''' + logging.debug(f'[{self.id_str}]Stop timer') + if self.tim: + self.tim.cancel() + self.tim = None + + def __timeout(self) -> None: + '''timer expired handler''' + logging.debug(f'[{self.id_str}]Timer expired') + self.__timeout_cb(next(self.exp_count)) + + def close(self) -> None: + self.stop() + self.__timeout_cb = None diff --git a/app/src/scheduler.py b/app/src/scheduler.py index 6f54b6d..3c1d25a 100644 --- a/app/src/scheduler.py +++ b/app/src/scheduler.py @@ -3,8 +3,6 @@ import json from mqtt import Mqtt from aiocron import crontab from infos import ClrAtMidnight -from modbus import Modbus -from messages import Message logger_mqtt = logging.getLogger('mqtt') @@ -21,9 +19,6 @@ class Schedule: crontab('0 0 * * *', func=cls.atmidnight, start=True) - # every minute - crontab('* * * * *', func=cls.regular_modbus_cmds, start=True) - @classmethod async def atmidnight(cls) -> None: '''Clear daily counters at midnight''' @@ -33,15 +28,3 @@ class Schedule: logger_mqtt.debug(f'{key}: {data}') data_json = json.dumps(data) await cls.mqtt.publish(f"{key}", data_json) - - @classmethod - async def regular_modbus_cmds(cls): - for m in Message: - if m.server_side: - fnc = getattr(m, "send_modbus_cmd", None) - if callable(fnc): - await fnc(Modbus.READ_REGS, 0x3008, 21, logging.DEBUG) - # if 0 == (cls.count % 30): - # # logging.info("Regular Modbus Status request") - # await fnc(Modbus.READ_REGS, 0x2007, 2, logging.DEBUG) - cls.count += 1 diff --git a/app/src/server.py b/app/src/server.py index 18dc401..d835e4b 100644 --- a/app/src/server.py +++ b/app/src/server.py @@ -2,6 +2,8 @@ import logging import asyncio import signal import os +from asyncio import StreamReader, StreamWriter +from aiohttp import web from logging import config # noqa F401 from messages import Message from inverter import Inverter @@ -10,25 +12,83 @@ from gen3plus.inverter_g3p import InverterG3P from scheduler import Schedule from config import Config +routes = web.RouteTableDef() +proxy_is_up = False -async def handle_client(reader, writer): + +@routes.get('/') +async def hello(request): + return web.Response(text="Hello, world") + + +@routes.get('/-/ready') +async def ready(request): + if proxy_is_up: + status = 200 + text = 'Is ready' + else: + status = 503 + text = 'Not ready' + return web.Response(status=status, text=text) + + +@routes.get('/-/healthy') +async def healthy(request): + + if proxy_is_up: + # logging.info('web reqeust healthy()') + for stream in Message: + try: + res = stream.healthy() + if not res: + return web.Response(status=503, text="I have a problem") + except Exception as err: + logging.info(f'Exception:{err}') + + return web.Response(status=200, text="I'm fine") + + +async def webserver(addr, port): + '''coro running our webserver''' + app = web.Application() + app.add_routes(routes) + runner = web.AppRunner(app) + + await runner.setup() + site = web.TCPSite(runner, addr, port) + await site.start() + logging.info(f'HTTP server listen on port: {port}') + + try: + # Normal interaction with aiohttp + while True: + await asyncio.sleep(3600) # sleep forever + except asyncio.CancelledError: + logging.info('HTTP server cancelled') + await runner.cleanup() + logging.debug('HTTP cleanup done') + + +async def handle_client(reader: StreamReader, writer: StreamWriter): '''Handles a new incoming connection and starts an async loop''' addr = writer.get_extra_info('peername') await InverterG3(reader, writer, addr).server_loop(addr) -async def handle_client_v2(reader, writer): +async def handle_client_v2(reader: StreamReader, writer: StreamWriter): '''Handles a new incoming connection and starts an async loop''' addr = writer.get_extra_info('peername') await InverterG3P(reader, writer, addr).server_loop(addr) -async def handle_shutdown(loop): +async def handle_shutdown(web_task): '''Close all TCP connections and stop the event loop''' logging.info('Shutdown due to SIGTERM') + global proxy_is_up + proxy_is_up = False # # first, disc all open TCP connections gracefully @@ -38,7 +98,7 @@ async def handle_shutdown(loop): await asyncio.wait_for(stream.disc(), 2) except Exception: pass - logging.info('Disconnecting done') + logging.info('Proxy disconnecting done') # # second, close all open TCP connections @@ -46,12 +106,20 @@ async def handle_shutdown(loop): for stream in Message: stream.close() - # - # at last, we stop the loop - # - loop.stop() + await asyncio.sleep(0.1) # give time for closing + logging.info('Proxy closing done') - logging.info('Shutdown complete') + # + # third, cancel the web server + # + web_task.cancel() + await web_task + + # + # at last, start a coro for stopping the loop + # + logging.debug("Stop event loop") + loop.stop() def get_log_level() -> int: @@ -84,17 +152,28 @@ if __name__ == "__main__": logging.getLogger('conn').setLevel(log_level) logging.getLogger('data').setLevel(log_level) logging.getLogger('tracer').setLevel(log_level) + logging.getLogger('asyncio').setLevel(log_level) # logging.getLogger('mqtt').setLevel(log_level) - # read config file - Config.class_init() - loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) + # read config file + ConfigErr = Config.class_init() + if ConfigErr is not None: + logging.info(f'ConfigErr: {ConfigErr}') Inverter.class_init() Schedule.start() + # + # Create tasks for our listening servers. These must be tasks! If we call + # start_server directly out of our main task, the eventloop will be blocked + # and we can't receive and handle the UNIX signals! + # + loop.create_task(asyncio.start_server(handle_client, '0.0.0.0', 5005)) + loop.create_task(asyncio.start_server(handle_client_v2, '0.0.0.0', 10000)) + web_task = loop.create_task(webserver('0.0.0.0', 8127)) + # # Register some UNIX Signal handler for a gracefully server shutdown # on Docker restart and stop @@ -102,22 +181,18 @@ if __name__ == "__main__": for signame in ('SIGINT', 'SIGTERM'): loop.add_signal_handler(getattr(signal, signame), lambda loop=loop: asyncio.create_task( - handle_shutdown(loop))) - - # - # Create taska for our listening servera. These must be tasks! If we call - # start_server directly out of our main task, the eventloop will be blocked - # and we can't receive and handle the UNIX signals! - # - loop.create_task(asyncio.start_server(handle_client, '0.0.0.0', 5005)) - loop.create_task(asyncio.start_server(handle_client_v2, '0.0.0.0', 10000)) + handle_shutdown(web_task))) + loop.set_debug(log_level == logging.DEBUG) try: + if ConfigErr is None: + proxy_is_up = True loop.run_forever() except KeyboardInterrupt: pass finally: + logging.info("Event loop is stopped") Inverter.class_close(loop) - logging.info('Close event loop') + logging.debug('Close event loop') loop.close() logging.info(f'Finally, exit Server "{serv_name}"') diff --git a/app/tests/test_infos_g3.py b/app/tests/test_infos_g3.py index 193536e..d335db8 100644 --- a/app/tests/test_infos_g3.py +++ b/app/tests/test_infos_g3.py @@ -140,6 +140,82 @@ def InvDataSeq2(): # Data indication from the controller msg += b'\x53\x00\x00' return msg +@pytest.fixture +def InvDataNew(): # Data indication from DSP V5.0.17 + msg = b'\x00\x00\x00\xa3\x00\x00\x00\x00\x53\x00\x00' + msg += b'\x00\x00\x00\x80\x53\x00\x00\x00\x00\x01\x04\x53\x00\x00\x00\x00' + msg += b'\x01\x90\x41\x00\x00\x01\x91\x53\x00\x00\x00\x00\x01\x90\x53\x00' + msg += b'\x00\x00\x00\x01\x91\x53\x00\x00\x00\x00\x01\x90\x53\x00\x00\x00' + msg += b'\x00\x01\x91\x53\x00\x00\x00\x00\x01\x94\x53\x00\x00\x00\x00\x01' + msg += b'\x95\x53\x00\x00\x00\x00\x01\x98\x53\x00\x00\x00\x00\x01\x99\x53' + msg += b'\x00\x00\x00\x00\x01\x80\x53\x00\x00\x00\x00\x01\x90\x41\x00\x00' + msg += b'\x01\x94\x53\x00\x00\x00\x00\x01\x94\x53\x00\x00\x00\x00\x01\x96' + msg += b'\x53\x00\x00\x00\x00\x01\x98\x53\x00\x00\x00\x00\x01\xa0\x53\x00' + msg += b'\x00\x00\x00\x01\xf0\x41\x00\x00\x01\xf1\x53\x00\x00\x00\x00\x01' + msg += b'\xf4\x53\x00\x00\x00\x00\x01\xf5\x53\x00\x00\x00\x00\x01\xf8\x53' + msg += b'\x00\x00\x00\x00\x01\xf9\x53\x00\x00\x00\x00\x00\x00\x53\x00\x00' + msg += b'\x00\x00\x00\x01\x53\x00\x00\x00\x00\x00\x00\x53\x00\x00\x00\x00' + msg += b'\x00\x01\x53\x00\x00\x00\x00\x00\x04\x53\x00\x00\x00\x00\x00\x58' + msg += b'\x41\x00\x00\x02\x00\x53\x00\x00\x00\x00\x02\x00\x53\x00\x00\x00' + msg += b'\x00\x02\x02\x53\x00\x00\x00\x00\x02\x00\x53\x00\x00\x00\x00\x02' + msg += b'\x04\x53\x00\x00\x00\x00\x02\x58\x41\x00\x00\x02\x59\x53\x00\x00' + msg += b'\x00\x00\x02\x40\x53\x00\x00\x00\x00\x02\x41\x53\x00\x00\x00\x00' + msg += b'\x02\x40\x53\x00\x00\x00\x00\x02\x41\x53\x00\x00\x00\x00\x02\x44' + msg += b'\x53\x00\x00\x00\x00\x02\x45\x53\x00\x00\x00\x00\x02\x60\x53\x00' + msg += b'\x00\x00\x00\x02\x61\x53\x00\x00\x00\x00\x02\x60\x53\x00\x00\x00' + msg += b'\x00\x02\x20\x41\x00\x00\x02\x24\x53\x00\x00\x00\x00\x02\x24\x53' + msg += b'\x00\x00\x00\x00\x02\x26\x53\x00\x00\x00\x00\x02\x40\x53\x00\x00' + msg += b'\x00\x00\x02\x40\x53\x00\x00\x00\x00\x02\x80\x41\x00\x00\x02\x81' + msg += b'\x53\x00\x00\x00\x00\x02\x84\x53\x00\x00\x00\x00\x02\x85\x53\x00' + msg += b'\x00\x00\x00\x02\xc0\x53\x00\x00\x00\x00\x02\xc1\x53\x00\x00\x00' + msg += b'\x00\x02\xc0\x53\x00\x00\x00\x00\x02\xc1\x53\x00\x00\x00\x00\x02' + msg += b'\xc0\x53\x00\x00\x00\x00\x02\xc1\x53\x00\x00\x00\x00\x02\xc4\x53' + msg += b'\x00\x00\x00\x00\x02\x00\x53\x00\x00\x00\x00\x02\x80\x53\x00\x00' + msg += b'\x00\x00\x02\xc8\x42\x00\x00\x00\x00\x48\x42\x00\x00\x00\x00\x80' + msg += b'\x42\x00\x00\x00\x00\x04\x53\x00\x00\x00\x00\x01\x20\x53\x00\x00' + msg += b'\x00\x00\x01\x84\x53\x00\x10\x00\x00\x02\x40\x46\x00\x00\x00\x00' + msg += b'\x00\x00\x04\x04\x46\x02\x00\x46\x02\x00\x00\x04\x00\x46\x00\x00' + msg += b'\x00\x00\x00\x00\x05\x04\x42\x00\x00\x00\x05\x50\x42\x00\x00\x00' + msg += b'\x00\x14\x42\x00\x00\x00\x00\x00\x46\x00\x00\x00\x00\x00\x00\x00' + msg += b'\xa4\x46\x00\x00\x00\x00\x00\x00\x01\x00\x46\x00\x00\x00\x00\x00' + msg += b'\x00\x01\x44\x46\x00\x00\x00\x00\x00\x00\x02\x00\x46\x00\x00\x00' + msg += b'\x00\x00\x00\x08\x04\x46\x00\x00\x00\x00\x00\x00\x08\x90\x46\x00' + msg += b'\x00\x00\x00\x00\x00\x08\x54\x46\x00\x00\x00\x00\x00\x00\x09\x20' + msg += b'\x46\x00\x00\x00\x00\x00\x00\x08\x04\x46\x00\x00\x00\x00\x00\x00' + msg += b'\x08\x00\x46\x00\x00\x00\x00\x00\x00\x08\x84\x46\x00\x00\x00\x00' + msg += b'\x00\x00\x08\x40\x46\x00\x00\x00\x00\x00\x00\x09\x04\x46\x00\x00' + msg += b'\x00\x00\x00\x00\x0a\x10\x46\x00\x00\x00\x00\x00\x00\x0c\x14\x46' + msg += b'\x00\x00\x00\x00\x00\x00\x0c\x80\x46\x00\x00\x00\x00\x00\x00\x0c' + msg += b'\x24\x42\x00\x00\x00\x0d\x00\x42\x00\x00\x00\x00\x04\x42\x00\x00' + msg += b'\x00\x00\x00\x42\x00\x00\x00\x00\x44\x42\x00\x00\x00\x00\x10\x42' + msg += b'\x00\x00\x00\x01\x14\x53\x00\x00\x00\x00\x01\xa0\x53\x00\x00\x00' + msg += b'\x00\x10\x04\x53\x55\xaa\x00\x00\x10\x40\x53\x00\x00\x00\x00\x10' + msg += b'\x04\x53\x00\x00\x00\x00\x11\x00\x53\x00\x00\x00\x00\x11\x84\x53' + msg += b'\x00\x00\x00\x00\x10\x50\x53\xff\xff\x00\x00\x10\x14\x53\x03\x20' + msg += b'\x00\x00\x10\x00\x53\x00\x00\x00\x00\x11\x24\x53\x00\x00\x00\x00' + msg += b'\x03\x00\x53\x00\x00\x00\x00\x03\x64\x53\x00\x00\x00\x00\x04\x50' + msg += b'\x53\x00\x00\x00\x00\x00\x34\x53\x00\x00\x00\x00\x00\x00\x42\x02' + msg += b'\x00\x00\x01\x04\x42\x00\x00\x00\x21\x00\x42\x00\x00\x00\x21\x44' + msg += b'\x42\x00\x00\x00\x22\x10\x53\x00\x00\x00\x00\x28\x14\x42\x01\x00' + msg += b'\x00\x28\xa0\x46\x42\x48\x00\x00\x00\x00\x29\x04\x42\x00\x00\x00' + msg += b'\x29\x40\x42\x00\x00\x00\x28\x04\x46\x42\x10\x00\x00\x00\x00\x28' + msg += b'\x00\x42\x00\x00\x00\x28\x84\x42\x00\x00\x00\x28\x50\x42\x00\x00' + msg += b'\x00\x29\x14\x42\x00\x00\x00\x2a\x00\x42\x00\x00\x00\x2c\x24\x46' + msg += b'\x42\x10\x00\x00\x00\x00\x2c\x80\x42\x00\x00\x00\x2c\x44\x53\x00' + msg += b'\x02\x00\x00\x2d\x00\x42\x00\x00\x00\x20\x04\x46\x42\x4d\x00\x00' + msg += b'\x00\x00\x20\x10\x42\x00\x00\x00\x20\x54\x42\x00\x00\x00\x20\x20' + msg += b'\x42\x00\x00\x00\x21\x04\x53\x00\x01\x00\x00\x22\x00\x42\x00\x00' + msg += b'\x00\x30\x04\x42\x00\x00\x00\x30\x40\x53\x00\x00\x00\x00\x30\x04' + msg += b'\x53\x00\x00\x00\x00\x31\x10\x42\x00\x00\x00\x31\x94\x53\x00\x04' + msg += b'\x00\x00\x30\x00\x53\x00\x00\x00\x00\x30\x24\x53\x00\x00\x00\x00' + msg += b'\x30\x00\x53\x00\x00\x00\x00\x31\x04\x53\x00\x00\x00\x00\x31\x80' + msg += b'\x53\x00\x00\x00\x00\x32\x44\x53\x00\x00\x00\x00\x30\x00\x53\x00' + msg += b'\x00\x00\x00\x30\x80\x53\x00\x00\x00\x00\x30\x00\x53\x00\x00\x00' + msg += b'\x00\x30\x80\x53\x00\x00\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x03\x00' + msg += b'\x00\x00\x00\x00' + return msg + @pytest.fixture def InvDataSeq2_Zero(): # Data indication from the controller msg = b'\x00\x00\x00\xa3\x00\x00\x00\x64\x53\x00\x01\x00\x00\x00\xc8\x53\x00\x02\x00\x00\x01\x2c\x53\x00\x00\x00\x00\x01\x90\x49\x00\x00\x00\x00\x00\x00\x01\x91\x53\x00\x00' @@ -391,6 +467,25 @@ def test_must_incr_total2(InvDataSeq2, InvDataSeq2_Zero): assert json.dumps(i.db['total']) == json.dumps({'Daily_Generation': 1.7, 'Total_Generation': 17.36}) assert json.dumps(i.db['input']) == json.dumps({"pv1": {"Voltage": 33.6, "Current": 1.91, "Power": 64.5, "Daily_Generation": 1.08, "Total_Generation": 9.74}, "pv2": {"Voltage": 33.5, "Current": 1.36, "Power": 45.7, "Daily_Generation": 0.62, "Total_Generation": 7.62}, "pv3": {"Voltage": 0.0, "Current": 0.0, "Power": 0.0}, "pv4": {"Voltage": 0.0, "Current": 0.0, "Power": 0.0}}) +def test_new_data_types(InvDataNew): + i = InfosG3() + tests = 0 + for key, update in i.parse (InvDataNew): + if key == 'events': + tests +=1 + elif key == 'inverter': + assert update == True + tests +=1 + elif key == 'input': + assert update == False + tests +=1 + else: + assert False + + assert tests==15 + assert json.dumps(i.db['inverter']) == json.dumps({"Manufacturer": 0}) + assert json.dumps(i.db['input']) == json.dumps({"pv1": {}}) + assert json.dumps(i.db['events']) == json.dumps({"401_": 0, "404_": 0, "405_": 0, "408_": 0, "409_No_Utility": 0, "406_": 0, "416_": 0}) def test_invalid_data_type(InvalidDataSeq): i = InfosG3() diff --git a/app/tests/test_modbus.py b/app/tests/test_modbus.py index b44cb12..0009182 100644 --- a/app/tests/test_modbus.py +++ b/app/tests/test_modbus.py @@ -5,9 +5,9 @@ from app.src.modbus import Modbus from app.src.infos import Infos, Register pytest_plugins = ('pytest_asyncio',) -pytestmark = pytest.mark.asyncio(scope="module") +# pytestmark = pytest.mark.asyncio(scope="module") -class TestHelper(Modbus): +class ModbusTestHelper(Modbus): def __init__(self): super().__init__(self.send_cb) self.db = Infos() @@ -35,7 +35,7 @@ def test_modbus_crc(): def test_build_modbus_pdu(): '''Check building and sending a MODBUS RTU''' - mb = TestHelper() + mb = ModbusTestHelper() mb.build_msg(1,6,0x2000,0x12) assert mb.pdu == b'\x01\x06\x20\x00\x00\x12\x02\x07' assert mb._Modbus__check_crc(mb.pdu) @@ -47,7 +47,7 @@ def test_build_modbus_pdu(): def test_recv_req(): '''Receive a valid request, which must transmitted''' - mb = TestHelper() + mb = ModbusTestHelper() assert mb.recv_req(b'\x01\x06\x20\x00\x00\x12\x02\x07') assert mb.last_fcode == 6 assert mb.last_reg == 0x2000 @@ -56,7 +56,7 @@ def test_recv_req(): def test_recv_req_crc_err(): '''Receive a request with invalid CRC, which must be dropped''' - mb = TestHelper() + mb = ModbusTestHelper() assert not mb.recv_req(b'\x01\x06\x20\x00\x00\x12\x02\x08') assert mb.pdu == None assert mb.last_fcode == 0 @@ -66,7 +66,7 @@ def test_recv_req_crc_err(): def test_recv_resp_crc_err(): '''Receive a response with invalid CRC, which must be dropped''' - mb = TestHelper() + mb = ModbusTestHelper() # simulate a transmitted request mb.req_pend = True mb.last_addr = 1 @@ -86,7 +86,7 @@ def test_recv_resp_crc_err(): def test_recv_resp_invalid_addr(): '''Receive a response with wrong server addr, which must be dropped''' - mb = TestHelper() + mb = ModbusTestHelper() mb.req_pend = True # simulate a transmitted request mb.last_addr = 1 @@ -109,7 +109,7 @@ def test_recv_resp_invalid_addr(): def test_recv_recv_fcode(): '''Receive a response with wrong function code, which must be dropped''' - mb = TestHelper() + mb = ModbusTestHelper() mb.build_msg(1,4,0x300e,2) assert mb.que.qsize() == 0 assert mb.req_pend @@ -130,7 +130,7 @@ def test_recv_recv_fcode(): def test_recv_resp_len(): '''Receive a response with wrong data length, which must be dropped''' - mb = TestHelper() + mb = ModbusTestHelper() mb.build_msg(1,3,0x300e,3) assert mb.que.qsize() == 0 assert mb.req_pend @@ -152,7 +152,7 @@ def test_recv_resp_len(): def test_recv_unexpect_resp(): '''Receive a response when we havb't sent a request''' - mb = TestHelper() + mb = ModbusTestHelper() assert not mb.req_pend # check unexpected response, which must be dropped @@ -167,7 +167,7 @@ def test_recv_unexpect_resp(): def test_parse_resp(): '''Receive matching response and parse the values''' - mb = TestHelper() + mb = ModbusTestHelper() mb.build_msg(1,3,0x3007,6) assert mb.que.qsize() == 0 assert mb.req_pend @@ -191,7 +191,7 @@ def test_parse_resp(): assert not mb.req_pend def test_queue(): - mb = TestHelper() + mb = ModbusTestHelper() mb.build_msg(1,3,0x3022,4) assert mb.que.qsize() == 0 assert mb.req_pend @@ -210,7 +210,7 @@ def test_queue(): def test_queue2(): '''Check queue handling for build_msg() calls''' - mb = TestHelper() + mb = ModbusTestHelper() mb.build_msg(1,3,0x3007,6) mb.build_msg(1,6,0x2008,4) assert mb.que.qsize() == 1 @@ -258,7 +258,7 @@ def test_queue2(): def test_queue3(): '''Check queue handling for recv_req() calls''' - mb = TestHelper() + mb = ModbusTestHelper() assert mb.recv_req(b'\x01\x03\x30\x07\x00\x06{\t', mb.resp_handler) assert mb.recv_req(b'\x01\x06\x20\x08\x00\x04\x02\x0b', mb.resp_handler) assert mb.que.qsize() == 1 @@ -315,7 +315,7 @@ def test_queue3(): async def test_timeout(): '''Test MODBUS response timeout and RTU retransmitting''' assert asyncio.get_running_loop() - mb = TestHelper() + mb = ModbusTestHelper() mb.max_retries = 2 mb.timeout = 0.1 # 100ms timeout for fast testing, expect a time resolution of at least 10ms assert asyncio.get_running_loop() == mb.loop @@ -363,7 +363,7 @@ async def test_timeout(): def test_recv_unknown_data(): '''Receive a response with an unknwon register''' - mb = TestHelper() + mb = ModbusTestHelper() assert 0x9000 not in mb.map mb.map[0x9000] = {'reg': Register.TEST_REG1, 'fmt': '!H', 'ratio': 1} diff --git a/app/tests/test_solarman.py b/app/tests/test_solarman.py index fca2b41..54e4878 100644 --- a/app/tests/test_solarman.py +++ b/app/tests/test_solarman.py @@ -7,6 +7,7 @@ from app.src.gen3plus.solarman_v5 import SolarmanV5 from app.src.config import Config from app.src.infos import Infos, Register from app.src.modbus import Modbus +from app.src.messages import State pytest_plugins = ('pytest_asyncio',) @@ -24,12 +25,24 @@ class Writer(): def write(self, pdu: bytearray): self.sent_pdu = pdu + +class Mqtt(): + def __init__(self): + self.key = '' + self.data = '' + + async def publish(self, key, data): + self.key = key + self.data = data + + class MemoryStream(SolarmanV5): def __init__(self, msg, chunks = (0,), server_side: bool = True): super().__init__(server_side) if server_side: self.mb.timeout = 1 # overwrite for faster testing self.writer = Writer() + self.mqtt = Mqtt() self.__msg = msg self.__msg_len = len(msg) self.__chunks = chunks @@ -43,6 +56,8 @@ class MemoryStream(SolarmanV5): self.test_exception_async_write = False self.entity_prfx = '' self.at_acl = {'mqtt': {'allow': ['AT+'], 'block': ['AT+WEBU']}, 'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE', 'AT+TIME'], 'block': ['AT+WEBU']}} + self.key = '' + self.data = '' def _timestamp(self): return timestamp @@ -54,6 +69,10 @@ class MemoryStream(SolarmanV5): self.__msg += msg self.__msg_len += len(msg) + def publish_mqtt(self, key, data): + self.key = key + self.data = data + def _read(self) -> int: copied_bytes = 0 try: @@ -481,9 +500,10 @@ def AtCommandIndMsgBlock(): # 0x4510 @pytest.fixture def AtCommandRspMsg(): # 0x1510 - msg = b'\xa5\x0a\x00\x10\x15\x03\x03' +get_sn() +b'\x01\x01' + msg = b'\xa5\x11\x00\x10\x15\x03\x03' +get_sn() +b'\x01\x01' msg += total() msg += hb() + msg += b'\x00\x00\x00\x00+ok' msg += correct_checksum(msg) msg += b'\x15' return msg @@ -832,8 +852,7 @@ def test_read_message_in_chunks2(ConfigTsunInv1, DeviceIndMsg): assert m.data_len == 0xd4 assert m.msg_count == 1 assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 - while m.read(): # read rest of message - pass + m.read() # read rest of message assert m.msg_count == 1 assert not m.header_valid # must be invalid, since msg was handled and buffer flushed assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0 @@ -1255,48 +1274,64 @@ async def test_msg_build_modbus_req(ConfigTsunInv1, DeviceIndMsg, DeviceRspMsg, m.close() @pytest.mark.asyncio -async def test_AT_cmd(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, InverterIndMsg, InverterRspMsg, AtCommandIndMsg): +async def test_AT_cmd(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, InverterIndMsg, InverterRspMsg, AtCommandIndMsg, AtCommandRspMsg): ConfigTsunAllowAll m = MemoryStream(DeviceIndMsg, (0,), True) m.append_msg(InverterIndMsg) - m.read() + m.append_msg(AtCommandRspMsg) + m.read() # read device ind assert m.control == 0x4110 assert str(m.seq) == '01:01' - assert m._recv_buffer==InverterIndMsg # unhandled next message + assert m._recv_buffer==InverterIndMsg + AtCommandRspMsg # unhandled next message assert m._send_buffer==DeviceRspMsg assert m._forward_buffer==DeviceIndMsg m._send_buffer = bytearray(0) # clear send buffer for next test m._forward_buffer = bytearray(0) # clear send buffer for next test await m.send_at_cmd('AT+TIME=214028,1,60,120') - assert m._recv_buffer==InverterIndMsg # unhandled next message + assert m._recv_buffer==InverterIndMsg + AtCommandRspMsg # unhandled next message assert m._send_buffer==b'' assert m._forward_buffer==b'' assert str(m.seq) == '01:01' + assert m.mqtt.key == '' + assert m.mqtt.data == "" - m.read() + m.read() # read inverter ind assert m.control == 0x4210 assert str(m.seq) == '02:02' - assert m._recv_buffer==b'' + assert m._recv_buffer==AtCommandRspMsg # unhandled next message assert m._send_buffer==InverterRspMsg assert m._forward_buffer==InverterIndMsg m._send_buffer = bytearray(0) # clear send buffer for next test m._forward_buffer = bytearray(0) # clear send buffer for next test await m.send_at_cmd('AT+TIME=214028,1,60,120') - assert m._recv_buffer==b'' + assert m._recv_buffer==AtCommandRspMsg # unhandled next message assert m._send_buffer==AtCommandIndMsg assert m._forward_buffer==b'' assert str(m.seq) == '02:03' + assert m.mqtt.key == '' + assert m.mqtt.data == "" m._send_buffer = bytearray(0) # clear send buffer for next test + m.read() # read at resp + assert m.control == 0x1510 + assert str(m.seq) == '03:03' + assert m._recv_buffer==b'' + assert m._send_buffer==b'' + assert m._forward_buffer==b'' + assert m.key == 'at_resp' + assert m.data == "+ok" + m.test_exception_async_write = True await m.send_at_cmd('AT+TIME=214028,1,60,120') assert m._recv_buffer==b'' assert m._send_buffer==b'' assert m._forward_buffer==b'' - assert str(m.seq) == '02:04' + assert str(m.seq) == '03:04' assert m.forward_at_cmd_resp == False + assert m.mqtt.key == '' + assert m.mqtt.data == "" m.close() @pytest.mark.asyncio @@ -1318,6 +1353,8 @@ async def test_AT_cmd_blocked(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, In assert m._send_buffer==b'' assert m._forward_buffer==b'' assert str(m.seq) == '01:01' + assert m.mqtt.key == '' + assert m.mqtt.data == "" m.read() assert m.control == 0x4210 @@ -1334,6 +1371,8 @@ async def test_AT_cmd_blocked(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, In assert m._forward_buffer==b'' assert str(m.seq) == '02:02' assert m.forward_at_cmd_resp == False + assert m.mqtt.key == 'at_resp' + assert m.mqtt.data == "'AT+WEBU' is forbidden" m.close() def test_AT_cmd_ind(ConfigTsunInv1, AtCommandIndMsg): @@ -1398,7 +1437,7 @@ def test_msg_at_command_rsp1(ConfigTsunInv1, AtCommandRspMsg): assert m.control == 0x1510 assert str(m.seq) == '03:03' assert m.header_len==11 - assert m.data_len==10 + assert m.data_len==17 assert m._forward_buffer==AtCommandRspMsg assert m._send_buffer==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 @@ -1417,7 +1456,7 @@ def test_msg_at_command_rsp2(ConfigTsunInv1, AtCommandRspMsg): assert m.control == 0x1510 assert str(m.seq) == '03:03' assert m.header_len==11 - assert m.data_len==10 + assert m.data_len==17 assert m._forward_buffer==b'' assert m._send_buffer==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 @@ -1428,7 +1467,7 @@ def test_msg_modbus_req(ConfigTsunInv1, MsgModbusCmd, MsgModbusCmdFwd): ConfigTsunInv1 m = MemoryStream(b'') m.snr = get_sn_int() - m.state = m.STATE_UP + m.state = State.up c = m.createClientStream(MsgModbusCmd) m.db.stat['proxy']['Unknown_Ctrl'] = 0 @@ -1455,7 +1494,7 @@ def test_msg_modbus_req2(ConfigTsunInv1, MsgModbusCmdCrcErr): ConfigTsunInv1 m = MemoryStream(b'') m.snr = get_sn_int() - m.state = m.STATE_UP + m.state = State.up c = m.createClientStream(MsgModbusCmdCrcErr) m.db.stat['proxy']['Unknown_Ctrl'] = 0 @@ -1661,21 +1700,21 @@ def test_zombie_conn(ConfigTsunInv1, MsgInverterInd): m1 = MemoryStream(MsgInverterInd, (0,)) m2 = MemoryStream(MsgInverterInd, (0,)) m3 = MemoryStream(MsgInverterInd, (0,)) - assert m1.state == m1.STATE_INIT - assert m2.state == m2.STATE_INIT - assert m3.state == m3.STATE_INIT + assert m1.state == m1.State.init + assert m2.state == m2.State.init + assert m3.state == m3.State.init m1.read() # read complete msg, and set unique_id - assert m1.state == m1.STATE_INIT - assert m2.state == m2.STATE_INIT - assert m3.state == m3.STATE_INIT + assert m1.state == m1.State.init + assert m2.state == m2.State.init + assert m3.state == m3.State.init m2.read() # read complete msg, and set unique_id - assert m1.state == m1.STATE_CLOSED - assert m2.state == m2.STATE_INIT - assert m3.state == m3.STATE_INIT + assert m1.state == m1.State.closed + assert m2.state == m2.State.init + assert m3.state == m3.State.init m3.read() # read complete msg, and set unique_id - assert m1.state == m1.STATE_CLOSED - assert m2.state == m2.STATE_CLOSED - assert m3.state == m3.STATE_INIT + assert m1.state == m1.State.closed + assert m2.state == m2.State.closed + assert m3.state == m3.State.init m1.close() m2.close() m3.close() diff --git a/app/tests/test_talent.py b/app/tests/test_talent.py index 102979c..f8efca0 100644 --- a/app/tests/test_talent.py +++ b/app/tests/test_talent.py @@ -4,6 +4,7 @@ from app.src.gen3.talent import Talent, Control from app.src.config import Config from app.src.infos import Infos, Register from app.src.modbus import Modbus +from app.src.messages import State pytest_plugins = ('pytest_asyncio',) @@ -60,7 +61,8 @@ class MemoryStream(Talent): return copied_bytes def _timestamp(self): - return 1700260990000 + # return 1700260990000 + return 1691246944000 def createClientStream(self, msg, chunks = (0,)): c = MemoryStream(msg, chunks, False) @@ -113,6 +115,10 @@ def MsgGetTime(): # Get Time Request message def MsgTimeResp(): # Get Time Resonse message return b'\x00\x00\x00\x1b\x10R170000000000001\x91\x22\x00\x00\x01\x89\xc6\x63\x4d\x80' +@pytest.fixture +def MsgTimeRespInv(): # Get Time Resonse message + return b'\x00\x00\x00\x17\x10R170000000000001\x91\x22\x00\x00\x01\x89' + @pytest.fixture def MsgTimeInvalid(): # Get Time Request message return b'\x00\x00\x00\x13\x10R170000000000001\x94\x22' @@ -129,6 +135,18 @@ def MsgControllerInd(): # Data indication from the controller msg += b'\x49\x00\x00\x00\x02\x00\x0d\x04\x08\x49\x00\x00\x00\x00\x00\x07\xa1\x84\x49\x00\x00\x00\x01\x00\x0c\x50\x59\x49\x00\x00\x00\x4c\x00\x0d\x1f\x60\x49\x00\x00\x00\x00' return msg +@pytest.fixture +def MsgControllerIndTsOffs(): # Data indication from the controller - offset 0x1000 + msg = b'\x00\x00\x01\x2f\x10R170000000000001\x91\x71\x0e\x10\x00\x00\x10R170000000000001' + msg += b'\x01\x00\x00\x01\x89\xc6\x63\x45\x50' + msg += b'\x00\x00\x00\x15\x00\x09\x2b\xa8\x54\x10\x52\x53\x57\x5f\x34\x30\x30\x5f\x56\x31\x2e\x30\x30\x2e\x30\x36\x00\x09\x27\xc0\x54\x06\x52\x61\x79\x6d\x6f' + msg += b'\x6e\x00\x09\x2f\x90\x54\x0b\x52\x53\x57\x2d\x31\x2d\x31\x30\x30\x30\x31\x00\x09\x5a\x88\x54\x0f\x74\x2e\x72\x61\x79\x6d\x6f\x6e\x69\x6f\x74\x2e\x63\x6f\x6d\x00\x09\x5a\xec\x54' + msg += b'\x1c\x6c\x6f\x67\x67\x65\x72\x2e\x74\x61\x6c\x65\x6e\x74\x2d\x6d\x6f\x6e\x69\x74\x6f\x72\x69\x6e\x67\x2e\x63\x6f\x6d\x00\x0d\x00\x20\x49\x00\x00\x00\x01\x00\x0c\x35\x00\x49\x00' + msg += b'\x00\x00\x64\x00\x0c\x96\xa8\x49\x00\x00\x00\x1d\x00\x0c\x7f\x38\x49\x00\x00\x00\x01\x00\x0c\xfc\x38\x49\x00\x00\x00\x01\x00\x0c\xf8\x50\x49\x00\x00\x01\x2c\x00\x0c\x63\xe0\x49' + msg += b'\x00\x00\x00\x00\x00\x0c\x67\xc8\x49\x00\x00\x00\x00\x00\x0c\x50\x58\x49\x00\x00\x00\x01\x00\x09\x5e\x70\x49\x00\x00\x13\x8d\x00\x09\x5e\xd4\x49\x00\x00\x13\x8d\x00\x09\x5b\x50' + msg += b'\x49\x00\x00\x00\x02\x00\x0d\x04\x08\x49\x00\x00\x00\x00\x00\x07\xa1\x84\x49\x00\x00\x00\x01\x00\x0c\x50\x59\x49\x00\x00\x00\x4c\x00\x0d\x1f\x60\x49\x00\x00\x00\x00' + return msg + @pytest.fixture def MsgControllerAck(): # Get Time Request message return b'\x00\x00\x00\x14\x10R170000000000001\x99\x71\x01' @@ -145,6 +163,92 @@ def MsgInverterInd(): # Data indication from the controller msg += b'\x54\x10T170000000000001\x00\x00\x00\x32\x54\x0a\x54\x53\x4f\x4c\x2d\x4d\x53\x36\x30\x30\x00\x00\x00\x3c\x54\x05\x41\x2c\x42\x2c\x43' return msg +@pytest.fixture +def MsgInverterIndTsOffs(): # Data indication from the controller + offset 256 + msg = b'\x00\x00\x00\x8b\x10R170000000000001\x91\x04\x01\x90\x00\x01\x10R170000000000001' + msg += b'\x01\x00\x00\x01\x89\xc6\x63\x62\x08' + msg += b'\x00\x00\x00\x06\x00\x00\x00\x0a\x54\x08\x4d\x69\x63\x72\x6f\x69\x6e\x76\x00\x00\x00\x14\x54\x04\x54\x53\x55\x4e\x00\x00\x00\x1E\x54\x07\x56\x35\x2e\x30\x2e\x31\x31\x00\x00\x00\x28' + msg += b'\x54\x10T170000000000001\x00\x00\x00\x32\x54\x0a\x54\x53\x4f\x4c\x2d\x4d\x53\x36\x30\x30\x00\x00\x00\x3c\x54\x05\x41\x2c\x42\x2c\x43' + return msg + +@pytest.fixture +def MsgInverterIndNew(): # Data indication from DSP V5.0.17 + msg = b'\x00\x00\x04\xa0\x10R170000000000001\x91\x04\x01\x90\x00\x01\x10R170000000000001' + msg += b'\x01\x00\x00\x01' + msg += b'\x90\x31\x4d\x68\x78\x00\x00\x00\xa3\x00\x00\x00\x00\x53\x00\x00' + msg += b'\x00\x00\x00\x80\x53\x00\x00\x00\x00\x01\x04\x53\x00\x00\x00\x00' + msg += b'\x01\x90\x41\x00\x00\x01\x91\x53\x00\x00\x00\x00\x01\x90\x53\x00' + msg += b'\x00\x00\x00\x01\x91\x53\x00\x00\x00\x00\x01\x90\x53\x00\x00\x00' + msg += b'\x00\x01\x91\x53\x00\x00\x00\x00\x01\x94\x53\x00\x00\x00\x00\x01' + msg += b'\x95\x53\x00\x00\x00\x00\x01\x98\x53\x00\x00\x00\x00\x01\x99\x53' + msg += b'\x00\x00\x00\x00\x01\x80\x53\x00\x00\x00\x00\x01\x90\x41\x00\x00' + msg += b'\x01\x94\x53\x00\x00\x00\x00\x01\x94\x53\x00\x00\x00\x00\x01\x96' + msg += b'\x53\x00\x00\x00\x00\x01\x98\x53\x00\x00\x00\x00\x01\xa0\x53\x00' + msg += b'\x00\x00\x00\x01\xf0\x41\x00\x00\x01\xf1\x53\x00\x00\x00\x00\x01' + msg += b'\xf4\x53\x00\x00\x00\x00\x01\xf5\x53\x00\x00\x00\x00\x01\xf8\x53' + msg += b'\x00\x00\x00\x00\x01\xf9\x53\x00\x00\x00\x00\x00\x00\x53\x00\x00' + msg += b'\x00\x00\x00\x01\x53\x00\x00\x00\x00\x00\x00\x53\x00\x00\x00\x00' + msg += b'\x00\x01\x53\x00\x00\x00\x00\x00\x04\x53\x00\x00\x00\x00\x00\x58' + msg += b'\x41\x00\x00\x02\x00\x53\x00\x00\x00\x00\x02\x00\x53\x00\x00\x00' + msg += b'\x00\x02\x02\x53\x00\x00\x00\x00\x02\x00\x53\x00\x00\x00\x00\x02' + msg += b'\x04\x53\x00\x00\x00\x00\x02\x58\x41\x00\x00\x02\x59\x53\x00\x00' + msg += b'\x00\x00\x02\x40\x53\x00\x00\x00\x00\x02\x41\x53\x00\x00\x00\x00' + msg += b'\x02\x40\x53\x00\x00\x00\x00\x02\x41\x53\x00\x00\x00\x00\x02\x44' + msg += b'\x53\x00\x00\x00\x00\x02\x45\x53\x00\x00\x00\x00\x02\x60\x53\x00' + msg += b'\x00\x00\x00\x02\x61\x53\x00\x00\x00\x00\x02\x60\x53\x00\x00\x00' + msg += b'\x00\x02\x20\x41\x00\x00\x02\x24\x53\x00\x00\x00\x00\x02\x24\x53' + msg += b'\x00\x00\x00\x00\x02\x26\x53\x00\x00\x00\x00\x02\x40\x53\x00\x00' + msg += b'\x00\x00\x02\x40\x53\x00\x00\x00\x00\x02\x80\x41\x00\x00\x02\x81' + msg += b'\x53\x00\x00\x00\x00\x02\x84\x53\x00\x00\x00\x00\x02\x85\x53\x00' + msg += b'\x00\x00\x00\x02\xc0\x53\x00\x00\x00\x00\x02\xc1\x53\x00\x00\x00' + msg += b'\x00\x02\xc0\x53\x00\x00\x00\x00\x02\xc1\x53\x00\x00\x00\x00\x02' + msg += b'\xc0\x53\x00\x00\x00\x00\x02\xc1\x53\x00\x00\x00\x00\x02\xc4\x53' + msg += b'\x00\x00\x00\x00\x02\x00\x53\x00\x00\x00\x00\x02\x80\x53\x00\x00' + msg += b'\x00\x00\x02\xc8\x42\x00\x00\x00\x00\x48\x42\x00\x00\x00\x00\x80' + msg += b'\x42\x00\x00\x00\x00\x04\x53\x00\x00\x00\x00\x01\x20\x53\x00\x00' + msg += b'\x00\x00\x01\x84\x53\x00\x10\x00\x00\x02\x40\x46\x00\x00\x00\x00' + msg += b'\x00\x00\x04\x04\x46\x02\x00\x46\x02\x00\x00\x04\x00\x46\x00\x00' + msg += b'\x00\x00\x00\x00\x05\x04\x42\x00\x00\x00\x05\x50\x42\x00\x00\x00' + msg += b'\x00\x14\x42\x00\x00\x00\x00\x00\x46\x00\x00\x00\x00\x00\x00\x00' + msg += b'\xa4\x46\x00\x00\x00\x00\x00\x00\x01\x00\x46\x00\x00\x00\x00\x00' + msg += b'\x00\x01\x44\x46\x00\x00\x00\x00\x00\x00\x02\x00\x46\x00\x00\x00' + msg += b'\x00\x00\x00\x08\x04\x46\x00\x00\x00\x00\x00\x00\x08\x90\x46\x00' + msg += b'\x00\x00\x00\x00\x00\x08\x54\x46\x00\x00\x00\x00\x00\x00\x09\x20' + msg += b'\x46\x00\x00\x00\x00\x00\x00\x08\x04\x46\x00\x00\x00\x00\x00\x00' + msg += b'\x08\x00\x46\x00\x00\x00\x00\x00\x00\x08\x84\x46\x00\x00\x00\x00' + msg += b'\x00\x00\x08\x40\x46\x00\x00\x00\x00\x00\x00\x09\x04\x46\x00\x00' + msg += b'\x00\x00\x00\x00\x0a\x10\x46\x00\x00\x00\x00\x00\x00\x0c\x14\x46' + msg += b'\x00\x00\x00\x00\x00\x00\x0c\x80\x46\x00\x00\x00\x00\x00\x00\x0c' + msg += b'\x24\x42\x00\x00\x00\x0d\x00\x42\x00\x00\x00\x00\x04\x42\x00\x00' + msg += b'\x00\x00\x00\x42\x00\x00\x00\x00\x44\x42\x00\x00\x00\x00\x10\x42' + msg += b'\x00\x00\x00\x01\x14\x53\x00\x00\x00\x00\x01\xa0\x53\x00\x00\x00' + msg += b'\x00\x10\x04\x53\x55\xaa\x00\x00\x10\x40\x53\x00\x00\x00\x00\x10' + msg += b'\x04\x53\x00\x00\x00\x00\x11\x00\x53\x00\x00\x00\x00\x11\x84\x53' + msg += b'\x00\x00\x00\x00\x10\x50\x53\xff\xff\x00\x00\x10\x14\x53\x03\x20' + msg += b'\x00\x00\x10\x00\x53\x00\x00\x00\x00\x11\x24\x53\x00\x00\x00\x00' + msg += b'\x03\x00\x53\x00\x00\x00\x00\x03\x64\x53\x00\x00\x00\x00\x04\x50' + msg += b'\x53\x00\x00\x00\x00\x00\x34\x53\x00\x00\x00\x00\x00\x00\x42\x02' + msg += b'\x00\x00\x01\x04\x42\x00\x00\x00\x21\x00\x42\x00\x00\x00\x21\x44' + msg += b'\x42\x00\x00\x00\x22\x10\x53\x00\x00\x00\x00\x28\x14\x42\x01\x00' + msg += b'\x00\x28\xa0\x46\x42\x48\x00\x00\x00\x00\x29\x04\x42\x00\x00\x00' + msg += b'\x29\x40\x42\x00\x00\x00\x28\x04\x46\x42\x10\x00\x00\x00\x00\x28' + msg += b'\x00\x42\x00\x00\x00\x28\x84\x42\x00\x00\x00\x28\x50\x42\x00\x00' + msg += b'\x00\x29\x14\x42\x00\x00\x00\x2a\x00\x42\x00\x00\x00\x2c\x24\x46' + msg += b'\x42\x10\x00\x00\x00\x00\x2c\x80\x42\x00\x00\x00\x2c\x44\x53\x00' + msg += b'\x02\x00\x00\x2d\x00\x42\x00\x00\x00\x20\x04\x46\x42\x4d\x00\x00' + msg += b'\x00\x00\x20\x10\x42\x00\x00\x00\x20\x54\x42\x00\x00\x00\x20\x20' + msg += b'\x42\x00\x00\x00\x21\x04\x53\x00\x01\x00\x00\x22\x00\x42\x00\x00' + msg += b'\x00\x30\x04\x42\x00\x00\x00\x30\x40\x53\x00\x00\x00\x00\x30\x04' + msg += b'\x53\x00\x00\x00\x00\x31\x10\x42\x00\x00\x00\x31\x94\x53\x00\x04' + msg += b'\x00\x00\x30\x00\x53\x00\x00\x00\x00\x30\x24\x53\x00\x00\x00\x00' + msg += b'\x30\x00\x53\x00\x00\x00\x00\x31\x04\x53\x00\x00\x00\x00\x31\x80' + msg += b'\x53\x00\x00\x00\x00\x32\x44\x53\x00\x00\x00\x00\x30\x00\x53\x00' + msg += b'\x00\x00\x00\x30\x80\x53\x00\x00\x00\x00\x30\x00\x53\x00\x00\x00' + msg += b'\x00\x30\x80\x53\x00\x00\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x03\x00' + msg += b'\x00\x00\x00\x00' + return msg + @pytest.fixture def MsgInverterAck(): # Get Time Request message return b'\x00\x00\x00\x14\x10R170000000000001\x99\x04\x01' @@ -331,8 +435,7 @@ def test_read_message_in_chunks2(MsgContactInfo): assert int(m.ctrl)==145 assert m.msg_id==0 assert m.msg_count == 1 - while m.read(): # read rest of message - pass + m.read() # read rest of message assert m.msg_count == 1 assert not m.header_valid # must be invalid, since msg was handled and buffer flushed m.close() @@ -471,9 +574,10 @@ def test_msg_get_time(ConfigTsunInv1, MsgGetTime): assert int(m.ctrl)==145 assert m.msg_id==34 assert m.header_len==23 + assert m.ts_offset==0 assert m.data_len==0 assert m._forward_buffer==MsgGetTime - assert m._send_buffer==b'' + assert m._send_buffer==b'\x00\x00\x00\x1b\x10R170000000000001\x91"\x00\x00\x01\x89\xc6,_\x00' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 m.close() @@ -489,9 +593,10 @@ def test_msg_get_time_autark(ConfigNoTsunInv1, MsgGetTime): assert int(m.ctrl)==145 assert m.msg_id==34 assert m.header_len==23 + assert m.ts_offset==0 assert m.data_len==0 assert m._forward_buffer==b'' - assert m._send_buffer==b'\x00\x00\x00\x1b\x10R170000000000001\x91"\x00\x00\x01\x8b\xdfs\xcc0' + assert m._send_buffer==bytearray(b'\x00\x00\x00\x1b\x10R170000000000001\x91"\x00\x00\x01\x89\xc6,_\x00') assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 m.close() @@ -507,8 +612,9 @@ def test_msg_time_resp(ConfigTsunInv1, MsgTimeResp): assert int(m.ctrl)==145 assert m.msg_id==34 assert m.header_len==23 + assert m.ts_offset==3600000 assert m.data_len==8 - assert m._forward_buffer==MsgTimeResp + assert m._forward_buffer==b'' assert m._send_buffer==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 m.close() @@ -525,12 +631,32 @@ def test_msg_time_resp_autark(ConfigNoTsunInv1, MsgTimeResp): assert int(m.ctrl)==145 assert m.msg_id==34 assert m.header_len==23 + assert m.ts_offset==3600000 assert m.data_len==8 assert m._forward_buffer==b'' assert m._send_buffer==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 m.close() +def test_msg_time_inv_resp(ConfigTsunInv1, MsgTimeRespInv): + ConfigTsunInv1 + m = MemoryStream(MsgTimeRespInv, (0,), False) + m.db.stat['proxy']['Unknown_Ctrl'] = 0 + m.read() # read complete msg, and dispatch msg + assert not m.header_valid # must be invalid, since msg was handled and buffer flushed + assert m.msg_count == 1 + assert m.id_str == b"R170000000000001" + assert m.unique_id == 'R170000000000001' + assert int(m.ctrl)==145 + assert m.msg_id==34 + assert m.header_len==23 + assert m.ts_offset==0 + assert m.data_len==4 + assert m._forward_buffer==MsgTimeRespInv + assert m._send_buffer==b'' + assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 + m.close() + def test_msg_time_invalid(ConfigTsunInv1, MsgTimeInvalid): ConfigTsunInv1 m = MemoryStream(MsgTimeInvalid, (0,), False) @@ -543,6 +669,7 @@ def test_msg_time_invalid(ConfigTsunInv1, MsgTimeInvalid): assert int(m.ctrl)==148 assert m.msg_id==34 assert m.header_len==23 + assert m.ts_offset==0 assert m.data_len==0 assert m._forward_buffer==MsgTimeInvalid assert m._send_buffer==b'' @@ -560,6 +687,7 @@ def test_msg_time_invalid_autark(ConfigNoTsunInv1, MsgTimeInvalid): assert m.unique_id == 'R170000000000001' assert int(m.ctrl)==148 assert m.msg_id==34 + assert m.ts_offset==0 assert m.header_len==23 assert m.data_len==0 assert m._forward_buffer==b'' @@ -567,7 +695,7 @@ def test_msg_time_invalid_autark(ConfigNoTsunInv1, MsgTimeInvalid): assert m.db.stat['proxy']['Unknown_Ctrl'] == 1 m.close() -def test_msg_cntrl_ind(ConfigTsunInv1, MsgControllerInd, MsgControllerAck): +def test_msg_cntrl_ind(ConfigTsunInv1, MsgControllerInd, MsgControllerIndTsOffs, MsgControllerAck): ConfigTsunInv1 m = MemoryStream(MsgControllerInd, (0,)) m.db.stat['proxy']['Unknown_Ctrl'] = 0 @@ -580,7 +708,12 @@ def test_msg_cntrl_ind(ConfigTsunInv1, MsgControllerInd, MsgControllerAck): assert m.msg_id==113 assert m.header_len==23 assert m.data_len==284 + m.ts_offset = 0 + m._update_header(m._forward_buffer) assert m._forward_buffer==MsgControllerInd + m.ts_offset = -4096 + m._update_header(m._forward_buffer) + assert m._forward_buffer==MsgControllerIndTsOffs assert m._send_buffer==MsgControllerAck assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 m.close() @@ -616,12 +749,17 @@ def test_msg_cntrl_invalid(ConfigTsunInv1, MsgControllerInvalid): assert m.msg_id==113 assert m.header_len==23 assert m.data_len==1 + m.ts_offset = 0 + m._update_header(m._forward_buffer) + assert m._forward_buffer==MsgControllerInvalid + m.ts_offset = -4096 + m._update_header(m._forward_buffer) assert m._forward_buffer==MsgControllerInvalid assert m._send_buffer==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 1 m.close() -def test_msg_inv_ind(ConfigTsunInv1, MsgInverterInd, MsgInverterAck): +def test_msg_inv_ind(ConfigTsunInv1, MsgInverterInd, MsgInverterIndTsOffs, MsgInverterAck): ConfigTsunInv1 tracer.setLevel(logging.DEBUG) m = MemoryStream(MsgInverterInd, (0,)) @@ -635,11 +773,62 @@ def test_msg_inv_ind(ConfigTsunInv1, MsgInverterInd, MsgInverterAck): assert m.msg_id==4 assert m.header_len==23 assert m.data_len==120 + m.ts_offset = 0 + m._update_header(m._forward_buffer) assert m._forward_buffer==MsgInverterInd + m.ts_offset = +256 + m._update_header(m._forward_buffer) + assert m._forward_buffer==MsgInverterIndTsOffs assert m._send_buffer==MsgInverterAck assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 m.close() - + +def test_msg_inv_ind2(ConfigTsunInv1, MsgInverterIndNew, MsgInverterIndTsOffs, MsgInverterAck): + ConfigTsunInv1 + tracer.setLevel(logging.DEBUG) + m = MemoryStream(MsgInverterIndNew, (0,)) + m.db.stat['proxy']['Unknown_Ctrl'] = 0 + m.db.stat['proxy']['Invalid_Data_Type'] = 0 + m.read() # read complete msg, and dispatch msg + assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 + assert m.db.stat['proxy']['Invalid_Data_Type'] == 0 + assert not m.header_valid # must be invalid, since msg was handled and buffer flushed + assert m.msg_count == 1 + assert m.id_str == b"R170000000000001" + assert m.unique_id == 'R170000000000001' + assert int(m.ctrl)==145 + assert m.msg_id==4 + assert m.header_len==23 + assert m.data_len==1165 + m.ts_offset = 0 + m._update_header(m._forward_buffer) + assert m._forward_buffer==MsgInverterIndNew + assert m._send_buffer==MsgInverterAck + m.close() + +def test_msg_inv_ind2(ConfigTsunInv1, MsgInverterIndNew, MsgInverterIndTsOffs, MsgInverterAck): + ConfigTsunInv1 + tracer.setLevel(logging.DEBUG) + m = MemoryStream(MsgInverterIndNew, (0,)) + m.db.stat['proxy']['Unknown_Ctrl'] = 0 + m.db.stat['proxy']['Invalid_Data_Type'] = 0 + m.read() # read complete msg, and dispatch msg + assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 + assert m.db.stat['proxy']['Invalid_Data_Type'] == 0 + assert not m.header_valid # must be invalid, since msg was handled and buffer flushed + assert m.msg_count == 1 + assert m.id_str == b"R170000000000001" + assert m.unique_id == 'R170000000000001' + assert int(m.ctrl)==145 + assert m.msg_id==4 + assert m.header_len==23 + assert m.data_len==1165 + m.ts_offset = 0 + m._update_header(m._forward_buffer) + assert m._forward_buffer==MsgInverterIndNew + assert m._send_buffer==MsgInverterAck + m.close() + def test_msg_inv_ack(ConfigTsunInv1, MsgInverterAck): ConfigTsunInv1 tracer.setLevel(logging.ERROR) @@ -673,6 +862,11 @@ def test_msg_inv_invalid(ConfigTsunInv1, MsgInverterInvalid): assert m.msg_id==4 assert m.header_len==23 assert m.data_len==1 + m.ts_offset = 0 + m._update_header(m._forward_buffer) + assert m._forward_buffer==MsgInverterInvalid + m.ts_offset = 256 + m._update_header(m._forward_buffer) assert m._forward_buffer==MsgInverterInvalid assert m._send_buffer==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 1 @@ -692,6 +886,11 @@ def test_msg_ota_req(ConfigTsunInv1, MsgOtaReq): assert m.msg_id==19 assert m.header_len==23 assert m.data_len==259 + m.ts_offset = 0 + m._update_header(m._forward_buffer) + assert m._forward_buffer==MsgOtaReq + m.ts_offset = 4096 + m._update_header(m._forward_buffer) assert m._forward_buffer==MsgOtaReq assert m._send_buffer==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 @@ -714,6 +913,11 @@ def test_msg_ota_ack(ConfigTsunInv1, MsgOtaAck): assert m.msg_id==19 assert m.header_len==23 assert m.data_len==1 + m.ts_offset = 0 + m._update_header(m._forward_buffer) + assert m._forward_buffer==MsgOtaAck + m.ts_offset = 256 + m._update_header(m._forward_buffer) assert m._forward_buffer==MsgOtaAck assert m._send_buffer==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 @@ -734,7 +938,12 @@ def test_msg_ota_invalid(ConfigTsunInv1, MsgOtaInvalid): assert m.msg_id==19 assert m.header_len==23 assert m.data_len==1 + m.ts_offset = 0 + m._update_header(m._forward_buffer) assert m._forward_buffer==MsgOtaInvalid + m.ts_offset = 4096 + assert m._forward_buffer==MsgOtaInvalid + m._update_header(m._forward_buffer) assert m._send_buffer==b'' assert m.db.stat['proxy']['Unknown_Ctrl'] == 1 assert m.db.stat['proxy']['OTA_Start_Msg'] == 0 @@ -847,7 +1056,7 @@ def test_msg_modbus_req(ConfigTsunInv1, MsgModbusCmd): ConfigTsunInv1 m = MemoryStream(b'') m.id_str = b"R170000000000001" - m.state = m.STATE_UP + m.state = State.up c = m.createClientStream(MsgModbusCmd) @@ -953,6 +1162,29 @@ def test_msg_modbus_rsp1(ConfigTsunInv1, MsgModbusRsp): assert m.db.stat['proxy']['Modbus_Command'] == 0 m.close() +def test_msg_modbus_cloud_rsp(ConfigTsunInv1, MsgModbusRsp): + '''Modbus response from TSUN without a valid Modbus request must be dropped''' + ConfigTsunInv1 + m = MemoryStream(MsgModbusRsp, (0,), False) + m.db.stat['proxy']['Unknown_Ctrl'] = 0 + m.db.stat['proxy']['Unknown_Msg'] = 0 + m.db.stat['proxy']['Modbus_Command'] = 0 + m.read() # read complete msg, and dispatch msg + assert not m.header_valid # must be invalid, since msg was handled and buffer flushed + assert m.msg_count == 1 + assert m.id_str == b"R170000000000001" + assert m.unique_id == 'R170000000000001' + assert int(m.ctrl)==145 + assert m.msg_id==119 + assert m.header_len==23 + assert m.data_len==13 + assert m._forward_buffer==b'' + assert m._send_buffer==b'' + assert m.db.stat['proxy']['Unknown_Msg'] == 1 + assert m.db.stat['proxy']['Unknown_Ctrl'] == 0 + assert m.db.stat['proxy']['Modbus_Command'] == 0 + m.close() + def test_msg_modbus_rsp2(ConfigTsunInv1, MsgModbusResp20): '''Modbus response with a valid Modbus request must be forwarded''' ConfigTsunInv1 @@ -1097,7 +1329,7 @@ async def test_msg_build_modbus_req(ConfigTsunInv1, MsgModbusCmd): assert m._send_buffer == b'' assert m.writer.sent_pdu == b'' - m.state = m.STATE_UP + m.state = State.up await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0, logging.DEBUG) assert 0 == m.send_msg_ofs assert m._forward_buffer == b'' @@ -1127,21 +1359,21 @@ def test_zombie_conn(ConfigTsunInv1, MsgInverterInd): m3 = MemoryStream(MsgInverterInd, (0,)) assert MemoryStream._RefNo == 3 + start_val assert m3.RefNo == 3 + start_val - assert m1.state == m1.STATE_INIT - assert m2.state == m2.STATE_INIT - assert m3.state == m3.STATE_INIT + assert m1.state == m1.State.init + assert m2.state == m2.State.init + assert m3.state == m3.State.init m1.read() # read complete msg, and set unique_id - assert m1.state == m1.STATE_UP - assert m2.state == m2.STATE_INIT - assert m3.state == m3.STATE_INIT + assert m1.state == m1.State.up + assert m2.state == m2.State.init + assert m3.state == m3.State.init m2.read() # read complete msg, and set unique_id - assert m1.state == m1.STATE_CLOSED - assert m2.state == m2.STATE_UP - assert m3.state == m3.STATE_INIT + assert m1.state == m1.State.closed + assert m2.state == m2.State.up + assert m3.state == m3.State.init m3.read() # read complete msg, and set unique_id - assert m1.state == m1.STATE_CLOSED - assert m2.state == m2.STATE_CLOSED - assert m3.state == m3.STATE_UP + assert m1.state == m1.State.closed + assert m2.state == m2.State.closed + assert m3.state == m3.State.up m1.close() m2.close() m3.close() diff --git a/app/tests/timestamp_old.svg b/app/tests/timestamp_old.svg new file mode 100644 index 0000000..739a420 --- /dev/null +++ b/app/tests/timestamp_old.svg @@ -0,0 +1,2 @@ + +InverterInverterProxyProxyCloudCloudMQTT-BrokerMQTT-BrokerContactIndstore Contact Infoin proxyContactRsp (Ok)getTimeReqContactIndContactRsp (Ok)getTimeReqTimeRsp (time)TimeRsp (time)set clock ininverterDataInd (ts:=time)DataRspDataInd (ts)DataIndDataRspDataInd (ts:=time)DataRspDataInd (ts)DataIndDataRsp \ No newline at end of file diff --git a/app/tests/timestamp_old.yuml b/app/tests/timestamp_old.yuml new file mode 100644 index 0000000..8f2e99c --- /dev/null +++ b/app/tests/timestamp_old.yuml @@ -0,0 +1,26 @@ +// {type:sequence} +// {generate:true} + +[Inverter]ContactInd>[Proxy] +[Proxy]-[note: store Contact Info in proxy{bg:cornsilk}] +[Proxy]ContactRsp (Ok).>[Inverter] + +[Inverter]getTimeReq>[Proxy] +[Proxy]ContactInd>[Cloud] +[Cloud]ContactRsp (Ok).>[Proxy] +[Proxy]getTimeReq>[Cloud] +[Cloud]TimeRsp (time).>[Proxy] +[Proxy]TimeRsp (time).>[Inverter] +[Inverter]-[note: set clock in inverter{bg:cornsilk}] + +[Inverter]DataInd (ts:=time)>[Proxy] +[Proxy]DataRsp>[Inverter] +[Proxy]DataInd (ts)>>[Cloud] +[Proxy]DataInd>>[MQTT-Broker] +[Cloud]DataRsp>>[Proxy] + +[Inverter]DataInd (ts:=time)>[Proxy] +[Proxy]DataRsp>[Inverter] +[Proxy]DataInd (ts)>>[Cloud] +[Proxy]DataInd>>[MQTT-Broker] +[Cloud]DataRsp>>[Proxy] diff --git a/docker-compose.yaml b/docker-compose.yaml index 4566a80..9b4f9e8 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -77,10 +77,15 @@ services: - ${DNS2:-4.4.4.4} ports: - 5005:5005 + - 8127:8127 - 10000:10000 volumes: - ${PROJECT_DIR:-./}tsun-proxy/log:/home/tsun-proxy/log - ${PROJECT_DIR:-./}tsun-proxy/config:/home/tsun-proxy/config + healthcheck: + test: wget --no-verbose --tries=1 --spider http://localhost:8127/-/healthy || exit 1 + interval: 10s + timeout: 3s networks: - outside diff --git a/system_tests/test_tcp_socket_v2.py b/system_tests/test_tcp_socket_v2.py index c41b41c..5e978de 100644 --- a/system_tests/test_tcp_socket_v2.py +++ b/system_tests/test_tcp_socket_v2.py @@ -89,6 +89,24 @@ def MsgDataResp(): # Contact Response message return msg +@pytest.fixture +def MsgInvalidInfo(): # Contact Info message wrong start byte + msg = b'\x47\xd4\x00\x10\x41\x00\x01' +get_sn() +b'\x02\xba\xd2\x00\x00' + msg += b'\x19\x00\x00\x00\x00\x00\x00\x00\x05\x3c\x78\x01\x64\x01\x4c\x53' + msg += b'\x57\x35\x42\x4c\x45\x5f\x31\x37\x5f\x30\x32\x42\x30\x5f\x31\x2e' + msg += b'\x30\x35\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x40\x2a\x8f\x4f\x51\x54\x31\x39\x32\x2e' + msg += b'\x31\x36\x38\x2e\x38\x30\x2e\x34\x39\x00\x00\x00\x0f\x00\x01\xb0' + msg += b'\x02\x0f\x00\xff\x56\x31\x2e\x31\x2e\x30\x30\x2e\x30\x42\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xfe\xfe\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x41\x6c\x6c\x69\x75\x73\x2d\x48\x6f' + msg += b'\x6d\x65\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x3c' + msg += b'\x15' + return msg @pytest.fixture(scope="session") @@ -155,4 +173,22 @@ def test_data_ind(ClientConnection,MsgDataInd, MsgDataResp): # time.sleep(2.5) checkResponse(data, MsgDataResp) +def test_inavlid_msg(ClientConnection,MsgInvalidInfo,MsgContactInfo, MsgContactResp): + s = ClientConnection + try: + s.sendall(MsgInvalidInfo) + # time.sleep(2.5) + data = s.recv(1024) + except TimeoutError: + pass + # time.sleep(2.5) + try: + s.sendall(MsgContactInfo) + # time.sleep(2.5) + data = s.recv(1024) + except TimeoutError: + pass + # time.sleep(2.5) + checkResponse(data, MsgContactResp) + \ No newline at end of file