tethys.core.pipes.pipe_zero
¶
Module Contents¶
-
class
tethys.core.pipes.pipe_zero.
ZeroPipe
(node_a: Union[str, 'ZeroNode'], node_b: Union[str, 'ZeroNode'], filters: Iterable['FilterBase'] = None, filters_threshold: float = 0.5, transport: Optional[Union[str, Callable, 'ZeroTransport']] = ..., **kwargs)[source]¶ Bases:
tethys.core.regobjs.regobj_zero.ZeroRegistrableObject
,tethys.core.pipes.pipe_base.PipeBase
The Pipe entity class of the Zero generation. The ZeroPipe connects two nodes and allows adding filters and transports.
- Example:
def transport_factory_1(pipe, **kwargs): return CustomTransport1() # setting default transport in the context with ZeroPipe.transport_factory_context(transport_factory_1): pipe1 = ZeroPipe( node_a="node1", # ZeroPipe will load "node1" from a repository node_b="node2", # ZeroPipe will load "node2" from a repository filters=[ CustomFilter(return_value=0.5) # filter will return 0.5 score ], filters_threshold=0.4999, # pipe send data if filters_score > filters_threshold # transport=transport_factory_1() # specified by the transport_factory_context ) pipe2 = ZeroPipe( node_a=node2, # ZeroPipe will use exists node2 object node_b=node1, # ZeroPipe will use exists node1 object # specify custom transport instead of the transport_factory_1 transport=CustomTransport2() )
- Parameters
filters (Iterable[FilterBase]) – list of the Filter instances (which inherits from abstract class FilterBase)
filters_threshold (float) – threshold for every filter’s score in the filters list.
transport (Union[str, Callable, ZeroTransport]) – this is transport which will be default for every pipe’s stream. If value == … then transport_factory will be executed in the __init__. If value == None then transport_factory will be executed in the get_stream().
-
classmethod
transport_factory_context
(cls, transport_factory: Union[Callable, 'ZeroTransport'])[source]¶ Context manager for the
set_transport_factory()
method
-
classmethod
set_transport_factory
(cls, transport_factory: Union[Callable, 'ZeroTransport'])[source]¶ Set default transport factory. You can provide the ZeroTransport instances instead of Callable objects.
- Parameters
transport_factory (Union[Callable, ZeroTransport]) – any
ZeroTransport
instance or any callable object that return the instance
-
filter_data_packet
(self, data_packet: Any, session: ZeroSession = None) → bool[source]¶ Check all filters in the pipe. Return True if all filters return score that greater than filters_threshold
- Parameters
data_packet – any data object
session (ZeroSession) – optional param that extends context information
- Returns
True or False, depends on the pipe’s filters score.
- Return type
-
get_stream
(self, session: ZeroSession) → 'ZeroStream'[source]¶ The method load or create ZeroStream instance according to pipe and session instances
- Parameters
session (ZeroSession) – ZeroSession instance
- Returns
ZeroStream instance that is attached to the pipe
- Return type
-
pull
(self, session: ZeroSession, **kwargs) → Generator[source]¶ Get data_packets from the pipe’s stream using python Generator (execute
ZeroStream.read()
method).- Parameters
session (ZeroSession) – ZeroSession instance
- Returns
data_packets generator
- Return type
-
push
(self, data_packet: Any, session: ZeroSession, many: bool = False, **kwargs) → bool[source]¶ Send data_packet into the pipe’s stream
- Parameters
data_packet – any data object or list of the data objects (with many=True)
session (ZeroSession) – ZeroSession instance
many (bool) – if many=True then data_packet’s elements will be sent one-by-one.
- Returns
True or False, True if at least one data_packet has been sent
- Return type