Skip to main content
The HTTP module provides the networking layer for tif1, including connection pooling, async parallel fetching, and rate limiting.

HTTP Session

get_session

Get the global HTTP session instance with connection pooling.
def get_session() -> Any
Returns a niquests.Session instance configured with:
  • Connection pooling for reuse (minimum 256 connections, 512 max pool size)
  • DoH (DNS over HTTPS) fallback support (Cloudflare, Google)
  • HTTP/3 and multiplexing support (configurable)
  • Keep-alive connections with configurable timeout
  • Custom User-Agent header
Returns:
  • Shared niquests.Session instance (thread-safe)
Example:
from tif1.http_session import get_session

session = get_session()
response = session.get("https://example.com/data.json")
data = response.json()
The HTTP session is automatically managed and thread-safe. You rarely need to interact with it directly.

close_session

Close the global HTTP session and release connections.
def close_session() -> None
Example:
from tif1.http_session import close_session
import atexit

# Close session on program exit
atexit.register(close_session)
Session cleanup is automatically registered with atexit, so manual cleanup is usually unnecessary.

get_connection_stats

Get current connection pool statistics for monitoring and debugging.
def get_connection_stats() -> dict[str, Any]
Returns: Dictionary with connection metrics:
  • total_requests: Total number of requests made
  • connections_reused: Number of requests that reused connections
  • connections_created: Number of connection pools created
  • reuse_rate: Percentage of requests that reused connections (0-100)
Example:
from tif1.http_session import get_connection_stats

stats = get_connection_stats()
print(f"Reuse rate: {stats['reuse_rate']:.1f}%")
print(f"Total requests: {stats['total_requests']}")

Async Fetching

The async fetch module provides high-performance parallel data loading using niquests with thread pool executors.

fetch_json_async

Asynchronously fetch and parse JSON data with caching, retry logic, and CDN fallback.
async def fetch_json_async(
    year: int,
    gp: str,
    session: str,
    path: str,
    max_retries: int | None = None,
    timeout: int | None = None,
    *,
    use_cache: bool = True,
    write_cache: bool = True,
    validate_payload: bool = True
) -> dict[str, Any]
Parameters:
  • year: Season year
  • gp: Grand Prix name
  • session: Session name (e.g., “Race”, “Qualifying”)
  • path: Path to JSON file (e.g., “drivers.json”, “laps_VER.json”)
  • max_retries: Maximum retry attempts (defaults to config value, typically 3)
  • timeout: Request timeout in seconds (defaults to config value, typically 30)
  • use_cache: If True, read from cache before network fetch
  • write_cache: If True, persist successful network responses to cache
  • validate_payload: If True, run Pydantic validation before returning data
Returns:
  • Parsed JSON data as dictionary (never None, raises on error)
Raises:
  • NetworkError: If network request fails after retries
  • DataNotFoundError: If data doesn’t exist (404)
  • InvalidDataError: If JSON parsing or validation fails
Example:
import asyncio
from tif1.async_fetch import fetch_json_async

async def load_data():
    data = await fetch_json_async(2021, "Belgian Grand Prix", "Race", "drivers.json")
    print(f"Loaded {len(data)} drivers")

asyncio.run(load_data())

fetch_multiple_async

Fetch multiple JSON files in parallel with optimized batch size and graceful error handling.
async def fetch_multiple_async(
    requests: list[tuple[int, str, str, str]],
    *,
    use_cache: bool = True,
    write_cache: bool = True,
    validate_payload: bool = True,
    max_retries: int | None = None,
    timeout: int | None = None,
    max_concurrent_requests: int | None = None
) -> list[dict[str, Any] | None]
Parameters:
  • requests: List of (year, gp, session, path) tuples
  • use_cache: If True, read from cache before network fetch
  • write_cache: If True, persist successful network responses to cache
  • validate_payload: If True, run payload validation before returning data
  • max_retries: Maximum retry attempts per request
  • timeout: Request timeout in seconds
  • max_concurrent_requests: Maximum concurrent requests (defaults to config value, typically 20)
