- refactoring
- remove _forward_buffer - make async_write private
This commit is contained in:
@@ -8,10 +8,10 @@ from itertools import count
|
|||||||
|
|
||||||
if __name__ == "app.src.async_stream":
|
if __name__ == "app.src.async_stream":
|
||||||
from app.src.async_ifc import AsyncIfc
|
from app.src.async_ifc import AsyncIfc
|
||||||
from app.src.messages import hex_dump_memory, State
|
from app.src.messages import State
|
||||||
else: # pragma: no cover
|
else: # pragma: no cover
|
||||||
from async_ifc import AsyncIfc
|
from async_ifc import AsyncIfc
|
||||||
from messages import hex_dump_memory, State
|
from messages import State
|
||||||
|
|
||||||
|
|
||||||
import gc
|
import gc
|
||||||
@@ -122,7 +122,7 @@ class AsyncStream():
|
|||||||
dead_conn_to)
|
dead_conn_to)
|
||||||
|
|
||||||
if self.unique_id:
|
if self.unique_id:
|
||||||
await self.async_write()
|
await self.__async_write()
|
||||||
await self.__async_forward()
|
await self.__async_forward()
|
||||||
await self.async_publ_mqtt()
|
await self.async_publ_mqtt()
|
||||||
|
|
||||||
@@ -156,7 +156,7 @@ class AsyncStream():
|
|||||||
f"{traceback.format_exc()}")
|
f"{traceback.format_exc()}")
|
||||||
await asyncio.sleep(0) # be cooperative to other task
|
await asyncio.sleep(0) # be cooperative to other task
|
||||||
|
|
||||||
async def async_write(self, headline: str = 'Transmit to ') -> None:
|
async def __async_write(self, headline: str = 'Transmit to ') -> None:
|
||||||
"""Async write handler to transmit the send_buffer"""
|
"""Async write handler to transmit the send_buffer"""
|
||||||
if len(self.ifc.write) > 0:
|
if len(self.ifc.write) > 0:
|
||||||
self.ifc.write.logging(logging.INFO, f'{headline}{self.addr}:')
|
self.ifc.write.logging(logging.INFO, f'{headline}{self.addr}:')
|
||||||
@@ -212,24 +212,21 @@ class AsyncStream():
|
|||||||
|
|
||||||
async def __async_forward(self) -> None:
|
async def __async_forward(self) -> None:
|
||||||
"""forward handler transmits data over the remote connection"""
|
"""forward handler transmits data over the remote connection"""
|
||||||
if not self._forward_buffer:
|
if len(self.ifc.forward) == 0:
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
if not self.remote_stream:
|
if not self.remote_stream:
|
||||||
await self.async_create_remote()
|
await self.async_create_remote()
|
||||||
if self.remote_stream:
|
if self.remote_stream:
|
||||||
if self.remote_stream._init_new_client_conn():
|
if self.remote_stream._init_new_client_conn():
|
||||||
await self.remote_stream.async_write()
|
await self.remote_stream.__async_write()
|
||||||
|
|
||||||
if self.remote_stream:
|
if self.remote_stream:
|
||||||
self.remote_stream._update_header(self._forward_buffer)
|
self.remote_stream._update_header(self.ifc.forward.peek())
|
||||||
hex_dump_memory(logging.INFO,
|
self.ifc.forward.logging(logging.INFO, 'Forward to '
|
||||||
f'Forward to {self.remote_stream.addr}:',
|
f'{self.remote_stream.addr}:')
|
||||||
self._forward_buffer,
|
self.remote_stream._writer.write(self.ifc.forward.get())
|
||||||
len(self._forward_buffer))
|
|
||||||
self.remote_stream._writer.write(self._forward_buffer)
|
|
||||||
await self.remote_stream._writer.drain()
|
await self.remote_stream._writer.drain()
|
||||||
self._forward_buffer = bytearray(0)
|
|
||||||
|
|
||||||
except OSError as error:
|
except OSError as error:
|
||||||
if self.remote_stream:
|
if self.remote_stream:
|
||||||
|
|||||||
Reference in New Issue
Block a user