hat.mariner.server.server

  1import logging
  2
  3from hat import aio
  4from hat import json
  5from hat.drivers import tcp
  6import hat.event.common
  7import hat.event.eventer
  8
  9from hat.mariner import transport
 10
 11
 12mlog: logging.Logger = logging.getLogger(__name__)
 13
 14
 15async def create_server(conf: json.Data) -> 'Server':
 16    srv = Server()
 17    srv._name = conf['name']
 18    srv._eventer_conf = conf['eventer']
 19    srv._client_confs = {client_conf['name']: client_conf
 20                         for client_conf in conf['clients']}
 21    srv._validator = json.DefaultSchemaValidator(
 22        json.create_schema_repository(*conf.get('schemas', [])))
 23
 24    srv._srv = await transport.listen(
 25        connection_cb=srv._on_connection,
 26        addr=tcp.Address(host=conf['mariner']['host'],
 27                         port=conf['mariner']['port']),
 28        bind_connections=True)
 29
 30    return srv
 31
 32
 33class Server(aio.Resource):
 34
 35    @property
 36    def async_group(self) -> aio.Group:
 37        return self._srv.async_group
 38
 39    async def _on_connection(self, mariner_conn):
 40        try:
 41            client = await _create_client(component_name=self._name,
 42                                          eventer_conf=self._eventer_conf,
 43                                          client_confs=self._client_confs,
 44                                          validator=self._validator,
 45                                          mariner_conn=mariner_conn)
 46
 47        except Exception as e:
 48            mlog.error("error initializing client: %s", e, exc_info=e)
 49
 50            await aio.uncancellable(mariner_conn.async_close())
 51            return
 52
 53        try:
 54            await client.wait_closing()
 55
 56        finally:
 57            await aio.uncancellable(client.async_close())
 58
 59
 60async def _create_client(component_name, eventer_conf, client_confs, validator,
 61                         mariner_conn):
 62    init_req = await mariner_conn.receive()
 63    if not isinstance(init_req, transport.InitReqMsg):
 64        raise Exception('invalid init request')
 65
 66    send_queue = aio.Queue(1024)
 67
 68    async def on_status(eventer_client, status):
 69        msg = transport.StatusMsg(status=status)
 70        await send_queue.put(msg)
 71
 72    async def on_events(eventer_client, events):
 73        msg = transport.EventsMsg(events=events)
 74        await send_queue.put(msg)
 75
 76    client_conf = client_confs.get(init_req.client_name)
 77
 78    try:
 79        if not client_conf:
 80            raise Exception('invalid client name')
 81
 82        if client_conf['token'] != init_req.client_token:
 83            raise Exception('invalid client token')
 84
 85        conf_subscription = hat.event.common.create_subscription(
 86            client_conf['subscriptions'])
 87        mariner_subscription = hat.event.common.create_subscription(
 88            init_req.subscriptions)
 89
 90        subscription = conf_subscription.intersection(mariner_subscription)
 91
 92        register_event_schema_ids = hat.event.common.create_event_type_collection(  # NOQA
 93            (hat.event.common.create_subscription(i['subscriptions']),
 94             i.get('schema_id'))
 95            for i in client_conf['register_events'])
 96
 97    except Exception as e:
 98        init_res = transport.InitResMsg(success=False,
 99                                        status=None,
100                                        error=str(e))
101        await mariner_conn.send(init_res)
102        raise
103
104    try:
105        eventer_client = await hat.event.eventer.connect(
106            addr=tcp.Address(host=eventer_conf['host'],
107                             port=eventer_conf['port']),
108            client_name=f'mariner/{component_name}/{init_req.client_name}',
109            client_token=eventer_conf['token'],
110            subscriptions=subscription.get_query_types(),
111            server_id=init_req.server_id,
112            persisted=init_req.persisted,
113            status_cb=on_status,
114            events_cb=on_events)
115
116        try:
117            eventer_client.async_group.spawn(aio.call_on_cancel,
118                                             mariner_conn.close)
119            mariner_conn.async_group.spawn(aio.call_on_cancel,
120                                           eventer_client.async_close)
121
122        except Exception:
123            await aio.uncancellable(eventer_client.async_close())
124            raise
125
126    except Exception:
127        init_res = transport.InitResMsg(
128            success=False,
129            status=None,
130            error='error connecting to eventer server')
131        await mariner_conn.send(init_res)
132        raise
133
134    init_res = transport.InitResMsg(success=True,
135                                    status=eventer_client.status,
136                                    error=None)
137    await mariner_conn.send(init_res)
138
139    client = _Client()
140    client._conf = client_conf
141    client._validator = validator
142    client._send_queue = send_queue
143    client._register_event_schema_ids = register_event_schema_ids
144    client._query_queue = aio.Queue(1024)
145    client._register_queue = aio.Queue(1024)
146    client._mariner_conn = mariner_conn
147    client._eventer_client = eventer_client
148
149    client.async_group.spawn(client._receive_loop)
150    client.async_group.spawn(client._send_loop)
151    client.async_group.spawn(client._query_loop)
152    client.async_group.spawn(client._register_loop)
153
154    return client
155
156
157class _Client(aio.Resource):
158
159    @property
160    def async_group(self) -> aio.Group:
161        return self._mariner_conn.async_group
162
163    async def _receive_loop(self):
164        try:
165            while True:
166                req = await self._mariner_conn.receive()
167
168                if isinstance(req, transport.RegisterReqMsg):
169                    await self._register_queue.put(req)
170
171                elif isinstance(req, transport.QueryReqMsg):
172                    await self._query_queue.put(req)
173
174                elif isinstance(req, transport.PingReqMsg):
175                    res = transport.PingResMsg(ping_id=req.ping_id)
176                    await self._send_queue.put(res)
177
178                else:
179                    raise Exception('invalid message type')
180
181        except ConnectionError:
182            pass
183
184        except Exception as e:
185            mlog.error('receive loop error: %s', e, exc_info=e)
186
187        finally:
188            self.close()
189
190    async def _send_loop(self):
191        try:
192            while True:
193                msg = await self._send_queue.get()
194
195                await self._mariner_conn.send(msg)
196
197        except ConnectionError:
198            pass
199
200        except Exception as e:
201            mlog.error('send loop error: %s', e, exc_info=e)
202
203        finally:
204            self.close()
205
206    async def _query_loop(self):
207        try:
208            while True:
209                req = await self._query_queue.get()
210
211                result = await self._eventer_client.query(req.params)
212
213                res = transport.QueryResMsg(query_id=req.query_id,
214                                            result=result)
215                await self._send_queue.put(res)
216
217        except ConnectionError:
218            pass
219
220        except Exception as e:
221            mlog.error('query loop error: %s', e, exc_info=e)
222
223        finally:
224            self.close()
225
226    async def _register_loop(self):
227        try:
228            while True:
229                req = await self._register_queue.get()
230
231                if all(self._is_register_event_valid(register_event)
232                       for register_event in req.register_events):
233                    events = await self._eventer_client.register(
234                        req.register_events, True)
235
236                else:
237                    events = None
238
239                res = transport.RegisterResMsg(register_id=req.register_id,
240                                               success=(events is not None),
241                                               events=events)
242
243                await self._send_queue.put(res)
244
245        except ConnectionError:
246            pass
247
248        except Exception as e:
249            mlog.error('register loop error: %s', e, exc_info=e)
250
251        finally:
252            self.close()
253
254    def _is_register_event_valid(self, register_event):
255        schema_ids = self._register_event_schema_ids.get(register_event.type)
256        if not isinstance(schema_ids, set):
257            schema_ids = set(schema_ids)
258
259        if not schema_ids:
260            return False
261
262        for schema_id in schema_ids:
263            if schema_id is None:
264                continue
265
266            if not isinstance(register_event.payload,
267                              hat.event.common.EventPayloadJson):
268                return False
269
270            try:
271                self._validator.validate(schema_id,
272                                         register_event.payload.data)
273
274            except Exception:
275                return False
276
277        return True
mlog: logging.Logger = <Logger hat.mariner.server.server (WARNING)>
async def create_server( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]) -> Server:
16async def create_server(conf: json.Data) -> 'Server':
17    srv = Server()
18    srv._name = conf['name']
19    srv._eventer_conf = conf['eventer']
20    srv._client_confs = {client_conf['name']: client_conf
21                         for client_conf in conf['clients']}
22    srv._validator = json.DefaultSchemaValidator(
23        json.create_schema_repository(*conf.get('schemas', [])))
24
25    srv._srv = await transport.listen(
26        connection_cb=srv._on_connection,
27        addr=tcp.Address(host=conf['mariner']['host'],
28                         port=conf['mariner']['port']),
29        bind_connections=True)
30
31    return srv
class Server(hat.aio.group.Resource):
34class Server(aio.Resource):
35
36    @property
37    def async_group(self) -> aio.Group:
38        return self._srv.async_group
39
40    async def _on_connection(self, mariner_conn):
41        try:
42            client = await _create_client(component_name=self._name,
43                                          eventer_conf=self._eventer_conf,
44                                          client_confs=self._client_confs,
45                                          validator=self._validator,
46                                          mariner_conn=mariner_conn)
47
48        except Exception as e:
49            mlog.error("error initializing client: %s", e, exc_info=e)
50
51            await aio.uncancellable(mariner_conn.async_close())
52            return
53
54        try:
55            await client.wait_closing()
56
57        finally:
58            await aio.uncancellable(client.async_close())

Resource with lifetime control based on Group.

async_group: hat.aio.group.Group
36    @property
37    def async_group(self) -> aio.Group:
38        return self._srv.async_group

Group controlling resource's lifetime.