Returns:
  • List of fetched data dictionaries (None for failed requests). Exceptions are logged and converted to None for graceful degradation. DataNotFoundError (404) is silently converted to None.
Example:
import asyncio
from tif1.async_fetch import fetch_multiple_async

async def load_multiple_drivers():
    requests = [
        (2021, "Belgian Grand Prix", "Race", "laps_VER.json"),
        (2021, "Belgian Grand Prix", "Race", "laps_HAM.json"),
        (2021, "Belgian Grand Prix", "Race", "laps_LEC.json"),
    ]

    results = await fetch_multiple_async(requests)

    for i, data in enumerate(results):
        if data:
            print(f"Driver {i+1}: {len(data['LapNumber'])} laps")

asyncio.run(load_multiple_drivers())

fetch_with_rate_limit

Execute async function with rate limiting using semaphore.
async def fetch_with_rate_limit(
    coro_func,
    *args,
    semaphore: asyncio.Semaphore | None = None,
    **kwargs
)
Parameters:
  • coro_func: Async function to execute
  • *args: Positional arguments for coro_func
  • semaphore: Optional semaphore for rate limiting. If None, creates one based on max_concurrent_requests config
  • **kwargs: Keyword arguments for coro_func
Returns:
  • Result from coro_func execution
Example:
import asyncio
from tif1.async_fetch import fetch_with_rate_limit, fetch_json_async

async def fetch_with_custom_limit():
    semaphore = asyncio.Semaphore(5)
    result = await fetch_with_rate_limit(
        fetch_json_async, 2021, "Belgian Grand Prix", "Race", "drivers.json",
        semaphore=semaphore
    )
    print(f"Fetched {len(result)} drivers")

asyncio.run(fetch_with_custom_limit())

close_session

Close the async HTTP session and release resources.
def close_session() -> None
Example:
from tif1.async_fetch import close_session
import atexit

atexit.register(close_session)

cleanup_resources

Clean up all async resources including session and executor.
def cleanup_resources() -> None
Example:
from tif1.async_fetch import cleanup_resources

# At program exit
cleanup_resources()

Rate Limiting

The async fetch module includes automatic rate limiting to prevent CDN throttling:
  • Maximum 20 concurrent requests by default (configurable via max_concurrent_requests config)
  • Configurable via max_concurrent_requests parameter in fetch_multiple_async
  • Automatic backoff on rate limit errors and pool exhaustion
  • Semaphore-based concurrency control
Example with custom concurrency:
import asyncio
from tif1.async_fetch import fetch_multiple_async

async def load_all_telemetry():
    requests = [...]  # 100+ (year, gp, session, path) tuples

    # Limit to 5 concurrent requests
    results = await fetch_multiple_async(requests, max_concurrent_requests=5)
    return results

Connection Pooling

The HTTP session uses aggressive connection pooling for maximum performance:
  • Dynamic pool sizing based on concurrency (minimum 256 connections, 512 max pool size)
  • Reuses TCP connections across requests
  • Keep-alive connections with configurable timeout (default: 120s, 1000 max requests)
  • Automatic connection cleanup
  • Thread-safe for concurrent use
  • Connection reuse metrics tracking
Benefits:
  • 30-50% faster for multiple requests
  • Lower CPU usage
  • Reduced network overhead
  • Optimized for high-concurrency telemetry fetching (128+ concurrent requests)
Configuration:
from tif1 import get_config

config = get_config()
config.set("pool_connections", 512)  # Explicit pool size
config.set("pool_maxsize", 2048)     # Max pool size for bursts
config.set("keepalive_timeout", 120) # Keep-alive timeout in seconds

Retry Logic

All HTTP requests include automatic retry with exponential backoff and CDN fallback:
  • Default: 3 retries (configurable via max_retries config)
  • Backoff: 2^attempt seconds with jitter (configurable)
  • Max delay: 60 seconds (configurable via max_retry_delay)
  • Retries on: Connection errors, timeouts, 5xx errors, pool exhaustion
  • No retry on: 404 (data not found), 4xx client errors
  • CDN fallback: Tries all configured CDN sources before failing
  • Zero-retry mode: Ultra-fast cold start optimization (tries all CDNs once, no delays)
