calc processing time for healthcheck
This commit is contained in:
@@ -2,7 +2,7 @@ import logging
|
|||||||
import traceback
|
import traceback
|
||||||
import time
|
import time
|
||||||
from asyncio import StreamReader, StreamWriter
|
from asyncio import StreamReader, StreamWriter
|
||||||
from messages import hex_dump_memory
|
from messages import hex_dump_memory, State
|
||||||
from typing import Self
|
from typing import Self
|
||||||
|
|
||||||
import gc
|
import gc
|
||||||
@@ -19,6 +19,8 @@ class AsyncStream():
|
|||||||
self.addr = addr
|
self.addr = addr
|
||||||
self.r_addr = ''
|
self.r_addr = ''
|
||||||
self.l_addr = ''
|
self.l_addr = ''
|
||||||
|
self.proc_start = None # start processing start timestamp
|
||||||
|
self.proc_max = 0
|
||||||
|
|
||||||
async def server_loop(self, addr: str) -> None:
|
async def server_loop(self, addr: str) -> None:
|
||||||
'''Loop for receiving messages from the inverter (server-side)'''
|
'''Loop for receiving messages from the inverter (server-side)'''
|
||||||
@@ -63,8 +65,14 @@ class AsyncStream():
|
|||||||
"""Async loop handler for precessing all received messages"""
|
"""Async loop handler for precessing all received messages"""
|
||||||
self.r_addr = self.writer.get_extra_info('peername')
|
self.r_addr = self.writer.get_extra_info('peername')
|
||||||
self.l_addr = self.writer.get_extra_info('sockname')
|
self.l_addr = self.writer.get_extra_info('sockname')
|
||||||
|
self.proc_start = time.time()
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
|
proc = time.time() - self.proc_start
|
||||||
|
if proc > self.proc_max:
|
||||||
|
self.proc_max = proc
|
||||||
|
self.proc_start = None
|
||||||
|
|
||||||
await self.__async_read()
|
await self.__async_read()
|
||||||
|
|
||||||
if self.unique_id:
|
if self.unique_id:
|
||||||
@@ -123,10 +131,11 @@ class AsyncStream():
|
|||||||
elapsed = 0
|
elapsed = 0
|
||||||
if self.proc_start is not None:
|
if self.proc_start is not None:
|
||||||
elapsed = time.time() - self.proc_start
|
elapsed = time.time() - self.proc_start
|
||||||
logging.info('async_stream healthy() elapsed: '
|
if self.state == State.closed or elapsed > 1:
|
||||||
f'{round(1000*elapsed)}ms'
|
logging.info(f'[{self.node_id}]'
|
||||||
f' max:{round(1000*self.proc_max)}ms')
|
f' act:{round(1000*elapsed)}ms'
|
||||||
logging.info(f'Healthy()) refs: {gc.get_referrers(self)}')
|
f' max:{round(1000*self.proc_max)}ms')
|
||||||
|
logging.info(f'Healthy()) refs: {gc.get_referrers(self)}')
|
||||||
return elapsed < 5
|
return elapsed < 5
|
||||||
|
|
||||||
'''
|
'''
|
||||||
@@ -136,6 +145,7 @@ class AsyncStream():
|
|||||||
"""Async read handler to read received data from TCP stream"""
|
"""Async read handler to read received data from TCP stream"""
|
||||||
data = await self.reader.read(4096)
|
data = await self.reader.read(4096)
|
||||||
if data:
|
if data:
|
||||||
|
self.proc_start = time.time()
|
||||||
self._recv_buffer += data
|
self._recv_buffer += data
|
||||||
self.read() # call read in parent class
|
self.read() # call read in parent class
|
||||||
else:
|
else:
|
||||||
|
|||||||
Reference in New Issue
Block a user