Data processing

There are some constraints regarding processing big data. One of them is that the system can’t calculate big data like other data streaming frameworks. It is created for another purpose. It helps manage any data stream. In most cases, you can use the system to combine several streaming frameworks using one interface to do complex data processing tasks.

Tethys can process your data in Nodes using Operators. Operators can use python to process and route the data, but it is simple usage. If you need speed or cluster processing, you can use any integrations.

How to use nodes and operator you can learn below:
from tethys.core.networks import ZeroNetwork
from tethys.core.nodes import ZeroNode
from tethys.core.nodes.operators import PythonFunctionOperator
from tethys.core.pipes import ZeroPipe
from tethys.core.sessions import ZeroSession
from tethys.core.stations import ZeroStation


class MyZeroSession(ZeroSession):
    FIELDS_SCHEMA = dict(
        **ZeroSession.FIELDS_SCHEMA,
        counter={
            "type": "integer",
            "default": 0
        }
    )


def operator(data_packet, session):
    if isinstance(data_packet, str):
        with session.lock_context():
            session.counter += 1
            session.save()


node = ZeroNode(
    PythonFunctionOperator(operator)
)
pipe = ZeroPipe(ZeroNode.IN, node)
network = ZeroNetwork([pipe])

# the session will be opened and closed (<safe> mode)
with MyZeroSession(network) as session:
    session.send(["1", "2", None, {"some": "data"}, "3"], many=True)


ZeroStation(
    sessions=[session],
    monitor_checks_delay=0.1,
    update_min_delay=0.1,
    stream_waiting_timeout=0,
).start()

assert session.counter == 3  # isinstance(data_packet, str)

List of the operators you can find in the API Reference.