hat.mariner.server.server
1import logging 2 3from hat import aio 4from hat import json 5from hat.drivers import tcp 6import hat.event.common 7import hat.event.eventer 8 9from hat.mariner import transport 10 11 12mlog: logging.Logger = logging.getLogger(__name__) 13 14 15async def create_server(conf: json.Data) -> 'Server': 16 srv = Server() 17 srv._name = conf['name'] 18 srv._eventer_conf = conf['eventer'] 19 srv._client_confs = {client_conf['name']: client_conf 20 for client_conf in conf['clients']} 21 srv._validator = json.DefaultSchemaValidator( 22 json.create_schema_repository(*conf.get('schemas', []))) 23 24 srv._srv = await transport.listen( 25 connection_cb=srv._on_connection, 26 addr=tcp.Address(host=conf['mariner']['host'], 27 port=conf['mariner']['port']), 28 bind_connections=True) 29 30 return srv 31 32 33class Server(aio.Resource): 34 35 @property 36 def async_group(self) -> aio.Group: 37 return self._srv.async_group 38 39 async def _on_connection(self, mariner_conn): 40 try: 41 client = await _create_client(component_name=self._name, 42 eventer_conf=self._eventer_conf, 43 client_confs=self._client_confs, 44 validator=self._validator, 45 mariner_conn=mariner_conn) 46 47 except Exception as e: 48 mlog.error("error initializing client: %s", e, exc_info=e) 49 50 await aio.uncancellable(mariner_conn.async_close()) 51 return 52 53 try: 54 await client.wait_closing() 55 56 finally: 57 await aio.uncancellable(client.async_close()) 58 59 60async def _create_client(component_name, eventer_conf, client_confs, validator, 61 mariner_conn): 62 init_req = await mariner_conn.receive() 63 if not isinstance(init_req, transport.InitReqMsg): 64 raise Exception('invalid init request') 65 66 send_queue = aio.Queue(1024) 67 68 async def on_status(eventer_client, status): 69 msg = transport.StatusMsg(status=status) 70 await send_queue.put(msg) 71 72 async def on_events(eventer_client, events): 73 msg = transport.EventsMsg(events=events) 74 await send_queue.put(msg) 75 76 client_conf = client_confs.get(init_req.client_name) 77 78 try: 79 if not client_conf: 80 raise Exception('invalid client name') 81 82 if client_conf['token'] != init_req.client_token: 83 raise Exception('invalid client token') 84 85 conf_subscription = hat.event.common.create_subscription( 86 client_conf['subscriptions']) 87 mariner_subscription = hat.event.common.create_subscription( 88 init_req.subscriptions) 89 90 subscription = conf_subscription.intersection(mariner_subscription) 91 92 register_event_schema_ids = hat.event.common.create_event_type_collection( # NOQA 93 (hat.event.common.create_subscription(i['subscriptions']), 94 i.get('schema_id')) 95 for i in client_conf['register_events']) 96 97 except Exception as e: 98 init_res = transport.InitResMsg(success=False, 99 status=None, 100 error=str(e)) 101 await mariner_conn.send(init_res) 102 raise 103 104 try: 105 eventer_client = await hat.event.eventer.connect( 106 addr=tcp.Address(host=eventer_conf['host'], 107 port=eventer_conf['port']), 108 client_name=f'mariner/{component_name}/{init_req.client_name}', 109 client_token=eventer_conf['token'], 110 subscriptions=subscription.get_query_types(), 111 server_id=init_req.server_id, 112 persisted=init_req.persisted, 113 status_cb=on_status, 114 events_cb=on_events) 115 116 try: 117 eventer_client.async_group.spawn(aio.call_on_cancel, 118 mariner_conn.close) 119 mariner_conn.async_group.spawn(aio.call_on_cancel, 120 eventer_client.async_close) 121 122 except Exception: 123 await aio.uncancellable(eventer_client.async_close()) 124 raise 125 126 except Exception: 127 init_res = transport.InitResMsg( 128 success=False, 129 status=None, 130 error='error connecting to eventer server') 131 await mariner_conn.send(init_res) 132 raise 133 134 init_res = transport.InitResMsg(success=True, 135 status=eventer_client.status, 136 error=None) 137 await mariner_conn.send(init_res) 138 139 client = _Client() 140 client._conf = client_conf 141 client._validator = validator 142 client._send_queue = send_queue 143 client._register_event_schema_ids = register_event_schema_ids 144 client._query_queue = aio.Queue(1024) 145 client._register_queue = aio.Queue(1024) 146 client._mariner_conn = mariner_conn 147 client._eventer_client = eventer_client 148 149 client.async_group.spawn(client._receive_loop) 150 client.async_group.spawn(client._send_loop) 151 client.async_group.spawn(client._query_loop) 152 client.async_group.spawn(client._register_loop) 153 154 return client 155 156 157class _Client(aio.Resource): 158 159 @property 160 def async_group(self) -> aio.Group: 161 return self._mariner_conn.async_group 162 163 async def _receive_loop(self): 164 try: 165 while True: 166 req = await self._mariner_conn.receive() 167 168 if isinstance(req, transport.RegisterReqMsg): 169 await self._register_queue.put(req) 170 171 elif isinstance(req, transport.QueryReqMsg): 172 await self._query_queue.put(req) 173 174 elif isinstance(req, transport.PingReqMsg): 175 res = transport.PingResMsg(ping_id=req.ping_id) 176 await self._send_queue.put(res) 177 178 else: 179 raise Exception('invalid message type') 180 181 except ConnectionError: 182 pass 183 184 except Exception as e: 185 mlog.error('receive loop error: %s', e, exc_info=e) 186 187 finally: 188 self.close() 189 190 async def _send_loop(self): 191 try: 192 while True: 193 msg = await self._send_queue.get() 194 195 await self._mariner_conn.send(msg) 196 197 except ConnectionError: 198 pass 199 200 except Exception as e: 201 mlog.error('send loop error: %s', e, exc_info=e) 202 203 finally: 204 self.close() 205 206 async def _query_loop(self): 207 try: 208 while True: 209 req = await self._query_queue.get() 210 211 result = await self._eventer_client.query(req.params) 212 213 res = transport.QueryResMsg(query_id=req.query_id, 214 result=result) 215 await self._send_queue.put(res) 216 217 except ConnectionError: 218 pass 219 220 except Exception as e: 221 mlog.error('query loop error: %s', e, exc_info=e) 222 223 finally: 224 self.close() 225 226 async def _register_loop(self): 227 try: 228 while True: 229 req = await self._register_queue.get() 230 231 if all(self._is_register_event_valid(register_event) 232 for register_event in req.register_events): 233 events = await self._eventer_client.register( 234 req.register_events, True) 235 236 else: 237 events = None 238 239 res = transport.RegisterResMsg(register_id=req.register_id, 240 success=(events is not None), 241 events=events) 242 243 await self._send_queue.put(res) 244 245 except ConnectionError: 246 pass 247 248 except Exception as e: 249 mlog.error('register loop error: %s', e, exc_info=e) 250 251 finally: 252 self.close() 253 254 def _is_register_event_valid(self, register_event): 255 schema_ids = self._register_event_schema_ids.get(register_event.type) 256 if not isinstance(schema_ids, set): 257 schema_ids = set(schema_ids) 258 259 if not schema_ids: 260 return False 261 262 for schema_id in schema_ids: 263 if schema_id is None: 264 continue 265 266 if not isinstance(register_event.payload, 267 hat.event.common.EventPayloadJson): 268 return False 269 270 try: 271 self._validator.validate(schema_id, 272 register_event.payload.data) 273 274 except Exception: 275 return False 276 277 return True
async def
create_server( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]) -> Server:
16async def create_server(conf: json.Data) -> 'Server': 17 srv = Server() 18 srv._name = conf['name'] 19 srv._eventer_conf = conf['eventer'] 20 srv._client_confs = {client_conf['name']: client_conf 21 for client_conf in conf['clients']} 22 srv._validator = json.DefaultSchemaValidator( 23 json.create_schema_repository(*conf.get('schemas', []))) 24 25 srv._srv = await transport.listen( 26 connection_cb=srv._on_connection, 27 addr=tcp.Address(host=conf['mariner']['host'], 28 port=conf['mariner']['port']), 29 bind_connections=True) 30 31 return srv
class
Server(hat.aio.group.Resource):
34class Server(aio.Resource): 35 36 @property 37 def async_group(self) -> aio.Group: 38 return self._srv.async_group 39 40 async def _on_connection(self, mariner_conn): 41 try: 42 client = await _create_client(component_name=self._name, 43 eventer_conf=self._eventer_conf, 44 client_confs=self._client_confs, 45 validator=self._validator, 46 mariner_conn=mariner_conn) 47 48 except Exception as e: 49 mlog.error("error initializing client: %s", e, exc_info=e) 50 51 await aio.uncancellable(mariner_conn.async_close()) 52 return 53 54 try: 55 await client.wait_closing() 56 57 finally: 58 await aio.uncancellable(client.async_close())
Resource with lifetime control based on Group
.