Example retry behavior:
Attempt 1: Immediate (CDN 1)
Attempt 1 failed: Try CDN 2, CDN 3...
Attempt 2: Wait ~2 seconds, retry all CDNs
Attempt 3: Wait ~4 seconds, retry all CDNs
Attempt 4: Fail with NetworkError
Pool exhaustion handling: When connection pool is exhausted, automatic backoff is applied:
  • Base backoff: 0.5 seconds (configurable via pool_exhaustion_backoff_base)
  • Max backoff: 5 seconds (configurable via pool_exhaustion_backoff_max)
  • Jitter: 0.5 seconds (configurable via pool_exhaustion_backoff_jitter)

Circuit Breaker Integration

HTTP requests are protected by a circuit breaker to prevent cascading failures:
  • Opens after 5 consecutive failures (configurable via circuit_breaker_threshold)
  • Blocks requests for 60 seconds when open (configurable via circuit_breaker_timeout)
  • Automatically tests recovery in half-open state
  • Closes on successful request
  • Thread-safe with atomic state transitions
  • Uses monotonic time for accurate timeout tracking
States:
  • closed: Normal operation, requests allowed
  • open: Too many failures, requests blocked
  • half-open: Testing recovery, single request allowed
See Retry & Reliability for details.

Complete Examples

Parallel session loading

import asyncio
from tif1.async_fetch import fetch_multiple_async

async def load_session_parallel():
    """Load all driver lap data in parallel."""
    drivers = ["VER", "HAM", "LEC", "SAI", "NOR"]

    # Build requests for all drivers
    requests = [
        (2021, "Belgian Grand Prix", "Race", f"laps_{driver}.json")
        for driver in drivers
    ]

    # Fetch all in parallel
    results = await fetch_multiple_async(requests, max_concurrent_requests=10)

    for driver, data in zip(drivers, results):
        if data:
            print(f"{driver}: {len(data['LapNumber'])} laps")

asyncio.run(load_session_parallel())

Custom timeout handling

import asyncio
from tif1.async_fetch import fetch_json_async
from tif1.exceptions import NetworkError, DataNotFoundError

async def fetch_with_custom_timeout():
    """Fetch with custom timeout and error handling."""
    try:
        # Use 60 second timeout for slow connections
        data = await fetch_json_async(
            2021, "Belgian Grand Prix", "Race", "laps_VER.json",
            timeout=60
        )
        print(f"Success: {len(data['LapNumber'])} laps")
    except DataNotFoundError as e:
        print(f"Data not found: {e.event} {e.session}")
    except NetworkError as e:
        print(f"Network error: {e.url}")

asyncio.run(fetch_with_custom_timeout())

Batch processing with rate limiting

import asyncio
from tif1.async_fetch import fetch_multiple_async

async def batch_fetch_telemetry():
    """Fetch telemetry for multiple driver/lap combinations."""
    drivers = ["VER", "HAM", "LEC"]
    laps = [1, 2, 3, 4, 5]

    # Build requests for all combinations
    requests = [
        (2021, "Belgian Grand Prix", "Race", f"telemetry_{driver}_{lap}.json")
        for driver in drivers
        for lap in laps
    ]

    # Fetch in batches of 20
    batch_size = 20
    all_results = []

    for i in range(0, len(requests), batch_size):
        batch = requests[i:i+batch_size]
        results = await fetch_multiple_async(batch, max_concurrent_requests=10)
        all_results.extend(results)
        print(f"Processed batch {i//batch_size + 1}")

    return all_results

asyncio.run(batch_fetch_telemetry())

Resource cleanup

import asyncio
import atexit
from tif1.async_fetch import cleanup_resources, fetch_json_async

