# Copyright 2020 Konstruktor, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from contextlib import contextmanager
from typing import Union, Callable, TYPE_CHECKING, Generator, List, Any, Tuple
from tethys.core.nodes.node_zero import ZeroNode
from tethys.core.regobjs.regobj_zero import ZeroRegistrableObject
from tethys.core.sessions.sess_zero import ZeroSession
from tethys.core.streams.stream_base import StreamBase
from tethys.core.transports.transport_zero import ZeroTransport
if TYPE_CHECKING:
from tethys.core.stations.station_zero import ZeroStation # noqa: F401
from tethys.core.pipes.pipe_zero import ZeroPipe # noqa: F401
[docs]log = logging.getLogger(__name__)
[docs]class ZeroStream(ZeroRegistrableObject, StreamBase):
"""
The Stream entity class of the Zero generation.
The ZeroStream is an entity that defines the physical path of the data.
"""
[docs] DEFAULT_HEARTBEAT_FAIL_DELAY = 10
[docs] CLASS_PATH = "/streams/"
":class:`ZeroStream` instances collection path in the repository"
[docs] FIELDS_SCHEMA = {
"pipe": {"type": ["string", "ZeroPipe"], "required": True},
"transport": {"type": ["string", "ZeroTransport"], "required": True},
"session": {"type": ["string", "ZeroSession"], "required": True},
"station": {
"type": ["string", "ZeroStation"],
"nullable": True,
"required": False,
"default": None,
},
"heartbeat_ts": {"type": "number", "default": 0, "required": False},
"closed": {"type": "boolean", "nullable": False, "default": False},
}
":class:`ZeroStream` fields validators"
def __new__(cls, *args, _id=None, **kwargs):
if _id is None and len(args) >= 2:
pipe, session, *_ = args
_id = "{}_{}".format(session.id, pipe.id)
return super().__new__(cls, *args, _id=_id, **kwargs)
def __init__(
self,
pipe: "ZeroPipe",
session: "ZeroSession",
transport: Union["ZeroTransport", Callable],
**kwargs
):
"""
:param pipe: Pipe instance that defines the logical path between nodes.
:type pipe: ZeroPipe
:param session: The session allows getting the stream for the pipe.
:type session: ZeroSession
:param transport:
"""
self.pipe = pipe
self.session = session
self.station = None
self.closed = False
self.heartbeat_ts = 0
if transport and not isinstance(transport, ZeroTransport):
transport = transport(self)
self.transport = transport
@contextmanager
[docs] def connection_context(self):
if not self.transport.is_connected(self):
self.transport.connect(self)
disconnect_after_read = True
else:
disconnect_after_read = False
yield
if disconnect_after_read:
self.transport.disconnect(self)
@property
[docs] def heartbeat_fail_delay(self):
station = self.station
if station:
heartbeat_fail_delay = getattr(station, "heartbeat_fail_delay", 0)
if not heartbeat_fail_delay or heartbeat_fail_delay < 0:
heartbeat_fail_delay = self.DEFAULT_HEARTBEAT_FAIL_DELAY
return heartbeat_fail_delay
return self.DEFAULT_HEARTBEAT_FAIL_DELAY
@property
[docs] def is_busy(self) -> bool:
"""
Is some station using the stream?
:rtype: bool
"""
self.refresh()
return (
self.heartbeat_fail_delay
and time.time() - self.heartbeat_ts < self.heartbeat_fail_delay
)
[docs] def open(self, save=True, **kwargs) -> "ZeroStream":
"""
Open stream
:param save: to save the stream
:type save: bool
:return: self instance
:rtype: StreamBase
"""
self.closed = False
if save:
self.save(save_dependency=False)
log.info("Stream %s opened%s", self.id, ". State was saved" if save else "")
return self
[docs] def close(self, save=True, **kwargs) -> "ZeroStream":
"""
Close stream
:param save: to save the stream
:type save: bool
:return: self instance
:rtype: StreamBase
"""
self.closed = True
if save:
self.save(save_dependency=False)
log.info("Stream %s closed%s", self.id, ". State was saved" if save else "")
return self
[docs] def write(self, data_packet: Union[List[Any], Any], many: bool = False, **kwargs):
"""
Write the data_packet to the stream
:param data_packet: any data object or list of the data objects (with many=True)
:param many: if many=True then data_packet's elements will be sent one-by-one.
:type many: bool
"""
if not self.closed or self.pipe.node_b == ZeroNode.OUT:
with self.connection_context():
if many:
data_packets = data_packet
for data_packet in data_packets:
self.transport.send(self, data_packet, **kwargs)
else:
self.transport.send(self, data_packet, **kwargs)
[docs] def read(
self, count: int = None, wait_timeout: float = None, **kwargs
) -> Generator[Tuple[str, Any], None, None]:
"""
Read the data_packets from the stream. Return Generator.
:param count: count of the data_packets
:type count: int
:param wait_timeout: waiting time (seconds)
:type wait_timeout: float
"""
counter = count or -1
initial_station = self.station
with self.connection_context():
while counter and not self.closed and initial_station == self.station:
data_packet = None
counter -= 1
if wait_timeout is None:
while (
data_packet is None
and not self.closed
and initial_station == self.station
):
data_packet = self.transport.recv(
self, wait_timeout=wait_timeout, **kwargs
)
if data_packet is None:
self.refresh()
if self.session.closed or self.session.closing_mode:
return
time.sleep(self.READ_DELAY)
else:
data_packet = self.transport.recv(
self, wait_timeout=wait_timeout, **kwargs
)
if data_packet is None:
break
yield data_packet
[docs] def ack(self, message_key: str, **kwargs):
"""
Acknowledge message
:param message_key: message key for the acknowledgement
:type message_key: str
"""
if not self.closed:
with self.connection_context():
self.transport.ack(self, message_key, **kwargs)
[docs] def redirect_to(self, station: "ZeroStation", **kwargs):
"""
Redirect stream processing to the station
:param station: Station where the stream can be processed in the near time
:type station: ZeroStation
"""
if not self.station or self.station.id != station.id:
log.info(
"Stream %s will redirect %sto [%s] station",
self.id,
"from [{}] station ".format(self.station.id) if self.station else "",
station.id,
)
station.save(save_dependency=False)
self.station = station
self.heartbeat_ts = time.time()
self.save(save_dependency=False)
log.info(
"Stream %s was redirected to [%s] station", self.id, self.station.id,
)
[docs] def heartbeat(self, **kwargs):
"""
Health heartbeat
"""
self.heartbeat_ts = time.time()
self.save(save_dependency=False)
log.debug("Stream [%s] heartbeat", self.id)
[docs] def __enter__(self):
return self.open(save=False)
[docs] def __exit__(self, exc_type, exc_val, exc_tb):
self.close(save=False)