Compare commits

...

85 Commits

Author SHA1 Message Date
Stefan Allius
9420a137dc update changelog 2024-07-01 22:33:10 +02:00
Stefan Allius
4abc308445 fix exception in MODBUS timeout callback 2024-06-30 00:27:58 +02:00
Stefan Allius
f527dfbbec update changelog 2024-06-30 00:06:39 +02:00
Stefan Allius
a79b36c361 update changelog 2024-06-29 23:52:12 +02:00
Stefan Allius
1357b0f665 cleanup shutdown
- stop webserver on shutdown
- enable asyncio debug mode for debug versions
2024-06-29 20:07:06 +02:00
Stefan Allius
d9b7b9e858 add asyncio log 2024-06-29 20:05:08 +02:00
Stefan Allius
7c48ee4065 rename python to debugpy 2024-06-27 21:01:32 +02:00
Stefan Allius
4e89abd2c9 fix timout calculation 2024-06-27 21:00:50 +02:00
Stefan Allius
f304aa009e add quit flag to docker push 2024-06-27 21:00:17 +02:00
Stefan Allius
9e218fdf41 fix Config.class_init()
- return error string or None
- release Schema structure after building thr config
2024-06-25 23:28:34 +02:00
Stefan Allius
18f6332784 fix timer cleanup 2024-06-25 23:13:59 +02:00
Stefan Allius
26aebbcab8 fix buildx warnings 2024-06-23 23:56:37 +02:00
Stefan Allius
a9c7ea386e S allius/issue111 (#112)
Synchronize regular MODBUS commands with the status of the inverter to prevent the inverter from crashing due to unexpected packets.

* inital checkin

* remove crontab entry for regular MODBUS cmds

* add timer for regular MODBUS polling

* fix Stop method call for already stopped timer

* optimize MB_START_TIMEOUT value

* cleanup

* update changelog
2024-06-23 22:23:48 +02:00
Stefan Allius
6332976c4a S allius/issue102 (#110)
* hotfix: don't send two MODBUS commands together

* fix unit tests

* remove read loop

* optional sleep between msg read and sending rsp

* wait after read 0.5s before sending a response

* add pending state

* fix state definitions

* determine the connection timeout by the conn state

* avoid sending MODBUS cmds in the inverter's reporting phase

* update changelog
2024-06-23 15:06:43 +02:00
Stefan Allius
cc233dcb17 S allius/issue108 (#109)
* add more data types

* adapt unittests

* improve test coverage

* fix linter warning

* update changelog
2024-06-23 00:52:42 +02:00
Stefan Allius
9a9cf79aac fix unittests 2024-06-21 23:38:07 +02:00
Stefan Allius
3ce29d4a96 fix merge conflict 2024-06-21 19:26:27 +02:00
Stefan Allius
a09d489c94 Merge branch 'main' of https://github.com/s-allius/tsun-gen3-proxy into dev-0.9 2024-06-21 19:25:37 +02:00
Stefan Allius
2d4679a361 S allius/issue100 (#101)
* detect dead connections

- disconnect connection on Msg receive timeout
- improve connection trace (add connection id)

* update changelog
2024-06-17 23:10:54 +02:00
Stefan Allius
9ff1453922 Merge pull request #99 from s-allius/health_check
Health check
2024-06-16 23:08:12 +02:00
Stefan Allius
5b36efc5e9 Merge branch 'dev-0.9.0' into health_check 2024-06-16 23:07:47 +02:00
Stefan Allius
c71994c839 update changelog 2024-06-16 22:58:04 +02:00
Stefan Allius
7d058e74fe log healthcheck infos with DEBUG level 2024-06-16 22:54:56 +02:00
Stefan Allius
373916bead add healthy method 2024-06-16 22:47:45 +02:00
Stefan Allius
f4b434cfef set new state State.received 2024-06-16 22:45:13 +02:00
Stefan Allius
d14cbe87a2 add docstrings to state enum 2024-06-16 22:43:59 +02:00
Stefan Allius
8aa1ef59ce updat changelog 2024-06-16 19:35:38 +02:00
Stefan Allius
3d55ac57a8 Update CHANGELOG.md 2024-06-16 19:17:11 +02:00
Stefan Allius
8088e6ab3c cleanup 2024-06-16 18:13:07 +02:00
Stefan Allius
4372e49a1e add HTTP server for healthcheck 2024-06-16 17:51:51 +02:00
Stefan Allius
da832232bb calc processing time for healthcheck 2024-06-16 17:51:14 +02:00
Stefan Allius
e0568291f6 use Enum class for State 2024-06-16 17:50:09 +02:00
Stefan Allius
f5e7aa4292 add aiohttp 2024-06-16 17:48:17 +02:00
Stefan Allius
5e360e1139 add wget for healtcheck 2024-06-16 17:47:46 +02:00
Stefan Allius
94f7f5faa2 complete exposed port list 2024-06-16 17:47:13 +02:00
Stefan Allius
4600fc9577 add healtcheck 2024-06-16 17:46:51 +02:00
Stefan Allius
fa7bfe9e16 log unrelease references 2024-06-16 13:29:43 +02:00
Stefan Allius
3cebab40c8 add heaithy handler 2024-06-16 13:26:05 +02:00
Stefan Allius
4649beb075 Merge pull request #97 from s-allius/s-allius/issue93
S allius/issue93
2024-06-16 13:09:16 +02:00
Stefan Allius
9138affdb9 update changelog 2024-06-16 13:05:20 +02:00
Stefan Allius
80183598ca cleanup 2024-06-16 13:03:33 +02:00
Stefan Allius
b688d04836 isolate Modbus fix 2024-06-16 13:00:02 +02:00
Stefan Allius
377c09bc66 Merge branch 'dev-0.9.0' of https://github.com/s-allius/tsun-gen3-proxy into s-allius/issue93 2024-06-16 12:39:56 +02:00
Stefan Allius
abb9e7c280 Merge pull request #96 from s-allius/s-allius/issue94
S allius/issue94
2024-06-16 12:35:12 +02:00
Stefan Allius
d78e32dd12 update changelog 2024-06-16 12:26:55 +02:00
Stefan Allius
30a6f75430 Merge branch 'dev-0.9.0' of https://github.com/s-allius/tsun-gen3-proxy into s-allius/issue94 2024-06-16 12:23:57 +02:00
Stefan Allius
e22ad78dcd add exception handling for forward handler 2024-06-16 12:23:13 +02:00
Stefan Allius
453d8b2aa2 call modbus close hanlder on a close call 2024-06-16 11:57:51 +02:00
Stefan Allius
f9b02f3486 add a close handler to release internal resources 2024-06-16 11:56:03 +02:00
Stefan Allius
b053c7e576 Update async_stream.py
- check if processing time is < 5 sec
2024-06-16 02:08:15 +02:00
Stefan Allius
10346e888f log ConfigErr with DEBUG level 2024-06-16 01:52:34 +02:00
Stefan Allius
f629246dbd fix typo 2024-06-16 01:18:06 +02:00
Stefan Allius
dbff66affd add healthy check methods 2024-06-15 23:36:59 +02:00
Stefan Allius
ac534c20ed calculate msg prossesing time 2024-06-15 23:34:11 +02:00
Stefan Allius
ff3ed83b49 add http server for healthcheck 2024-06-15 23:29:27 +02:00
Stefan Allius
ae94cd62fc use config validation for healthcheck 2024-06-15 23:23:57 +02:00
Stefan Allius
a16a19cc2c add aiohttp 2024-06-15 23:21:15 +02:00
Stefan Allius
dd351176bd add wget for healthcheck 2024-06-15 23:20:38 +02:00
Stefan Allius
cc8674d108 add exposed ports and healthcheck 2024-06-15 23:19:10 +02:00
Stefan Allius
d7767cb5ea update changelog 2024-06-14 20:11:17 +02:00
Stefan Allius
1e3bb31ef8 Merge pull request #90 from s-allius/s-allius/issue56
S allius/issue56
2024-06-14 00:05:48 +02:00
Stefan Allius
d6a44d9173 update changelog 2024-06-13 23:52:13 +02:00
Stefan Allius
43a2ef5712 add systemtest with invalid start byte 2024-06-13 23:45:22 +02:00
Stefan Allius
3209ebabde fix warnings 2024-06-13 23:44:57 +02:00
Stefan Allius
aac6cfd629 dump droped packages 2024-06-13 23:43:05 +02:00
Stefan Allius
e8d32b45a5 label debug images with debug 2024-06-13 23:41:30 +02:00
Stefan Allius
06b63f554d addapt unit test 2024-06-09 11:41:29 +02:00
Stefan Allius
53f6a5447d cleanup msg_get_time handler 2024-06-09 11:41:01 +02:00
Stefan Allius
d6093e6b11 fix pytest collect warning 2024-06-09 11:40:08 +02:00
Stefan Allius
c8113e2f60 update changelog 2024-06-09 11:29:43 +02:00
Stefan Allius
57d6785f15 print image build time during proxy start 2024-06-09 11:22:23 +02:00
Stefan Allius
ff8adb5632 fix solarman unit tests
- fake Mqtt class
2024-06-09 11:02:43 +02:00
Stefan Allius
1deab4be6a fix imports 2024-06-09 11:01:04 +02:00
Stefan Allius
730229cfb0 don't mark all test as async 2024-06-09 01:26:21 +02:00
Stefan Allius
7b9550773d don't use depricated varn anymore 2024-06-09 01:25:06 +02:00
Stefan Allius
3bc2b262b5 add more type annotations 2024-06-08 23:59:13 +02:00
Stefan Allius
37c2246132 fix names of issue branches 2024-06-08 23:57:46 +02:00
Stefan Allius
d0bd599420 fix Generator annotation for ha_proxy_confs 2024-06-08 23:54:52 +02:00
Stefan Allius
661f699444 Merge branch 'main' of https://github.com/s-allius/tsun-gen3-proxy into s-allius/issue56 2024-06-08 23:35:18 +02:00
Stefan Allius
a499c5e6b0 add more type annotations 2024-06-08 23:33:25 +02:00
Stefan Allius
9985917ad2 add more type annotations 2024-06-08 23:15:38 +02:00
Stefan Allius
851bd54d8f Merge branch 'dev-0.9.0' of https://github.com/s-allius/tsun-gen3-proxy into s-allius/issue56 2024-06-08 00:08:54 +02:00
Stefan Allius
81d551e47f initial version 2024-04-30 11:49:59 +02:00
Stefan Allius
63547bb51f adapt tests for stateless timestamp handling 2024-04-29 22:51:31 +02:00
Stefan Allius
6eebd0c852 make timestamp handling stateless 2024-04-29 22:48:41 +02:00
31 changed files with 1047 additions and 253 deletions

2
.vscode/launch.json vendored
View File

@@ -6,7 +6,7 @@
"configurations": [
{
"name": "Python: Aktuelle Datei",
"type": "python",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",

View File

@@ -5,7 +5,33 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [unreleased]
## [0.9.0] - 2024-07-01
- fix exception in MODBUS timeout callback
## [0.9.0-RC1] - 2024-06-29
- add asyncio log and debug mode
- stop the HTTP server on shutdown gracefully
- Synchronize regular MODBUS commands with the status of the inverter to prevent the inverter from crashing due to
unexpected packets. [#111](https://github.com/s-allius/tsun-gen3-proxy/issues/111)
- GEN3: avoid sending MODBUS commands to the inverter during the inverter's reporting phase
- GEN3: determine the connection timeout based on the connection state
- GEN3: support more data encodings for DSP version V5.0.17 [#108](https://github.com/s-allius/tsun-gen3-proxy/issues/108)
- detect dead connections [#100](https://github.com/s-allius/tsun-gen3-proxy/issues/100)
- improve connection logging wirt a unique connection id
- Add healthcheck, readiness and liveness checks [#91](https://github.com/s-allius/tsun-gen3-proxy/issues/91)
- MODBUS close handler releases internal resource [#93](https://github.com/s-allius/tsun-gen3-proxy/issues/93)
- add exception handling for message forwarding [#94](https://github.com/s-allius/tsun-gen3-proxy/issues/94)
- GEN3: make timestamp handling stateless, to avoid blocking when the TSUN cloud is down [#56](https://github.com/s-allius/tsun-gen3-proxy/issues/56)
- GEN3PLUS: dump invalid packages with wrong start or stop byte
- label debug imagages als `debug`
- print imgae build time during proxy start
- add type annotations
- improve async unit test and fix pytest warnings
- run github tests even for pulls on issue branches
## [0.8.1] - 2024-06-21
@@ -24,7 +50,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- catch all OSError errors in the read loop
- log Modbus traces with different log levels
- add Modbus fifo and timeout handler
- build version string in the same format as TSUN for GEN3 invterts
- build version string in the same format as TSUN for GEN3 inverters
- add graceful shutdown
- parse Modbus values and store them in the database
- add cron task to request the output power every minute

View File

@@ -15,7 +15,7 @@ RUN apk upgrade --no-cache && \
#
# second stage for building wheels packages
FROM base as builder
FROM base AS builder
# copy the dependencies file to the root dir and install requirements
COPY ./requirements.txt /root/
@@ -26,7 +26,7 @@ RUN apk add --no-cache build-base && \
#
# third stage for our runtime image
FROM base as runtime
FROM base AS runtime
ARG SERVICE_NAME
ARG VERSION
ARG UID
@@ -63,8 +63,8 @@ RUN python -m pip install --no-cache --no-index /root/wheels/* && \
COPY --chmod=0700 entrypoint.sh /root/entrypoint.sh
COPY config .
COPY src .
EXPOSE 5005
RUN date > /build-date.txt
EXPOSE 5005 8127 10000
# command to run on container start
ENTRYPOINT ["/root/entrypoint.sh"]
@@ -73,7 +73,7 @@ CMD [ "python3", "./server.py" ]
LABEL org.opencontainers.image.title="TSUN Gen3 Proxy"
LABEL org.opencontainers.image.authors="Stefan Allius"
LABEL org.opencontainers.image.source https://github.com/s-allius/tsun-gen3-proxy
LABEL org.opencontainers.image.description 'This proxy enables a reliable connection between TSUN third generation inverters (eg. TSOL MS600, MS800, MS2000) and an MQTT broker to integrate the inverter into typical home automations.'
LABEL org.opencontainers.image.source=https://github.com/s-allius/tsun-gen3-proxy
LABEL org.opencontainers.image.description='This proxy enables a reliable connection between TSUN third generation inverters (eg. TSOL MS600, MS800, MS2000) and an MQTT broker to integrate the inverter into typical home automations.'
LABEL org.opencontainers.image.licenses="BSD-3-Clause"
LABEL org.opencontainers.image.vendor="Stefan Allius"

View File

@@ -4,7 +4,7 @@
# rc: release candidate build
# rel: release build and push to ghcr.io
# Note: for release build, you need to set GHCR_TOKEN
# export GHCR_TOKEN=<YOUR_GITHUB_TOKEN> in your .profile
# export GHCR_TOKEN=<YOUR_GITHUB_TOKEN> in your .zprofile
# see also: https://docs.github.com/en/packages/working-with-a-github-packages-registry/working-with-the-container-registry
@@ -31,7 +31,7 @@ fi
echo version: $VERSION build-date: $BUILD_DATE image: $IMAGE
if [[ $1 == debug ]];then
docker build --build-arg "VERSION=${VERSION}" --build-arg environment=dev --build-arg "LOG_LVL=DEBUG" --label "org.opencontainers.image.created=${BUILD_DATE}" --label "org.opencontainers.image.version=${VERSION}" --label "org.opencontainers.image.revision=${BRANCH}" -t ${IMAGE}:dev app
docker build --build-arg "VERSION=${VERSION}" --build-arg environment=dev --build-arg "LOG_LVL=DEBUG" --label "org.opencontainers.image.created=${BUILD_DATE}" --label "org.opencontainers.image.version=${VERSION}" --label "org.opencontainers.image.revision=${BRANCH}" -t ${IMAGE}:debug app
elif [[ $1 == dev ]];then
docker build --build-arg "VERSION=${VERSION}" --build-arg environment=production --label "org.opencontainers.image.created=${BUILD_DATE}" --label "org.opencontainers.image.version=${VERSION}" --label "org.opencontainers.image.revision=${BRANCH}" -t ${IMAGE}:dev app
@@ -39,16 +39,16 @@ elif [[ $1 == rc ]];then
docker build --build-arg "VERSION=${VERSION}" --build-arg environment=production --label "org.opencontainers.image.created=${BUILD_DATE}" --label "org.opencontainers.image.version=${VERSION}" --label "org.opencontainers.image.revision=${BRANCH}" -t ${IMAGE}:rc -t ${IMAGE}:${VERSION} app
echo 'login to ghcr.io'
echo $GHCR_TOKEN | docker login ghcr.io -u s-allius --password-stdin
docker push ghcr.io/s-allius/tsun-gen3-proxy:rc
docker push ghcr.io/s-allius/tsun-gen3-proxy:${VERSION}
docker push -q ghcr.io/s-allius/tsun-gen3-proxy:rc
docker push -q ghcr.io/s-allius/tsun-gen3-proxy:${VERSION}
elif [[ $1 == rel ]];then
docker build --no-cache --build-arg "VERSION=${VERSION}" --build-arg environment=production --label "org.opencontainers.image.created=${BUILD_DATE}" --label "org.opencontainers.image.version=${VERSION}" --label "org.opencontainers.image.revision=${BRANCH}" -t ${IMAGE}:latest -t ${IMAGE}:${MAJOR} -t ${IMAGE}:${VERSION} app
echo 'login to ghcr.io'
echo $GHCR_TOKEN | docker login ghcr.io -u s-allius --password-stdin
docker push ghcr.io/s-allius/tsun-gen3-proxy:latest
docker push ghcr.io/s-allius/tsun-gen3-proxy:${MAJOR}
docker push ghcr.io/s-allius/tsun-gen3-proxy:${VERSION}
docker push -q ghcr.io/s-allius/tsun-gen3-proxy:latest
docker push -q ghcr.io/s-allius/tsun-gen3-proxy:${MAJOR}
docker push -q ghcr.io/s-allius/tsun-gen3-proxy:${VERSION}
fi
echo 'check docker-compose.yaml file'

View File

@@ -5,6 +5,7 @@ user="$(id -u)"
echo "######################################################"
echo "# prepare: '$SERVICE_NAME' Version:$VERSION"
echo "# for running with UserID:$UID, GroupID:$GID"
echo "# Image built: $(cat /build-date.txt) "
echo "#"
if [ "$user" = '0' ]; then

View File

@@ -17,6 +17,5 @@ if [ "$environment" = "production" ] ; then \
-name od -o \
-name strings -o \
-name su -o \
-name wget -o \
\) -delete \
; fi

View File

@@ -1,3 +1,4 @@
aiomqtt==2.0.1
schema==0.7.5
aiocron==1.8
aiocron==1.8
aiohttp==3.9.5

View File

@@ -1,44 +1,77 @@
import asyncio
import logging
import traceback
from messages import hex_dump_memory
import time
from asyncio import StreamReader, StreamWriter
from messages import hex_dump_memory, State
from typing import Self
from itertools import count
import gc
logger = logging.getLogger('conn')
class AsyncStream():
_ids = count(0)
MAX_PROC_TIME = 2
'''maximum processing time for a received msg in sec'''
MAX_START_TIME = 400
'''maximum time without a received msg in sec'''
MAX_INV_IDLE_TIME = 90
'''maximum time without a received msg from the inverter in sec'''
MAX_CLOUD_IDLE_TIME = 360
'''maximum time without a received msg from cloud side in sec'''
def __init__(self, reader, writer, addr) -> None:
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr) -> None:
logger.debug('AsyncStream.__init__')
self.reader = reader
self.writer = writer
self.addr = addr
self.r_addr = ''
self.l_addr = ''
self.conn_no = next(self._ids)
self.proc_start = None # start processing start timestamp
self.proc_max = 0
async def server_loop(self, addr):
def __timeout(self) -> int:
if self.state == State.init:
to = self.MAX_START_TIME
else:
if self.server_side:
to = self.MAX_INV_IDLE_TIME
else:
to = self.MAX_CLOUD_IDLE_TIME
return to
async def server_loop(self, addr: str) -> None:
'''Loop for receiving messages from the inverter (server-side)'''
logging.info(f'[{self.node_id}] Accept connection from {addr}')
logger.info(f'[{self.node_id}:{self.conn_no}] '
f'Accept connection from {addr}')
self.inc_counter('Inverter_Cnt')
await self.loop()
self.dec_counter('Inverter_Cnt')
logging.info(f'[{self.node_id}] Server loop stopped for'
f' r{self.r_addr}')
logger.info(f'[{self.node_id}:{self.conn_no}] Server loop stopped for'
f' r{self.r_addr}')
# if the server connection closes, we also have to disconnect
# the connection to te TSUN cloud
if self.remoteStream:
logging.debug("disconnect client connection")
logger.info(f'[{self.node_id}:{self.conn_no}] disc client '
f'connection: [{self.remoteStream.node_id}:'
f'{self.remoteStream.conn_no}]')
await self.remoteStream.disc()
try:
await self._async_publ_mqtt_proxy_stat('proxy')
except Exception:
pass
async def client_loop(self, addr):
async def client_loop(self, addr: str) -> None:
'''Loop for receiving messages from the TSUN cloud (client-side)'''
clientStream = await self.remoteStream.loop()
logging.info(f'[{self.node_id}] Client loop stopped for'
f' l{clientStream.l_addr}')
logger.info(f'[{clientStream.node_id}:{clientStream.conn_no}] '
'Client loop stopped for'
f' l{clientStream.l_addr}')
# if the client connection closes, we don't touch the server
# connection. Instead we erase the client connection stream,
@@ -54,28 +87,45 @@ class AsyncStream():
# than erase client connection
self.remoteStream = None
async def loop(self):
async def loop(self) -> Self:
"""Async loop handler for precessing all received messages"""
self.r_addr = self.writer.get_extra_info('peername')
self.l_addr = self.writer.get_extra_info('sockname')
self.proc_start = time.time()
while True:
try:
await self.__async_read()
proc = time.time() - self.proc_start
if proc > self.proc_max:
self.proc_max = proc
self.proc_start = None
dead_conn_to = self.__timeout()
await asyncio.wait_for(self.__async_read(),
dead_conn_to)
if self.unique_id:
await self.async_write()
await self.__async_forward()
await self.async_publ_mqtt()
except asyncio.TimeoutError:
logger.warning(f'[{self.node_id}:{self.conn_no}] Dead '
f'connection timeout ({dead_conn_to}s) '
f'for {self.l_addr}')
await self.disc()
self.close()
return self
except OSError as error:
logger.error(f'[{self.node_id}] {error} for l{self.l_addr} | '
logger.error(f'[{self.node_id}:{self.conn_no}] '
f'{error} for l{self.l_addr} | '
f'r{self.r_addr}')
await self.disc()
self.close()
return self
except RuntimeError as error:
logger.info(f"[{self.node_id}] {error} for {self.l_addr}")
logger.info(f'[{self.node_id}:{self.conn_no}] '
f'{error} for {self.l_addr}')
await self.disc()
self.close()
return self
@@ -86,31 +136,8 @@ class AsyncStream():
f"Exception for {self.addr}:\n"
f"{traceback.format_exc()}")
async def disc(self) -> None:
if self.writer.is_closing():
return
logger.debug(f'AsyncStream.disc() l{self.l_addr} | r{self.r_addr}')
self.writer.close()
await self.writer.wait_closed()
def close(self):
if self.writer.is_closing():
return
logger.debug(f'AsyncStream.close() l{self.l_addr} | r{self.r_addr}')
self.writer.close()
'''
Our private methods
'''
async def __async_read(self) -> None:
data = await self.reader.read(4096)
if data:
self._recv_buffer += data
self.read() # call read in parent class
else:
raise RuntimeError("Peer closed.")
async def async_write(self, headline='Transmit to ') -> None:
async def async_write(self, headline: str = 'Transmit to ') -> None:
"""Async write handler to transmit the send_buffer"""
if self._send_buffer:
hex_dump_memory(logging.INFO, f'{headline}{self.addr}:',
self._send_buffer, len(self._send_buffer))
@@ -118,8 +145,57 @@ class AsyncStream():
await self.writer.drain()
self._send_buffer = bytearray(0) # self._send_buffer[sent:]
async def disc(self) -> None:
"""Async disc handler for graceful disconnect"""
if self.writer.is_closing():
return
logger.debug(f'AsyncStream.disc() l{self.l_addr} | r{self.r_addr}')
self.writer.close()
await self.writer.wait_closed()
def close(self) -> None:
"""close handler for a no waiting disconnect
hint: must be called before releasing the connection instance
"""
self.reader.feed_eof() # abort awaited read
if self.writer.is_closing():
return
logger.debug(f'AsyncStream.close() l{self.l_addr} | r{self.r_addr}')
self.writer.close()
def healthy(self) -> bool:
elapsed = 0
if self.proc_start is not None:
elapsed = time.time() - self.proc_start
if self.state == State.closed or elapsed > self.MAX_PROC_TIME:
logging.debug(f'[{self.node_id}:{self.conn_no}:'
f'{type(self).__name__}]'
f' act:{round(1000*elapsed)}ms'
f' max:{round(1000*self.proc_max)}ms')
logging.debug(f'Healthy()) refs: {gc.get_referrers(self)}')
return elapsed < 5
'''
Our private methods
'''
async def __async_read(self) -> None:
"""Async read handler to read received data from TCP stream"""
data = await self.reader.read(4096)
if data:
self.proc_start = time.time()
self._recv_buffer += data
wait = self.read() # call read in parent class
if wait > 0:
await asyncio.sleep(wait)
else:
raise RuntimeError("Peer closed.")
async def __async_forward(self) -> None:
if self._forward_buffer:
"""forward handler transmits data over the remote connection"""
if not self._forward_buffer:
return
try:
if not self.remoteStream:
await self.async_create_remote()
if self.remoteStream:
@@ -136,6 +212,30 @@ class AsyncStream():
await self.remoteStream.writer.drain()
self._forward_buffer = bytearray(0)
except OSError as error:
if self.remoteStream:
rmt = self.remoteStream
self.remoteStream = None
logger.error(f'[{rmt.node_id}:{rmt.conn_no}] Fwd: {error} for '
f'l{rmt.l_addr} | r{rmt.r_addr}')
await rmt.disc()
rmt.close()
except RuntimeError as error:
if self.remoteStream:
rmt = self.remoteStream
self.remoteStream = None
logger.info(f'[{rmt.node_id}:{rmt.conn_no}] '
f'Fwd: {error} for {rmt.l_addr}')
await rmt.disc()
rmt.close()
except Exception:
self.inc_counter('SW_Exception')
logger.error(
f"Fwd Exception for {self.addr}:\n"
f"{traceback.format_exc()}")
def __del__(self):
logger.debug(
f"AsyncStream.__del__ l{self.l_addr} | r{self.r_addr}")

View File

@@ -84,7 +84,7 @@ class Config():
)
@classmethod
def class_init(cls): # pragma: no cover
def class_init(cls) -> None | str: # pragma: no cover
try:
# make the default config transparaent by copying it
# in the config.example file
@@ -94,7 +94,9 @@ class Config():
"config/config.example.toml")
except Exception:
pass
cls.read()
err_str = cls.read()
del cls.conf_schema
return err_str
@classmethod
def _read_config_file(cls) -> dict: # pragma: no cover

View File

@@ -1,5 +1,6 @@
import logging
# import gc
from asyncio import StreamReader, StreamWriter
from async_stream import AsyncStream
from gen3.talent import Talent
@@ -8,12 +9,13 @@ logger = logging.getLogger('conn')
class ConnectionG3(AsyncStream, Talent):
def __init__(self, reader, writer, addr, remote_stream, server_side: bool,
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, remote_stream: 'ConnectionG3', server_side: bool,
id_str=b'') -> None:
AsyncStream.__init__(self, reader, writer, addr)
Talent.__init__(self, server_side, id_str)
self.remoteStream = remote_stream
self.remoteStream: 'ConnectionG3' = remote_stream
'''
Our puplic methods
@@ -29,6 +31,10 @@ class ConnectionG3(AsyncStream, Talent):
async def async_publ_mqtt(self) -> None:
pass
def healthy(self) -> bool:
logger.debug('ConnectionG3 healthy()')
return AsyncStream.healthy(self)
'''
Our private methods
'''

View File

@@ -132,11 +132,24 @@ class InfosG3(Infos):
errors='replace')
ind += str_len+1
elif data_type == 0x00: # 'Nul' -> end
i = elms # abort the loop
elif data_type == 0x41: # 'A' -> Nop ??
# result = struct.unpack_from('!l', buf, ind)[0]
ind += 0
i += 1
continue
elif data_type == 0x42: # 'B' -> byte, int8
result = struct.unpack_from('!B', buf, ind)[0]
ind += 1
elif data_type == 0x49: # 'I' -> int32
result = struct.unpack_from('!l', buf, ind)[0]
ind += 4
elif data_type == 0x53: # 'S' -> short
elif data_type == 0x53: # 'S' -> short, int16
result = struct.unpack_from('!h', buf, ind)[0]
ind += 2
@@ -144,13 +157,14 @@ class InfosG3(Infos):
result = round(struct.unpack_from('!f', buf, ind)[0], 2)
ind += 4
elif data_type == 0x4c: # 'L' -> int64
elif data_type == 0x4c: # 'L' -> long, int64
result = struct.unpack_from('!q', buf, ind)[0]
ind += 8
else:
self.inc_counter('Invalid_Data_Type')
logging.error(f"Infos.parse: data_type: {data_type}"
f" @0x{addr:04x} No:{i}"
" not supported")
return

View File

@@ -1,7 +1,8 @@
import asyncio
import logging
import traceback
import json
import asyncio
from asyncio import StreamReader, StreamWriter
from config import Config
from inverter import Inverter
from gen3.connection_g3 import ConnectionG3
@@ -44,7 +45,7 @@ class InverterG3(Inverter, ConnectionG3):
destroyed
'''
def __init__(self, reader, writer, addr):
def __init__(self, reader: StreamReader, writer: StreamWriter, addr):
super().__init__(reader, writer, addr, None, True)
self.__ha_restarts = -1
@@ -56,11 +57,14 @@ class InverterG3(Inverter, ConnectionG3):
addr = (host, port)
try:
logging.info(f'[{self.node_id}] Connected to {addr}')
logging.info(f'[{self.node_id}] Connect to {addr}')
connect = asyncio.open_connection(host, port)
reader, writer = await connect
self.remoteStream = ConnectionG3(reader, writer, addr, self,
False, self.id_str)
logging.info(f'[{self.remoteStream.node_id}:'
f'{self.remoteStream.conn_no}] '
f'Connected to {addr}')
asyncio.create_task(self.client_loop(addr))
except (ConnectionRefusedError, TimeoutError) as error:
@@ -121,7 +125,7 @@ class InverterG3(Inverter, ConnectionG3):
def close(self) -> None:
logging.debug(f'InverterG3.close() l{self.l_addr} | r{self.r_addr}')
super().close() # call close handler in the parent class
# logger.debug (f'Inverter refs: {gc.get_referrers(self)}')
# logging.info(f'Inverter refs: {gc.get_referrers(self)}')
def __del__(self):
logging.debug("InverterG3.__del__")

View File

@@ -4,13 +4,15 @@ import time
from datetime import datetime
if __name__ == "app.src.gen3.talent":
from app.src.messages import hex_dump_memory, Message
from app.src.messages import hex_dump_memory, Message, State
from app.src.modbus import Modbus
from app.src.my_timer import Timer
from app.src.config import Config
from app.src.gen3.infos_g3 import InfosG3
else: # pragma: no cover
from messages import hex_dump_memory, Message
from messages import hex_dump_memory, Message, State
from modbus import Modbus
from my_timer import Timer
from config import Config
from gen3.infos_g3 import InfosG3
@@ -35,12 +37,16 @@ class Control:
class Talent(Message):
MB_START_TIMEOUT = 40
MB_REGULAR_TIMEOUT = 60
def __init__(self, server_side: bool, id_str=b''):
super().__init__(server_side, self.send_modbus_cb, mb_timeout=11)
self.await_conn_resp_cnt = 0
self.id_str = id_str
self.contact_name = b''
self.contact_mail = b''
self.ts_offset = 0 # time offset between tsun cloud and local
self.db = InfosG3()
self.switch = {
0x00: self.msg_contact_info,
@@ -64,19 +70,20 @@ class Talent(Message):
}
self.modbus_elms = 0 # for unit tests
self.node_id = 'G3' # will be overwritten in __set_serial_no
# self.forwarding = Config.get('tsun')['enabled']
self.mb_timer = Timer(self.mb_timout_cb, self.node_id)
'''
Our puplic methods
'''
def close(self) -> None:
logging.debug('Talent.close()')
# we have refernces to methods of this class in self.switch
# we have references to methods of this class in self.switch
# so we have to erase self.switch, otherwise this instance can't be
# deallocated by the garbage collector ==> we get a memory leak
self.switch.clear()
self.log_lvl.clear()
self.state = self.STATE_CLOSED
self.state = State.closed
self.mb_timer.close()
super().close()
def __set_serial_no(self, serial_no: str):
@@ -105,7 +112,7 @@ class Talent(Message):
self.unique_id = serial_no
def read(self) -> None:
def read(self) -> float:
self._read()
if not self.header_valid:
@@ -113,6 +120,9 @@ class Talent(Message):
if self.header_valid and len(self._recv_buffer) >= (self.header_len +
self.data_len):
if self.state == State.init:
self.state = State.received # received 1st package
log_lvl = self.log_lvl.get(self.msg_id, logging.WARNING)
if callable(log_lvl):
log_lvl = log_lvl()
@@ -123,7 +133,7 @@ class Talent(Message):
self.__set_serial_no(self.id_str.decode("utf-8"))
self.__dispatch_msg()
self.__flush_recv_msg()
return
return 0.5 # wait 500ms before sending a response
def forward(self, buffer, buflen) -> None:
tsun = Config.get('tsun')
@@ -140,9 +150,9 @@ class Talent(Message):
return
def send_modbus_cb(self, modbus_pdu: bytearray, log_lvl: int, state: str):
if self.state != self.STATE_UP:
logger.warn(f'[{self.node_id}] ignore MODBUS cmd,'
' cause the state is not UP anymore')
if self.state != State.up:
logger.warning(f'[{self.node_id}] ignore MODBUS cmd,'
' cause the state is not UP anymore')
return
self.__build_header(0x70, 0x77)
@@ -156,13 +166,25 @@ class Talent(Message):
self.writer.write(self._send_buffer)
self._send_buffer = bytearray(0) # self._send_buffer[sent:]
async def send_modbus_cmd(self, func, addr, val, log_lvl) -> None:
if self.state != self.STATE_UP:
def _send_modbus_cmd(self, func, addr, val, log_lvl) -> None:
if self.state != State.up:
logger.log(log_lvl, f'[{self.node_id}] ignore MODBUS cmd,'
' as the state is not UP')
return
self.mb.build_msg(Modbus.INV_ADDR, func, addr, val, log_lvl)
async def send_modbus_cmd(self, func, addr, val, log_lvl) -> None:
self._send_modbus_cmd(func, addr, val, log_lvl)
def mb_timout_cb(self, exp_cnt):
self.mb_timer.start(self.MB_REGULAR_TIMEOUT)
if 0 == (exp_cnt % 30):
# logging.info("Regular Modbus Status request")
self._send_modbus_cmd(Modbus.READ_REGS, 0x2007, 2, logging.DEBUG)
else:
self._send_modbus_cmd(Modbus.READ_REGS, 0x3008, 21, logging.DEBUG)
def _init_new_client_conn(self) -> bool:
contact_name = self.contact_name
contact_mail = self.contact_mail
@@ -204,6 +226,24 @@ class Talent(Message):
ts = (datetime.now() - datetime(1970, 1, 1)).total_seconds()
return round(ts*1000)
def _update_header(self, _forward_buffer):
'''update header for message before forwarding,
add time offset to timestamp'''
_len = len(_forward_buffer)
result = struct.unpack_from('!lB', _forward_buffer, 0)
id_len = result[1] # len of variable id string
if _len < 2*id_len + 21:
return
result = struct.unpack_from('!B', _forward_buffer, id_len+6)
msg_code = result[0]
if msg_code == 0x71 or msg_code == 0x04:
result = struct.unpack_from('!q', _forward_buffer, 13+2*id_len)
ts = result[0] + self.ts_offset
logger.debug(f'offset: {self.ts_offset:08x}'
f' proxy-time: {ts:08x}')
struct.pack_into('!q', _forward_buffer, 13+2*id_len, ts)
# check if there is a complete header in the buffer, parse it
# and set
# self.header_len
@@ -256,7 +296,8 @@ class Talent(Message):
fnc = self.switch.get(self.msg_id, self.msg_unknown)
if self.unique_id:
logger.info(self.__flow_str(self.server_side, 'rx') +
f' Ctl: {int(self.ctrl):#02x} Msg: {fnc.__name__!r}')
f' Ctl: {int(self.ctrl):#02x} ({self.state}) '
f'Msg: {fnc.__name__!r}')
fnc()
else:
logger.info(self.__flow_str(self.server_side, 'drop') +
@@ -305,39 +346,37 @@ class Talent(Message):
return True
def msg_get_time(self):
tsun = Config.get('tsun')
if tsun['enabled']:
if self.ctrl.is_ind():
if self.data_len >= 8:
ts = self._timestamp()
result = struct.unpack_from('!q', self._recv_buffer,
self.header_len)
logger.debug(f'tsun-time: {result[0]:08x}'
f' proxy-time: {ts:08x}')
else:
logger.warning('Unknown Ctrl')
self.inc_counter('Unknown_Ctrl')
self.forward(self._recv_buffer, self.header_len+self.data_len)
if self.ctrl.is_ind():
if self.data_len == 0:
self.state = State.pend # block MODBUS cmds
self.mb_timer.start(self.MB_START_TIMEOUT)
ts = self._timestamp()
logger.debug(f'time: {ts:08x}')
self.__build_header(0x91)
self._send_buffer += struct.pack('!q', ts)
self.__finish_send_msg()
elif self.data_len >= 8:
ts = self._timestamp()
result = struct.unpack_from('!q', self._recv_buffer,
self.header_len)
self.ts_offset = result[0]-ts
logger.debug(f'tsun-time: {int(result[0]):08x}'
f' proxy-time: {ts:08x}'
f' offset: {self.ts_offset}')
return # ignore received response
else:
if self.ctrl.is_ind():
if self.data_len == 0:
ts = self._timestamp()
logger.debug(f'time: {ts:08x}')
logger.warning('Unknown Ctrl')
self.inc_counter('Unknown_Ctrl')
self.__build_header(0x91)
self._send_buffer += struct.pack('!q', ts)
self.__finish_send_msg()
else:
logger.warning('Unknown Ctrl')
self.inc_counter('Unknown_Ctrl')
self.forward(self._recv_buffer, self.header_len+self.data_len)
def parse_msg_header(self):
result = struct.unpack_from('!lB', self._recv_buffer, self.header_len)
data_id = result[0] # len of complete message
id_len = result[1] # len of variable id string
logger.debug(f'Data_ID: {data_id} id_len: {id_len}')
logger.debug(f'Data_ID: 0x{data_id:08x} id_len: {id_len}')
msg_hdr_len = 5+id_len+9
@@ -356,7 +395,6 @@ class Talent(Message):
self._send_buffer += b'\x01'
self.__finish_send_msg()
self.__process_data()
self.state = self.STATE_UP
elif self.ctrl.is_resp():
return # ignore received response
@@ -372,7 +410,7 @@ class Talent(Message):
self._send_buffer += b'\x01'
self.__finish_send_msg()
self.__process_data()
self.state = self.STATE_UP
self.state = State.up # allow MODBUS cmds
elif self.ctrl.is_resp():
return # ignore received response
@@ -432,8 +470,13 @@ class Talent(Message):
else:
self.inc_counter('Invalid_Msg_Format')
elif self.ctrl.is_ind():
# logger.debug(f'Modbus Ind MsgLen: {modbus_len}')
self.modbus_elms = 0
# logger.debug(f'Modbus Ind MsgLen: {modbus_len}')
if not self.server_side:
logger.warning('Unknown Message')
self.inc_counter('Unknown_Msg')
return
for key, update, _ in self.mb.recv_resp(self.db, data[
hdr_len:],
self.node_id):

View File

@@ -1,5 +1,6 @@
import logging
# import gc
from asyncio import StreamReader, StreamWriter
from async_stream import AsyncStream
from gen3plus.solarman_v5 import SolarmanV5
@@ -8,12 +9,13 @@ logger = logging.getLogger('conn')
class ConnectionG3P(AsyncStream, SolarmanV5):
def __init__(self, reader, writer, addr, remote_stream,
def __init__(self, reader: StreamReader, writer: StreamWriter,
addr, remote_stream: 'ConnectionG3P',
server_side: bool) -> None:
AsyncStream.__init__(self, reader, writer, addr)
SolarmanV5.__init__(self, server_side)
self.remoteStream = remote_stream
self.remoteStream: 'ConnectionG3P' = remote_stream
'''
Our puplic methods
@@ -29,6 +31,10 @@ class ConnectionG3P(AsyncStream, SolarmanV5):
async def async_publ_mqtt(self) -> None:
pass
def healthy(self) -> bool:
logger.debug('ConnectionG3P healthy()')
return AsyncStream.healthy(self)
'''
Our private methods
'''

View File

@@ -1,7 +1,8 @@
import asyncio
import logging
import traceback
import json
import asyncio
from asyncio import StreamReader, StreamWriter
from config import Config
from inverter import Inverter
from gen3plus.connection_g3p import ConnectionG3P
@@ -44,7 +45,7 @@ class InverterG3P(Inverter, ConnectionG3P):
destroyed
'''
def __init__(self, reader, writer, addr):
def __init__(self, reader: StreamReader, writer: StreamWriter, addr):
super().__init__(reader, writer, addr, None, True)
self.__ha_restarts = -1
@@ -56,11 +57,14 @@ class InverterG3P(Inverter, ConnectionG3P):
addr = (host, port)
try:
logging.info(f'[{self.node_id}] Connected to {addr}')
logging.info(f'[{self.node_id}] Connect to {addr}')
connect = asyncio.open_connection(host, port)
reader, writer = await connect
self.remoteStream = ConnectionG3P(reader, writer, addr, self,
False)
logging.info(f'[{self.remoteStream.node_id}:'
f'{self.remoteStream.conn_no}] '
f'Connected to {addr}')
asyncio.create_task(self.client_loop(addr))
except (ConnectionRefusedError, TimeoutError) as error:

View File

@@ -6,15 +6,17 @@ import asyncio
from datetime import datetime
if __name__ == "app.src.gen3plus.solarman_v5":
from app.src.messages import hex_dump_memory, Message
from app.src.messages import hex_dump_memory, Message, State
from app.src.modbus import Modbus
from app.src.my_timer import Timer
from app.src.config import Config
from app.src.gen3plus.infos_g3p import InfosG3P
from app.src.infos import Register
else: # pragma: no cover
from messages import hex_dump_memory, Message
from messages import hex_dump_memory, Message, State
from config import Config
from modbus import Modbus
from my_timer import Timer
from gen3plus.infos_g3p import InfosG3P
from infos import Register
# import traceback
@@ -51,6 +53,8 @@ class Sequence():
class SolarmanV5(Message):
AT_CMD = 1
MB_RTU_CMD = 2
MB_START_TIMEOUT = 40
MB_REGULAR_TIMEOUT = 60
def __init__(self, server_side: bool):
super().__init__(server_side, self.send_modbus_cb, mb_timeout=5)
@@ -123,19 +127,20 @@ class SolarmanV5(Message):
self.at_acl = g3p_cnf['at_acl']
self.node_id = 'G3P' # will be overwritten in __set_serial_no
# self.forwarding = Config.get('solarman')['enabled']
self.mb_timer = Timer(self.mb_timout_cb, self.node_id)
'''
Our puplic methods
'''
def close(self) -> None:
logging.debug('Solarman.close()')
# we have refernces to methods of this class in self.switch
# we have references to methods of this class in self.switch
# so we have to erase self.switch, otherwise this instance can't be
# deallocated by the garbage collector ==> we get a memory leak
self.switch.clear()
self.log_lvl.clear()
self.state = self.STATE_CLOSED
self.state = State.closed
self.mb_timer.close()
super().close()
def __set_serial_no(self, snr: int):
@@ -169,7 +174,7 @@ class SolarmanV5(Message):
self.unique_id = serial_no
def read(self) -> None:
def read(self) -> float:
self._read()
if not self.header_valid:
@@ -184,10 +189,13 @@ class SolarmanV5(Message):
self._recv_buffer, self.header_len+self.data_len+2)
if self.__trailer_is_ok(self._recv_buffer, self.header_len
+ self.data_len + 2):
if self.state == State.init:
self.state = State.received
self.__set_serial_no(self.snr)
self.__dispatch_msg()
self.__flush_recv_msg()
return
return 0 # wait 0s before sending a response
def forward(self, buffer, buflen) -> None:
tsun = Config.get('solarman')
@@ -253,6 +261,10 @@ class SolarmanV5(Message):
self.snr = result[4]
if start != 0xA5:
hex_dump_memory(logging.ERROR,
'Drop packet w invalid start byte from'
f' {self.addr}:', buf, buf_len)
self.inc_counter('Invalid_Msg_Format')
# erase broken recv buffer
self._recv_buffer = bytearray()
@@ -264,6 +276,9 @@ class SolarmanV5(Message):
crc = buf[self.data_len+11]
stop = buf[self.data_len+12]
if stop != 0x15:
hex_dump_memory(logging.ERROR,
'Drop packet w invalid stop byte from '
f'{self.addr}:', buf, buf_len)
self.inc_counter('Invalid_Msg_Format')
if len(self._recv_buffer) > (self.data_len+13):
next_start = buf[self.data_len+13]
@@ -338,9 +353,9 @@ class SolarmanV5(Message):
self.__finish_send_msg()
def send_modbus_cb(self, pdu: bytearray, log_lvl: int, state: str):
if self.state != self.STATE_UP:
logger.warn(f'[{self.node_id}] ignore MODBUS cmd,'
' cause the state is not UP anymore')
if self.state != State.up:
logger.warning(f'[{self.node_id}] ignore MODBUS cmd,'
' cause the state is not UP anymore')
return
self.__build_header(0x4510)
self._send_buffer += struct.pack('<BHLLL', self.MB_RTU_CMD,
@@ -352,21 +367,33 @@ class SolarmanV5(Message):
self.writer.write(self._send_buffer)
self._send_buffer = bytearray(0) # self._send_buffer[sent:]
async def send_modbus_cmd(self, func, addr, val, log_lvl) -> None:
if self.state != self.STATE_UP:
def _send_modbus_cmd(self, func, addr, val, log_lvl) -> None:
if self.state != State.up:
logger.log(log_lvl, f'[{self.node_id}] ignore MODBUS cmd,'
' as the state is not UP')
return
self.mb.build_msg(Modbus.INV_ADDR, func, addr, val, log_lvl)
async def send_modbus_cmd(self, func, addr, val, log_lvl) -> None:
self._send_modbus_cmd(func, addr, val, log_lvl)
def mb_timout_cb(self, exp_cnt):
self.mb_timer.start(self.MB_REGULAR_TIMEOUT)
self._send_modbus_cmd(Modbus.READ_REGS, 0x3008, 21, logging.DEBUG)
if 0 == (exp_cnt % 30):
# logging.info("Regular Modbus Status request")
self._send_modbus_cmd(Modbus.READ_REGS, 0x2007, 2, logging.DEBUG)
def at_cmd_forbidden(self, cmd: str, connection: str) -> bool:
return not cmd.startswith(tuple(self.at_acl[connection]['allow'])) or \
cmd.startswith(tuple(self.at_acl[connection]['block']))
async def send_at_cmd(self, AT_cmd: str) -> None:
if self.state != self.STATE_UP:
logger.warn(f'[{self.node_id}] ignore AT+ cmd,'
' as the state is not UP')
if self.state != State.up:
logger.warning(f'[{self.node_id}] ignore AT+ cmd,'
' as the state is not UP')
return
AT_cmd = AT_cmd.strip()
@@ -375,8 +402,7 @@ class SolarmanV5(Message):
node_id = self.node_id
key = 'at_resp'
logger.info(f'{key}: {data_json}')
asyncio.ensure_future(
self.publish_mqtt(f'{self.entity_prfx}{node_id}{key}', data_json)) # noqa: E501
await self.mqtt.publish(f'{self.entity_prfx}{node_id}{key}', data_json) # noqa: E501
return
self.forward_at_cmd_resp = False
@@ -465,7 +491,9 @@ class SolarmanV5(Message):
self.__process_data(ftype)
self.__forward_msg()
self.__send_ack_rsp(0x1210, ftype)
self.state = self.STATE_UP
if self.state is not State.up:
self.state = State.up
self.mb_timer.start(self.MB_START_TIMEOUT)
def msg_sync_start(self):
data = self._recv_buffer[self.header_len:]
@@ -499,13 +527,15 @@ class SolarmanV5(Message):
__forward_msg):
self.inc_counter('Modbus_Command')
else:
logger.error('Invalid Modbus Msg')
self.inc_counter('Invalid_Msg_Format')
return
self.__forward_msg()
async def publish_mqtt(self, key, data):
await self.mqtt.publish(key, data) # pragma: no cover
def publish_mqtt(self, key, data):
asyncio.ensure_future(
self.mqtt.publish(key, data))
def get_cmd_rsp_log_lvl(self) -> int:
ftype = self._recv_buffer[self.header_len]
@@ -529,8 +559,7 @@ class SolarmanV5(Message):
node_id = self.node_id
key = 'at_resp'
logger.info(f'{key}: {data_json}')
asyncio.ensure_future(
self.publish_mqtt(f'{self.entity_prfx}{node_id}{key}', data_json)) # noqa: E501
self.publish_mqtt(f'{self.entity_prfx}{node_id}{key}', data_json) # noqa: E501
return
elif ftype == self.MB_RTU_CMD:
valid = data[1]
@@ -561,7 +590,9 @@ class SolarmanV5(Message):
self.__forward_msg()
self.__send_ack_rsp(0x1710, ftype)
self.state = self.STATE_UP
if self.state is not State.up:
self.state = State.up
self.mb_timer.start(self.MB_START_TIMEOUT)
def msg_sync_end(self):
data = self._recv_buffer[self.header_len:]

View File

@@ -343,7 +343,7 @@ class Infos:
dict[counter] -= 1
def ha_proxy_confs(self, ha_prfx: str, node_id: str, snr: str) \
-> Generator[tuple[dict, str], None, None]:
-> Generator[tuple[str, str, str, str], None, None]:
'''Generator function yields json register struct for home-assistant
auto configuration and the unique entity string, for all proxy
registers

View File

@@ -1,5 +1,5 @@
[loggers]
keys=root,tracer,mesg,conn,data,mqtt
keys=root,tracer,mesg,conn,data,mqtt,asyncio
[handlers]
keys=console_handler,file_handler_name1,file_handler_name2
@@ -24,6 +24,12 @@ handlers=console_handler,file_handler_name1
propagate=0
qualname=mqtt
[logger_asyncio]
level=INFO
handlers=console_handler,file_handler_name1
propagate=0
qualname=asyncio
[logger_data]
level=DEBUG
handlers=file_handler_name1

View File

@@ -1,6 +1,7 @@
import logging
import weakref
from typing import Callable
from typing import Callable, Generator
from enum import Enum
if __name__ == "app.src.messages":
@@ -45,21 +46,32 @@ def hex_dump_memory(level, info, data, num):
class IterRegistry(type):
def __iter__(cls):
def __iter__(cls) -> Generator['Message', None, None]:
for ref in cls._registry:
obj = ref()
if obj is not None:
yield obj
class State(Enum):
'''state of the logical connection'''
init = 0
'''just created'''
received = 1
'''at least one packet received'''
up = 2
'''at least one cmd-rsp transaction'''
pend = 3
'''inverter transaction pending, don't send MODBUS cmds'''
closed = 4
'''connection closed'''
class Message(metaclass=IterRegistry):
_registry = []
STATE_INIT = 0
STATE_UP = 2
STATE_CLOSED = 3
def __init__(self, server_side: bool, send_modbus_cb:
Callable[[bytes, int, str], None], mb_timeout):
Callable[[bytes, int, str], None], mb_timeout: int):
self._registry.append(weakref.ref(self))
self.server_side = server_side
@@ -78,7 +90,7 @@ class Message(metaclass=IterRegistry):
self._send_buffer = bytearray(0)
self._forward_buffer = bytearray(0)
self.new_data = {}
self.state = self.STATE_INIT
self.state = State.init
'''
Empty methods, that have to be implemented in any child class which
@@ -97,7 +109,7 @@ class Message(metaclass=IterRegistry):
'''
def close(self) -> None:
if self.mb:
del self.mb
self.mb.close()
self.mb = None
pass # pragma: no cover

View File

@@ -105,7 +105,17 @@ class Modbus():
self.req_pend = False
self.tim = None
def close(self):
"""free the queue and erase the callback handlers"""
logging.debug('Modbus close:')
self.__stop_timer()
self.rsp_handler = None
self.snd_handler = None
while not self.que.empty:
self.que.get_nowait()
def __del__(self):
"""log statistics on the deleting of a MODBUS instance"""
logging.debug(f'Modbus __del__:\n {self.counter}')
def build_msg(self, addr: int, func: int, reg: int, val: int,
@@ -243,6 +253,7 @@ class Modbus():
# logging.debug(f'Modbus stop timer {self}')
if self.tim:
self.tim.cancel()
self.tim = None
def __timeout_cb(self) -> None:
'''Rsponse timeout handler retransmit pdu or send next pdu'''

35
app/src/my_timer.py Normal file
View File

@@ -0,0 +1,35 @@
import asyncio
import logging
from itertools import count
class Timer:
def __init__(self, cb, id_str: str = ''):
self.__timeout_cb = cb
self.loop = asyncio.get_event_loop()
self.tim = None
self.id_str = id_str
self.exp_count = count(0)
def start(self, timeout: float) -> None:
'''Start timer with timeout seconds'''
if self.tim:
self.tim.cancel()
self.tim = self.loop.call_later(timeout, self.__timeout)
logging.debug(f'[{self.id_str}]Start timer')
def stop(self) -> None:
'''Stop timer'''
logging.debug(f'[{self.id_str}]Stop timer')
if self.tim:
self.tim.cancel()
self.tim = None
def __timeout(self) -> None:
'''timer expired handler'''
logging.debug(f'[{self.id_str}]Timer expired')
self.__timeout_cb(next(self.exp_count))
def close(self) -> None:
self.stop()
self.__timeout_cb = None

View File

@@ -3,8 +3,6 @@ import json
from mqtt import Mqtt
from aiocron import crontab
from infos import ClrAtMidnight
from modbus import Modbus
from messages import Message
logger_mqtt = logging.getLogger('mqtt')
@@ -21,9 +19,6 @@ class Schedule:
crontab('0 0 * * *', func=cls.atmidnight, start=True)
# every minute
crontab('* * * * *', func=cls.regular_modbus_cmds, start=True)
@classmethod
async def atmidnight(cls) -> None:
'''Clear daily counters at midnight'''
@@ -33,15 +28,3 @@ class Schedule:
logger_mqtt.debug(f'{key}: {data}')
data_json = json.dumps(data)
await cls.mqtt.publish(f"{key}", data_json)
@classmethod
async def regular_modbus_cmds(cls):
for m in Message:
if m.server_side:
fnc = getattr(m, "send_modbus_cmd", None)
if callable(fnc):
await fnc(Modbus.READ_REGS, 0x3008, 21, logging.DEBUG)
# if 0 == (cls.count % 30):
# # logging.info("Regular Modbus Status request")
# await fnc(Modbus.READ_REGS, 0x2007, 2, logging.DEBUG)
cls.count += 1

View File

@@ -2,6 +2,8 @@ import logging
import asyncio
import signal
import os
from asyncio import StreamReader, StreamWriter
from aiohttp import web
from logging import config # noqa F401
from messages import Message
from inverter import Inverter
@@ -10,25 +12,83 @@ from gen3plus.inverter_g3p import InverterG3P
from scheduler import Schedule
from config import Config
routes = web.RouteTableDef()
proxy_is_up = False
async def handle_client(reader, writer):
@routes.get('/')
async def hello(request):
return web.Response(text="Hello, world")
@routes.get('/-/ready')
async def ready(request):
if proxy_is_up:
status = 200
text = 'Is ready'
else:
status = 503
text = 'Not ready'
return web.Response(status=status, text=text)
@routes.get('/-/healthy')
async def healthy(request):
if proxy_is_up:
# logging.info('web reqeust healthy()')
for stream in Message:
try:
res = stream.healthy()
if not res:
return web.Response(status=503, text="I have a problem")
except Exception as err:
logging.info(f'Exception:{err}')
return web.Response(status=200, text="I'm fine")
async def webserver(addr, port):
'''coro running our webserver'''
app = web.Application()
app.add_routes(routes)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, addr, port)
await site.start()
logging.info(f'HTTP server listen on port: {port}')
try:
# Normal interaction with aiohttp
while True:
await asyncio.sleep(3600) # sleep forever
except asyncio.CancelledError:
logging.info('HTTP server cancelled')
await runner.cleanup()
logging.debug('HTTP cleanup done')
async def handle_client(reader: StreamReader, writer: StreamWriter):
'''Handles a new incoming connection and starts an async loop'''
addr = writer.get_extra_info('peername')
await InverterG3(reader, writer, addr).server_loop(addr)
async def handle_client_v2(reader, writer):
async def handle_client_v2(reader: StreamReader, writer: StreamWriter):
'''Handles a new incoming connection and starts an async loop'''
addr = writer.get_extra_info('peername')
await InverterG3P(reader, writer, addr).server_loop(addr)
async def handle_shutdown(loop):
async def handle_shutdown(web_task):
'''Close all TCP connections and stop the event loop'''
logging.info('Shutdown due to SIGTERM')
global proxy_is_up
proxy_is_up = False
#
# first, disc all open TCP connections gracefully
@@ -38,7 +98,7 @@ async def handle_shutdown(loop):
await asyncio.wait_for(stream.disc(), 2)
except Exception:
pass
logging.info('Disconnecting done')
logging.info('Proxy disconnecting done')
#
# second, close all open TCP connections
@@ -46,12 +106,20 @@ async def handle_shutdown(loop):
for stream in Message:
stream.close()
#
# at last, we stop the loop
#
loop.stop()
await asyncio.sleep(0.1) # give time for closing
logging.info('Proxy closing done')
logging.info('Shutdown complete')
#
# third, cancel the web server
#
web_task.cancel()
await web_task
#
# at last, start a coro for stopping the loop
#
logging.debug("Stop event loop")
loop.stop()
def get_log_level() -> int:
@@ -84,17 +152,28 @@ if __name__ == "__main__":
logging.getLogger('conn').setLevel(log_level)
logging.getLogger('data').setLevel(log_level)
logging.getLogger('tracer').setLevel(log_level)
logging.getLogger('asyncio').setLevel(log_level)
# logging.getLogger('mqtt').setLevel(log_level)
# read config file
Config.class_init()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# read config file
ConfigErr = Config.class_init()
if ConfigErr is not None:
logging.info(f'ConfigErr: {ConfigErr}')
Inverter.class_init()
Schedule.start()
#
# Create tasks for our listening servers. These must be tasks! If we call
# start_server directly out of our main task, the eventloop will be blocked
# and we can't receive and handle the UNIX signals!
#
loop.create_task(asyncio.start_server(handle_client, '0.0.0.0', 5005))
loop.create_task(asyncio.start_server(handle_client_v2, '0.0.0.0', 10000))
web_task = loop.create_task(webserver('0.0.0.0', 8127))
#
# Register some UNIX Signal handler for a gracefully server shutdown
# on Docker restart and stop
@@ -102,22 +181,18 @@ if __name__ == "__main__":
for signame in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, signame),
lambda loop=loop: asyncio.create_task(
handle_shutdown(loop)))
#
# Create taska for our listening servera. These must be tasks! If we call
# start_server directly out of our main task, the eventloop will be blocked
# and we can't receive and handle the UNIX signals!
#
loop.create_task(asyncio.start_server(handle_client, '0.0.0.0', 5005))
loop.create_task(asyncio.start_server(handle_client_v2, '0.0.0.0', 10000))
handle_shutdown(web_task)))
loop.set_debug(log_level == logging.DEBUG)
try:
if ConfigErr is None:
proxy_is_up = True
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
logging.info("Event loop is stopped")
Inverter.class_close(loop)
logging.info('Close event loop')
logging.debug('Close event loop')
loop.close()
logging.info(f'Finally, exit Server "{serv_name}"')

View File

@@ -140,6 +140,82 @@ def InvDataSeq2(): # Data indication from the controller
msg += b'\x53\x00\x00'
return msg
@pytest.fixture
def InvDataNew(): # Data indication from DSP V5.0.17
msg = b'\x00\x00\x00\xa3\x00\x00\x00\x00\x53\x00\x00'
msg += b'\x00\x00\x00\x80\x53\x00\x00\x00\x00\x01\x04\x53\x00\x00\x00\x00'
msg += b'\x01\x90\x41\x00\x00\x01\x91\x53\x00\x00\x00\x00\x01\x90\x53\x00'
msg += b'\x00\x00\x00\x01\x91\x53\x00\x00\x00\x00\x01\x90\x53\x00\x00\x00'
msg += b'\x00\x01\x91\x53\x00\x00\x00\x00\x01\x94\x53\x00\x00\x00\x00\x01'
msg += b'\x95\x53\x00\x00\x00\x00\x01\x98\x53\x00\x00\x00\x00\x01\x99\x53'
msg += b'\x00\x00\x00\x00\x01\x80\x53\x00\x00\x00\x00\x01\x90\x41\x00\x00'
msg += b'\x01\x94\x53\x00\x00\x00\x00\x01\x94\x53\x00\x00\x00\x00\x01\x96'
msg += b'\x53\x00\x00\x00\x00\x01\x98\x53\x00\x00\x00\x00\x01\xa0\x53\x00'
msg += b'\x00\x00\x00\x01\xf0\x41\x00\x00\x01\xf1\x53\x00\x00\x00\x00\x01'
msg += b'\xf4\x53\x00\x00\x00\x00\x01\xf5\x53\x00\x00\x00\x00\x01\xf8\x53'
msg += b'\x00\x00\x00\x00\x01\xf9\x53\x00\x00\x00\x00\x00\x00\x53\x00\x00'
msg += b'\x00\x00\x00\x01\x53\x00\x00\x00\x00\x00\x00\x53\x00\x00\x00\x00'
msg += b'\x00\x01\x53\x00\x00\x00\x00\x00\x04\x53\x00\x00\x00\x00\x00\x58'
msg += b'\x41\x00\x00\x02\x00\x53\x00\x00\x00\x00\x02\x00\x53\x00\x00\x00'
msg += b'\x00\x02\x02\x53\x00\x00\x00\x00\x02\x00\x53\x00\x00\x00\x00\x02'
msg += b'\x04\x53\x00\x00\x00\x00\x02\x58\x41\x00\x00\x02\x59\x53\x00\x00'
msg += b'\x00\x00\x02\x40\x53\x00\x00\x00\x00\x02\x41\x53\x00\x00\x00\x00'
msg += b'\x02\x40\x53\x00\x00\x00\x00\x02\x41\x53\x00\x00\x00\x00\x02\x44'
msg += b'\x53\x00\x00\x00\x00\x02\x45\x53\x00\x00\x00\x00\x02\x60\x53\x00'
msg += b'\x00\x00\x00\x02\x61\x53\x00\x00\x00\x00\x02\x60\x53\x00\x00\x00'
msg += b'\x00\x02\x20\x41\x00\x00\x02\x24\x53\x00\x00\x00\x00\x02\x24\x53'
msg += b'\x00\x00\x00\x00\x02\x26\x53\x00\x00\x00\x00\x02\x40\x53\x00\x00'
msg += b'\x00\x00\x02\x40\x53\x00\x00\x00\x00\x02\x80\x41\x00\x00\x02\x81'
msg += b'\x53\x00\x00\x00\x00\x02\x84\x53\x00\x00\x00\x00\x02\x85\x53\x00'
msg += b'\x00\x00\x00\x02\xc0\x53\x00\x00\x00\x00\x02\xc1\x53\x00\x00\x00'
msg += b'\x00\x02\xc0\x53\x00\x00\x00\x00\x02\xc1\x53\x00\x00\x00\x00\x02'
msg += b'\xc0\x53\x00\x00\x00\x00\x02\xc1\x53\x00\x00\x00\x00\x02\xc4\x53'
msg += b'\x00\x00\x00\x00\x02\x00\x53\x00\x00\x00\x00\x02\x80\x53\x00\x00'
msg += b'\x00\x00\x02\xc8\x42\x00\x00\x00\x00\x48\x42\x00\x00\x00\x00\x80'
msg += b'\x42\x00\x00\x00\x00\x04\x53\x00\x00\x00\x00\x01\x20\x53\x00\x00'
msg += b'\x00\x00\x01\x84\x53\x00\x10\x00\x00\x02\x40\x46\x00\x00\x00\x00'
msg += b'\x00\x00\x04\x04\x46\x02\x00\x46\x02\x00\x00\x04\x00\x46\x00\x00'
msg += b'\x00\x00\x00\x00\x05\x04\x42\x00\x00\x00\x05\x50\x42\x00\x00\x00'
msg += b'\x00\x14\x42\x00\x00\x00\x00\x00\x46\x00\x00\x00\x00\x00\x00\x00'
msg += b'\xa4\x46\x00\x00\x00\x00\x00\x00\x01\x00\x46\x00\x00\x00\x00\x00'
msg += b'\x00\x01\x44\x46\x00\x00\x00\x00\x00\x00\x02\x00\x46\x00\x00\x00'
msg += b'\x00\x00\x00\x08\x04\x46\x00\x00\x00\x00\x00\x00\x08\x90\x46\x00'
msg += b'\x00\x00\x00\x00\x00\x08\x54\x46\x00\x00\x00\x00\x00\x00\x09\x20'
msg += b'\x46\x00\x00\x00\x00\x00\x00\x08\x04\x46\x00\x00\x00\x00\x00\x00'
msg += b'\x08\x00\x46\x00\x00\x00\x00\x00\x00\x08\x84\x46\x00\x00\x00\x00'
msg += b'\x00\x00\x08\x40\x46\x00\x00\x00\x00\x00\x00\x09\x04\x46\x00\x00'
msg += b'\x00\x00\x00\x00\x0a\x10\x46\x00\x00\x00\x00\x00\x00\x0c\x14\x46'
msg += b'\x00\x00\x00\x00\x00\x00\x0c\x80\x46\x00\x00\x00\x00\x00\x00\x0c'
msg += b'\x24\x42\x00\x00\x00\x0d\x00\x42\x00\x00\x00\x00\x04\x42\x00\x00'
msg += b'\x00\x00\x00\x42\x00\x00\x00\x00\x44\x42\x00\x00\x00\x00\x10\x42'
msg += b'\x00\x00\x00\x01\x14\x53\x00\x00\x00\x00\x01\xa0\x53\x00\x00\x00'
msg += b'\x00\x10\x04\x53\x55\xaa\x00\x00\x10\x40\x53\x00\x00\x00\x00\x10'
msg += b'\x04\x53\x00\x00\x00\x00\x11\x00\x53\x00\x00\x00\x00\x11\x84\x53'
msg += b'\x00\x00\x00\x00\x10\x50\x53\xff\xff\x00\x00\x10\x14\x53\x03\x20'
msg += b'\x00\x00\x10\x00\x53\x00\x00\x00\x00\x11\x24\x53\x00\x00\x00\x00'
msg += b'\x03\x00\x53\x00\x00\x00\x00\x03\x64\x53\x00\x00\x00\x00\x04\x50'
msg += b'\x53\x00\x00\x00\x00\x00\x34\x53\x00\x00\x00\x00\x00\x00\x42\x02'
msg += b'\x00\x00\x01\x04\x42\x00\x00\x00\x21\x00\x42\x00\x00\x00\x21\x44'
msg += b'\x42\x00\x00\x00\x22\x10\x53\x00\x00\x00\x00\x28\x14\x42\x01\x00'
msg += b'\x00\x28\xa0\x46\x42\x48\x00\x00\x00\x00\x29\x04\x42\x00\x00\x00'
msg += b'\x29\x40\x42\x00\x00\x00\x28\x04\x46\x42\x10\x00\x00\x00\x00\x28'
msg += b'\x00\x42\x00\x00\x00\x28\x84\x42\x00\x00\x00\x28\x50\x42\x00\x00'
msg += b'\x00\x29\x14\x42\x00\x00\x00\x2a\x00\x42\x00\x00\x00\x2c\x24\x46'
msg += b'\x42\x10\x00\x00\x00\x00\x2c\x80\x42\x00\x00\x00\x2c\x44\x53\x00'
msg += b'\x02\x00\x00\x2d\x00\x42\x00\x00\x00\x20\x04\x46\x42\x4d\x00\x00'
msg += b'\x00\x00\x20\x10\x42\x00\x00\x00\x20\x54\x42\x00\x00\x00\x20\x20'
msg += b'\x42\x00\x00\x00\x21\x04\x53\x00\x01\x00\x00\x22\x00\x42\x00\x00'
msg += b'\x00\x30\x04\x42\x00\x00\x00\x30\x40\x53\x00\x00\x00\x00\x30\x04'
msg += b'\x53\x00\x00\x00\x00\x31\x10\x42\x00\x00\x00\x31\x94\x53\x00\x04'
msg += b'\x00\x00\x30\x00\x53\x00\x00\x00\x00\x30\x24\x53\x00\x00\x00\x00'
msg += b'\x30\x00\x53\x00\x00\x00\x00\x31\x04\x53\x00\x00\x00\x00\x31\x80'
msg += b'\x53\x00\x00\x00\x00\x32\x44\x53\x00\x00\x00\x00\x30\x00\x53\x00'
msg += b'\x00\x00\x00\x30\x80\x53\x00\x00\x00\x00\x30\x00\x53\x00\x00\x00'
msg += b'\x00\x30\x80\x53\x00\x00\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x03\x00'
msg += b'\x00\x00\x00\x00'
return msg
@pytest.fixture
def InvDataSeq2_Zero(): # Data indication from the controller
msg = b'\x00\x00\x00\xa3\x00\x00\x00\x64\x53\x00\x01\x00\x00\x00\xc8\x53\x00\x02\x00\x00\x01\x2c\x53\x00\x00\x00\x00\x01\x90\x49\x00\x00\x00\x00\x00\x00\x01\x91\x53\x00\x00'
@@ -391,6 +467,25 @@ def test_must_incr_total2(InvDataSeq2, InvDataSeq2_Zero):
assert json.dumps(i.db['total']) == json.dumps({'Daily_Generation': 1.7, 'Total_Generation': 17.36})
assert json.dumps(i.db['input']) == json.dumps({"pv1": {"Voltage": 33.6, "Current": 1.91, "Power": 64.5, "Daily_Generation": 1.08, "Total_Generation": 9.74}, "pv2": {"Voltage": 33.5, "Current": 1.36, "Power": 45.7, "Daily_Generation": 0.62, "Total_Generation": 7.62}, "pv3": {"Voltage": 0.0, "Current": 0.0, "Power": 0.0}, "pv4": {"Voltage": 0.0, "Current": 0.0, "Power": 0.0}})
def test_new_data_types(InvDataNew):
i = InfosG3()
tests = 0
for key, update in i.parse (InvDataNew):
if key == 'events':
tests +=1
elif key == 'inverter':
assert update == True
tests +=1
elif key == 'input':
assert update == False
tests +=1
else:
assert False
assert tests==15
assert json.dumps(i.db['inverter']) == json.dumps({"Manufacturer": 0})
assert json.dumps(i.db['input']) == json.dumps({"pv1": {}})
assert json.dumps(i.db['events']) == json.dumps({"401_": 0, "404_": 0, "405_": 0, "408_": 0, "409_No_Utility": 0, "406_": 0, "416_": 0})
def test_invalid_data_type(InvalidDataSeq):
i = InfosG3()

View File

@@ -5,9 +5,9 @@ from app.src.modbus import Modbus
from app.src.infos import Infos, Register
pytest_plugins = ('pytest_asyncio',)
pytestmark = pytest.mark.asyncio(scope="module")
# pytestmark = pytest.mark.asyncio(scope="module")
class TestHelper(Modbus):
class ModbusTestHelper(Modbus):
def __init__(self):
super().__init__(self.send_cb)
self.db = Infos()
@@ -35,7 +35,7 @@ def test_modbus_crc():
def test_build_modbus_pdu():
'''Check building and sending a MODBUS RTU'''
mb = TestHelper()
mb = ModbusTestHelper()
mb.build_msg(1,6,0x2000,0x12)
assert mb.pdu == b'\x01\x06\x20\x00\x00\x12\x02\x07'
assert mb._Modbus__check_crc(mb.pdu)
@@ -47,7 +47,7 @@ def test_build_modbus_pdu():
def test_recv_req():
'''Receive a valid request, which must transmitted'''
mb = TestHelper()
mb = ModbusTestHelper()
assert mb.recv_req(b'\x01\x06\x20\x00\x00\x12\x02\x07')
assert mb.last_fcode == 6
assert mb.last_reg == 0x2000
@@ -56,7 +56,7 @@ def test_recv_req():
def test_recv_req_crc_err():
'''Receive a request with invalid CRC, which must be dropped'''
mb = TestHelper()
mb = ModbusTestHelper()
assert not mb.recv_req(b'\x01\x06\x20\x00\x00\x12\x02\x08')
assert mb.pdu == None
assert mb.last_fcode == 0
@@ -66,7 +66,7 @@ def test_recv_req_crc_err():
def test_recv_resp_crc_err():
'''Receive a response with invalid CRC, which must be dropped'''
mb = TestHelper()
mb = ModbusTestHelper()
# simulate a transmitted request
mb.req_pend = True
mb.last_addr = 1
@@ -86,7 +86,7 @@ def test_recv_resp_crc_err():
def test_recv_resp_invalid_addr():
'''Receive a response with wrong server addr, which must be dropped'''
mb = TestHelper()
mb = ModbusTestHelper()
mb.req_pend = True
# simulate a transmitted request
mb.last_addr = 1
@@ -109,7 +109,7 @@ def test_recv_resp_invalid_addr():
def test_recv_recv_fcode():
'''Receive a response with wrong function code, which must be dropped'''
mb = TestHelper()
mb = ModbusTestHelper()
mb.build_msg(1,4,0x300e,2)
assert mb.que.qsize() == 0
assert mb.req_pend
@@ -130,7 +130,7 @@ def test_recv_recv_fcode():
def test_recv_resp_len():
'''Receive a response with wrong data length, which must be dropped'''
mb = TestHelper()
mb = ModbusTestHelper()
mb.build_msg(1,3,0x300e,3)
assert mb.que.qsize() == 0
assert mb.req_pend
@@ -152,7 +152,7 @@ def test_recv_resp_len():
def test_recv_unexpect_resp():
'''Receive a response when we havb't sent a request'''
mb = TestHelper()
mb = ModbusTestHelper()
assert not mb.req_pend
# check unexpected response, which must be dropped
@@ -167,7 +167,7 @@ def test_recv_unexpect_resp():
def test_parse_resp():
'''Receive matching response and parse the values'''
mb = TestHelper()
mb = ModbusTestHelper()
mb.build_msg(1,3,0x3007,6)
assert mb.que.qsize() == 0
assert mb.req_pend
@@ -191,7 +191,7 @@ def test_parse_resp():
assert not mb.req_pend
def test_queue():
mb = TestHelper()
mb = ModbusTestHelper()
mb.build_msg(1,3,0x3022,4)
assert mb.que.qsize() == 0
assert mb.req_pend
@@ -210,7 +210,7 @@ def test_queue():
def test_queue2():
'''Check queue handling for build_msg() calls'''
mb = TestHelper()
mb = ModbusTestHelper()
mb.build_msg(1,3,0x3007,6)
mb.build_msg(1,6,0x2008,4)
assert mb.que.qsize() == 1
@@ -258,7 +258,7 @@ def test_queue2():
def test_queue3():
'''Check queue handling for recv_req() calls'''
mb = TestHelper()
mb = ModbusTestHelper()
assert mb.recv_req(b'\x01\x03\x30\x07\x00\x06{\t', mb.resp_handler)
assert mb.recv_req(b'\x01\x06\x20\x08\x00\x04\x02\x0b', mb.resp_handler)
assert mb.que.qsize() == 1
@@ -315,7 +315,7 @@ def test_queue3():
async def test_timeout():
'''Test MODBUS response timeout and RTU retransmitting'''
assert asyncio.get_running_loop()
mb = TestHelper()
mb = ModbusTestHelper()
mb.max_retries = 2
mb.timeout = 0.1 # 100ms timeout for fast testing, expect a time resolution of at least 10ms
assert asyncio.get_running_loop() == mb.loop
@@ -363,7 +363,7 @@ async def test_timeout():
def test_recv_unknown_data():
'''Receive a response with an unknwon register'''
mb = TestHelper()
mb = ModbusTestHelper()
assert 0x9000 not in mb.map
mb.map[0x9000] = {'reg': Register.TEST_REG1, 'fmt': '!H', 'ratio': 1}

View File

@@ -7,6 +7,7 @@ from app.src.gen3plus.solarman_v5 import SolarmanV5
from app.src.config import Config
from app.src.infos import Infos, Register
from app.src.modbus import Modbus
from app.src.messages import State
pytest_plugins = ('pytest_asyncio',)
@@ -24,12 +25,24 @@ class Writer():
def write(self, pdu: bytearray):
self.sent_pdu = pdu
class Mqtt():
def __init__(self):
self.key = ''
self.data = ''
async def publish(self, key, data):
self.key = key
self.data = data
class MemoryStream(SolarmanV5):
def __init__(self, msg, chunks = (0,), server_side: bool = True):
super().__init__(server_side)
if server_side:
self.mb.timeout = 1 # overwrite for faster testing
self.writer = Writer()
self.mqtt = Mqtt()
self.__msg = msg
self.__msg_len = len(msg)
self.__chunks = chunks
@@ -43,6 +56,8 @@ class MemoryStream(SolarmanV5):
self.test_exception_async_write = False
self.entity_prfx = ''
self.at_acl = {'mqtt': {'allow': ['AT+'], 'block': ['AT+WEBU']}, 'tsun': {'allow': ['AT+Z', 'AT+UPURL', 'AT+SUPDATE', 'AT+TIME'], 'block': ['AT+WEBU']}}
self.key = ''
self.data = ''
def _timestamp(self):
return timestamp
@@ -54,6 +69,10 @@ class MemoryStream(SolarmanV5):
self.__msg += msg
self.__msg_len += len(msg)
def publish_mqtt(self, key, data):
self.key = key
self.data = data
def _read(self) -> int:
copied_bytes = 0
try:
@@ -481,9 +500,10 @@ def AtCommandIndMsgBlock(): # 0x4510
@pytest.fixture
def AtCommandRspMsg(): # 0x1510
msg = b'\xa5\x0a\x00\x10\x15\x03\x03' +get_sn() +b'\x01\x01'
msg = b'\xa5\x11\x00\x10\x15\x03\x03' +get_sn() +b'\x01\x01'
msg += total()
msg += hb()
msg += b'\x00\x00\x00\x00+ok'
msg += correct_checksum(msg)
msg += b'\x15'
return msg
@@ -832,8 +852,7 @@ def test_read_message_in_chunks2(ConfigTsunInv1, DeviceIndMsg):
assert m.data_len == 0xd4
assert m.msg_count == 1
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
while m.read(): # read rest of message
pass
m.read() # read rest of message
assert m.msg_count == 1
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.db.stat['proxy']['Invalid_Msg_Format'] == 0
@@ -1255,48 +1274,64 @@ async def test_msg_build_modbus_req(ConfigTsunInv1, DeviceIndMsg, DeviceRspMsg,
m.close()
@pytest.mark.asyncio
async def test_AT_cmd(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, InverterIndMsg, InverterRspMsg, AtCommandIndMsg):
async def test_AT_cmd(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, InverterIndMsg, InverterRspMsg, AtCommandIndMsg, AtCommandRspMsg):
ConfigTsunAllowAll
m = MemoryStream(DeviceIndMsg, (0,), True)
m.append_msg(InverterIndMsg)
m.read()
m.append_msg(AtCommandRspMsg)
m.read() # read device ind
assert m.control == 0x4110
assert str(m.seq) == '01:01'
assert m._recv_buffer==InverterIndMsg # unhandled next message
assert m._recv_buffer==InverterIndMsg + AtCommandRspMsg # unhandled next message
assert m._send_buffer==DeviceRspMsg
assert m._forward_buffer==DeviceIndMsg
m._send_buffer = bytearray(0) # clear send buffer for next test
m._forward_buffer = bytearray(0) # clear send buffer for next test
await m.send_at_cmd('AT+TIME=214028,1,60,120')
assert m._recv_buffer==InverterIndMsg # unhandled next message
assert m._recv_buffer==InverterIndMsg + AtCommandRspMsg # unhandled next message
assert m._send_buffer==b''
assert m._forward_buffer==b''
assert str(m.seq) == '01:01'
assert m.mqtt.key == ''
assert m.mqtt.data == ""
m.read()
m.read() # read inverter ind
assert m.control == 0x4210
assert str(m.seq) == '02:02'
assert m._recv_buffer==b''
assert m._recv_buffer==AtCommandRspMsg # unhandled next message
assert m._send_buffer==InverterRspMsg
assert m._forward_buffer==InverterIndMsg
m._send_buffer = bytearray(0) # clear send buffer for next test
m._forward_buffer = bytearray(0) # clear send buffer for next test
await m.send_at_cmd('AT+TIME=214028,1,60,120')
assert m._recv_buffer==b''
assert m._recv_buffer==AtCommandRspMsg # unhandled next message
assert m._send_buffer==AtCommandIndMsg
assert m._forward_buffer==b''
assert str(m.seq) == '02:03'
assert m.mqtt.key == ''
assert m.mqtt.data == ""
m._send_buffer = bytearray(0) # clear send buffer for next test
m.read() # read at resp
assert m.control == 0x1510
assert str(m.seq) == '03:03'
assert m._recv_buffer==b''
assert m._send_buffer==b''
assert m._forward_buffer==b''
assert m.key == 'at_resp'
assert m.data == "+ok"
m.test_exception_async_write = True
await m.send_at_cmd('AT+TIME=214028,1,60,120')
assert m._recv_buffer==b''
assert m._send_buffer==b''
assert m._forward_buffer==b''
assert str(m.seq) == '02:04'
assert str(m.seq) == '03:04'
assert m.forward_at_cmd_resp == False
assert m.mqtt.key == ''
assert m.mqtt.data == ""
m.close()
@pytest.mark.asyncio
@@ -1318,6 +1353,8 @@ async def test_AT_cmd_blocked(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, In
assert m._send_buffer==b''
assert m._forward_buffer==b''
assert str(m.seq) == '01:01'
assert m.mqtt.key == ''
assert m.mqtt.data == ""
m.read()
assert m.control == 0x4210
@@ -1334,6 +1371,8 @@ async def test_AT_cmd_blocked(ConfigTsunAllowAll, DeviceIndMsg, DeviceRspMsg, In
assert m._forward_buffer==b''
assert str(m.seq) == '02:02'
assert m.forward_at_cmd_resp == False
assert m.mqtt.key == 'at_resp'
assert m.mqtt.data == "'AT+WEBU' is forbidden"
m.close()
def test_AT_cmd_ind(ConfigTsunInv1, AtCommandIndMsg):
@@ -1398,7 +1437,7 @@ def test_msg_at_command_rsp1(ConfigTsunInv1, AtCommandRspMsg):
assert m.control == 0x1510
assert str(m.seq) == '03:03'
assert m.header_len==11
assert m.data_len==10
assert m.data_len==17
assert m._forward_buffer==AtCommandRspMsg
assert m._send_buffer==b''
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
@@ -1417,7 +1456,7 @@ def test_msg_at_command_rsp2(ConfigTsunInv1, AtCommandRspMsg):
assert m.control == 0x1510
assert str(m.seq) == '03:03'
assert m.header_len==11
assert m.data_len==10
assert m.data_len==17
assert m._forward_buffer==b''
assert m._send_buffer==b''
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
@@ -1428,7 +1467,7 @@ def test_msg_modbus_req(ConfigTsunInv1, MsgModbusCmd, MsgModbusCmdFwd):
ConfigTsunInv1
m = MemoryStream(b'')
m.snr = get_sn_int()
m.state = m.STATE_UP
m.state = State.up
c = m.createClientStream(MsgModbusCmd)
m.db.stat['proxy']['Unknown_Ctrl'] = 0
@@ -1455,7 +1494,7 @@ def test_msg_modbus_req2(ConfigTsunInv1, MsgModbusCmdCrcErr):
ConfigTsunInv1
m = MemoryStream(b'')
m.snr = get_sn_int()
m.state = m.STATE_UP
m.state = State.up
c = m.createClientStream(MsgModbusCmdCrcErr)
m.db.stat['proxy']['Unknown_Ctrl'] = 0
@@ -1661,21 +1700,21 @@ def test_zombie_conn(ConfigTsunInv1, MsgInverterInd):
m1 = MemoryStream(MsgInverterInd, (0,))
m2 = MemoryStream(MsgInverterInd, (0,))
m3 = MemoryStream(MsgInverterInd, (0,))
assert m1.state == m1.STATE_INIT
assert m2.state == m2.STATE_INIT
assert m3.state == m3.STATE_INIT
assert m1.state == m1.State.init
assert m2.state == m2.State.init
assert m3.state == m3.State.init
m1.read() # read complete msg, and set unique_id
assert m1.state == m1.STATE_INIT
assert m2.state == m2.STATE_INIT
assert m3.state == m3.STATE_INIT
assert m1.state == m1.State.init
assert m2.state == m2.State.init
assert m3.state == m3.State.init
m2.read() # read complete msg, and set unique_id
assert m1.state == m1.STATE_CLOSED
assert m2.state == m2.STATE_INIT
assert m3.state == m3.STATE_INIT
assert m1.state == m1.State.closed
assert m2.state == m2.State.init
assert m3.state == m3.State.init
m3.read() # read complete msg, and set unique_id
assert m1.state == m1.STATE_CLOSED
assert m2.state == m2.STATE_CLOSED
assert m3.state == m3.STATE_INIT
assert m1.state == m1.State.closed
assert m2.state == m2.State.closed
assert m3.state == m3.State.init
m1.close()
m2.close()
m3.close()

View File

@@ -4,6 +4,7 @@ from app.src.gen3.talent import Talent, Control
from app.src.config import Config
from app.src.infos import Infos, Register
from app.src.modbus import Modbus
from app.src.messages import State
pytest_plugins = ('pytest_asyncio',)
@@ -60,7 +61,8 @@ class MemoryStream(Talent):
return copied_bytes
def _timestamp(self):
return 1700260990000
# return 1700260990000
return 1691246944000
def createClientStream(self, msg, chunks = (0,)):
c = MemoryStream(msg, chunks, False)
@@ -113,6 +115,10 @@ def MsgGetTime(): # Get Time Request message
def MsgTimeResp(): # Get Time Resonse message
return b'\x00\x00\x00\x1b\x10R170000000000001\x91\x22\x00\x00\x01\x89\xc6\x63\x4d\x80'
@pytest.fixture
def MsgTimeRespInv(): # Get Time Resonse message
return b'\x00\x00\x00\x17\x10R170000000000001\x91\x22\x00\x00\x01\x89'
@pytest.fixture
def MsgTimeInvalid(): # Get Time Request message
return b'\x00\x00\x00\x13\x10R170000000000001\x94\x22'
@@ -129,6 +135,18 @@ def MsgControllerInd(): # Data indication from the controller
msg += b'\x49\x00\x00\x00\x02\x00\x0d\x04\x08\x49\x00\x00\x00\x00\x00\x07\xa1\x84\x49\x00\x00\x00\x01\x00\x0c\x50\x59\x49\x00\x00\x00\x4c\x00\x0d\x1f\x60\x49\x00\x00\x00\x00'
return msg
@pytest.fixture
def MsgControllerIndTsOffs(): # Data indication from the controller - offset 0x1000
msg = b'\x00\x00\x01\x2f\x10R170000000000001\x91\x71\x0e\x10\x00\x00\x10R170000000000001'
msg += b'\x01\x00\x00\x01\x89\xc6\x63\x45\x50'
msg += b'\x00\x00\x00\x15\x00\x09\x2b\xa8\x54\x10\x52\x53\x57\x5f\x34\x30\x30\x5f\x56\x31\x2e\x30\x30\x2e\x30\x36\x00\x09\x27\xc0\x54\x06\x52\x61\x79\x6d\x6f'
msg += b'\x6e\x00\x09\x2f\x90\x54\x0b\x52\x53\x57\x2d\x31\x2d\x31\x30\x30\x30\x31\x00\x09\x5a\x88\x54\x0f\x74\x2e\x72\x61\x79\x6d\x6f\x6e\x69\x6f\x74\x2e\x63\x6f\x6d\x00\x09\x5a\xec\x54'
msg += b'\x1c\x6c\x6f\x67\x67\x65\x72\x2e\x74\x61\x6c\x65\x6e\x74\x2d\x6d\x6f\x6e\x69\x74\x6f\x72\x69\x6e\x67\x2e\x63\x6f\x6d\x00\x0d\x00\x20\x49\x00\x00\x00\x01\x00\x0c\x35\x00\x49\x00'
msg += b'\x00\x00\x64\x00\x0c\x96\xa8\x49\x00\x00\x00\x1d\x00\x0c\x7f\x38\x49\x00\x00\x00\x01\x00\x0c\xfc\x38\x49\x00\x00\x00\x01\x00\x0c\xf8\x50\x49\x00\x00\x01\x2c\x00\x0c\x63\xe0\x49'
msg += b'\x00\x00\x00\x00\x00\x0c\x67\xc8\x49\x00\x00\x00\x00\x00\x0c\x50\x58\x49\x00\x00\x00\x01\x00\x09\x5e\x70\x49\x00\x00\x13\x8d\x00\x09\x5e\xd4\x49\x00\x00\x13\x8d\x00\x09\x5b\x50'
msg += b'\x49\x00\x00\x00\x02\x00\x0d\x04\x08\x49\x00\x00\x00\x00\x00\x07\xa1\x84\x49\x00\x00\x00\x01\x00\x0c\x50\x59\x49\x00\x00\x00\x4c\x00\x0d\x1f\x60\x49\x00\x00\x00\x00'
return msg
@pytest.fixture
def MsgControllerAck(): # Get Time Request message
return b'\x00\x00\x00\x14\x10R170000000000001\x99\x71\x01'
@@ -145,6 +163,92 @@ def MsgInverterInd(): # Data indication from the controller
msg += b'\x54\x10T170000000000001\x00\x00\x00\x32\x54\x0a\x54\x53\x4f\x4c\x2d\x4d\x53\x36\x30\x30\x00\x00\x00\x3c\x54\x05\x41\x2c\x42\x2c\x43'
return msg
@pytest.fixture
def MsgInverterIndTsOffs(): # Data indication from the controller + offset 256
msg = b'\x00\x00\x00\x8b\x10R170000000000001\x91\x04\x01\x90\x00\x01\x10R170000000000001'
msg += b'\x01\x00\x00\x01\x89\xc6\x63\x62\x08'
msg += b'\x00\x00\x00\x06\x00\x00\x00\x0a\x54\x08\x4d\x69\x63\x72\x6f\x69\x6e\x76\x00\x00\x00\x14\x54\x04\x54\x53\x55\x4e\x00\x00\x00\x1E\x54\x07\x56\x35\x2e\x30\x2e\x31\x31\x00\x00\x00\x28'
msg += b'\x54\x10T170000000000001\x00\x00\x00\x32\x54\x0a\x54\x53\x4f\x4c\x2d\x4d\x53\x36\x30\x30\x00\x00\x00\x3c\x54\x05\x41\x2c\x42\x2c\x43'
return msg
@pytest.fixture
def MsgInverterIndNew(): # Data indication from DSP V5.0.17
msg = b'\x00\x00\x04\xa0\x10R170000000000001\x91\x04\x01\x90\x00\x01\x10R170000000000001'
msg += b'\x01\x00\x00\x01'
msg += b'\x90\x31\x4d\x68\x78\x00\x00\x00\xa3\x00\x00\x00\x00\x53\x00\x00'
msg += b'\x00\x00\x00\x80\x53\x00\x00\x00\x00\x01\x04\x53\x00\x00\x00\x00'
msg += b'\x01\x90\x41\x00\x00\x01\x91\x53\x00\x00\x00\x00\x01\x90\x53\x00'
msg += b'\x00\x00\x00\x01\x91\x53\x00\x00\x00\x00\x01\x90\x53\x00\x00\x00'
msg += b'\x00\x01\x91\x53\x00\x00\x00\x00\x01\x94\x53\x00\x00\x00\x00\x01'
msg += b'\x95\x53\x00\x00\x00\x00\x01\x98\x53\x00\x00\x00\x00\x01\x99\x53'
msg += b'\x00\x00\x00\x00\x01\x80\x53\x00\x00\x00\x00\x01\x90\x41\x00\x00'
msg += b'\x01\x94\x53\x00\x00\x00\x00\x01\x94\x53\x00\x00\x00\x00\x01\x96'
msg += b'\x53\x00\x00\x00\x00\x01\x98\x53\x00\x00\x00\x00\x01\xa0\x53\x00'
msg += b'\x00\x00\x00\x01\xf0\x41\x00\x00\x01\xf1\x53\x00\x00\x00\x00\x01'
msg += b'\xf4\x53\x00\x00\x00\x00\x01\xf5\x53\x00\x00\x00\x00\x01\xf8\x53'
msg += b'\x00\x00\x00\x00\x01\xf9\x53\x00\x00\x00\x00\x00\x00\x53\x00\x00'
msg += b'\x00\x00\x00\x01\x53\x00\x00\x00\x00\x00\x00\x53\x00\x00\x00\x00'
msg += b'\x00\x01\x53\x00\x00\x00\x00\x00\x04\x53\x00\x00\x00\x00\x00\x58'
msg += b'\x41\x00\x00\x02\x00\x53\x00\x00\x00\x00\x02\x00\x53\x00\x00\x00'
msg += b'\x00\x02\x02\x53\x00\x00\x00\x00\x02\x00\x53\x00\x00\x00\x00\x02'
msg += b'\x04\x53\x00\x00\x00\x00\x02\x58\x41\x00\x00\x02\x59\x53\x00\x00'
msg += b'\x00\x00\x02\x40\x53\x00\x00\x00\x00\x02\x41\x53\x00\x00\x00\x00'
msg += b'\x02\x40\x53\x00\x00\x00\x00\x02\x41\x53\x00\x00\x00\x00\x02\x44'
msg += b'\x53\x00\x00\x00\x00\x02\x45\x53\x00\x00\x00\x00\x02\x60\x53\x00'
msg += b'\x00\x00\x00\x02\x61\x53\x00\x00\x00\x00\x02\x60\x53\x00\x00\x00'
msg += b'\x00\x02\x20\x41\x00\x00\x02\x24\x53\x00\x00\x00\x00\x02\x24\x53'
msg += b'\x00\x00\x00\x00\x02\x26\x53\x00\x00\x00\x00\x02\x40\x53\x00\x00'
msg += b'\x00\x00\x02\x40\x53\x00\x00\x00\x00\x02\x80\x41\x00\x00\x02\x81'
msg += b'\x53\x00\x00\x00\x00\x02\x84\x53\x00\x00\x00\x00\x02\x85\x53\x00'
msg += b'\x00\x00\x00\x02\xc0\x53\x00\x00\x00\x00\x02\xc1\x53\x00\x00\x00'
msg += b'\x00\x02\xc0\x53\x00\x00\x00\x00\x02\xc1\x53\x00\x00\x00\x00\x02'
msg += b'\xc0\x53\x00\x00\x00\x00\x02\xc1\x53\x00\x00\x00\x00\x02\xc4\x53'
msg += b'\x00\x00\x00\x00\x02\x00\x53\x00\x00\x00\x00\x02\x80\x53\x00\x00'
msg += b'\x00\x00\x02\xc8\x42\x00\x00\x00\x00\x48\x42\x00\x00\x00\x00\x80'
msg += b'\x42\x00\x00\x00\x00\x04\x53\x00\x00\x00\x00\x01\x20\x53\x00\x00'
msg += b'\x00\x00\x01\x84\x53\x00\x10\x00\x00\x02\x40\x46\x00\x00\x00\x00'
msg += b'\x00\x00\x04\x04\x46\x02\x00\x46\x02\x00\x00\x04\x00\x46\x00\x00'
msg += b'\x00\x00\x00\x00\x05\x04\x42\x00\x00\x00\x05\x50\x42\x00\x00\x00'
msg += b'\x00\x14\x42\x00\x00\x00\x00\x00\x46\x00\x00\x00\x00\x00\x00\x00'
msg += b'\xa4\x46\x00\x00\x00\x00\x00\x00\x01\x00\x46\x00\x00\x00\x00\x00'
msg += b'\x00\x01\x44\x46\x00\x00\x00\x00\x00\x00\x02\x00\x46\x00\x00\x00'
msg += b'\x00\x00\x00\x08\x04\x46\x00\x00\x00\x00\x00\x00\x08\x90\x46\x00'
msg += b'\x00\x00\x00\x00\x00\x08\x54\x46\x00\x00\x00\x00\x00\x00\x09\x20'
msg += b'\x46\x00\x00\x00\x00\x00\x00\x08\x04\x46\x00\x00\x00\x00\x00\x00'
msg += b'\x08\x00\x46\x00\x00\x00\x00\x00\x00\x08\x84\x46\x00\x00\x00\x00'
msg += b'\x00\x00\x08\x40\x46\x00\x00\x00\x00\x00\x00\x09\x04\x46\x00\x00'
msg += b'\x00\x00\x00\x00\x0a\x10\x46\x00\x00\x00\x00\x00\x00\x0c\x14\x46'
msg += b'\x00\x00\x00\x00\x00\x00\x0c\x80\x46\x00\x00\x00\x00\x00\x00\x0c'
msg += b'\x24\x42\x00\x00\x00\x0d\x00\x42\x00\x00\x00\x00\x04\x42\x00\x00'
msg += b'\x00\x00\x00\x42\x00\x00\x00\x00\x44\x42\x00\x00\x00\x00\x10\x42'
msg += b'\x00\x00\x00\x01\x14\x53\x00\x00\x00\x00\x01\xa0\x53\x00\x00\x00'
msg += b'\x00\x10\x04\x53\x55\xaa\x00\x00\x10\x40\x53\x00\x00\x00\x00\x10'
msg += b'\x04\x53\x00\x00\x00\x00\x11\x00\x53\x00\x00\x00\x00\x11\x84\x53'
msg += b'\x00\x00\x00\x00\x10\x50\x53\xff\xff\x00\x00\x10\x14\x53\x03\x20'
msg += b'\x00\x00\x10\x00\x53\x00\x00\x00\x00\x11\x24\x53\x00\x00\x00\x00'
msg += b'\x03\x00\x53\x00\x00\x00\x00\x03\x64\x53\x00\x00\x00\x00\x04\x50'
msg += b'\x53\x00\x00\x00\x00\x00\x34\x53\x00\x00\x00\x00\x00\x00\x42\x02'
msg += b'\x00\x00\x01\x04\x42\x00\x00\x00\x21\x00\x42\x00\x00\x00\x21\x44'
msg += b'\x42\x00\x00\x00\x22\x10\x53\x00\x00\x00\x00\x28\x14\x42\x01\x00'
msg += b'\x00\x28\xa0\x46\x42\x48\x00\x00\x00\x00\x29\x04\x42\x00\x00\x00'
msg += b'\x29\x40\x42\x00\x00\x00\x28\x04\x46\x42\x10\x00\x00\x00\x00\x28'
msg += b'\x00\x42\x00\x00\x00\x28\x84\x42\x00\x00\x00\x28\x50\x42\x00\x00'
msg += b'\x00\x29\x14\x42\x00\x00\x00\x2a\x00\x42\x00\x00\x00\x2c\x24\x46'
msg += b'\x42\x10\x00\x00\x00\x00\x2c\x80\x42\x00\x00\x00\x2c\x44\x53\x00'
msg += b'\x02\x00\x00\x2d\x00\x42\x00\x00\x00\x20\x04\x46\x42\x4d\x00\x00'
msg += b'\x00\x00\x20\x10\x42\x00\x00\x00\x20\x54\x42\x00\x00\x00\x20\x20'
msg += b'\x42\x00\x00\x00\x21\x04\x53\x00\x01\x00\x00\x22\x00\x42\x00\x00'
msg += b'\x00\x30\x04\x42\x00\x00\x00\x30\x40\x53\x00\x00\x00\x00\x30\x04'
msg += b'\x53\x00\x00\x00\x00\x31\x10\x42\x00\x00\x00\x31\x94\x53\x00\x04'
msg += b'\x00\x00\x30\x00\x53\x00\x00\x00\x00\x30\x24\x53\x00\x00\x00\x00'
msg += b'\x30\x00\x53\x00\x00\x00\x00\x31\x04\x53\x00\x00\x00\x00\x31\x80'
msg += b'\x53\x00\x00\x00\x00\x32\x44\x53\x00\x00\x00\x00\x30\x00\x53\x00'
msg += b'\x00\x00\x00\x30\x80\x53\x00\x00\x00\x00\x30\x00\x53\x00\x00\x00'
msg += b'\x00\x30\x80\x53\x00\x00\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x03\x00'
msg += b'\x00\x00\x00\x00'
return msg
@pytest.fixture
def MsgInverterAck(): # Get Time Request message
return b'\x00\x00\x00\x14\x10R170000000000001\x99\x04\x01'
@@ -331,8 +435,7 @@ def test_read_message_in_chunks2(MsgContactInfo):
assert int(m.ctrl)==145
assert m.msg_id==0
assert m.msg_count == 1
while m.read(): # read rest of message
pass
m.read() # read rest of message
assert m.msg_count == 1
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
m.close()
@@ -471,9 +574,10 @@ def test_msg_get_time(ConfigTsunInv1, MsgGetTime):
assert int(m.ctrl)==145
assert m.msg_id==34
assert m.header_len==23
assert m.ts_offset==0
assert m.data_len==0
assert m._forward_buffer==MsgGetTime
assert m._send_buffer==b''
assert m._send_buffer==b'\x00\x00\x00\x1b\x10R170000000000001\x91"\x00\x00\x01\x89\xc6,_\x00'
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
m.close()
@@ -489,9 +593,10 @@ def test_msg_get_time_autark(ConfigNoTsunInv1, MsgGetTime):
assert int(m.ctrl)==145
assert m.msg_id==34
assert m.header_len==23
assert m.ts_offset==0
assert m.data_len==0
assert m._forward_buffer==b''
assert m._send_buffer==b'\x00\x00\x00\x1b\x10R170000000000001\x91"\x00\x00\x01\x8b\xdfs\xcc0'
assert m._send_buffer==bytearray(b'\x00\x00\x00\x1b\x10R170000000000001\x91"\x00\x00\x01\x89\xc6,_\x00')
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
m.close()
@@ -507,8 +612,9 @@ def test_msg_time_resp(ConfigTsunInv1, MsgTimeResp):
assert int(m.ctrl)==145
assert m.msg_id==34
assert m.header_len==23
assert m.ts_offset==3600000
assert m.data_len==8
assert m._forward_buffer==MsgTimeResp
assert m._forward_buffer==b''
assert m._send_buffer==b''
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
m.close()
@@ -525,12 +631,32 @@ def test_msg_time_resp_autark(ConfigNoTsunInv1, MsgTimeResp):
assert int(m.ctrl)==145
assert m.msg_id==34
assert m.header_len==23
assert m.ts_offset==3600000
assert m.data_len==8
assert m._forward_buffer==b''
assert m._send_buffer==b''
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
m.close()
def test_msg_time_inv_resp(ConfigTsunInv1, MsgTimeRespInv):
ConfigTsunInv1
m = MemoryStream(MsgTimeRespInv, (0,), False)
m.db.stat['proxy']['Unknown_Ctrl'] = 0
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
assert m.id_str == b"R170000000000001"
assert m.unique_id == 'R170000000000001'
assert int(m.ctrl)==145
assert m.msg_id==34
assert m.header_len==23
assert m.ts_offset==0
assert m.data_len==4
assert m._forward_buffer==MsgTimeRespInv
assert m._send_buffer==b''
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
m.close()
def test_msg_time_invalid(ConfigTsunInv1, MsgTimeInvalid):
ConfigTsunInv1
m = MemoryStream(MsgTimeInvalid, (0,), False)
@@ -543,6 +669,7 @@ def test_msg_time_invalid(ConfigTsunInv1, MsgTimeInvalid):
assert int(m.ctrl)==148
assert m.msg_id==34
assert m.header_len==23
assert m.ts_offset==0
assert m.data_len==0
assert m._forward_buffer==MsgTimeInvalid
assert m._send_buffer==b''
@@ -560,6 +687,7 @@ def test_msg_time_invalid_autark(ConfigNoTsunInv1, MsgTimeInvalid):
assert m.unique_id == 'R170000000000001'
assert int(m.ctrl)==148
assert m.msg_id==34
assert m.ts_offset==0
assert m.header_len==23
assert m.data_len==0
assert m._forward_buffer==b''
@@ -567,7 +695,7 @@ def test_msg_time_invalid_autark(ConfigNoTsunInv1, MsgTimeInvalid):
assert m.db.stat['proxy']['Unknown_Ctrl'] == 1
m.close()
def test_msg_cntrl_ind(ConfigTsunInv1, MsgControllerInd, MsgControllerAck):
def test_msg_cntrl_ind(ConfigTsunInv1, MsgControllerInd, MsgControllerIndTsOffs, MsgControllerAck):
ConfigTsunInv1
m = MemoryStream(MsgControllerInd, (0,))
m.db.stat['proxy']['Unknown_Ctrl'] = 0
@@ -580,7 +708,12 @@ def test_msg_cntrl_ind(ConfigTsunInv1, MsgControllerInd, MsgControllerAck):
assert m.msg_id==113
assert m.header_len==23
assert m.data_len==284
m.ts_offset = 0
m._update_header(m._forward_buffer)
assert m._forward_buffer==MsgControllerInd
m.ts_offset = -4096
m._update_header(m._forward_buffer)
assert m._forward_buffer==MsgControllerIndTsOffs
assert m._send_buffer==MsgControllerAck
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
m.close()
@@ -616,12 +749,17 @@ def test_msg_cntrl_invalid(ConfigTsunInv1, MsgControllerInvalid):
assert m.msg_id==113
assert m.header_len==23
assert m.data_len==1
m.ts_offset = 0
m._update_header(m._forward_buffer)
assert m._forward_buffer==MsgControllerInvalid
m.ts_offset = -4096
m._update_header(m._forward_buffer)
assert m._forward_buffer==MsgControllerInvalid
assert m._send_buffer==b''
assert m.db.stat['proxy']['Unknown_Ctrl'] == 1
m.close()
def test_msg_inv_ind(ConfigTsunInv1, MsgInverterInd, MsgInverterAck):
def test_msg_inv_ind(ConfigTsunInv1, MsgInverterInd, MsgInverterIndTsOffs, MsgInverterAck):
ConfigTsunInv1
tracer.setLevel(logging.DEBUG)
m = MemoryStream(MsgInverterInd, (0,))
@@ -635,11 +773,62 @@ def test_msg_inv_ind(ConfigTsunInv1, MsgInverterInd, MsgInverterAck):
assert m.msg_id==4
assert m.header_len==23
assert m.data_len==120
m.ts_offset = 0
m._update_header(m._forward_buffer)
assert m._forward_buffer==MsgInverterInd
m.ts_offset = +256
m._update_header(m._forward_buffer)
assert m._forward_buffer==MsgInverterIndTsOffs
assert m._send_buffer==MsgInverterAck
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
m.close()
def test_msg_inv_ind2(ConfigTsunInv1, MsgInverterIndNew, MsgInverterIndTsOffs, MsgInverterAck):
ConfigTsunInv1
tracer.setLevel(logging.DEBUG)
m = MemoryStream(MsgInverterIndNew, (0,))
m.db.stat['proxy']['Unknown_Ctrl'] = 0
m.db.stat['proxy']['Invalid_Data_Type'] = 0
m.read() # read complete msg, and dispatch msg
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
assert m.db.stat['proxy']['Invalid_Data_Type'] == 0
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
assert m.id_str == b"R170000000000001"
assert m.unique_id == 'R170000000000001'
assert int(m.ctrl)==145
assert m.msg_id==4
assert m.header_len==23
assert m.data_len==1165
m.ts_offset = 0
m._update_header(m._forward_buffer)
assert m._forward_buffer==MsgInverterIndNew
assert m._send_buffer==MsgInverterAck
m.close()
def test_msg_inv_ind2(ConfigTsunInv1, MsgInverterIndNew, MsgInverterIndTsOffs, MsgInverterAck):
ConfigTsunInv1
tracer.setLevel(logging.DEBUG)
m = MemoryStream(MsgInverterIndNew, (0,))
m.db.stat['proxy']['Unknown_Ctrl'] = 0
m.db.stat['proxy']['Invalid_Data_Type'] = 0
m.read() # read complete msg, and dispatch msg
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
assert m.db.stat['proxy']['Invalid_Data_Type'] == 0
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
assert m.id_str == b"R170000000000001"
assert m.unique_id == 'R170000000000001'
assert int(m.ctrl)==145
assert m.msg_id==4
assert m.header_len==23
assert m.data_len==1165
m.ts_offset = 0
m._update_header(m._forward_buffer)
assert m._forward_buffer==MsgInverterIndNew
assert m._send_buffer==MsgInverterAck
m.close()
def test_msg_inv_ack(ConfigTsunInv1, MsgInverterAck):
ConfigTsunInv1
tracer.setLevel(logging.ERROR)
@@ -673,6 +862,11 @@ def test_msg_inv_invalid(ConfigTsunInv1, MsgInverterInvalid):
assert m.msg_id==4
assert m.header_len==23
assert m.data_len==1
m.ts_offset = 0
m._update_header(m._forward_buffer)
assert m._forward_buffer==MsgInverterInvalid
m.ts_offset = 256
m._update_header(m._forward_buffer)
assert m._forward_buffer==MsgInverterInvalid
assert m._send_buffer==b''
assert m.db.stat['proxy']['Unknown_Ctrl'] == 1
@@ -692,6 +886,11 @@ def test_msg_ota_req(ConfigTsunInv1, MsgOtaReq):
assert m.msg_id==19
assert m.header_len==23
assert m.data_len==259
m.ts_offset = 0
m._update_header(m._forward_buffer)
assert m._forward_buffer==MsgOtaReq
m.ts_offset = 4096
m._update_header(m._forward_buffer)
assert m._forward_buffer==MsgOtaReq
assert m._send_buffer==b''
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
@@ -714,6 +913,11 @@ def test_msg_ota_ack(ConfigTsunInv1, MsgOtaAck):
assert m.msg_id==19
assert m.header_len==23
assert m.data_len==1
m.ts_offset = 0
m._update_header(m._forward_buffer)
assert m._forward_buffer==MsgOtaAck
m.ts_offset = 256
m._update_header(m._forward_buffer)
assert m._forward_buffer==MsgOtaAck
assert m._send_buffer==b''
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
@@ -734,7 +938,12 @@ def test_msg_ota_invalid(ConfigTsunInv1, MsgOtaInvalid):
assert m.msg_id==19
assert m.header_len==23
assert m.data_len==1
m.ts_offset = 0
m._update_header(m._forward_buffer)
assert m._forward_buffer==MsgOtaInvalid
m.ts_offset = 4096
assert m._forward_buffer==MsgOtaInvalid
m._update_header(m._forward_buffer)
assert m._send_buffer==b''
assert m.db.stat['proxy']['Unknown_Ctrl'] == 1
assert m.db.stat['proxy']['OTA_Start_Msg'] == 0
@@ -847,7 +1056,7 @@ def test_msg_modbus_req(ConfigTsunInv1, MsgModbusCmd):
ConfigTsunInv1
m = MemoryStream(b'')
m.id_str = b"R170000000000001"
m.state = m.STATE_UP
m.state = State.up
c = m.createClientStream(MsgModbusCmd)
@@ -953,6 +1162,29 @@ def test_msg_modbus_rsp1(ConfigTsunInv1, MsgModbusRsp):
assert m.db.stat['proxy']['Modbus_Command'] == 0
m.close()
def test_msg_modbus_cloud_rsp(ConfigTsunInv1, MsgModbusRsp):
'''Modbus response from TSUN without a valid Modbus request must be dropped'''
ConfigTsunInv1
m = MemoryStream(MsgModbusRsp, (0,), False)
m.db.stat['proxy']['Unknown_Ctrl'] = 0
m.db.stat['proxy']['Unknown_Msg'] = 0
m.db.stat['proxy']['Modbus_Command'] = 0
m.read() # read complete msg, and dispatch msg
assert not m.header_valid # must be invalid, since msg was handled and buffer flushed
assert m.msg_count == 1
assert m.id_str == b"R170000000000001"
assert m.unique_id == 'R170000000000001'
assert int(m.ctrl)==145
assert m.msg_id==119
assert m.header_len==23
assert m.data_len==13
assert m._forward_buffer==b''
assert m._send_buffer==b''
assert m.db.stat['proxy']['Unknown_Msg'] == 1
assert m.db.stat['proxy']['Unknown_Ctrl'] == 0
assert m.db.stat['proxy']['Modbus_Command'] == 0
m.close()
def test_msg_modbus_rsp2(ConfigTsunInv1, MsgModbusResp20):
'''Modbus response with a valid Modbus request must be forwarded'''
ConfigTsunInv1
@@ -1097,7 +1329,7 @@ async def test_msg_build_modbus_req(ConfigTsunInv1, MsgModbusCmd):
assert m._send_buffer == b''
assert m.writer.sent_pdu == b''
m.state = m.STATE_UP
m.state = State.up
await m.send_modbus_cmd(Modbus.WRITE_SINGLE_REG, 0x2008, 0, logging.DEBUG)
assert 0 == m.send_msg_ofs
assert m._forward_buffer == b''
@@ -1127,21 +1359,21 @@ def test_zombie_conn(ConfigTsunInv1, MsgInverterInd):
m3 = MemoryStream(MsgInverterInd, (0,))
assert MemoryStream._RefNo == 3 + start_val
assert m3.RefNo == 3 + start_val
assert m1.state == m1.STATE_INIT
assert m2.state == m2.STATE_INIT
assert m3.state == m3.STATE_INIT
assert m1.state == m1.State.init
assert m2.state == m2.State.init
assert m3.state == m3.State.init
m1.read() # read complete msg, and set unique_id
assert m1.state == m1.STATE_UP
assert m2.state == m2.STATE_INIT
assert m3.state == m3.STATE_INIT
assert m1.state == m1.State.up
assert m2.state == m2.State.init
assert m3.state == m3.State.init
m2.read() # read complete msg, and set unique_id
assert m1.state == m1.STATE_CLOSED
assert m2.state == m2.STATE_UP
assert m3.state == m3.STATE_INIT
assert m1.state == m1.State.closed
assert m2.state == m2.State.up
assert m3.state == m3.State.init
m3.read() # read complete msg, and set unique_id
assert m1.state == m1.STATE_CLOSED
assert m2.state == m2.STATE_CLOSED
assert m3.state == m3.STATE_UP
assert m1.state == m1.State.closed
assert m2.state == m2.State.closed
assert m3.state == m3.State.up
m1.close()
m2.close()
m3.close()

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 8.4 KiB

View File

@@ -0,0 +1,26 @@
// {type:sequence}
// {generate:true}
[Inverter]ContactInd>[Proxy]
[Proxy]-[note: store Contact Info in proxy{bg:cornsilk}]
[Proxy]ContactRsp (Ok).>[Inverter]
[Inverter]getTimeReq>[Proxy]
[Proxy]ContactInd>[Cloud]
[Cloud]ContactRsp (Ok).>[Proxy]
[Proxy]getTimeReq>[Cloud]
[Cloud]TimeRsp (time).>[Proxy]
[Proxy]TimeRsp (time).>[Inverter]
[Inverter]-[note: set clock in inverter{bg:cornsilk}]
[Inverter]DataInd (ts:=time)>[Proxy]
[Proxy]DataRsp>[Inverter]
[Proxy]DataInd (ts)>>[Cloud]
[Proxy]DataInd>>[MQTT-Broker]
[Cloud]DataRsp>>[Proxy]
[Inverter]DataInd (ts:=time)>[Proxy]
[Proxy]DataRsp>[Inverter]
[Proxy]DataInd (ts)>>[Cloud]
[Proxy]DataInd>>[MQTT-Broker]
[Cloud]DataRsp>>[Proxy]

View File

@@ -77,10 +77,15 @@ services:
- ${DNS2:-4.4.4.4}
ports:
- 5005:5005
- 8127:8127
- 10000:10000
volumes:
- ${PROJECT_DIR:-./}tsun-proxy/log:/home/tsun-proxy/log
- ${PROJECT_DIR:-./}tsun-proxy/config:/home/tsun-proxy/config
healthcheck:
test: wget --no-verbose --tries=1 --spider http://localhost:8127/-/healthy || exit 1
interval: 10s
timeout: 3s
networks:
- outside

View File

@@ -89,6 +89,24 @@ def MsgDataResp(): # Contact Response message
return msg
@pytest.fixture
def MsgInvalidInfo(): # Contact Info message wrong start byte
msg = b'\x47\xd4\x00\x10\x41\x00\x01' +get_sn() +b'\x02\xba\xd2\x00\x00'
msg += b'\x19\x00\x00\x00\x00\x00\x00\x00\x05\x3c\x78\x01\x64\x01\x4c\x53'
msg += b'\x57\x35\x42\x4c\x45\x5f\x31\x37\x5f\x30\x32\x42\x30\x5f\x31\x2e'
msg += b'\x30\x35\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x40\x2a\x8f\x4f\x51\x54\x31\x39\x32\x2e'
msg += b'\x31\x36\x38\x2e\x38\x30\x2e\x34\x39\x00\x00\x00\x0f\x00\x01\xb0'
msg += b'\x02\x0f\x00\xff\x56\x31\x2e\x31\x2e\x30\x30\x2e\x30\x42\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xfe\xfe\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x41\x6c\x6c\x69\x75\x73\x2d\x48\x6f'
msg += b'\x6d\x65\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
msg += b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x3c'
msg += b'\x15'
return msg
@pytest.fixture(scope="session")
@@ -155,4 +173,22 @@ def test_data_ind(ClientConnection,MsgDataInd, MsgDataResp):
# time.sleep(2.5)
checkResponse(data, MsgDataResp)
def test_inavlid_msg(ClientConnection,MsgInvalidInfo,MsgContactInfo, MsgContactResp):
s = ClientConnection
try:
s.sendall(MsgInvalidInfo)
# time.sleep(2.5)
data = s.recv(1024)
except TimeoutError:
pass
# time.sleep(2.5)
try:
s.sendall(MsgContactInfo)
# time.sleep(2.5)
data = s.recv(1024)
except TimeoutError:
pass
# time.sleep(2.5)
checkResponse(data, MsgContactResp)