Production-prepared asynchronous Python SDK with a coding guide rate limited, with in-memory caching and authentication

In this tutorial, we guide users by creating a strong, production-prepared Python SDK. It starts by showing and adjusting the essential asynchronous HTTP libraries (AIOHTP, Nest-Asinsio). Structured response then goes through the implementation of the main components, including Objects Bjects, token-backcat rate, memory caching with TTL and clean, dataclass-driven design. We will look at how to wrap these pieces into the AdvancedSDK class that supports async reference management, automatic retry/waiting-on-limited behavior, JSOON/Oath headers injection and convenient HTTP-Krapad methods. In the way, demo harness caching against Jasonplaceholder shows efficiency, brings batch with rate limits, handling error, and also shows how to expand SDK through a fluent “builder” pattern for custom configuration.

import asyncio
import aiohttp
import time
import json
from typing import Dict, List, Optional, Any, Union
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
import hashlib
import logging


!pip install aiohttp nest-asyncio

We install and configure asynchronous runtime by importing asynchronous and AIOHTTP along with utilities for timing, JSon handling, dataclass modeling, caching (by hashlib and datTime), and structured liveli. PIP Install AIOHTP Nest-Asincio Line ensures that the notebook can run the event loop within the colab, enabling strong async HTTP requests and rate-limited workflows.

@dataclass
class APIResponse:
    """Structured response object"""
    data: Any
    status_code: int
    headers: Dict(str, str)
    timestamp: datetime
   
    def to_dict(self) -> Dict:
        return asdict(self)

Epirespons Dataclas contains the timestamp of recovery in HTTP response details, payload (data), status code, headers and a single, type of OBJECT body. To_dict () Auxiliary simplest ging ging, serialization or downstream processes patterns into a plain dictionary.

class RateLimiter:
    """Token bucket rate limiter"""
    def __init__(self, max_calls: int = 100, time_window: int = 60):
        self.max_calls = max_calls
        self.time_window = time_window
        self.calls = ()
   
    def can_proceed(self) -> bool:
        now = time.time()
        self.calls = (call_time for call_time in self.calls if now - call_time < self.time_window)
       
        if len(self.calls) < self.max_calls:
            self.calls.append(now)
            return True
        return False
   
    def wait_time(self) -> float:
        if not self.calls:
            return 0
        return max(0, self.time_window - (time.time() - self.calls(0)))

Retelmeter class implies a simple token-bucket policy by tracking the recent Calls LS’s timestamps and allowing Max_k Alls Ls in Rolling Time_wind. When the limit is reached, Ken_Praus () returns false, and waiting_time () calculates how long before the next request.

class Cache:
    """Simple in-memory cache with TTL"""
    def __init__(self, default_ttl: int = 300):
        self.cache = {}
        self.default_ttl = default_ttl
   
    def _generate_key(self, method: str, url: str, params: Dict = None) -> str:
        key_data = f"{method}:{url}:{json.dumps(params or {}, sort_keys=True)}"
        return hashlib.md5(key_data.encode()).hexdigest()
   
    def get(self, method: str, url: str, params: Dict = None) -> Optional(APIResponse):
        key = self._generate_key(method, url, params)
        if key in self.cache:
            response, expiry = self.cache(key)
            if datetime.now() < expiry:
                return response
            del self.cache(key)
        return None
   
    def set(self, method: str, url: str, response: APIResponse, params: Dict = None, ttl: int = None):
        key = self._generate_key(method, url, params)
        expiry = datetime.now() + timedelta(seconds=ttl or self.default_ttl)
        self.cache(key) = (response, expiry)

Provides a light in-memory TTL cache for API answers by hasation to cash class request signature (method, URL, Param). It gives the valid cached epirespons objects bojacts before expiration and automatically gives the ICT to stale entries after the time-to-life passes.

