hat.mariner.server.server

  1import logging
  2
  3from hat import aio
  4from hat import json
  5from hat.drivers import tcp
  6import hat.event.common
  7import hat.event.eventer
  8
  9from hat.mariner import transport
 10
 11
 12mlog: logging.Logger = logging.getLogger(__name__)
 13
 14
 15async def create_server(conf: json.Data) -> 'Server':
 16    srv = Server()
 17    srv._name = conf['name']
 18    srv._eventer_conf = conf['eventer']
 19    srv._client_confs = {client_conf['name']: client_conf
 20                         for client_conf in conf['clients']}
 21
 22    srv._srv = await transport.listen(
 23        connection_cb=srv._on_connection,
 24        addr=tcp.Address(host=conf['mariner']['host'],
 25                         port=conf['mariner']['port']),
 26        bind_connections=True)
 27
 28    return srv
 29
 30
 31class Server(aio.Resource):
 32
 33    @property
 34    def async_group(self) -> aio.Group:
 35        return self._srv.async_group
 36
 37    async def _on_connection(self, mariner_conn):
 38        try:
 39            client = await _create_client(component_name=self._name,
 40                                          eventer_conf=self._eventer_conf,
 41                                          client_confs=self._client_confs,
 42                                          mariner_conn=mariner_conn)
 43
 44        except Exception as e:
 45            mlog.error("error initializing client: %s", e, exc_info=e)
 46
 47            await aio.uncancellable(mariner_conn.async_close())
 48            return
 49
 50        try:
 51            await client.wait_closing()
 52
 53        finally:
 54            await aio.uncancellable(client.async_close())
 55
 56
 57async def _create_client(component_name, eventer_conf, client_confs,
 58                         mariner_conn):
 59    init_req = await mariner_conn.receive()
 60    if not isinstance(init_req, transport.InitReqMsg):
 61        raise Exception('invalid init request')
 62
 63    send_queue = aio.Queue(1024)
 64
 65    async def on_status(eventer_client, status):
 66        msg = transport.StatusMsg(status=status)
 67        await send_queue.put(msg)
 68
 69    async def on_events(eventer_client, events):
 70        msg = transport.EventsMsg(events=events)
 71        await send_queue.put(msg)
 72
 73    client_conf = client_confs.get(init_req.client_name)
 74
 75    try:
 76        if not client_conf:
 77            raise Exception('invalid client name')
 78
 79        if client_conf['token'] != init_req.client_token:
 80            raise Exception('invalid client token')
 81
 82        conf_subscription = hat.event.common.create_subscription(
 83            client_conf['subscriptions'])
 84        mariner_subscription = hat.event.common.create_subscription(
 85            init_req.subscriptions)
 86
 87        subscription = conf_subscription.intersection(mariner_subscription)
 88
 89    except Exception as e:
 90        init_res = transport.InitResMsg(success=False,
 91                                        status=None,
 92                                        error=str(e))
 93        await mariner_conn.send(init_res)
 94        raise
 95
 96    try:
 97        eventer_client = await hat.event.eventer.connect(
 98            addr=tcp.Address(host=eventer_conf['host'],
 99                             port=eventer_conf['port']),
100            client_name=f'mariner/{component_name}/{init_req.client_name}',
101            client_token=eventer_conf['token'],
102            subscriptions=subscription.get_query_types(),
103            server_id=init_req.server_id,
104            persisted=init_req.persisted,
105            status_cb=on_status,
106            events_cb=on_events)
107
108        try:
109            eventer_client.async_group.spawn(aio.call_on_cancel,
110                                             mariner_conn.close)
111            mariner_conn.async_group.spawn(aio.call_on_cancel,
112                                           eventer_client.async_close)
113
114        except Exception:
115            await aio.uncancellable(eventer_client.async_close())
116            raise
117
118    except Exception:
119        init_res = transport.InitResMsg(
120            success=False,
121            status=None,
122            error='error connecting to eventer server')
123        await mariner_conn.send(init_res)
124        raise
125
126    init_res = transport.InitResMsg(success=True,
127                                    status=eventer_client.status,
128                                    error=None)
129    await mariner_conn.send(init_res)
130
131    client = _Client()
132    client._conf = client_conf
133    client._send_queue = send_queue
134    client._query_queue = aio.Queue(1024)
135    client._mariner_conn = mariner_conn
136    client._eventer_client = eventer_client
137
138    client.async_group.spawn(client._receive_loop)
139    client.async_group.spawn(client._send_loop)
140    client.async_group.spawn(client._query_loop)
141
142    return client
143
144
145class _Client(aio.Resource):
146
147    @property
148    def async_group(self) -> aio.Group:
149        return self._mariner_conn.async_group
150
151    async def _receive_loop(self):
152        try:
153            while True:
154                req = await self._mariner_conn.receive()
155
156                if isinstance(req, transport.RegisterReqMsg):
157                    res = transport.RegisterResMsg(register_id=req.register_id,
158                                                   success=False,
159                                                   events=None)
160                    await self._send_queue.put(res)
161
162                elif isinstance(req, transport.QueryReqMsg):
163                    await self._query_queue.put(req)
164
165                elif isinstance(req, transport.PingReqMsg):
166                    res = transport.PingResMsg(ping_id=req.ping_id)
167                    await self._send_queue.put(res)
168
169                else:
170                    raise Exception('invalid message type')
171
172        except ConnectionError:
173            pass
174
175        except Exception as e:
176            mlog.error('receive loop error: %s', e, exc_info=e)
177
178        finally:
179            self.close()
180
181    async def _send_loop(self):
182        try:
183            while True:
184                msg = await self._send_queue.get()
185
186                await self._mariner_conn.send(msg)
187
188        except ConnectionError:
189            pass
190
191        except Exception as e:
192            mlog.error('send loop error: %s', e, exc_info=e)
193
194        finally:
195            self.close()
196
197    async def _query_loop(self):
198        try:
199            while True:
200                req = await self._query_queue.get()
201
202                result = await self._eventer_client.query(req.params)
203
204                res = transport.QueryResMsg(query_id=req.query_id,
205                                            result=result)
206                await self._send_queue.put(res)
207
208        except ConnectionError:
209            pass
210
211        except Exception as e:
212            mlog.error('query loop error: %s', e, exc_info=e)
213
214        finally:
215            self.close()
mlog: logging.Logger = <Logger hat.mariner.server.server (WARNING)>
async def create_server( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]) -> Server:
16async def create_server(conf: json.Data) -> 'Server':
17    srv = Server()
18    srv._name = conf['name']
19    srv._eventer_conf = conf['eventer']
20    srv._client_confs = {client_conf['name']: client_conf
21                         for client_conf in conf['clients']}
22
23    srv._srv = await transport.listen(
24        connection_cb=srv._on_connection,
25        addr=tcp.Address(host=conf['mariner']['host'],
26                         port=conf['mariner']['port']),
27        bind_connections=True)
28
29    return srv
class Server(hat.aio.group.Resource):
32class Server(aio.Resource):
33
34    @property
35    def async_group(self) -> aio.Group:
36        return self._srv.async_group
37
38    async def _on_connection(self, mariner_conn):
39        try:
40            client = await _create_client(component_name=self._name,
41                                          eventer_conf=self._eventer_conf,
42                                          client_confs=self._client_confs,
43                                          mariner_conn=mariner_conn)
44
45        except Exception as e:
46            mlog.error("error initializing client: %s", e, exc_info=e)
47
48            await aio.uncancellable(mariner_conn.async_close())
49            return
50
51        try:
52            await client.wait_closing()
53
54        finally:
55            await aio.uncancellable(client.async_close())

Resource with lifetime control based on Group.

async_group: hat.aio.group.Group
34    @property
35    def async_group(self) -> aio.Group:
36        return self._srv.async_group

Group controlling resource's lifetime.