hopeit.server package

hopeit engine server modules

Modules Overview:

  • api: provides openapi support for endpoints defined in web module.
  • web: provides /api endpoints creation for GET and POST events, and provides /mgmt endpoints for STREAM and SERVICE events
  • engine: provides App initialization and containers
  • events: provides handlers for execution of App Events
  • streams: provides read-write support for Redis Streams
  • steps: helpers to execute event steps
  • config: helpers to load configuration files
  • imports: helpers to import App Events modules
  • logger: provides base logging for server
  • metrics: provides metrics calculation for events
  • names: helpers for events and route naming conventions
  • errors: helpers for error messages

Submodules

Open API spec creation and server helpers

hopeit.server.api.init_empty_spec(api_version: str, title: str, description: str)

Initializes internal spec and static_spec dictionaries with minimal Open API requirements: openapi, info sections and empty paths. This method can be used to create new API specs. :param api_version: info.version :param title: info.title :param description: info.description

hopeit.server.api.load_api_file(path: Union[str, pathlib.Path])

Loads OpenAPI spec from a json file. Spec is loaded into the module. @param path: path to json file

hopeit.server.api.save_api_file(path: Union[str, pathlib.Path], api_version: str)

Saves module Open API spec to json file. :param path: path to json file :param api_version: new api_version, in case changes between previously loaded api file and api calculated at

runtime, a new api_version needs to be specified to allow saving the file.
hopeit.server.api.setup(**kwargs)

Setup additional options for api module. Supported options are:

Parameters:generate_mode – bool, default False: creates empty path placholders for modules not defining __api__ specification
hopeit.server.api.clear()

Clears api configuration stored in memory. This disables api module.

hopeit.server.api.app_route_name(app: hopeit.app.config.AppDescriptor, *, event_name: str, plugin: Optional[hopeit.app.config.AppDescriptor] = None, prefix: str = 'api', override_route_name: Optional[str] = None) → str

Returns the full route name for a given app event

Parameters:
  • app – AppDescriptor, as defined in AppConfig
  • event_name – event name as defined in AppConfig
  • plugin – optional plugin if the event comes from a plugin and EventPlugMode==’OnApp’
  • prefix – route prefix, defaults to ‘api’
  • override_route_name – Optional[str], provided route to be used instead app and event name, if starts with ‘/’, prefix will be ignored, otherwised appended to prefix
Returns:

str, full route name. i.e.: /api/app-name/1x0/event-name or /api/app-name/1x0/plugin-name/1x0/event-name

hopeit.server.api.register_server_config(server_config: hopeit.server.config.ServerConfig)

Register API definitions from server configuration. This consists of allowed and default authentication methods.

hopeit.server.api.register_apps(apps_config: List[hopeit.app.config.AppConfig])

Register api definition for a list of apps that conform to a single API specification.

@param apps_config: list of AppConfig objects to be introspected

hopeit.server.api.enable_swagger(server_config: hopeit.server.config.ServerConfig, app: aiohttp.web_app.Application)
Enables Open API (a.k.a Swagger) on this server. This consists of:
  • All endpoints within API specification are to be handled by a Open API handler that will validate requests
  • If specified in server_config.api_docs_path, API docs site will be available at the given route.
    i.e. http://server-address:8020/api/docs
Parameters:
  • server_config – server configuration
  • app – aiohttp web Application to host routes and docs
hopeit.server.api.diff_specs() → bool

Detects differences between loaded API specification and spec calculated from server and apps.

Returns:True if differences are found, False if loaded spec matches runtime.

AsyncCollector (aliased as Collector) implementation to be used as a way to concurrently execute steps in an event

Use hopeit.app.events collector_step(…) constructor to define steps implementing AsyncCollector

hopeit.server.collector.Collector

alias of hopeit.server.collector.AsyncCollector

class hopeit.server.collector.AsyncCollector

Bases: hopeit.server.collector.AbstractCollector

Allows to define a list of steps (functions) to be executed concurrently.

Example:

AsyncCollector.input(payload).steps(('step1', step1func, 'step2', step2func').run()

Gathers and run concurrently step1func and step2func and put their return values in the collector under the specified key (‘step1’ or ‘step2’) If a function needs the result from another step it can be retrieved with `await collector[‘step1’]. To access the payload a function can do `payload = await collector[‘payload’] This way you can specify a number of functions that can run asynchronously with no particular order (i.e. when querying multiple databases or making requests in parallel to external services), and then use the results or combine them in a different step.

CAUTION: AsyncCollector makes not guarantees whether your code will block indefinitely (i,e. if you do await collector[‘step1’] from step2func but step1func does await collector[‘step2’]. This will block your application, so the only way the engine will prevent this to lock your server is using timeouts. In case of a dead-lock, event will fail to process and concurrent functions will be canceled when reaching a timeout, but no checking is done on whether a deadlock is happening. Please check the sequence your code is accessing/awaiting results from the collector to avoid cycles.

input(payload: Any)
run(context: hopeit.app.context.EventContext)
steps(*funcs)

Provides generic compress and decompress methods for payloads

hopeit.server.compression.compress(data: bytes, compression: hopeit.app.config.Compression) → bytes
hopeit.server.compression.decompress(data: bytes, compression: hopeit.app.config.Compression) → bytes

Server configuration utility

class hopeit.server.config.StreamsConfig(stream_manager: str = 'hopeit.streams.NoStreamManager', connection_str: str = '<<NoStreamManager>>', delay_auto_start_seconds: int = 3)

Bases: dataclasses_jsonschema.JsonSchemaMixin, hopeit.server.config.StreamsConfig, object

Field connection_str:
 str, url to connect to streams server: i.e. redis://localhost:6379
connection_str = '<<NoStreamManager>>'
delay_auto_start_seconds = 3
event_id() → str
event_ts() → Optional[datetime.datetime]
stream_manager = 'hopeit.streams.NoStreamManager'
class hopeit.server.config.LoggingConfig(log_level: str = 'INFO', log_path: str = 'logs/')

Bases: dataclasses_jsonschema.JsonSchemaMixin, hopeit.server.config.LoggingConfig, object

event_id() → str
event_ts() → Optional[datetime.datetime]
log_level = 'INFO'
log_path = 'logs/'
class hopeit.server.config.AuthType

Bases: str, enum.Enum

Supported Authorization/Authentication types

BASIC = 'Basic'
BEARER = 'Bearer'
REFRESH = 'Refresh'
UNSECURED = 'Unsecured'
class hopeit.server.config.AuthConfig(secrets_location: str, auth_passphrase: str, enabled: bool = True, create_keys: bool = False, domain: Optional[str] = None, encryption_algorithm: str = 'RS256', default_auth_methods: List[hopeit.server.config.AuthType] = <factory>)

Bases: dataclasses_jsonschema.JsonSchemaMixin, hopeit.server.config.AuthConfig, object

Server configuration to handle authorization tokens

create_keys = False
domain = None
enabled = True
encryption_algorithm = 'RS256'
event_id() → str
event_ts() → Optional[datetime.datetime]
static no_auth()
class hopeit.server.config.ServerConfig(streams: hopeit.server.config.StreamsConfig = <factory>, logging: hopeit.server.config.LoggingConfig = <factory>, auth: hopeit.server.config.AuthConfig = <factory>, api: hopeit.server.config.APIConfig = <factory>, engine_version: str = '0.6.0')

Bases: dataclasses_jsonschema.JsonSchemaMixin, hopeit.server.config.ServerConfig, object

Server configuration

engine_version = '0.6.0'
event_id() → str
event_ts() → Optional[datetime.datetime]
hopeit.server.config.parse_server_config_json(config_json: str) → hopeit.server.config.ServerConfig

Parses configuration json file contents into EngineConfig data structure. Before conversion, parameters enclosed with { } are replaced by its respective values (@see _replace_args)

hopeit.server.config.replace_env_vars(config_json: str) → str

Replaces env variables matching form ${VAR_NAME} with its string value :param config_json: input configuratio json as string :return: str, with replaced values :raise: AssertionError if variables matching ${VAR_NAME} form are not replaced

hopeit.server.config.replace_config_args(*, parsed_config: ConfigType, config_classes: tuple, auto_prefix: str = '')

Replaces {…} enclosed expression in string values inside parsed_config. {…} expressions are paths to objects in same parsed_config object, expressed using dot notation. Also special name {auto} is replaced by the object path. parsed_config is modified inline and no value is returned. Replacement is done twice so values that refers to other generated values are replaced.

Example: {app.name} will be replaced by the contents of parsed_config.app.name

{auto} will be replaced by the path of keys to the current object built using dot notation and prefixed by app.name.`app.version`

Example:

AppConfig(
    app=App(name="myapp", version="1x0"),
    events={"event1": EventDescriptor(
        steps={"step1": StepDescriptor(notify="{auto}" ... )}
    )}
)

{auto} will be replaced by myapp.1x0.event1.step1

Engine module: handle apps load, setup and serving

class hopeit.server.engine.AppEngine(*, app_config: hopeit.app.config.AppConfig, plugins: List[hopeit.app.config.AppConfig], streams_enabled: bool = True)

Bases: object

Engine that handles a Hopeit Application

execute(*, context: hopeit.app.context.EventContext, query_args: Optional[dict], payload: Union[str, int, float, bool, dict, set, list, DataObject, None]) → Union[str, int, float, bool, dict, set, list, DataObject, None]

Executes a configured Event of type GET or POST using received payload as input, considering configured timeout.

Parameters:
  • context – EventContext, info about app, event and tracking
  • query_args – dict, containing query arguments to be passed to every step of event
  • payload – EventPayload, payload to send to event handler
Returns:

EventPayload, result from executing the event

Raise:

TimeoutException in case configured timeout is exceeded before getting the result

postprocess(*, context: hopeit.app.context.EventContext, payload: Union[str, int, float, bool, dict, set, list, DataObject, None], response: hopeit.app.context.PostprocessHook) → Union[str, int, float, bool, dict, set, list, DataObject, None]
preprocess(*, context: hopeit.app.context.EventContext, query_args: Optional[Dict[str, Any]], payload: Union[str, int, float, bool, dict, set, list, DataObject, None], request: hopeit.app.context.PreprocessHook) → Union[str, int, float, bool, dict, set, list, DataObject, None]
read_stream(*, event_name: str, test_mode: bool = False) → Union[str, int, float, bool, dict, set, list, DataObject, Exception, None]

Listens to a stream specified by event of type STREAM, and executes the event handler for each received event in the stream.

When invoked, stream will be read continuously consuming events that are arriving, processing it according to configured steps. To interrupt listening for events, call stop_event(event_name)

Parameters:
  • event_name – str, an event name contained in app_config
  • test_mode – bool, set to True to immediately stop and return results for testing
Returns:

last result or exception, only intended to be used in test_mode

service_loop(*, event_name: str, test_mode: bool = False) → Union[str, int, float, bool, dict, set, list, DataObject, Exception, None]

Service loop, executes __service__ handler in event and execute event steps for each yielded payload.

Parameters:
  • event_name – str, an event name contained in app_config
  • test_mode – bool, set to True to immediately stop and return results for testing
Returns:

last result or exception, only intended to be used in test_mode

start()

Starts handlers, services and pools for this application

stop()

Stops and clean handlers

stop_event(event_name: str)

Sets running state to stopped for a continuous-running event. This acts as signling for stop for STREAM events.

Parameters:event_name – name of the event to signal stop
class hopeit.server.engine.Server

Bases: object

Server engine. Call start() to create an instance of the engine Then start individual apps by calling start_app(…) End apps and stop engine with stop()

app_engine(*, app_key: str) → hopeit.server.engine.AppEngine
start(*, config: hopeit.server.config.ServerConfig)

Starts a engine instance

start_app(app_config: hopeit.app.config.AppConfig)

Starts and register a Hopeit App into this engine instance

Parameters:app_config – AppConfog, app configuration as specified in config module
stop()

Stops every active app in the engine instance.

Server error handling convenience module

class hopeit.server.errors.ErrorInfo(msg: str, tb: List[str])

Bases: dataclasses_jsonschema.JsonSchemaMixin, hopeit.server.errors.ErrorInfo, object

Error information to be returned in failed responses

event_id() → str
event_ts() → Optional[datetime.datetime]
static from_exception(e: BaseException)
hopeit.server.errors.format_exc(e: Exception) → List[str]
hopeit.server.errors.json_exc(e: Exception) → str

Events base classes and low level handlers to execute events specified by apps

class hopeit.server.events.EventHandler(*, app_config: hopeit.app.config.AppConfig, plugins: List[hopeit.app.config.AppConfig], effective_events: Dict[str, hopeit.app.config.EventDescriptor])

Bases: object

Handles execution of Hopeit App events

handle_async_event(*, context: hopeit.app.context.EventContext, query_args: Optional[Dict[str, Any]], payload: Union[str, int, float, bool, dict, set, list, DataObject, None]) → AsyncGenerator[Union[str, int, float, bool, dict, set, list, DataObject, None], None]

Handles execution of engine defined event. Executes event handler code deployed with app.

Execution goes as following:
  • EventDescriptor from AppConfig is used
  • an object from a class with CamelCase name same as event name is instantiated
  • find the next step to execute that accepts input with payload type
  • method with same name as step is invoked in instantiated object
  • if a step specifies write_stream, and event is not None, payload is published to a stream
  • repeats previous 3 steps executing next step that accepts current payload type
Parameters:
  • context – EventContext
  • query_args – arguments from a query context in the form of a dictionary
  • payload – EventPayload, to be sent to event implementor
load_modules(effective_events: Dict[str, hopeit.app.config.EventDescriptor])
postprocess(*, context: hopeit.app.context.EventContext, payload: Union[str, int, float, bool, dict, set, list, DataObject, None], response: hopeit.app.context.PostprocessHook) → Union[str, int, float, bool, dict, set, list, DataObject, None]

Invokes postprocess method in event if defined in event configuration, allowing events to append headers, cookies and status to a response

preprocess(*, context: hopeit.app.context.EventContext, query_args: Optional[Dict[str, Any]], payload: Union[str, int, float, bool, dict, set, list, DataObject, None], request: hopeit.app.context.PreprocessHook) → Union[str, int, float, bool, dict, set, list, DataObject, None]

Invokes __preprocess__ method in event if defined in event, allowing events to process elements from requests.

Utilities to import apps modules and datatypes at runtime / server start

hopeit.server.imports.find_event_handler(*, app_config: hopeit.app.config.AppConfig, event_name: str) → module

Returns the initialized module implementing the event business logic.

Server/Engine logging module

hopeit.server.logger.extra_values(required_fields: Iterable[str], *, prefix='extra.', **kwargs) → Dict[str, str]
hopeit.server.logger.combined(*args) → Dict[str, str]
hopeit.server.logger.setup_app_logger(module, *, app_config: hopeit.app.config.AppConfig, name: str, event_info: hopeit.app.config.EventDescriptor)

Returns wrapper over python logging Logger for a given app and name. Logger name is made combining {app.name} {app.version} {name} {host} {pid} Standard fields to be logged are `%(asctime)s | %(levelname)s | %(name)s | %(message)s | ` Specific apps can require for extra fields per event, configurabe in EventDescriptor

hopeit.server.logger.engine_logger() → hopeit.server.logger.EngineLoggerWrapper

Returns logger wrapper for engine modules Allows to reference logger as a module variable.

Use at module level in events implementation:

from hopeit.logger import engine_logger()

logger = engine_logger()
hopeit.server.logger.extra_logger()
hopeit.server.logger.format_extra_values(values: Dict[str, Any], prefix: str = '') → str
class hopeit.server.logger.EngineLoggerWrapper

Bases: object

Wrapper around standard python Logger to be used in engine modules. Provides additional functionallity to log extra info and metrics.

debug(context: Union[str, hopeit.app.context.EventContext], msg, *args, **kwargs) → None
done(context: hopeit.app.context.EventContext, *args, **kwargs) → None
engine_logger = <Logger hopeit.engine 0.6.0 openapi build-14034830-project-614051-hopeitengine 139 (DEBUG)>
error(context: Union[str, hopeit.app.context.EventContext], msg, *args, **kwargs) → None
failed(context: hopeit.app.context.EventContext, *args, **kwargs) → None
ignored(context: hopeit.app.context.EventContext, *args, **kwargs) → None
info(context: Union[str, hopeit.app.context.EventContext], msg, *args, **kwargs) → None
init_app(app_config: hopeit.app.config.AppConfig, plugins: List[hopeit.app.config.AppConfig])

Initializes logger for an app and its plugins

init_cli(name: str)
init_server(server_config: hopeit.server.config.ServerConfig)
loggers = {}
start(context: hopeit.app.context.EventContext, *args, **kwargs) → None
stats(context: hopeit.app.context.EventContext, *args, **kwargs) → None
warning(context: Union[str, hopeit.app.context.EventContext], msg, *args, **kwargs) → None

Metrics module

hopeit.server.metrics.metrics(context: hopeit.app.context.EventContext)

Return event calculated metrics using EventContext as a dictionary that can be used in logging

Parameters:context – EventContext

return: dictionary than can be passed to logging extra= parameter

hopeit.server.metrics.stream_metrics(context: hopeit.app.context.EventContext)

Return stream event calculated metrics using EventContext as a dictionary that can be used in logging

Parameters:context – EventContext
Returns:dictionary than can be passed to logging extra= parameter
class hopeit.server.metrics.StreamStats

Bases: object

Helper class to keep stream consuming stats

calc() → Dict[str, Union[int, float]]

calculate stream stats to be logged :return: dict, with stream stats to be used as extra info for logging

ensure_start()
inc(error: bool = False)
reset_batch(now: datetime.datetime)

Convenience methods to normalize endpoint names, module names from configuration

hopeit.server.names.route_name(*args) → str
hopeit.server.names.module_name(*args) → str
hopeit.server.names.auto_path(*args) → str
hopeit.server.names.auto_path_prefixed(prefix, *args) → str

Provides generic serialize, deserialize methods to handle payloads

hopeit.server.serialization.serialize(data: Union[str, int, float, bool, dict, set, list, DataObject], serialization: hopeit.app.config.Serialization, compression: hopeit.app.config.Compression) → bytes
hopeit.server.serialization.deserialize(data: bytes, serialization: hopeit.app.config.Serialization, compression: hopeit.app.config.Compression, datatype: Type[EventPayloadType]) → Union[str, int, float, bool, dict, set, list, DataObject]

Handling sequencing and execution of steps from events

hopeit.server.steps.extract_module_steps(impl: module) → List[Tuple[str, Optional[Tuple[Callable, Type[CT_co], Type[CT_co]]]]]
hopeit.server.steps.effective_steps(event_name: str, module_steps: List[Tuple[str, Optional[Tuple[Callable, Type[CT_co], Type[CT_co]]]]]) → Dict[str, Tuple[Callable, Type[CT_co], Type[CT_co]]]

Return list of steps given the possibility that event steps are splitted in stages using SHUFFLE keyword. Imn that case event name will contain event_name.stage_step format. In case event_name does not contain ‘.’ and stage_step, the list of steps from start up to a SHUFFLE if found is returned. In case event_name hast ‘.’ and stage_step, steps starting in stage_step up to the next SHUFFLE step if present is are returned.

hopeit.server.steps.event_and_step(event_name: str) → Tuple[str, Optional[str]]
hopeit.server.steps.extract_event_stages(impl: module) → List[str]

Extract a list of step names consisting of the first step defined in __steps__ plus the subsequent step name after a SHUFFLE step if any in a given module

hopeit.server.steps.extract_preprocess_handler(impl: module) → Optional[Tuple[Callable, Type[CT_co], Type[CT_co]]]
hopeit.server.steps.extract_postprocess_handler(impl: module) → Optional[Tuple[Callable, Type[CT_co], Type[CT_co]]]
hopeit.server.steps.extract_input_type(impl: module, from_step: Optional[str] = None) → Type[CT_co]
hopeit.server.steps.execute_steps(steps: Dict[str, Tuple[Callable, Type[CT_co], Type[CT_co]]], *, context: hopeit.app.context.EventContext, payload: Union[str, int, float, bool, dict, set, list, DataObject, None], **kwargs) → AsyncGenerator[Union[str, int, float, bool, dict, set, list, DataObject, None], None]

Invoke steps from a event. It will try to find next step in configuration order that matches input type of the payload, and will updated the payload and invoke next valid step.

hopeit.server.steps.invoke_single_step(func: Callable, *, payload: Union[str, int, float, bool, dict, set, list, DataObject, None], context: hopeit.app.context.EventContext, **kwargs) → Union[str, int, float, bool, dict, set, list, DataObject, None]
hopeit.server.steps.find_datatype_handler(*, app_config: hopeit.app.config.AppConfig, event_name: str)
hopeit.server.steps.split_event_stages(app: hopeit.app.config.AppDescriptor, event_name: str, event_info: hopeit.app.config.EventDescriptor, impl: module) → Dict[str, hopeit.app.config.EventDescriptor]

Splits an event whose steps contain SHUFFLE step, in an initial event with same name as event_name plus sub_events with names `event_name.step_name’ for each step after a SHUFFLE. Creates intermediate auto named write_stream, read_stream to communicate data between event and sub_events, clones event configuration from main event to sub_events, and setup write_stream property for final event to be the one specified in configuration.

class hopeit.server.steps.CollectorStepsDescriptor(input_type: Type[EventPayloadType])

Bases: object

Specification for input payload type and list of sub-steps when using a collector as a compound steps in __steps__ definition of a method. This class should be instantiated using collector method from hopeit.app.events module.

Example:

from hopeit.app.events import collector_step

__steps__ == [collector_step(payload=InputType).steps('step1', 'step2'), 'step_outside_collector']

will generate a steps definition of two steps: First a collector, which receives an InputType object and run step1 and step2 functions concurrently. Step result will be a Collector to be used by step_outside_collector.

gather(*steps)
name() → str
setup_step_impl(module: module) → Callable[[Union[str, int, float, bool, dict, set, list, DataObject], hopeit.app.context.EventContext], Union[str, int, float, bool, dict, set, list, DataObject]]

Engine version constants. Increment on release To ensure configuration files from example apps and plugins have same version as engine, environment variables HOPEIT_ENGINE_VERSION and HOPEIT_APPS_API_VERSION are set.

Webserver module based on aiohttp to handle web/api requests

hopeit.server.web.parse_args(args) → Tuple[Optional[str], Optional[int], Optional[str], bool, List[str], Optional[str]]

Parse command line arguments: param: args: in form of –arg=value –path, optional, is the path of posix socket –port, optional the tcp port number –start-streams, optional True if to auto start all events of STREAM type –config-files, is a comma-separated list of hopeit apps config files relative or full paths –api-file, optional path to openapi.json file with at least openapi and info sections Example:

python web.py --port=8020 --path=/tmp/hopeit.01 --config-files=test.json
Notes:
–config-files argument is mandatory if –port and –path are not supplied the engine start on 8020 port by default
hopeit.server.web.main(host: Optional[str], port: Optional[int], path: Optional[str], start_streams: bool, config_files: List[str], api_file: Optional[str])
hopeit.server.web.start_server(config: hopeit.server.config.ServerConfig)

Start engine engine

hopeit.server.web.start_app(config: hopeit.app.config.AppConfig, scheduler: aiojobs._scheduler.Scheduler, start_streams: bool = False)

Start Hopeit app specified by config

Parameters:
  • config – AppConfig, configuration for the app to start
  • start_streams – if True all stream events in app will start consuming
hopeit.server.web.stop_server()