A lightweight, type-safe data transformation framework for Python.
>> operatorfrom rivusio import AsyncBasePipe
from typing import Dict, List
class FilterPipe(AsyncBasePipe[Dict, Dict]):
async def process(self, data: Dict) -> Dict:
return {k: v for k, v in data.items() if v > 100}
class TransformPipe(AsyncBasePipe[Dict, List]):
async def process(self, data: Dict) -> List:
return list(data.values())
async def main():
# Create pipeline using >> operator
pipeline = FilterPipe() >> TransformPipe()
# Process data
data = {"a": 150, "b": 50, "c": 200}
result = await pipeline(data) # [150, 200]
print(result)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
pip install rivusio