Skip to main content

Error Handling

Best practices for handling errors and implementing retry strategies with SanctionsWise API.


Error Response Format​

All errors return a JSON response:

{
"error": "Error type",
"message": "Detailed error message"
}

Error Types​

400 Bad Request​

Invalid request parameters:

{
"error": "Field \"name\" is required and must be a string"
}

Common causes:

  • Missing required fields
  • Invalid field types
  • Value out of range (e.g., threshold < 0.50)

Solution: Validate input before sending.

401 Unauthorized​

Missing API key:

{
"message": "Unauthorized"
}

Solution: Include x-api-key header.

403 Forbidden​

Invalid or revoked API key:

{
"message": "Forbidden - Invalid API key"
}

Solution: Check API key at app.orchestraprime.ai.

429 Too Many Requests​

Rate limit exceeded:

{
"message": "Rate limit exceeded",
"retry_after_seconds": 60
}

Solution: Implement exponential backoff.

500 Internal Server Error​

Server-side error:

{
"error": "Internal server error",
"message": "An error occurred processing your request"
}

Solution: Retry with exponential backoff.


Retry Strategy​

Exponential Backoff​

import time
from functools import wraps

def retry_with_backoff(max_retries=3, base_delay=1):
"""Decorator for retry with exponential backoff."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
# Rate limited - use retry-after header
retry_after = int(e.response.headers.get("Retry-After", base_delay * (2 ** attempt)))
time.sleep(retry_after)
elif e.response.status_code >= 500:
# Server error - exponential backoff
time.sleep(base_delay * (2 ** attempt))
else:
# Client error - don't retry
raise
except requests.exceptions.RequestException:
# Network error - exponential backoff
time.sleep(base_delay * (2 ** attempt))

# Final attempt
return func(*args, **kwargs)
return wrapper
return decorator

@retry_with_backoff(max_retries=3)
def screen_entity_with_retry(name, entity_type, threshold):
response = requests.post(
f"{BASE_URL}/screen/entity",
headers={"x-api-key": API_KEY, "Content-Type": "application/json"},
json={"name": name, "entity_type": entity_type, "threshold": threshold},
timeout=30
)
response.raise_for_status()
return response.json()

Circuit Breaker Pattern​

Prevent cascading failures:

from datetime import datetime, timedelta

class CircuitBreaker:
"""Simple circuit breaker for API calls."""

def __init__(self, failure_threshold=5, reset_timeout=60):
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.failures = 0
self.last_failure_time = None
self.state = "closed" # closed, open, half-open

def call(self, func, *args, **kwargs):
if self.state == "open":
if datetime.now() - self.last_failure_time > timedelta(seconds=self.reset_timeout):
self.state = "half-open"
else:
raise Exception("Circuit breaker is open")

try:
result = func(*args, **kwargs)
if self.state == "half-open":
self.state = "closed"
self.failures = 0
return result

except Exception as e:
self.failures += 1
self.last_failure_time = datetime.now()

if self.failures >= self.failure_threshold:
self.state = "open"

raise e

# Usage
breaker = CircuitBreaker(failure_threshold=5, reset_timeout=60)
result = breaker.call(screen_entity_with_retry, "Vladimir Putin", "individual", 0.60)

Connection Pooling​

Improve performance with connection reuse:

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

def create_session():
"""Create a session with connection pooling and retry."""
session = requests.Session()

retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["POST", "GET"]
)

adapter = HTTPAdapter(
pool_connections=10,
pool_maxsize=10,
max_retries=retry_strategy
)

session.mount("https://", adapter)
session.headers.update({
"x-api-key": API_KEY,
"Content-Type": "application/json"
})

return session

# Reuse session for all requests
session = create_session()
response = session.post(f"{BASE_URL}/screen/entity", json={"name": "John Smith"})

Error Logging​

Log errors for debugging:

import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def screen_with_logging(name):
try:
result = screen_entity(name)
logger.info(f"Screening complete: {result['screening_id']} - {result['status']}")
return result
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP error screening {name}: {e.response.status_code} - {e.response.text}")
raise
except requests.exceptions.RequestException as e:
logger.error(f"Network error screening {name}: {str(e)}")
raise

Timeout Handling​

Always set reasonable timeouts:

# Single entity - 30 second timeout
response = requests.post(
f"{BASE_URL}/screen/entity",
json={"name": name},
timeout=30 # seconds
)

# Batch - longer timeout
response = requests.post(
f"{BASE_URL}/screen/batch",
json={"entities": entities},
timeout=120 # 2 minutes for large batches
)