Data Streams and Connections¶
There are several objects that realize the main idea of the project. And some of them implement data transfer interfaces: Pipe, Stream, Transport.
Pipe objects describe logical data paths. Also, these objects responsible for streams getting. Stream objects store information about Transport objects and a state of the data flows. Transport objects responsible for managing connections.
Example to understand the concept of the pipes and streams creating:
from tethys.core.exceptions import TethysROFieldValidationError from tethys.core.pipes import ZeroPipe from tethys.core.streams import ZeroStream from tethys.core.transports import ZeroTransport from tethys.core.transports.connectors import DummyConnector from tethys.core.networks import ZeroNetwork from tethys.core.sessions import ZeroSession from tethys.core.nodes import ZeroNode node1 = ZeroNode() node2 = ZeroNode() network = ZeroNetwork() session = ZeroSession(network) connector = DummyConnector() custom_transport = ZeroTransport(connector) # will execute in ZeroStream.__init__ def transport_func(stream): return custom_transport pipe1 = ZeroPipe(node1, node2) pipe2 = ZeroPipe(node2, node1, transport=transport_func) pipe3 = ZeroPipe(node2, node1, transport=custom_transport) assert pipe1.transport != pipe2.transport != pipe3.transport # transport factories # set the transport for all pipes in the context with ZeroPipe.transport_factory_context(custom_transport): pipe4 = ZeroPipe(node2, node1) # set the result of the function for all pipes in the context # lambda will execute in ZeroPipe.__init__ method with ZeroPipe.transport_factory_context(lambda pipe: custom_transport): pipe5 = ZeroPipe(node2, node1) assert pipe3.transport == pipe4.transport == pipe5.transport with ZeroPipe.transport_factory_context(lambda pipe: custom_transport): # lambda will execute in ZeroPipe.get_stream() method pipe6 = ZeroPipe(node2, node1, transport=None) ZeroPipe.set_transport_factory(custom_transport) pipe7 = ZeroPipe(node2, node1, transport=None) assert pipe5.transport != pipe6.transport assert pipe6.transport is None and pipe7.transport is None # streams stream1 = pipe1.get_stream(session) # this get_stream will create the stream stream2 = pipe2.get_stream(session) stream3 = pipe3.get_stream(session) stream4 = pipe4.get_stream(session) stream5 = pipe5.get_stream(session) stream6 = pipe6.get_stream(session) stream7 = pipe7.get_stream(session) assert stream7 == pipe7.get_stream(session) # and this get_stream will load the stream # first pipe and first stream using default transport assert stream1.transport != stream2.transport # other streams have identical transport objects assert all( stream.transport == custom_transport for stream in [stream2, stream3, stream4, stream5, stream6, stream7] ) custom_stream = ZeroStream(pipe1, session, custom_transport) try: custom_stream2 = ZeroStream(pipe1, session, None) except TethysROFieldValidationError: custom_stream2 = None assert not custom_stream2
Each object described above uses special methods to transfer data. The following examples show how these methods work.
In most cases, you will not use the pipe’s interface to send data, but the next example will help you to understand how data are transferred and filtered.
from tethys.core.pipes import ZeroPipe from tethys.core.pipes.filters import RegexMatchFilter, FNFilter from tethys.core.networks import ZeroNetwork from tethys.core.sessions import ZeroSession from tethys.core.nodes import ZeroNode from difflib import SequenceMatcher node1 = ZeroNode() node2 = ZeroNode() network = ZeroNetwork() session = ZeroSession(network) # simple pipe pipe1 = ZeroPipe(node1, node2) # push method will get/create stream by self.get_stream(session) # and execute 'write' method of the stream. pipe1.push("some_data_packet", session) # pull method will receive data from the stream. data_packets_generator = pipe1.pull(session) assert next(data_packets_generator) == "some_data_packet" # a distinctive feature of the pipe's methods is filtering. # you can filter the data before sending using special filters. def similarity(data_packet, **kwargs): if not isinstance(data_packet, str): return 0.0 return SequenceMatcher(None, data_packet, "some_data_packet").ratio() pipe2 = ZeroPipe( node1, node2, filters=[ RegexMatchFilter("^some_.*"), # return 1 or 0 FNFilter(similarity) # return similarity result ], filters_threshold=0.5 # filters score > 0.5 ) # also you can send a list of data packets by adding many=True pipe2.push(["some_data_packet", "data_packet", "some_", None, {}], session, many=True) data_packets_generator = pipe2.pull(session, wait_timeout=0) assert list(data_packets_generator) == ["some_data_packet"]
Pipes methods are proxies for the streams methods with some sugar, as well as streams methods are proxies for the transport methods.
Next example shows how the methods work together:
import json from tethys.core.pipes import ZeroPipe from tethys.core.networks import ZeroNetwork from tethys.core.sessions import ZeroSession from tethys.core.nodes import ZeroNode from tethys.core.transports import ZeroTransport node1 = ZeroNode() node2 = ZeroNode() network = ZeroNetwork() session = ZeroSession(network) def serializer(x): return json.dumps(x) def deserializer(x): return json.loads(x) transport1 = ZeroTransport(serializer=serializer, deserializer=deserializer) pipe1 = ZeroPipe(node1, node2, transport=transport1) stream1 = pipe1.get_stream(session) connection1 = transport1.connect(stream1) pipe1.push("data1", session) data_packet_key1, data_packet1 = next(stream1.read(wait_timeout=0)) stream1.write(["data2", {"v": "data3"}], many=True) stream1.ack(data_packet_key1) assert (data_packet_key1, data_packet1) == ("", "data1") data_packet_key2, data_packet2 = transport1.recv(stream1, wait_timeout=0) transport1.send(stream1, None) transport1.ack(stream1, data_packet_key2) assert (data_packet_key2, data_packet2) == ("", "data2") connection1.send("data") assert list(connection1.recv_iter()) == [ ("", '{"v": "data3"}'), ("", "null"), ("", "data") ] # not deserialized for i in range(3): connection1.ack("") # "" because LocalConnection doesn't generate message_key transport1.disconnect(stream1)
As a user, you will not use most of the features. They are part of the system and helps to process your data according to your declarative instructions.
Also, there are methods for data sending in Nodes and Sessions. In most cases, you will use them to send data.
High-level example:
import json from tethys.core.pipes import ZeroPipe from tethys.core.networks import ZeroNetwork from tethys.core.sessions import ZeroSession from tethys.core.nodes import ZeroNode from tethys.core.nodes.operators import PythonFunctionOperator from tethys.core.transports import ZeroTransport from tethys.core.stations import ZeroStation def serializer(x): return json.dumps(x) def deserializer(x): return json.loads(x) transport1 = ZeroTransport(serializer=serializer, deserializer=deserializer) # the first argument is the data_packet and other arguments will be recognized (by name) # you can pass arguments in any order # list of the available arguments you can find in the PythonFunctionOperator description # in the Api Reference section def op1_fn(data_packet, node, network, session): if isinstance(data_packet, str): # send data to the forward pipes (Pipe2 and Pipe3) node.send(data_packet, session) else: first_forward_pipe = next(network.get_forward_pipes(node)) # Pipe2 first_forward_pipe.push(data_packet, session) def op2_fn(data_packet, pipe, node, session): if isinstance(data_packet, str): pipe.push({"data": data_packet}, session) # recursion else: node.send(data_packet, session) # send data to the forward pipes # data processing will detailed in the next docs section. node1 = ZeroNode( PythonFunctionOperator(op1_fn) ) node2 = ZeroNode( op2_fn # the function will be wrapped by PythonFunctionOperator ) # default: dummy operator that just returns data_packet node3 = ZeroNode() with ZeroPipe.transport_factory_context(transport1): pipe1 = ZeroPipe(ZeroNode.IN, node1, _id="pipe1") pipe2 = ZeroPipe(node1, node2, _id="pipe2") pipe3 = ZeroPipe(node1, node3, _id="pipe3") pipe4 = ZeroPipe(node2, ZeroNode.OUT, _id="pipe4") pipe5 = ZeroPipe(node3, ZeroNode.OUT, _id="pipe5") network = ZeroNetwork([pipe1, pipe2, pipe3, pipe4, pipe5]) # -----Pipe 2------> Node 2 -----Pipe 4------> OUT # IN --Pipe1--> Node 1 ---> # -----Pipe 3------> Node 3 -----Pipe 5------> OUT session = ZeroSession(network).open() # will send data to the input pipe (Pipe1) session.send(["data1", "data2", {"data": "data3"}], many=True) session.close(ZeroSession.SOFT_CLOSING_MODE) # start processing ZeroStation( sessions=[session], monitor_checks_delay=0.1, update_min_delay=0.1, stream_waiting_timeout=0 ).start() assert list(pipe4.pull(session, wait_timeout=0)) == [ {"data": "data3"}, {"data": "data1"}, {"data": "data2"} ] assert list(pipe5.pull(session, wait_timeout=0)) == [ "data1", "data2" ]
How to work the “data processing” will detailed in next sections of the docs. Also, list of the filters, operators and connectors you can find in the API Reference.