hat.mariner.server.main

Event server main

 1"""Event server main"""
 2
 3from pathlib import Path
 4import argparse
 5import asyncio
 6import contextlib
 7import logging.config
 8import sys
 9
10import appdirs
11
12from hat import aio
13from hat import json
14
15from hat.mariner import common
16from hat.mariner.server.server import create_server
17
18
19mlog: logging.Logger = logging.getLogger('hat.mariner.server.main')
20"""Module logger"""
21
22user_conf_dir: Path = Path(appdirs.user_config_dir('hat'))
23"""User configuration directory path"""
24
25
26def create_argument_parser() -> argparse.ArgumentParser:
27    """Create argument parser"""
28    parser = argparse.ArgumentParser()
29    parser.add_argument(
30        '--conf', metavar='PATH', type=Path, default=None,
31        help="configuration defined by hat-mariner://server.yaml "
32             "(default $XDG_CONFIG_HOME/hat/mariner.{yaml|yml|toml|json})")
33    return parser
34
35
36def main():
37    """Event Server"""
38    parser = create_argument_parser()
39    args = parser.parse_args()
40    conf = json.read_conf(args.conf, user_conf_dir / 'mariner')
41    sync_main(conf)
42
43
44def sync_main(conf: json.Data):
45    """Sync main entry point"""
46    aio.init_asyncio()
47
48    validator = json.DefaultSchemaValidator(common.json_schema_repo)
49    validator.validate('hat-mariner://server.yaml', conf)
50
51    log_conf = conf.get('log')
52    if log_conf:
53        logging.config.dictConfig(log_conf)
54
55    with contextlib.suppress(asyncio.CancelledError):
56        aio.run_asyncio(async_main(conf))
57
58
59async def async_main(conf: json.Data):
60    """Async main entry point"""
61    srv = await create_server(conf)
62
63    try:
64        await srv.wait_closing()
65
66    finally:
67        await aio.uncancellable(srv.async_close())
68
69
70if __name__ == '__main__':
71    sys.argv[0] = 'hat-mariner-server'
72    sys.exit(main())
mlog: logging.Logger = <Logger hat.mariner.server.main (WARNING)>

Module logger

user_conf_dir: pathlib._local.Path = PosixPath('/home/runner/.config/hat')

User configuration directory path

def create_argument_parser() -> argparse.ArgumentParser:
27def create_argument_parser() -> argparse.ArgumentParser:
28    """Create argument parser"""
29    parser = argparse.ArgumentParser()
30    parser.add_argument(
31        '--conf', metavar='PATH', type=Path, default=None,
32        help="configuration defined by hat-mariner://server.yaml "
33             "(default $XDG_CONFIG_HOME/hat/mariner.{yaml|yml|toml|json})")
34    return parser

Create argument parser

def main():
37def main():
38    """Event Server"""
39    parser = create_argument_parser()
40    args = parser.parse_args()
41    conf = json.read_conf(args.conf, user_conf_dir / 'mariner')
42    sync_main(conf)

Event Server

def sync_main( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]):
45def sync_main(conf: json.Data):
46    """Sync main entry point"""
47    aio.init_asyncio()
48
49    validator = json.DefaultSchemaValidator(common.json_schema_repo)
50    validator.validate('hat-mariner://server.yaml', conf)
51
52    log_conf = conf.get('log')
53    if log_conf:
54        logging.config.dictConfig(log_conf)
55
56    with contextlib.suppress(asyncio.CancelledError):
57        aio.run_asyncio(async_main(conf))

Sync main entry point

async def async_main( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]):
60async def async_main(conf: json.Data):
61    """Async main entry point"""
62    srv = await create_server(conf)
63
64    try:
65        await srv.wait_closing()
66
67    finally:
68        await aio.uncancellable(srv.async_close())

Async main entry point