# Register cleanup on exit (automatic, but shown for clarity)
atexit.register(cleanup_resources)

async def main():
    """Main application with automatic cleanup."""
    data = await fetch_json_async(
        2021, "Belgian Grand Prix", "Race", "drivers.json"
    )
    print(f"Loaded {len(data)} drivers")

asyncio.run(main())
# cleanup_resources() called automatically on exit

Zero-retry mode for ultra-fast cold starts

import asyncio
from tif1 import get_config
from tif1.async_fetch import fetch_json_async

async def ultra_fast_fetch():
    """Disable retries for fastest possible cold start."""
    config = get_config()
    config.set("max_retries", 0)  # Try each CDN once, no delays

    data = await fetch_json_async(
        2021, "Belgian Grand Prix", "Race", "drivers.json"
    )
    print(f"Loaded {len(data)} drivers")

asyncio.run(ultra_fast_fetch())

Performance Tips

  1. Use async methods for multiple requests: 5-10x faster than sequential fetching.
  2. Tune max_concurrent_requests for your network: Higher values (50-128) for fast connections, lower (5-10) for slow.
  3. Disable validation in production: Saves 10-15% processing time.
data = await fetch_json_async(
    2021, "Belgian Grand Prix", "Race", "drivers.json",
    validate_payload=False
)
  1. Use zero-retry mode for cold starts: Fastest possible startup, tries all CDNs once.
from tif1 import get_config
config = get_config()
config.set("max_retries", 0)
  1. Monitor connection pool stats: Track reuse rate for optimization.
from tif1.http_session import get_connection_stats

stats = get_connection_stats()
print(f"Reuse rate: {stats['reuse_rate']:.1f}%")
  1. Monitor circuit breaker: Check state if experiencing network issues.
from tif1.retry import get_circuit_breaker

cb = get_circuit_breaker()
print(f"Circuit breaker state: {cb.state}")
  1. Clean up resources: Automatic via atexit, but can call manually if needed.
from tif1.async_fetch import cleanup_resources
cleanup_resources()
  1. Use process pool for JSON parsing: For large payloads, enable multi-process parsing.
from tif1 import get_config
config = get_config()
config.set("json_parse_workers", 4)  # Use 4 worker processes

Troubleshooting

Slow Requests

import logging
from tif1 import get_config

# Enable debug logging to see request timing
logging.basicConfig(level=logging.DEBUG)

# Check timeout setting
config = get_config()
print(f"Timeout: {config.get('timeout')}s")

# Increase if needed
config.set("timeout", 60)

Connection Errors

from tif1.retry import get_circuit_breaker, reset_circuit_breaker

# Check circuit breaker
cb = get_circuit_breaker()
if cb.state == "open":
    print("Circuit breaker is open, waiting...")
    import time
    time.sleep(60)
    reset_circuit_breaker()

Rate Limiting

import asyncio
from tif1.async_fetch import fetch_multiple_async

# Reduce concurrent requests
requests = [(2021, "Belgian Grand Prix", "Race", "drivers.json")]
results = await fetch_multiple_async(requests, max_concurrent_requests=5)

Memory Issues

import asyncio
from tif1.async_fetch import fetch_multiple_async

# Process in smaller batches
all_requests = [...]  # Large list of (year, gp, session, path) tuples
batch_size = 10

for i in range(0, len(all_requests), batch_size):
    batch = all_requests[i:i+batch_size]
    results = await fetch_multiple_async(batch)
    # Process results
    del results  # Free memory

Pool Exhaustion

from tif1 import get_config
from tif1.http_session import get_connection_stats

# Check pool configuration
config = get_config()
print(f"Pool connections: {config.get('pool_connections')}")
print(f"Pool maxsize: {config.get('pool_maxsize')}")

# Check connection stats
stats = get_connection_stats()
print(f"Reuse rate: {stats['reuse_rate']:.1f}%")

# Increase pool size if needed
config.set("pool_connections", 512)
config.set("pool_maxsize", 2048)
Last modified on March 5, 2026