class AdvancedSDK:
    """Advanced SDK with modern Python patterns"""
   
    def __init__(self, base_url: str, api_key: str = None, rate_limit: int = 100):
        self.base_url = base_url.rstrip('/')
        self.api_key = api_key
        self.session = None
        self.rate_limiter = RateLimiter(max_calls=rate_limit)
        self.cache = Cache()
        self.logger = self._setup_logger()
       
    def _setup_logger(self) -> logging.Logger:
        logger = logging.getLogger(f"SDK-{id(self)}")
        if not logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            handler.setFormatter(formatter)
            logger.addHandler(handler)
            logger.setLevel(logging.INFO)
        return logger
   
    async def __aenter__(self):
        """Async context manager entry"""
        self.session = aiohttp.ClientSession()
        return self
   
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit"""
        if self.session:
            await self.session.close()
   
    def _get_headers(self) -> Dict(str, str):
        headers = {'Content-Type': 'application/json'}
        if self.api_key:
            headers('Authorization') = f'Bearer {self.api_key}'
        return headers
   
    async def _make_request(self, method: str, endpoint: str, params: Dict = None,
                          data: Dict = None, use_cache: bool = True) -> APIResponse:
        """Core request method with rate limiting and caching"""
       
        if use_cache and method.upper() == 'GET':
            cached = self.cache.get(method, endpoint, params)
            if cached:
                self.logger.info(f"Cache hit for {method} {endpoint}")
                return cached
       
        if not self.rate_limiter.can_proceed():
            wait_time = self.rate_limiter.wait_time()
            self.logger.warning(f"Rate limit hit, waiting {wait_time:.2f}s")
            await asyncio.sleep(wait_time)
       
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
       
        try:
            async with self.session.request(
                method=method.upper(),
                url=url,
                params=params,
                json=data,
                headers=self._get_headers()
            ) as resp:
                response_data = await resp.json() if resp.content_type == 'application/json' else await resp.text()
               
                api_response = APIResponse(
                    data=response_data,
                    status_code=resp.status,
                    headers=dict(resp.headers),
                    timestamp=datetime.now()
                )
               
                if use_cache and method.upper() == 'GET' and 200 <= resp.status < 300:
                    self.cache.set(method, endpoint, api_response, params)
               
                self.logger.info(f"{method.upper()} {endpoint} - Status: {resp.status}")
                return api_response
               
        except Exception as e:
            self.logger.error(f"Request failed: {str(e)}")
            raise
   
    async def get(self, endpoint: str, params: Dict = None, use_cache: bool = True) -> APIResponse:
        return await self._make_request('GET', endpoint, params=params, use_cache=use_cache)
   
    async def post(self, endpoint: str, data: Dict = None) -> APIResponse:
        return await self._make_request('POST', endpoint, data=data, use_cache=False)
   
    async def put(self, endpoint: str, data: Dict = None) -> APIResponse:
        return await self._make_request('PUT', endpoint, data=data, use_cache=False)
   
    async def delete(self, endpoint: str) -> APIResponse:
        return await self._make_request('DELETE', endpoint, use_cache=False)

AdvancedSDK class wrap everything together in a clean, async-first client: it manages the AIOHTP session by Async reference managers, injecting JSOON and Oath headers, and coordinating our retimators and cache under the hood. Its _make_request method, cache lookups, rate-limite weights, error ging ging, and response packing gates/post/put/ka delete in the Appearspones Objects Burgets, while Gate/Post/Put/Ka Delete Calls Gives us ergonomics.

async def demo_sdk():
    """Demonstrate SDK capabilities"""
    print("🚀 Advanced SDK Demo")
    print("=" * 50)
   
    async with AdvancedSDK("https://jsonplaceholder.typicode.com") as sdk:
       
        print("\n📥 Testing GET request with caching...")
        response1 = await sdk.get("/posts/1")
        print(f"First request - Status: {response1.status_code}")
        print(f"Title: {response1.data.get('title', 'N/A')}")
       
        response2 = await sdk.get("/posts/1")
        print(f"Second request (cached) - Status: {response2.status_code}")
       
        print("\n📤 Testing POST request...")
        new_post = {
            "title": "Advanced SDK Tutorial",
            "body": "This SDK demonstrates modern Python patterns",
            "userId": 1
        }
        post_response = await sdk.post("/posts", data=new_post)
        print(f"POST Status: {post_response.status_code}")
        print(f"Created post ID: {post_response.data.get('id', 'N/A')}")
       
        print("\n⚡ Testing batch requests with rate limiting...")
        tasks = ()
        for i in range(1, 6):
            tasks.append(sdk.get(f"/posts/{i}"))
       
        results = await asyncio.gather(*tasks)
        print(f"Batch completed: {len(results)} requests")
        for i, result in enumerate(results, 1):
            print(f"  Post {i}: {result.data.get('title', 'N/A')(:30)}...")
       
        print("\n❌ Testing error handling...")
        try:
            error_response = await sdk.get("/posts/999999")
            print(f"Error response status: {error_response.status_code}")
        except Exception as e:
            print(f"Handled error: {type(e).__name__}")
   
    print("\n✅ Demo completed successfully!")


async def run_demo():
  """Colab-friendly demo runner"""
  await demo_sdk()

Demo_SDK goes through the main features of the Corutin SDK, issues cached GET requests, posts, limits the rate, and handles errors against JSOnplaceholder API, printing code and sample data to explain each capacity. Run_demo Assistant ensures that this demo runs easily inside the Colab notebook's existing event loop.

import nest_asyncio
nest_asyncio.apply()


if __name__ == "__main__":
    try:
        asyncio.run(demo_sdk())
    except RuntimeError:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(demo_sdk())


class SDKBuilder:
    """Builder pattern for SDK configuration"""
    def __init__(self, base_url: str):
        self.base_url = base_url
        self.config = {}
   
    def with_auth(self, api_key: str):
        self.config('api_key') = api_key
        return self
   
    def with_rate_limit(self, calls_per_minute: int):
        self.config('rate_limit') = calls_per_minute
        return self
   
    def build(self) -> AdvancedSDK:
        return AdvancedSDK(self.base_url, **self.config)

Finally, we apply nest_cinusio to enable Nested Event Loops in Colab, then run a demo by Asinsio.Ran (with Fa All LBACK of Manual Loop Execution if necessary). It also introduces the SDKbuilder class that applies a vague builder pattern to easily configure and instant Advancedke with custom authentication and rate-limited settings.

In conclusion, this SDK tutorial provides a scalable foundation for any comfortable integration, combining modern Python idioms (dataclass, async/waiting, reference managers) with practical tools (rate limits, cache, structured ging ging). By adapting the patterns shown here, specifically separating concerns between the request orchestration, caching and response modeling, teams can accelerate the development of new API clients while ensuring prediction, observation and elasticity.


Check Diet. All credit for this research goes to researchers of this project. Also, feel free to follow us Twitter And don't forget to join us 100 k+ ml subredit And subscribe Our newsletter.


Sana Hassan, a consulting intern at MarktecPost and IIT Madras, is enthusiastic about applying technology and AI to overcome real-world challenges. With more interest in solving practical problems, it brings a new perspective to the intersection of AI and real life solutions.

Scroll to Top