Skip to main content
The async_fetch module provides the underlying async HTTP infrastructure for parallel data fetching. It uses niquests (a modern fork of requests) with HTTP/2 multiplexing for optimal performance.
Most users don’t need to interact with this module directly. The Session class handles async fetching automatically through methods like laps_async() and get_fastest_laps_tels_async().

Core Functions

fetch_json_async

async def fetch_json_async(
    year: int,
    gp: str,
    session: str,
    path: str,
    max_retries: int | None = None,
    timeout: int | None = None,
    *,
    use_cache: bool = True,
    write_cache: bool = True,
    validate_payload: bool = True
) -> dict[str, Any]
Asynchronously fetch and parse JSON from a CDN with caching, retry logic, and validation. Parameters:
  • year: Season year (e.g., 2025)
  • gp: Grand Prix name (e.g., “Monaco”)
  • session: Session name (e.g., “Race”, “Qualifying”)
  • path: Path to JSON file (e.g., “laps.json”, “drivers.json”)
  • max_retries: Maximum retry attempts. If None, uses global config
  • timeout: Request timeout in seconds. If None, uses global config
  • use_cache: If True, read from cache before network fetch
  • write_cache: If True, persist successful responses to cache
  • validate_payload: If True, run payload validation before returning data
Returns:
  • Parsed JSON data as a dictionary (never None, raises on error)
Raises:
  • NetworkError: If network request fails after all retries
  • DataNotFoundError: If data doesn’t exist (404)
  • InvalidDataError: If JSON parsing or validation fails
Example:
import asyncio
from tif1.async_fetch import fetch_json_async

async def fetch_data():
    # Fetch laps data for 2025 Monaco Qualifying
    data = await fetch_json_async(
        year=2025,
        gp="Monaco",
        session="Qualifying",
        path="laps.json",
        timeout=30,
        max_retries=3
    )
    print(f"Loaded {len(data)} items")

asyncio.run(fetch_data())

fetch_multiple_async

async def fetch_multiple_async(
    requests: list[tuple[int, str, str, str]],
    *,
    use_cache: bool = True,
    write_cache: bool = True,
    validate_payload: bool = True,
    max_retries: int | None = None,
    timeout: int | None = None,
    max_concurrent_requests: int | None = None
) -> list[dict[str, Any] | None]
Fetch multiple JSON files in parallel with optimized batch size and graceful error handling. Parameters:
  • requests: List of (year, gp, session, path) tuples to fetch
  • use_cache: If True, read from cache before network fetch
  • write_cache: If True, persist successful responses to cache
  • validate_payload: If True, run payload validation before returning data
  • max_retries: Maximum retry attempts per request. If None, uses global config
  • timeout: Request timeout in seconds per request. If None, uses global config
  • max_concurrent_requests: Maximum concurrent requests. If None, uses global config (default: 20)
Returns:
  • List of parsed JSON dictionaries or None for failed requests, in the same order as requests
Raises:
  • Does not raise exceptions. Failed requests return None and are logged as warnings
  • DataNotFoundError (404) is silently converted to None
Example:
import asyncio
from tif1.async_fetch import fetch_multiple_async

async def fetch_driver_laps():
    # Fetch laps for multiple drivers in parallel
    requests = [
        (2025, "Monaco Grand Prix", "Qualifying", "laps/VER.json"),
        (2025, "Monaco Grand Prix", "Qualifying", "laps/HAM.json"),
        (2025, "Monaco Grand Prix", "Qualifying", "laps/LEC.json"),
    ]

    results = await fetch_multiple_async(
        requests,
        max_concurrent_requests=10
    )

    for i, data in enumerate(results):
        if data is not None:
            print(f"Driver {i+1}: {len(data)} laps")
        else:
            print(f"Driver {i+1}: Failed to fetch")

asyncio.run(fetch_driver_laps())

fetch_with_rate_limit

async def fetch_with_rate_limit(
    coro_func,
    *args,
    semaphore: asyncio.Semaphore | None = None,
    **kwargs
)
Execute an async function with rate limiting using a semaphore for concurrency control. Parameters:
  • coro_func: Async function to execute
  • *args: Positional arguments for coro_func
  • semaphore: Optional semaphore for rate limiting. If None, creates one based on max_concurrent_requests config
  • **kwargs: Keyword arguments for coro_func
Returns:
  • Result from coro_func execution
Raises:
  • Any exception raised by coro_func
Example:
import asyncio
from tif1.async_fetch import fetch_with_rate_limit, fetch_json_async

async def controlled_fetch():
    # Limit to 5 concurrent requests
    semaphore = asyncio.Semaphore(5)

    result = await fetch_with_rate_limit(
        fetch_json_async,
        2025, "Monaco", "Race", "drivers.json",
        semaphore=semaphore
    )
    return result

asyncio.run(controlled_fetch())
This is a utility function for custom concurrency control. Most users should use fetch_multiple_async() which handles rate limiting automatically.

