hat.mariner.transport
1from hat.mariner.transport.common import (InitReqMsg, 2 InitResMsg, 3 StatusMsg, 4 EventsMsg, 5 RegisterReqMsg, 6 RegisterResMsg, 7 QueryReqMsg, 8 QueryResMsg, 9 PingReqMsg, 10 PingResMsg, 11 Msg) 12from hat.mariner.transport.connection import (ConnectionCb, 13 connect, 14 listen, 15 Connection) 16 17 18__all__ = ['InitReqMsg', 19 'InitResMsg', 20 'StatusMsg', 21 'EventsMsg', 22 'RegisterReqMsg', 23 'RegisterResMsg', 24 'QueryReqMsg', 25 'QueryResMsg', 26 'PingReqMsg', 27 'PingResMsg', 28 'Msg', 29 'ConnectionCb', 30 'connect', 31 'listen', 32 'Connection']
class
InitReqMsg(typing.NamedTuple):
10class InitReqMsg(typing.NamedTuple): 11 client_name: str 12 client_token: str | None 13 subscriptions: Collection[hat.event.common.EventType] 14 server_id: hat.event.common.ServerId | None 15 persisted: bool
InitReqMsg(client_name, client_token, subscriptions, server_id, persisted)
class
InitResMsg(typing.NamedTuple):
18class InitResMsg(typing.NamedTuple): 19 success: bool 20 status: hat.event.common.Status | None 21 error: str | None
InitResMsg(success, status, error)
class
StatusMsg(typing.NamedTuple):
StatusMsg(status,)
class
EventsMsg(typing.NamedTuple):
EventsMsg(events,)
class
RegisterReqMsg(typing.NamedTuple):
32class RegisterReqMsg(typing.NamedTuple): 33 register_id: int 34 register_events: Collection[hat.event.common.RegisterEvent]
RegisterReqMsg(register_id, register_events)
class
RegisterResMsg(typing.NamedTuple):
37class RegisterResMsg(typing.NamedTuple): 38 register_id: int 39 success: bool 40 events: Collection[hat.event.common.Event] | None
RegisterResMsg(register_id, success, events)
class
QueryReqMsg(typing.NamedTuple):
QueryReqMsg(query_id, params)
class
QueryResMsg(typing.NamedTuple):
QueryResMsg(query_id, result)
class
PingReqMsg(typing.NamedTuple):
PingReqMsg(ping_id,)
class
PingResMsg(typing.NamedTuple):
PingResMsg(ping_id,)
Msg =
InitReqMsg | InitResMsg | StatusMsg | EventsMsg | RegisterReqMsg | RegisterResMsg | QueryReqMsg | QueryResMsg | PingReqMsg | PingResMsg
ConnectionCb =
typing.Callable[[ForwardRef('Connection')], None | collections.abc.Awaitable[None]]
async def
listen( connection_cb: Callable[[Connection], None | Awaitable[None]], addr: hat.drivers.tcp.Address, **kwargs) -> hat.drivers.tcp.Server:
class
Connection(hat.aio.group.Resource):
38class Connection(aio.Resource): 39 40 def __init__(self, conn: tcp.Connection): 41 self._conn = conn 42 43 @property 44 def async_group(self) -> aio.Group: 45 return self._conn.async_group 46 47 @property 48 def info(self) -> tcp.ConnectionInfo: 49 return self._conn.info 50 51 async def drain(self): 52 await self._conn.drain() 53 54 async def send(self, msg: common.Msg): 55 msg_json = encoder.encode_msg(msg) 56 msg_bytes = json.encode(msg_json).encode() 57 msg_len = len(msg_bytes) 58 len_size = math.ceil(msg_len.bit_length() / 8) 59 60 if len_size < 1 or len_size > 8: 61 raise ValueError('unsupported msg size') 62 63 data = bytes(itertools.chain([len_size], 64 msg_len.to_bytes(len_size, 'big'), 65 msg_bytes)) 66 await self._conn.write(data) 67 68 async def receive(self) -> common.Msg: 69 len_size_bytes = await self._conn.readexactly(1) 70 len_size = len_size_bytes[0] 71 72 if len_size < 1 or len_size > 8: 73 raise ValueError('unsupported msg size') 74 75 msg_len_bytes = await self._conn.readexactly(len_size) 76 msg_len = int.from_bytes(msg_len_bytes, 'big') 77 78 msg_bytes = await self._conn.readexactly(msg_len) 79 msg_str = str(msg_bytes, encoding='utf-8') 80 msg_json = json.decode(msg_str) 81 82 return encoder.decode_msg(msg_json)
Resource with lifetime control based on Group
.
async def
send( self, msg: InitReqMsg | InitResMsg | StatusMsg | EventsMsg | RegisterReqMsg | RegisterResMsg | QueryReqMsg | QueryResMsg | PingReqMsg | PingResMsg):
54 async def send(self, msg: common.Msg): 55 msg_json = encoder.encode_msg(msg) 56 msg_bytes = json.encode(msg_json).encode() 57 msg_len = len(msg_bytes) 58 len_size = math.ceil(msg_len.bit_length() / 8) 59 60 if len_size < 1 or len_size > 8: 61 raise ValueError('unsupported msg size') 62 63 data = bytes(itertools.chain([len_size], 64 msg_len.to_bytes(len_size, 'big'), 65 msg_bytes)) 66 await self._conn.write(data)
async def
receive( self) -> InitReqMsg | InitResMsg | StatusMsg | EventsMsg | RegisterReqMsg | RegisterResMsg | QueryReqMsg | QueryResMsg | PingReqMsg | PingResMsg:
68 async def receive(self) -> common.Msg: 69 len_size_bytes = await self._conn.readexactly(1) 70 len_size = len_size_bytes[0] 71 72 if len_size < 1 or len_size > 8: 73 raise ValueError('unsupported msg size') 74 75 msg_len_bytes = await self._conn.readexactly(len_size) 76 msg_len = int.from_bytes(msg_len_bytes, 'big') 77 78 msg_bytes = await self._conn.readexactly(msg_len) 79 msg_str = str(msg_bytes, encoding='utf-8') 80 msg_json = json.decode(msg_str) 81 82 return encoder.decode_msg(msg_json)