hat.mariner.server.server
1import logging 2 3from hat import aio 4from hat import json 5from hat.drivers import tcp 6import hat.event.common 7import hat.event.eventer 8 9from hat.mariner import transport 10 11 12mlog: logging.Logger = logging.getLogger(__name__) 13 14 15async def create_server(conf: json.Data) -> 'Server': 16 srv = Server() 17 srv._name = conf['name'] 18 srv._eventer_conf = conf['eventer'] 19 srv._client_confs = {client_conf['name']: client_conf 20 for client_conf in conf['clients']} 21 22 srv._srv = await transport.listen( 23 connection_cb=srv._on_connection, 24 addr=tcp.Address(host=conf['mariner']['host'], 25 port=conf['mariner']['port']), 26 bind_connections=True) 27 28 return srv 29 30 31class Server(aio.Resource): 32 33 @property 34 def async_group(self) -> aio.Group: 35 return self._srv.async_group 36 37 async def _on_connection(self, mariner_conn): 38 try: 39 client = await _create_client(component_name=self._name, 40 eventer_conf=self._eventer_conf, 41 client_confs=self._client_confs, 42 mariner_conn=mariner_conn) 43 44 except Exception as e: 45 mlog.error("error initializing client: %s", e, exc_info=e) 46 47 await aio.uncancellable(mariner_conn.async_close()) 48 return 49 50 try: 51 await client.wait_closing() 52 53 finally: 54 await aio.uncancellable(client.async_close()) 55 56 57async def _create_client(component_name, eventer_conf, client_confs, 58 mariner_conn): 59 init_req = await mariner_conn.receive() 60 if not isinstance(init_req, transport.InitReqMsg): 61 raise Exception('invalid init request') 62 63 send_queue = aio.Queue(1024) 64 65 async def on_status(eventer_client, status): 66 msg = transport.StatusMsg(status=status) 67 await send_queue.put(msg) 68 69 async def on_events(eventer_client, events): 70 msg = transport.EventsMsg(events=events) 71 await send_queue.put(msg) 72 73 client_conf = client_confs.get(init_req.client_name) 74 75 try: 76 if not client_conf: 77 raise Exception('invalid client name') 78 79 if client_conf['token'] != init_req.client_token: 80 raise Exception('invalid client token') 81 82 conf_subscription = hat.event.common.create_subscription( 83 client_conf['subscriptions']) 84 mariner_subscription = hat.event.common.create_subscription( 85 init_req.subscriptions) 86 87 subscription = conf_subscription.intersection(mariner_subscription) 88 89 except Exception as e: 90 init_res = transport.InitResMsg(success=False, 91 status=None, 92 error=str(e)) 93 await mariner_conn.send(init_res) 94 raise 95 96 try: 97 eventer_client = await hat.event.eventer.connect( 98 addr=tcp.Address(host=eventer_conf['host'], 99 port=eventer_conf['port']), 100 client_name=f'mariner/{component_name}/{init_req.client_name}', 101 client_token=eventer_conf['token'], 102 subscriptions=subscription.get_query_types(), 103 server_id=init_req.server_id, 104 persisted=init_req.persisted, 105 status_cb=on_status, 106 events_cb=on_events) 107 108 try: 109 eventer_client.async_group.spawn(aio.call_on_cancel, 110 mariner_conn.close) 111 mariner_conn.async_group.spawn(aio.call_on_cancel, 112 eventer_client.async_close) 113 114 except Exception: 115 await aio.uncancellable(eventer_client.async_close()) 116 raise 117 118 except Exception: 119 init_res = transport.InitResMsg( 120 success=False, 121 status=None, 122 error='error connecting to eventer server') 123 await mariner_conn.send(init_res) 124 raise 125 126 init_res = transport.InitResMsg(success=True, 127 status=eventer_client.status, 128 error=None) 129 await mariner_conn.send(init_res) 130 131 client = _Client() 132 client._conf = client_conf 133 client._send_queue = send_queue 134 client._query_queue = aio.Queue(1024) 135 client._mariner_conn = mariner_conn 136 client._eventer_client = eventer_client 137 138 client.async_group.spawn(client._receive_loop) 139 client.async_group.spawn(client._send_loop) 140 client.async_group.spawn(client._query_loop) 141 142 return client 143 144 145class _Client(aio.Resource): 146 147 @property 148 def async_group(self) -> aio.Group: 149 return self._mariner_conn.async_group 150 151 async def _receive_loop(self): 152 try: 153 while True: 154 req = await self._mariner_conn.receive() 155 156 if isinstance(req, transport.RegisterReqMsg): 157 res = transport.RegisterResMsg(register_id=req.register_id, 158 success=False, 159 events=None) 160 await self._send_queue.put(res) 161 162 elif isinstance(req, transport.QueryReqMsg): 163 await self._query_queue.put(req) 164 165 elif isinstance(req, transport.PingReqMsg): 166 res = transport.PingResMsg(ping_id=req.ping_id) 167 await self._send_queue.put(res) 168 169 else: 170 raise Exception('invalid message type') 171 172 except ConnectionError: 173 pass 174 175 except Exception as e: 176 mlog.error('receive loop error: %s', e, exc_info=e) 177 178 finally: 179 self.close() 180 181 async def _send_loop(self): 182 try: 183 while True: 184 msg = await self._send_queue.get() 185 186 await self._mariner_conn.send(msg) 187 188 except ConnectionError: 189 pass 190 191 except Exception as e: 192 mlog.error('send loop error: %s', e, exc_info=e) 193 194 finally: 195 self.close() 196 197 async def _query_loop(self): 198 try: 199 while True: 200 req = await self._query_queue.get() 201 202 result = await self._eventer_client.query(req.params) 203 204 res = transport.QueryResMsg(query_id=req.query_id, 205 result=result) 206 await self._send_queue.put(res) 207 208 except ConnectionError: 209 pass 210 211 except Exception as e: 212 mlog.error('query loop error: %s', e, exc_info=e) 213 214 finally: 215 self.close()
async def
create_server( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]) -> Server:
16async def create_server(conf: json.Data) -> 'Server': 17 srv = Server() 18 srv._name = conf['name'] 19 srv._eventer_conf = conf['eventer'] 20 srv._client_confs = {client_conf['name']: client_conf 21 for client_conf in conf['clients']} 22 23 srv._srv = await transport.listen( 24 connection_cb=srv._on_connection, 25 addr=tcp.Address(host=conf['mariner']['host'], 26 port=conf['mariner']['port']), 27 bind_connections=True) 28 29 return srv
class
Server(hat.aio.group.Resource):
32class Server(aio.Resource): 33 34 @property 35 def async_group(self) -> aio.Group: 36 return self._srv.async_group 37 38 async def _on_connection(self, mariner_conn): 39 try: 40 client = await _create_client(component_name=self._name, 41 eventer_conf=self._eventer_conf, 42 client_confs=self._client_confs, 43 mariner_conn=mariner_conn) 44 45 except Exception as e: 46 mlog.error("error initializing client: %s", e, exc_info=e) 47 48 await aio.uncancellable(mariner_conn.async_close()) 49 return 50 51 try: 52 await client.wait_closing() 53 54 finally: 55 await aio.uncancellable(client.async_close())
Resource with lifetime control based on Group
.