Resource Management

cleanup_resources

def cleanup_resources() -> None
Clean up all async resources including HTTP sessions, thread pools, and process pools. Call this when shutting down your application to ensure proper cleanup. Handles cleanup errors gracefully and logs failures without raising. Example:
import tif1
from tif1.async_fetch import cleanup_resources

# Your application code
session = tif1.get_session(2025, "Monaco", "Race")
# ... work with session ...

# Clean up on shutdown
cleanup_resources()

close_session

def close_session() -> None
Close the shared niquests HTTP session. The session will be recreated on next use.

close_executor

def close_executor() -> None
Shut down the thread pool executor used for async operations. The executor will be recreated on next use.

Performance Characteristics

The async fetch system is optimized for:
  • HTTP/2 multiplexing: Single connection for multiple requests
  • Connection pooling: Reuses connections across requests
  • Parallel JSON parsing: Offloads JSON parsing to thread pool
  • Rate limiting: Prevents overwhelming CDN with concurrent requests
  • Automatic retries: Exponential backoff with jitter
Typical performance:
  • Single lap fetch: ~50-100ms
  • 20 driver laps in parallel: ~200-300ms (vs 1-2s sequential)
  • Full session telemetry (20 drivers × 50 laps): ~10-15s (vs 50-100s sequential)

Configuration

Async fetch behavior is controlled by global configuration:
import tif1

config = tif1.get_config()

# Set timeout for all async requests (default: 30)
config.set("timeout", 60)

# Set max retries (default: 3)
config.set("max_retries", 5)

# Set max concurrent requests (default: 20)
config.set("max_concurrent_requests", 50)

# Enable/disable validation (default: True)
config.set("validate_data", True)

# Retry backoff factor (default: 2.0)
config.set("retry_backoff_factor", 2.0)

# Max retry delay in seconds (default: 60.0)
config.set("max_retry_delay", 60.0)

# Enable retry jitter (default: True)
config.set("retry_jitter", True)

# Max jitter in seconds (default: 1.0)
config.set("retry_jitter_max", 1.0)

# JSON parse workers for process pool (default: 0, disabled)
config.set("json_parse_workers", 4)

# Enable offline mode (no network requests)
config.set("offline_mode", False)

# Enable CI mode (disable caching)
config.set("ci_mode", False)

Error Handling

The async fetch system uses a hierarchy of exceptions:
from tif1.exceptions import NetworkError, DataNotFoundError, InvalidDataError

try:
    data = await fetch_json_async(
        year=2025,
        gp="Monaco",
        session="Race",
        path="laps.json"
    )
except DataNotFoundError as e:
    # Resource doesn't exist (404)
    print(f"Data not found: {e}")
    print(f"Year: {e.context.get('year')}")
    print(f"Event: {e.context.get('event')}")
    print(f"Session: {e.context.get('session')}")
except InvalidDataError as e:
    # JSON parsing or validation failed
    print(f"Invalid data: {e}")
    print(f"Reason: {e.context.get('reason')}")
except NetworkError as e:
    # All retry attempts failed
    print(f"Network error: {e}")
    print(f"URL: {e.context.get('url')}")
    print(f"Status: {e.context.get('status_code')}")

Advanced Usage

Custom concurrency limits

# Fetch with custom concurrency
requests = [(2025, "Monaco", "Race", f"laps/driver{i}.json") for i in range(100)]
results = await fetch_multiple_async(requests, max_concurrent_requests=50)

Disable caching for fresh data

# Skip cache reads and writes
data = await fetch_json_async(
    year=2025,
    gp="Monaco",
    session="Race",
    path="laps.json",
    use_cache=False,
    write_cache=False
)

Disable validation for performance

# Skip payload validation for faster parsing
data = await fetch_json_async(
    year=2025,
    gp="Monaco",
    session="Race",
    path="laps.json",
    validate_payload=False
)

Implementation Details

The async fetch system uses niquests with HTTP/2 support, allowing multiple requests to share a single TCP connection. This dramatically reduces latency for parallel requests.
JSON parsing is offloaded to a thread pool executor (or optional process pool for non-telemetry data) to avoid blocking the async event loop. The system uses orjson for fast parsing and can parse multiple responses in parallel. Telemetry payloads use thread-based parsing to avoid cross-process IPC overhead.
A semaphore-based rate limiter ensures no more than max_concurrent_requests requests are in flight simultaneously. This prevents overwhelming the CDN and triggering rate limits. The default is 20 concurrent requests, configurable via global config.
Failed requests are retried with exponential backoff and jitter. The backoff formula is: min(backoff_factor^attempt + random(0, jitter_max), max_delay) seconds, where defaults are backoff_factor=2.0, jitter_max=1.0, and max_delay=60.0. The system also includes circuit breaker logic to prevent cascading failures and special handling for connection pool exhaustion.