Source code for tethys.core.sessions.sess_zero

# Copyright 2020 Konstruktor, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import Union, Any, TYPE_CHECKING

from tethys.core.exceptions import TethysInputNodesNotFound, TethysSessionClosed
from tethys.core.nodes import ZeroNode
from tethys.core.regobjs.regobj_zero import ZeroRegistrableObject
from tethys.core.sessions.sess_base import SessionBase

if TYPE_CHECKING:
    from tethys.core.networks.network_zero import ZeroNetwork  # noqa: F401

[docs]log = logging.getLogger(__name__)
[docs]class ZeroSession(ZeroRegistrableObject, SessionBase): """ The Session entity class of the Zero generation. The ZeroSession is a context for the streams. """
[docs] SOFT_CLOSING_MODE = "<soft>"
[docs] HARD_CLOSING_MODE = "<hard>"
[docs] INSTANT_CLOSING_MODE = "<instant>"
[docs] CLASS_PATH = "/sessions/"
":class:`ZeroSession` instances collection path in the repository"
[docs] FIELDS_SCHEMA = { "network": {"type": ["string", "ZeroNetwork"], "required": True}, "context": {"type": "dict", "default": {}, "required": False}, "closing_mode": { "type": "string", "nullable": True, "default": None, "allowed": [SOFT_CLOSING_MODE, HARD_CLOSING_MODE, INSTANT_CLOSING_MODE], }, "closed": {"type": "boolean", "nullable": False, "default": False}, "parent": { "type": ["string", "ZeroSession"], "nullable": True, "default": None, "required": False, }, "children": { "type": "list", "valuesrules": {"type": ["string", "ZeroSession"]},
}, } ":class:`ZeroSession` fields validators" def __init__( self, network: Union["ZeroNetwork", str], parent: "ZeroSession" = None, **kwargs ): """ :param network: The network used to run streams in the session scope. :type network: ZeroNetwork :param parent: The parent session :type parent: ZeroSession """ self.network = network self.closed = False self.closing_mode = None self.children = [] self.parent = parent
[docs] def __setattr__(self, key, value): old_parent = getattr(self, "parent", None) super().__setattr__(key, value) if key == "parent": if value and isinstance(value, ZeroSession): value.add_child(self) if old_parent: old_parent.remove_child(self)
[docs] def add_child(self, child: "ZeroSession"): self.children.append(child.path)
[docs] def remove_child(self, child: "ZeroSession"): self.children.remove(child.path)
[docs] def get_children_dict(self): c_sessions = [ZeroSession.load(c_path) for c_path in self.children] return {child.id: child for child in c_sessions}
[docs] def open(self, **kwargs) -> "ZeroSession": """ This method changes the ZeroSession station to `closed=False`. :return: self object :rtype: ZeroSession """ self.closed = False self.closing_mode = None return self.save(save_dependency=False)
def _close_related_entities(self, mode): for child in self.get_children_dict().values(): child.close(mode) if mode != self.SOFT_CLOSING_MODE: pipes = self.network.pipes.values() else: pipes = self.network.output_pipes for pipe in pipes: try: stream = pipe.get_stream(self) if not stream.closed: stream.close() except TethysSessionClosed: logging.debug("Close method tried to close nonexistent stream")
[docs] def close(self, mode: str = None) -> "ZeroSession": """ This method changes the ZeroSession station to `closing_mode=mode`. All children also will be closed. :param mode: closing mode determines the mode of closing streams (default: INSTANT_CLOSING_MODE) :type mode: str :return: self object :rtype: ZeroSession """ mode = mode or self.INSTANT_CLOSING_MODE if mode not in [ self.INSTANT_CLOSING_MODE, self.HARD_CLOSING_MODE, self.SOFT_CLOSING_MODE, ]: raise ValueError("invalid closing mode") if mode == self.INSTANT_CLOSING_MODE: self.closed = True self.closing_mode = mode self._close_related_entities(mode) return self.save(save_dependency=False)
[docs] def send(self, data_packet: Any, many: bool = False, **kwargs): """ This method sends data_packet(s) to the input nodes. :param data_packet: any data object or list of the data objects (with many=True) :param many: if many=True then data_packet's elements will be sent one-by-one. :type many: bool """ pipes_map = self.network.get_pipes_map() if ZeroNode.IN not in pipes_map: raise TethysInputNodesNotFound( "Input node not found in [{}] network".format(self.network.id) ) input_pipes_nodes = pipes_map[ZeroNode.IN] for node_pipes in input_pipes_nodes.values(): for pipe in node_pipes.values(): pipe.push(data_packet, self, many=many, **kwargs)
def _sync_closing_mode_with_parent(self): if ( self.parent and self.parent.refresh() and ( self.parent.closed or self.parent.closing_mode != self.SOFT_CLOSING_MODE ) ): self.close(self.parent.closing_mode) def _sync_closing_mode_with_stream(self): if not self.closed: for pipe in self.network.pipes.values(): try: stream = pipe.get_stream(self) except TethysSessionClosed: continue if not stream.closed: break else: self.close()
[docs] def refresh(self, **kwargs) -> Union["ZeroSession", None]: """ Reload entity from the repository (without cache). Also this method updates streams and sessions states according to the closing mode. :return: return ZeroSession instance or None (when error like NotFound) :rtype: ZeroSession or None """ if not super().refresh(**kwargs): return None self._sync_closing_mode_with_parent() self._sync_closing_mode_with_stream() return self
[docs] def __enter__(self) -> "ZeroSession": return self.open()
[docs] def __exit__(self, exc_type, exc_val, exc_tb): self.close(self.SOFT_CLOSING_MODE)