This example demonstrates how to perform basic data transformations using Rivusio.
from rivusio import AsyncBasePipe
from rivusio.config import PipeConfig
from typing import Dict, List
from pydantic import BaseModel, Field
# Define data model
class Record(BaseModel):
id: int
value: float
category: str
class TransformConfig(PipeConfig):
multiply_by: float = Field(default=1.0, gt=0)
categories: List[str] = Field(default_factory=lambda: ["A", "B", "C"])
class TransformPipe(AsyncBasePipe[Record, Record]):
def __init__(self, config: TransformConfig):
super().__init__()
self.config = config
async def process(self, data: Record) -> Record:
return Record(
id=data.id,
value=data.value * self.config.multiply_by,
category=data.category
)
async def main():
config = TransformConfig(multiply_by=2.0)
pipe = TransformPipe(config)
data = Record(id=1, value=10.0, category="A")
result = await pipe(data)
print(result) # Record(id=1, value=20.0, category='A')
if __name__ == "__main__":
import asyncio
asyncio.run(main())