The HTTP module provides the networking layer for tif1, including connection pooling, async parallel fetching, and rate limiting.
HTTP Session
get_session
Get the global HTTP session instance with connection pooling.
Returns a niquests.Session instance configured with:
- Connection pooling for reuse (minimum 256 connections, 512 max pool size)
- DoH (DNS over HTTPS) fallback support (Cloudflare, Google)
- HTTP/3 and multiplexing support (configurable)
- Keep-alive connections with configurable timeout
- Custom User-Agent header
Returns:
- Shared niquests.Session instance (thread-safe)
Example:
from tif1.http_session import get_session
session = get_session()
response = session.get("https://example.com/data.json")
data = response.json()
The HTTP session is automatically managed and thread-safe. You rarely need to interact with it directly.
close_session
Close the global HTTP session and release connections.
def close_session() -> None
Example:
from tif1.http_session import close_session
import atexit
# Close session on program exit
atexit.register(close_session)
Session cleanup is automatically registered with atexit, so manual cleanup is usually unnecessary.
get_connection_stats
Get current connection pool statistics for monitoring and debugging.
def get_connection_stats() -> dict[str, Any]
Returns:
Dictionary with connection metrics:
total_requests: Total number of requests made
connections_reused: Number of requests that reused connections
connections_created: Number of connection pools created
reuse_rate: Percentage of requests that reused connections (0-100)
Example:
from tif1.http_session import get_connection_stats
stats = get_connection_stats()
print(f"Reuse rate: {stats['reuse_rate']:.1f}%")
print(f"Total requests: {stats['total_requests']}")
Async Fetching
The async fetch module provides high-performance parallel data loading using niquests with thread pool executors.
fetch_json_async
Asynchronously fetch and parse JSON data with caching, retry logic, and CDN fallback.
async def fetch_json_async(
year: int,
gp: str,
session: str,
path: str,
max_retries: int | None = None,
timeout: int | None = None,
*,
use_cache: bool = True,
write_cache: bool = True,
validate_payload: bool = True
) -> dict[str, Any]
Parameters:
year: Season year
gp: Grand Prix name
session: Session name (e.g., “Race”, “Qualifying”)
path: Path to JSON file (e.g., “drivers.json”, “laps_VER.json”)
max_retries: Maximum retry attempts (defaults to config value, typically 3)
timeout: Request timeout in seconds (defaults to config value, typically 30)
use_cache: If True, read from cache before network fetch
write_cache: If True, persist successful network responses to cache
validate_payload: If True, run Pydantic validation before returning data
Returns:
- Parsed JSON data as dictionary (never None, raises on error)
Raises:
NetworkError: If network request fails after retries
DataNotFoundError: If data doesn’t exist (404)
InvalidDataError: If JSON parsing or validation fails
Example:
import asyncio
from tif1.async_fetch import fetch_json_async
async def load_data():
data = await fetch_json_async(2021, "Belgian Grand Prix", "Race", "drivers.json")
print(f"Loaded {len(data)} drivers")
asyncio.run(load_data())
fetch_multiple_async
Fetch multiple JSON files in parallel with optimized batch size and graceful error handling.
async def fetch_multiple_async(
requests: list[tuple[int, str, str, str]],
*,
use_cache: bool = True,
write_cache: bool = True,
validate_payload: bool = True,
max_retries: int | None = None,
timeout: int | None = None,
max_concurrent_requests: int | None = None
) -> list[dict[str, Any] | None]
Parameters:
requests: List of (year, gp, session, path) tuples
use_cache: If True, read from cache before network fetch
write_cache: If True, persist successful network responses to cache
validate_payload: If True, run payload validation before returning data
max_retries: Maximum retry attempts per request
timeout: Request timeout in seconds
max_concurrent_requests: Maximum concurrent requests (defaults to config value, typically 20)
Returns:
- List of fetched data dictionaries (None for failed requests). Exceptions are logged and converted to None for graceful degradation. DataNotFoundError (404) is silently converted to None.
Example:
import asyncio
from tif1.async_fetch import fetch_multiple_async
async def load_multiple_drivers():
requests = [
(2021, "Belgian Grand Prix", "Race", "laps_VER.json"),
(2021, "Belgian Grand Prix", "Race", "laps_HAM.json"),
(2021, "Belgian Grand Prix", "Race", "laps_LEC.json"),
]
results = await fetch_multiple_async(requests)
for i, data in enumerate(results):
if data:
print(f"Driver {i+1}: {len(data['LapNumber'])} laps")
asyncio.run(load_multiple_drivers())
fetch_with_rate_limit
Execute async function with rate limiting using semaphore.
async def fetch_with_rate_limit(
coro_func,
*args,
semaphore: asyncio.Semaphore | None = None,
**kwargs
)
Parameters:
coro_func: Async function to execute
*args: Positional arguments for coro_func
semaphore: Optional semaphore for rate limiting. If None, creates one based on max_concurrent_requests config
**kwargs: Keyword arguments for coro_func
Returns:
- Result from coro_func execution
Example:
import asyncio
from tif1.async_fetch import fetch_with_rate_limit, fetch_json_async
async def fetch_with_custom_limit():
semaphore = asyncio.Semaphore(5)
result = await fetch_with_rate_limit(
fetch_json_async, 2021, "Belgian Grand Prix", "Race", "drivers.json",
semaphore=semaphore
)
print(f"Fetched {len(result)} drivers")
asyncio.run(fetch_with_custom_limit())
close_session
Close the async HTTP session and release resources.
def close_session() -> None
Example:
from tif1.async_fetch import close_session
import atexit
atexit.register(close_session)
cleanup_resources
Clean up all async resources including session and executor.
def cleanup_resources() -> None
Example:
from tif1.async_fetch import cleanup_resources
# At program exit
cleanup_resources()
Rate Limiting
The async fetch module includes automatic rate limiting to prevent CDN throttling:
- Maximum 20 concurrent requests by default (configurable via
max_concurrent_requests config)
- Configurable via
max_concurrent_requests parameter in fetch_multiple_async
- Automatic backoff on rate limit errors and pool exhaustion
- Semaphore-based concurrency control
Example with custom concurrency:
import asyncio
from tif1.async_fetch import fetch_multiple_async
async def load_all_telemetry():
requests = [...] # 100+ (year, gp, session, path) tuples
# Limit to 5 concurrent requests
results = await fetch_multiple_async(requests, max_concurrent_requests=5)
return results
Connection Pooling
The HTTP session uses aggressive connection pooling for maximum performance:
- Dynamic pool sizing based on concurrency (minimum 256 connections, 512 max pool size)
- Reuses TCP connections across requests
- Keep-alive connections with configurable timeout (default: 120s, 1000 max requests)
- Automatic connection cleanup
- Thread-safe for concurrent use
- Connection reuse metrics tracking
Benefits:
- 30-50% faster for multiple requests
- Lower CPU usage
- Reduced network overhead
- Optimized for high-concurrency telemetry fetching (128+ concurrent requests)
Configuration:
from tif1 import get_config
config = get_config()
config.set("pool_connections", 512) # Explicit pool size
config.set("pool_maxsize", 2048) # Max pool size for bursts
config.set("keepalive_timeout", 120) # Keep-alive timeout in seconds
Retry Logic
All HTTP requests include automatic retry with exponential backoff and CDN fallback:
- Default: 3 retries (configurable via
max_retries config)
- Backoff: 2^attempt seconds with jitter (configurable)
- Max delay: 60 seconds (configurable via
max_retry_delay)
- Retries on: Connection errors, timeouts, 5xx errors, pool exhaustion
- No retry on: 404 (data not found), 4xx client errors
- CDN fallback: Tries all configured CDN sources before failing
- Zero-retry mode: Ultra-fast cold start optimization (tries all CDNs once, no delays)
Example retry behavior:
Attempt 1: Immediate (CDN 1)
Attempt 1 failed: Try CDN 2, CDN 3...
Attempt 2: Wait ~2 seconds, retry all CDNs
Attempt 3: Wait ~4 seconds, retry all CDNs
Attempt 4: Fail with NetworkError
Pool exhaustion handling:
When connection pool is exhausted, automatic backoff is applied:
- Base backoff: 0.5 seconds (configurable via
pool_exhaustion_backoff_base)
- Max backoff: 5 seconds (configurable via
pool_exhaustion_backoff_max)
- Jitter: 0.5 seconds (configurable via
pool_exhaustion_backoff_jitter)
Circuit Breaker Integration
HTTP requests are protected by a circuit breaker to prevent cascading failures:
- Opens after 5 consecutive failures (configurable via
circuit_breaker_threshold)
- Blocks requests for 60 seconds when open (configurable via
circuit_breaker_timeout)
- Automatically tests recovery in half-open state
- Closes on successful request
- Thread-safe with atomic state transitions
- Uses monotonic time for accurate timeout tracking
States:
closed: Normal operation, requests allowed
open: Too many failures, requests blocked
half-open: Testing recovery, single request allowed
See Retry & Reliability for details.
Complete Examples
Parallel session loading
import asyncio
from tif1.async_fetch import fetch_multiple_async
async def load_session_parallel():
"""Load all driver lap data in parallel."""
drivers = ["VER", "HAM", "LEC", "SAI", "NOR"]
# Build requests for all drivers
requests = [
(2021, "Belgian Grand Prix", "Race", f"laps_{driver}.json")
for driver in drivers
]
# Fetch all in parallel
results = await fetch_multiple_async(requests, max_concurrent_requests=10)
for driver, data in zip(drivers, results):
if data:
print(f"{driver}: {len(data['LapNumber'])} laps")
asyncio.run(load_session_parallel())
Custom timeout handling
import asyncio
from tif1.async_fetch import fetch_json_async
from tif1.exceptions import NetworkError, DataNotFoundError
async def fetch_with_custom_timeout():
"""Fetch with custom timeout and error handling."""
try:
# Use 60 second timeout for slow connections
data = await fetch_json_async(
2021, "Belgian Grand Prix", "Race", "laps_VER.json",
timeout=60
)
print(f"Success: {len(data['LapNumber'])} laps")
except DataNotFoundError as e:
print(f"Data not found: {e.event} {e.session}")
except NetworkError as e:
print(f"Network error: {e.url}")
asyncio.run(fetch_with_custom_timeout())
Batch processing with rate limiting
import asyncio
from tif1.async_fetch import fetch_multiple_async
async def batch_fetch_telemetry():
"""Fetch telemetry for multiple driver/lap combinations."""
drivers = ["VER", "HAM", "LEC"]
laps = [1, 2, 3, 4, 5]
# Build requests for all combinations
requests = [
(2021, "Belgian Grand Prix", "Race", f"telemetry_{driver}_{lap}.json")
for driver in drivers
for lap in laps
]
# Fetch in batches of 20
batch_size = 20
all_results = []
for i in range(0, len(requests), batch_size):
batch = requests[i:i+batch_size]
results = await fetch_multiple_async(batch, max_concurrent_requests=10)
all_results.extend(results)
print(f"Processed batch {i//batch_size + 1}")
return all_results
asyncio.run(batch_fetch_telemetry())
Resource cleanup
import asyncio
import atexit
from tif1.async_fetch import cleanup_resources, fetch_json_async
# Register cleanup on exit (automatic, but shown for clarity)
atexit.register(cleanup_resources)
async def main():
"""Main application with automatic cleanup."""
data = await fetch_json_async(
2021, "Belgian Grand Prix", "Race", "drivers.json"
)
print(f"Loaded {len(data)} drivers")
asyncio.run(main())
# cleanup_resources() called automatically on exit
Zero-retry mode for ultra-fast cold starts
import asyncio
from tif1 import get_config
from tif1.async_fetch import fetch_json_async
async def ultra_fast_fetch():
"""Disable retries for fastest possible cold start."""
config = get_config()
config.set("max_retries", 0) # Try each CDN once, no delays
data = await fetch_json_async(
2021, "Belgian Grand Prix", "Race", "drivers.json"
)
print(f"Loaded {len(data)} drivers")
asyncio.run(ultra_fast_fetch())
-
Use async methods for multiple requests: 5-10x faster than sequential fetching.
-
Tune max_concurrent_requests for your network: Higher values (50-128) for fast connections, lower (5-10) for slow.
-
Disable validation in production: Saves 10-15% processing time.
data = await fetch_json_async(
2021, "Belgian Grand Prix", "Race", "drivers.json",
validate_payload=False
)
- Use zero-retry mode for cold starts: Fastest possible startup, tries all CDNs once.
from tif1 import get_config
config = get_config()
config.set("max_retries", 0)
- Monitor connection pool stats: Track reuse rate for optimization.
from tif1.http_session import get_connection_stats
stats = get_connection_stats()
print(f"Reuse rate: {stats['reuse_rate']:.1f}%")
- Monitor circuit breaker: Check state if experiencing network issues.
from tif1.retry import get_circuit_breaker
cb = get_circuit_breaker()
print(f"Circuit breaker state: {cb.state}")
- Clean up resources: Automatic via
atexit, but can call manually if needed.
from tif1.async_fetch import cleanup_resources
cleanup_resources()
- Use process pool for JSON parsing: For large payloads, enable multi-process parsing.
from tif1 import get_config
config = get_config()
config.set("json_parse_workers", 4) # Use 4 worker processes
Troubleshooting
Slow Requests
import logging
from tif1 import get_config
# Enable debug logging to see request timing
logging.basicConfig(level=logging.DEBUG)
# Check timeout setting
config = get_config()
print(f"Timeout: {config.get('timeout')}s")
# Increase if needed
config.set("timeout", 60)
Connection Errors
from tif1.retry import get_circuit_breaker, reset_circuit_breaker
# Check circuit breaker
cb = get_circuit_breaker()
if cb.state == "open":
print("Circuit breaker is open, waiting...")
import time
time.sleep(60)
reset_circuit_breaker()
Rate Limiting
import asyncio
from tif1.async_fetch import fetch_multiple_async
# Reduce concurrent requests
requests = [(2021, "Belgian Grand Prix", "Race", "drivers.json")]
results = await fetch_multiple_async(requests, max_concurrent_requests=5)
Memory Issues
import asyncio
from tif1.async_fetch import fetch_multiple_async
# Process in smaller batches
all_requests = [...] # Large list of (year, gp, session, path) tuples
batch_size = 10
for i in range(0, len(all_requests), batch_size):
batch = all_requests[i:i+batch_size]
results = await fetch_multiple_async(batch)
# Process results
del results # Free memory
Pool Exhaustion
from tif1 import get_config
from tif1.http_session import get_connection_stats
# Check pool configuration
config = get_config()
print(f"Pool connections: {config.get('pool_connections')}")
print(f"Pool maxsize: {config.get('pool_maxsize')}")
# Check connection stats
stats = get_connection_stats()
print(f"Reuse rate: {stats['reuse_rate']:.1f}%")
# Increase pool size if needed
config.set("pool_connections", 512)
config.set("pool_maxsize", 2048)