Rivusio provides comprehensive error handling capabilities to help you build robust data processing pipelines.
Base error type for pipe-specific errors:
from rivusio.exceptions import PipeError
try:
result = await pipe.process(data)
except PipeError as e:
print(f"Error in pipe {e.pipe}: {e.error}")
Error type for pipeline-level errors:
from rivusio.exceptions import PipelineError
try:
result = await pipeline.process(data)
except PipelineError as e:
print(f"Pipeline error: {e}")
print(f"Failed pipe: {e.pipe}")
print(f"Original error: {e.original_error}")
Configure retries for transient errors:
from rivusio import RetryConfig
config = RetryConfig(
max_retries=3,
retry_delay=5,
backoff_factor=2,
exceptions=[TransientError]
)
pipe = MyPipe(config=config)
Implement custom error recovery:
class MyPipe(ConfigurablePipe[Dict, Dict]):
async def process(self, data: Dict) -> Dict:
try:
return await self._process_data(data)
except ValueError as e:
# Handle value error
return self._handle_value_error(data, e)
except KeyError as e:
# Handle key error
return self._handle_key_error(data, e)
Handle unprocessable messages:
class DeadLetterConfig(PipeConfig):
queue_url: str
class ProcessingPipe(ConfigurablePipe[Dict, Dict]):
def __init__(self, config: PipeConfig, dlq_config: DeadLetterConfig):
super().__init__(config)
self.dlq = DeadLetterQueue(dlq_config)
async def process(self, data: Dict) -> Dict:
try:
return await self._process_data(data)
except UnprocessableError as e:
await self.dlq.send(data, error=e)
return None
Monitor errors in your pipeline:
from rivusio.monitoring import PipelineMonitor
monitor = PipelineMonitor()
pipeline.set_error_monitor(monitor)
# Process data
await pipeline.process(data)
# Get error metrics
error_metrics = monitor.get_metrics()
print(f"Total errors: {error_metrics['error_count']}")
print(f"Error rate: {error_metrics['error_rate']}")