hat.mariner.transport

 1from hat.mariner.transport.common import (InitReqMsg,
 2                                          InitResMsg,
 3                                          StatusMsg,
 4                                          EventsMsg,
 5                                          RegisterReqMsg,
 6                                          RegisterResMsg,
 7                                          QueryReqMsg,
 8                                          QueryResMsg,
 9                                          PingReqMsg,
10                                          PingResMsg,
11                                          Msg)
12from hat.mariner.transport.connection import (ConnectionCb,
13                                              connect,
14                                              listen,
15                                              Connection)
16
17
18__all__ = ['InitReqMsg',
19           'InitResMsg',
20           'StatusMsg',
21           'EventsMsg',
22           'RegisterReqMsg',
23           'RegisterResMsg',
24           'QueryReqMsg',
25           'QueryResMsg',
26           'PingReqMsg',
27           'PingResMsg',
28           'Msg',
29           'ConnectionCb',
30           'connect',
31           'listen',
32           'Connection']
class InitReqMsg(typing.NamedTuple):
10class InitReqMsg(typing.NamedTuple):
11    client_name: str
12    client_token: str | None
13    subscriptions: Collection[hat.event.common.EventType]
14    server_id: hat.event.common.ServerId | None
15    persisted: bool

InitReqMsg(client_name, client_token, subscriptions, server_id, persisted)

InitReqMsg( client_name: str, client_token: str | None, subscriptions: Collection[tuple[str, ...]], server_id: int | None, persisted: bool)

Create new instance of InitReqMsg(client_name, client_token, subscriptions, server_id, persisted)

client_name: str

Alias for field number 0

client_token: str | None

Alias for field number 1

subscriptions: Collection[tuple[str, ...]]

Alias for field number 2

server_id: int | None

Alias for field number 3

persisted: bool

Alias for field number 4

class InitResMsg(typing.NamedTuple):
18class InitResMsg(typing.NamedTuple):
19    success: bool
20    status: hat.event.common.Status | None
21    error: str | None

InitResMsg(success, status, error)

InitResMsg( success: bool, status: hat.event.common.common.Status | None, error: str | None)

Create new instance of InitResMsg(success, status, error)

success: bool

Alias for field number 0

status: hat.event.common.common.Status | None

Alias for field number 1

error: str | None

Alias for field number 2

class StatusMsg(typing.NamedTuple):
24class StatusMsg(typing.NamedTuple):
25    status: hat.event.common.Status

StatusMsg(status,)

StatusMsg(status: hat.event.common.common.Status)

Create new instance of StatusMsg(status,)

status: hat.event.common.common.Status

Alias for field number 0

class EventsMsg(typing.NamedTuple):
28class EventsMsg(typing.NamedTuple):
29    events: Collection[hat.event.common.Event]

EventsMsg(events,)

EventsMsg(events: Collection[hat.event.common.common.Event])

Create new instance of EventsMsg(events,)

events: Collection[hat.event.common.common.Event]

Alias for field number 0

class RegisterReqMsg(typing.NamedTuple):
32class RegisterReqMsg(typing.NamedTuple):
33    register_id: int
34    register_events: Collection[hat.event.common.RegisterEvent]

RegisterReqMsg(register_id, register_events)

RegisterReqMsg( register_id: int, register_events: Collection[hat.event.common.common.RegisterEvent])

Create new instance of RegisterReqMsg(register_id, register_events)

register_id: int

Alias for field number 0

register_events: Collection[hat.event.common.common.RegisterEvent]

Alias for field number 1

class RegisterResMsg(typing.NamedTuple):
37class RegisterResMsg(typing.NamedTuple):
38    register_id: int
39    success: bool
40    events: Collection[hat.event.common.Event] | None

RegisterResMsg(register_id, success, events)

RegisterResMsg( register_id: int, success: bool, events: Collection[hat.event.common.common.Event] | None)

Create new instance of RegisterResMsg(register_id, success, events)

register_id: int

Alias for field number 0

success: bool

Alias for field number 1

events: Collection[hat.event.common.common.Event] | None

Alias for field number 2

class QueryReqMsg(typing.NamedTuple):
43class QueryReqMsg(typing.NamedTuple):
44    query_id: int
45    params: hat.event.common.QueryParams

QueryReqMsg(query_id, params)

QueryReqMsg( query_id: int, params: hat.event.common.common.QueryLatestParams | hat.event.common.common.QueryTimeseriesParams | hat.event.common.common.QueryServerParams)

Create new instance of QueryReqMsg(query_id, params)

query_id: int

Alias for field number 0

params: hat.event.common.common.QueryLatestParams | hat.event.common.common.QueryTimeseriesParams | hat.event.common.common.QueryServerParams

Alias for field number 1

class QueryResMsg(typing.NamedTuple):
48class QueryResMsg(typing.NamedTuple):
49    query_id: int
50    result: hat.event.common.QueryResult

QueryResMsg(query_id, result)

QueryResMsg(query_id: int, result: hat.event.common.common.QueryResult)

Create new instance of QueryResMsg(query_id, result)

query_id: int

Alias for field number 0

result: hat.event.common.common.QueryResult

Alias for field number 1

class PingReqMsg(typing.NamedTuple):
53class PingReqMsg(typing.NamedTuple):
54    ping_id: int

PingReqMsg(ping_id,)

PingReqMsg(ping_id: int)

Create new instance of PingReqMsg(ping_id,)

ping_id: int

Alias for field number 0

class PingResMsg(typing.NamedTuple):
57class PingResMsg(typing.NamedTuple):
58    ping_id: int

PingResMsg(ping_id,)

PingResMsg(ping_id: int)

Create new instance of PingResMsg(ping_id,)

ping_id: int

Alias for field number 0

ConnectionCb = typing.Callable[[ForwardRef('Connection')], None | collections.abc.Awaitable[None]]
async def connect( addr: hat.drivers.tcp.Address, **kwargs) -> Connection:
17async def connect(addr: tcp.Address,
18                  **kwargs
19                  ) -> 'Connection':
20    conn = await tcp.connect(addr, **kwargs)
21
22    return Connection(conn)
async def listen( connection_cb: Callable[[Connection], None | Awaitable[None]], addr: hat.drivers.tcp.Address, **kwargs) -> hat.drivers.tcp.Server:
25async def listen(connection_cb: ConnectionCb,
26                 addr: tcp.Address,
27                 **kwargs
28                 ) -> tcp.Server:
29
30    async def on_connection(conn):
31        await aio.call(connection_cb, Connection(conn))
32
33    srv = await tcp.listen(on_connection, addr, **kwargs)
34
35    return srv
class Connection(hat.aio.group.Resource):
38class Connection(aio.Resource):
39
40    def __init__(self, conn: tcp.Connection):
41        self._conn = conn
42
43    @property
44    def async_group(self) -> aio.Group:
45        return self._conn.async_group
46
47    @property
48    def info(self) -> tcp.ConnectionInfo:
49        return self._conn.info
50
51    async def drain(self):
52        await self._conn.drain()
53
54    async def send(self, msg: common.Msg):
55        msg_json = encoder.encode_msg(msg)
56        msg_bytes = json.encode(msg_json).encode()
57        msg_len = len(msg_bytes)
58        len_size = math.ceil(msg_len.bit_length() / 8)
59
60        if len_size < 1 or len_size > 8:
61            raise ValueError('unsupported msg size')
62
63        data = bytes(itertools.chain([len_size],
64                                     msg_len.to_bytes(len_size, 'big'),
65                                     msg_bytes))
66        await self._conn.write(data)
67
68    async def receive(self) -> common.Msg:
69        len_size_bytes = await self._conn.readexactly(1)
70        len_size = len_size_bytes[0]
71
72        if len_size < 1 or len_size > 8:
73            raise ValueError('unsupported msg size')
74
75        msg_len_bytes = await self._conn.readexactly(len_size)
76        msg_len = int.from_bytes(msg_len_bytes, 'big')
77
78        msg_bytes = await self._conn.readexactly(msg_len)
79        msg_str = str(msg_bytes, encoding='utf-8')
80        msg_json = json.decode(msg_str)
81
82        return encoder.decode_msg(msg_json)

Resource with lifetime control based on Group.

Connection(conn: hat.drivers.tcp.Connection)
40    def __init__(self, conn: tcp.Connection):
41        self._conn = conn
async_group: hat.aio.group.Group
43    @property
44    def async_group(self) -> aio.Group:
45        return self._conn.async_group

Group controlling resource's lifetime.

info: hat.drivers.tcp.ConnectionInfo
47    @property
48    def info(self) -> tcp.ConnectionInfo:
49        return self._conn.info
async def drain(self):
51    async def drain(self):
52        await self._conn.drain()
54    async def send(self, msg: common.Msg):
55        msg_json = encoder.encode_msg(msg)
56        msg_bytes = json.encode(msg_json).encode()
57        msg_len = len(msg_bytes)
58        len_size = math.ceil(msg_len.bit_length() / 8)
59
60        if len_size < 1 or len_size > 8:
61            raise ValueError('unsupported msg size')
62
63        data = bytes(itertools.chain([len_size],
64                                     msg_len.to_bytes(len_size, 'big'),
65                                     msg_bytes))
66        await self._conn.write(data)
68    async def receive(self) -> common.Msg:
69        len_size_bytes = await self._conn.readexactly(1)
70        len_size = len_size_bytes[0]
71
72        if len_size < 1 or len_size > 8:
73            raise ValueError('unsupported msg size')
74
75        msg_len_bytes = await self._conn.readexactly(len_size)
76        msg_len = int.from_bytes(msg_len_bytes, 'big')
77
78        msg_bytes = await self._conn.readexactly(msg_len)
79        msg_str = str(msg_bytes, encoding='utf-8')
80        msg_json = json.decode(msg_str)
81
82        return encoder.decode_msg(msg_json)