Claude vs Qwen for Coding

Claude edges out Qwen for pure coding performance, with a 79.6% vs 76.4% SWE-bench score, and offers Claude Code (CLI) for seamless development workflows. However, Qwen is a genuinely capable alternative that closes the gap significantly while costing 87% less—making it the smart choice if budget matters more than marginal performance gains.

Head-to-Head for Coding

Software Engineering (SWE-bench)

Claude

Claude

Leads with 79.6% benchmark

Qwen

Strong at 76.4% benchmark

Large Codebase Context

Qwen

Claude

128K tokens, sufficient

Qwen

256K tokens, double capacity

Algorithm Problem-Solving

Claude

Claude

Excellent 95.6% (AIME 2025)

Qwen

Strong 91.3% (AIME 2025)

Developer Cost-Effectiveness

Qwen

Claude

$3/$15 per 1M tokens

Qwen

$0.40/$2.40 per 1M tokens

Complex Problem Reasoning

Tie

Claude

Extended thinking (adjustable)

Qwen

Extended thinking supported

See Them Answer a Coding Question

We asked both models the same coding-focused prompt.

Build a Python async web scraper that handles rate limiting, retries with exponential backoff, and extracts structured data from paginated API responses into typed dataclasses.

Claude 4.6 Sonnet

Build a Python async web scraper that handles rate limiting, retries with exponential backoff, and extracts structured data from paginated API responses into typed dataclasses.

Async Web Scraper with Rate Limiting, Retries, and Structured Data Extraction

Project Structure

scraper/
├── models.py          # Typed dataclasses
├── rate_limiter.py    # Token bucket rate limiter
├── retry.py           # Exponential backoff logic
├── scraper.py         # Core async scraper
├── extractors.py      # Data extraction & parsing
└── main.py            # Example usage

models.py — Typed Dataclasses

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, List, Any, Dict
from enum import Enum


class ScrapingStatus(Enum):
    SUCCESS = "success"
    FAILED = "failed"
    RATE_LIMITED = "rate_limited"
    PARTIAL = "partial"


@dataclass
class RateLimitConfig:
    """Token bucket configuration."""
    requests_per_second: float = 2.0
    burst_size: int = 5
    retry_after_header: str = "Retry-After"


@dataclass
class RetryConfig:
    """Exponential backoff configuration."""
    max_attempts: int = 5
    base_delay: float = 1.0
    max_delay: float = 60.0
    exponential_base: float = 2.0
    jitter: bool = True
    retryable_status_codes: tuple = (429, 500, 502, 503, 504)


@dataclass
class PaginationConfig:
    """Pagination strategy configuration."""
    page_param: str = "page"
    per_page_param: str = "per_page"
    per_page: int = 100
    max_pages: Optional[int] = None
    # For cursor-based pagination
    cursor_param: Optional[str] = None
    next_cursor_path: Optional[str] = None  # JSONPath-like: "meta.next_cursor"


@dataclass
class ScrapingResult:
    """Container for scraping results."""
    url: str
    status: ScrapingStatus
    total_pages: int = 0
    total_records: int = 0
    errors: List[str] = field(default_factory=list)
    started_at: datetime = field(default_factory=datetime.utcnow)
    completed_at: Optional[datetime] = None
    duration_seconds: float = 0.0


# --- Domain Models ---

@dataclass
class User:
    id: int
    username: str
    email: str
    created_at: datetime
    is_active: bool = True
    full_name: Optional[str] = None
    avatar_url: Optional[str] = None
    metadata: Dict[str, Any] = field(default_factory=dict)

    def __post_init__(self):
        if isinstance(self.created_at, str):
            self.created_at = datetime.fromisoformat(
                self.created_at.replace("Z", "+00:00")
            )


@dataclass
class Post:
    id: int
    title: str
    body: str
    user_id: int
    created_at: datetime
    tags: List[str] = field(default_factory=list)
    likes: int = 0
    is_published: bool = False

    def __post_init__(self):
        if isinstance(self.created_at, str):
            self.created_at = datetime.fromisoformat(
                self.created_at.replace("Z", "+00:00")
            )
        if isinstance(self.tags, str):
            self.tags = [t.strip() for t in self.tags.split(",") if t.strip()]


@dataclass
class Comment:
    id: int
    post_id: int
    user_id: int
    body: str
    created_at: datetime
    parent_id: Optional[int] = None

    def __post_init__(self):
        if isinstance(self.created_at, str):
            self.created_at = datetime.fromisoformat(
                self.created_at.replace("Z", "+00:00")
            )

rate_limiter.py — Token Bucket Implementation

import asyncio
import time
import logging
from typing import Optional

logger = logging.getLogger(__name__)


class AsyncTokenBucket:
    """
    Thread-safe async token bucket rate limiter.
    
    Supports burst capacity and dynamic rate adjustment
    based on server Retry-After headers.
    """

    def __init__(
        self,
        rate: float,
        burst: int,
        domain: str = "default"
    ):
        self.rate = rate          # tokens per second
        self.burst = burst        # max burst size
        self.domain = domain
        self._tokens = float(burst)
        self._last_refill = time.monotonic()
        self._lock = asyncio.Lock()
        self._backpressure_until: float = 0.0

    def _refill(self) -> None:
        """Refill tokens based on elapsed time."""
        now = time.monotonic()
        elapsed = now - self._last_refill
        tokens_to_add = elapsed * self.rate
        self._tokens = min(self.burst, self._tokens + tokens_to_add)
        self._last_refill = now

    async def acquire(self, tokens: int = 1) -> float:
        """
        Acquire tokens, waiting if necessary.
        Returns the wait time in seconds.
        """
        async with self._lock:
            # Honor server-side backpressure
            now = time.monotonic()
            if now < self._backpressure_until:
                wait = self._backpressure_until - now
                logger.debug(
                    f"[{self.domain}] Backpressure: waiting {wait:.2f}s"
                )
                await asyncio.sleep(wait)

            self._refill()

            if self._tokens >= tokens:
                self._tokens -= tokens
                return 0.0

            # Calculate wait time for enough tokens
            deficit = tokens - self._tokens
            wait_time = deficit / self.rate
            logger.debug(
                f"[{self.domain}] Rate limit hit, waiting {wait_time:.2f}s "
                f"(tokens: {self._tokens:.2f})"
            )
            await asyncio.sleep(wait_time)
            self._refill()
            self._tokens -= tokens
            return wait_time

    async def notify_rate_limited(self, retry_after: Optional[float] = None) -> None:
        """Called when server returns 429 to apply backpressure."""
        async with self._lock:
            if retry_after:
                self._backpressure_until = time.monotonic() + retry_after
                logger.warning(
                    f"[{self.domain}] Server rate limited us. "
                    f"Backing off for {retry_after:.1f}s"
                )
            # Also drain tokens to slow down
            self._tokens = 0

    def get_stats(self) -> dict:
        return {
            "domain": self.domain,
            "rate": self.rate,
            "burst": self.burst,
            "current_tokens": round(self._tokens, 2),
        }


class RateLimiterRegistry:
    """Manages per-domain rate limiters."""

    def __init__(self):
        self._limiters: dict[str, AsyncTokenBucket] = {}
        self._default_rate = 2.0
        self._default_burst = 5

    def get_or_create(
        self,
        domain: str,
        rate: Optional[float] = None,
        burst: Optional[int] = None
    ) -> AsyncTokenBucket:
        if domain not in self._limiters:
            self._limiters[domain] = AsyncTokenBucket(
                rate=rate or self._default_rate,
                burst=burst or self._default_burst,
                domain=domain
            )
        return self._limiters[domain]

    def get_all_stats(self) -> list[dict]:
        return [limiter.get_stats() for limiter in self._limiters.values()]

retry.py — Exponential Backoff

import asyncio
import random
import logging
import functools
from typing import TypeVar, Callable, Optional, Tuple, Type
from dataclasses import dataclass

logger = logging.getLogger(__name__)

T = TypeVar("T")


@dataclass
class RetryState:
    attempt: int = 0
    total_delay: float = 0.0
    last_exception: Optional[Exception] = None


class RetryExhaustedError(Exception):
    """Raised when all retry attempts are exhausted."""
    
    def __init__(self, message: str, state: RetryState):
        super().__init__(message)
        self.state = state


def calculate_backoff(
    attempt: int,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0,
    jitter: bool = True
) -> float:
    """
    Calculate backoff delay with optional jitter.
    
    Formula: min(base * exp_base^attempt, max_delay) + jitter
    """
    delay = min(base_delay * (exponential_base ** attempt), max_delay)
    
    if jitter:
        # Full jitter: random between 0 and calculated delay
        # Prevents thundering herd problem
        delay = random.uniform(0, delay)
    
    return delay


class AsyncRetry:
    """
    Async retry decorator with exponential backoff.
    
    Usage:
        @AsyncRetry(max_attempts=3, retryable_exceptions=(aiohttp.ClientError,))
        async def fetch_data():
            ...
    """

    def __init__(
        self,
        max_attempts: int = 5,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
        exponential_base: float = 2.0,
        jitter: bool = True,
        retryable_exceptions: Tuple[Type[Exception], ...] = (Exception,),
        on_retry: Optional[Callable] = None,
    ):
        self.max_attempts = max_attempts
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exponential_base = exponential_base
        self.jitter = jitter
        self.retryable_exceptions = retryable_exceptions
        self.on_retry = on_retry

    def __call__(self, func: Callable) -> Callable:
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            return await self.execute(func, *args, **kwargs)
        return wrapper

    async def execute(
        self,
        func: Callable,
        *args,
        **kwargs
    ):
        state = RetryState()

        for attempt in range(self.max_attempts):
            state.attempt = attempt
            try:
                result = await func(*args, **kwargs)
                if attempt > 0:
                    logger.info(
                        f"[{func.__name__}] Succeeded after {attempt + 1} attempts "
                        f"(total delay: {state.total_delay:.2f}s)"
                    )
                return result

            except self.retryable_exceptions as e:
                state.last_exception = e
                
                if attempt == self.max_attempts - 1:
                    raise RetryExhaustedError(
                        f"[{func.__name__}] Failed after {self.max_attempts} attempts. "
                        f"Last error: {e}",
                        state=state
                    ) from e

                delay = calculate_backoff(
                    attempt=attempt,
                    base_delay=self.base_delay,
                    max_delay=self.max_delay,
                    exponential_base=self.exponential_base,
                    jitter=self.jitter,
                )
                state.total_delay += delay

                logger.warning(
                    f"[{func.__name__}] Attempt {attempt + 1}/{self.max_attempts} "
                    f"failed: {type(e).__name__}: {e}. "
                    f"Retrying in {delay:.2f}s..."
                )

                if self.on_retry:
                    await self.on_retry(attempt, delay, e)

                await asyncio.sleep(delay)

        raise RetryExhaustedError(
            f"Exhausted {self.max_attempts} attempts",
            state=state
        )

extractors.py — Data Extraction & Parsing

import logging
from typing import Any, Dict, List, Optional, Type, TypeVar, get_type_hints
from dataclasses import fields, MISSING
import dataclasses

logger = logging.getLogger(__name__)

T = TypeVar("T")


class ExtractionError(Exception):
    """Raised when data extraction fails."""
    pass


def get_nested(data: Dict, path: str, default: Any = None) -> Any:
    """
    Get nested dict value using dot notation.
    
    Example: get_nested(data, "meta.pagination.next_cursor")
    """
    keys = path.split(".")
    current = data
    for key in keys:
        if isinstance(current, dict):
            current = current.get(key, default)
        elif isinstance(current, list) and key.isdigit():
            idx = int(key)
            current = current[idx] if idx < len(current) else default
        else:
            return default
    return current


def coerce_value(value: Any, target_type: Type) -> Any:
    """Attempt to coerce a value to the target type."""
    if value is None:
        return None
    
    # Handle Optional types
    origin = getattr(target_type, "__origin__", None)
    if origin is type(None):
        return None
    
    # Unwrap Optional[X] -> X
    if origin is type(None) or str(origin) == "typing.Union":
        args = getattr(target_type, "__args__", ())
        non_none_args = [a for a in args if a is not type(None)]
        if non_none_args:
            target_type = non_none_args[0]
    
    try:
        if target_type == bool and isinstance(value, str):
            return value.lower() in ("true", "1", "yes")
        return target_type(value)
    except (ValueError, TypeError):
        return value


def from_dict(cls: Type[T], data: Dict[str, Any]) -> T:
    """
    Safely construct a dataclass from a dictionary,
    handling missing fields with defaults and type coercion.
    """
    if not dataclasses.is_dataclass(cls):
        raise TypeError(f"{cls} is not a dataclass")

    field_map = {}
    type_hints = get_type_hints(cls)

    for f in fields(cls):
        value = data.get(f.name)

        if value is None:
            if f.default is not MISSING:
                field_map[f.name] = f.default
            elif f.default_factory is not MISSING:  # type: ignore
                field_map[f.name] = f.default_factory()  # type: ignore
            else:
                logger.warning(f"Missing required field '{f.name}' for {cls.__name__}")
                field_map[f.name] = None
        else:
            target_type = type_hints.get(f.name, type(value))
            field_map[f.name] = coerce_value(value, target_type)

    return cls(**field_map)


class APIResponseExtractor:
    """
    Extracts structured data from paginated API responses.
    Supports multiple API response formats.
    """

    # Common response wrapper patterns
    DATA_PATHS = ["data", "results", "items", "records", "content"]
    META_PATHS = ["meta", "pagination", "_meta"]

    def extract_items(
        self,
        response: Dict[str, Any],
        data_path: Optional[str] = None
    ) -> List[Dict]:
        """Extract list of items from API response."""
        if data_path:
            items = get_nested(response, data_path)
        else:
            # Auto-detect data location
            items = self._auto_detect_items(response)

        if items is None:
            raise ExtractionError(
                f"Could not find items in response. "
                f"Keys: {list(response.keys())}"
            )

        if not isinstance(items, list):
            raise ExtractionError(
                f"Expected list of items, got {type(items).__name__}"
            )

        return items

    def _auto_detect_items(self, response: Dict) -> Optional[List]:
        """Try common paths to find list data."""
        # Direct list response
        if isinstance(response, list):
            return response
        
        # Try common wrapper keys
        for key in self.DATA_PATHS:
            value = response.get(key)
            if isinstance(value, list):
                return value
        
        return None

    def extract_pagination_info(
        self,
        response: Dict[str, Any],
        cursor_path: Optional[str] = None
    ) -> Dict[str, Any]:
        """Extract pagination metadata from response."""
        info = {
            "total": None,
            "page": None,
            "per_page": None,
            "total_pages": None,
            "has_more": False,
            "next_cursor": None,
            "next_url": None,
        }

        # Try to find pagination data in common locations
        meta = None
        for path in self.META_PATHS:
            meta = response.get(path)
            if meta and isinstance(meta, dict):
                break

        if meta:
            info["total"] = meta.get("total") or meta.get("total_count")
            info["page"] = meta.get("page") or meta.get("current_page")
            info["per_page"] = meta.get("per_page") or meta.get("limit")
            info["total_pages"] = meta.get("total_pages") or meta.get("pages")
            info["has_more"] = bool(meta.get("has_more") or meta.get("has_next"))
            info["next_url"] = meta.get("next") or meta.get("next_url")
            info["next_cursor"] = meta.get("cursor") or meta.get("next_cursor")

        # Cursor-based pagination via custom path
        if cursor_path:
            info["next_cursor"] = get_nested(response, cursor_path)

        # Infer has_more
        if info["page"] and info["total_pages"]:
            info["has_more"] = info["page"] < info["total_pages"]

        return info

    def parse_items(
        self,
        items: List[Dict],
        model_cls: Type[T],
        strict: bool = False
    ) -> Tuple[List[T], List[Dict]]:
        """
        Parse raw dicts into typed dataclass instances.
        
        Returns (successful_items, failed_items)
        """
        from typing import Tuple  # local import to avoid circular
        successful = []
        failed = []

        for item in items:
            try:
                instance = from_dict(model_cls, item)
                successful.append(instance)
            except Exception as e:
                logger.error(f"Failed to parse {model_cls.__name__}: {e} | Data: {item}")
                failed.append({"data": item, "error": str(e)})
                if strict:
                    raise ExtractionError(f"Strict mode: {e}") from e

        logger.debug(
            f"Parsed {len(successful)}/{len(items)} {model_cls.__name__} records "
            f"({len(failed)} failures)"
        )
        return successful, failed

scraper.py — Core Async Scraper

import asyncio
import logging
import time
from dataclasses import dataclass, field
from datetime import datetime
from typing import (
    AsyncIterator, Dict, Generic, List, Optional,
    Tuple, Type, TypeVar, Any
)
from urllib.parse import urlencode, urlparse

import aiohttp
from aiohttp import ClientResponseError, ClientConnectorError, ServerTimeoutError

from models import (
    RateLimitConfig, RetryConfig, PaginationConfig,
    ScrapingResult, ScrapingStatus
)
from rate_limiter import AsyncTokenBucket, RateLimiterRegistry
from retry import AsyncRetry, RetryExhaustedError, calculate_backoff
from extractors import APIResponseExtractor, ExtractionError

logger = logging.getLogger(__name__)

T = TypeVar("T")

# Exceptions we'll retry on
RETRYABLE_EXCEPTIONS = (
    ClientConnectorError,
    ServerTimeoutError,
    asyncio.TimeoutError,
    ConnectionResetError,
)


@dataclass
class FetchResult:
    """Raw result from a single HTTP request."""
    url: str
    status_code: int
    data: Optional[Dict] = None
    error: Optional[str] = None
    headers: Dict[str, str] = field(default_factory=dict)
    duration_ms: float = 0.0
    attempt: int = 1


class AsyncScraper(Generic[T]):
    """
    Production-grade async web scraper with:
    - Per-domain rate limiting (token bucket)
    - Exponential backoff retries
    - Automatic pagination handling
    - Typed data extraction
    - Connection pooling
    - Comprehensive error handling
    """

    def __init__(
        self,
        base_url: str,
        model_cls: Type[T],
        rate_config: Optional[RateLimitConfig] = None,
        retry_config: Optional[RetryConfig] = None,
        pagination_config: Optional[PaginationConfig] = None,
        default_headers: Optional[Dict[str, str]] = None,
        timeout_seconds: float = 30.0,
        max_concurrent_requests: int = 10,
    ):
        self.base_url = base_url.rstrip("/")
        self.model_cls = model_cls
        self.rate_config = rate_config or RateLimitConfig()
        self.retry_config = retry_config or RetryConfig()
        self.pagination_config = pagination_config or PaginationConfig()
        self.default_headers = default_headers or {}
        self.timeout_seconds = timeout_seconds

        # Domain for rate limiting
        self._domain = urlparse(base_url).netloc

        # Session management
        self._session: Optional[aiohttp.ClientSession] = None
        self._connector: Optional[aiohttp.TCPConnector] = None

        # Rate limiter
        self._limiter = AsyncTokenBucket(
            rate=self.rate_config.requests_per_second,
            burst=self.rate_config.burst_size,
            domain=self._domain,
        )

        # Semaphore for concurrency control
        self._semaphore = asyncio.Semaphore(max_concurrent_requests)

        # Data extractor
        self._extractor = APIResponseExtractor()

        # Stats
        self._total_requests = 0
        self._failed_requests = 0
        self._rate_limited_count = 0

    async def __aenter__(self) -> "AsyncScraper":
        await self._create_session()
        return self

    async def __aexit__(self, *args) -> None:
        await self._close_session()

    async def _create_session(self) -> None:
        """Create aiohttp session with connection pooling."""
        self._connector = aiohttp.TCPConnector(
            limit=100,           # Total connection pool size
            limit_per_host=20,   # Per-host connection limit
            ttl_dns_cache=300,   # DNS cache TTL
            enable_cleanup_closed=True,
        )
        self._session = aiohttp.ClientSession(
            connector=self._connector,
            headers={
                "User-Agent": "AsyncScraper/1.0",
                "Accept": "application/json",
                **self.default_headers,
            },
            timeout=aiohttp.ClientTimeout(total=self.timeout_seconds),
        )
        logger.info(f"Session created for domain: {self._domain}")

    async def _close_session(self) -> None:
        """Gracefully close the session."""
        if self._session:
            await self._session.close()
            # Allow SSL cleanup
            await asyncio.sleep(0.25)
        logger.info(
            f"Session closed. Stats — Requests: {self._total_requests}, "
            f"Failed: {self._failed_requests}, "
            f"Rate limited: {self._rate_limited_count}"
        )

    async def _fetch_with_retry(
        self,
        url: str,
        params: Optional[Dict] = None,
        attempt: int = 0
    ) -> FetchResult:
        """
        Fetch a URL with rate limiting and exponential backoff.
        Recursive retry implementation.
        """
        if attempt >= self.retry_config.max_attempts:
            raise RetryExhaustedError(
                f"Max retries ({self.retry_config.max_attempts}) exhausted for {url}",
                state=None
            )

        # Acquire rate limit token
        await self._limiter.acquire()

        async with self._semaphore:
            start_time = time.monotonic()
            self._total_requests += 1

            try:
                async with self._session.get(url, params=params) as response:
                    duration_ms = (time.monotonic() - start_time) * 1000
                    
                    logger.debug(
                        f"GET {url} | Status: {response.status} | "
                        f"Duration: {duration_ms:.0f}ms | Attempt: {attempt + 1}"
                    )

                    # Handle rate limiting (429)
                    if response.status == 429:
                        self._rate_limited_count += 1
                        retry_after = self._parse_retry_after(response)
                        await self._limiter.notify_rate_limited(retry_after)
                        
                        delay = retry_after or calculate_backoff(
                            attempt,
                            self.retry_config.base_delay,
                            self.retry_config.max_delay,
                            jitter=self.retry_config.jitter
                        )
                        logger.warning(
                            f"Rate limited (429) on {url}. "
                            f"Retrying after {delay:.1f}s..."
                        )
                        await asyncio.sleep(delay)
                        return await self._fetch_with_retry(url, params, attempt + 1)

                    # Handle other retryable status codes
                    if response.status in self.retry_config.retryable_status_codes:
                        self._failed_requests += 1
                        delay = calculate_backoff(
                            attempt,
                            self.retry_config.base_delay,
                            self.retry_config.max_delay,
                            jitter=self.retry_config.jitter
                        )
                        logger.warning(
                            f"Retryable status {response.status} on {url}. "
                            f"Attempt {attempt + 1}. Retrying in {delay:.1f}s..."
                        )
                        await asyncio.sleep(delay)
                        return await self._fetch_with_retry(url, params, attempt + 1)

                    # Non-retryable errors
                    if response.status >= 400:
                        error_text = await response.text()
                        return FetchResult(
                            url=url,
                            status_code=response.status,
                            error=f"HTTP {response.status}: {error_text[:200]}",
                            duration_ms=duration_ms,
                            attempt=attempt + 1,
                        )

                    # Success
                    data = await response.json(content_type=None)
                    return FetchResult(
                        url=url,
                        status_code=response.status,
                        data=data,
                        headers=dict(response.headers),
                        duration_ms=duration_ms,
                        attempt=attempt + 1,
                    )

            except RETRYABLE_EXCEPTIONS as e:
                self._failed_requests += 1
                delay = calculate_backoff(
                    attempt,
                    self.retry_config.base_delay,
                    self.retry_config.max_delay,
                    jitter=self.retry_config.jitter
                )
                logger.warning(
                    f"Connection error on {url}: {type(e).__name__}: {e}. "
                    f"Retrying in {delay:.1f}s..."
                )
                await asyncio.sleep(delay)
                return await self._fetch_with_retry(url, params, attempt + 1)

    def _parse_retry_after(self, response: aiohttp.ClientResponse) -> Optional[float]:
        """Parse Retry-After header (seconds or HTTP date)."""
        header = response.headers.get(self.rate_config.retry_after_header)
        if not header:
            return None
        try:
            return float(header)
        except ValueError:
            # Try parsing as HTTP date
            from email.utils import parsedate
            parsed = parsedate(header)
            if parsed:
                import calendar
                target = calendar.timegm(parsed)
                return max(0, target - time.time())
        return None

    def _build_url(self, endpoint: str) -> str:
        """Build full URL from base and endpoint."""
        if endpoint.startswith("http"):
            return endpoint
        return f"{self.base_url}/{endpoint.lstrip('/')}"

    async def scrape_page(
        self,
        endpoint: str,
        page: int = 1,
        extra_params: Optional[Dict] = None,
        data_path: Optional[str] = None,
    ) -> Tuple[List[T], Dict]:
        """
        Scrape a single page and return parsed items + pagination info.
        """
        url = self._build_url(endpoint)
        params = {
            self.pagination_config.page_param: page,
            self.pagination_config.per_page_param: self.pagination_config.per_page,
            **(extra_params or {}),
        }

        result = await self._fetch_with_retry(url, params=params)

        if result.error or not result.data:
            logger.error(f"Failed to fetch page {page}: {result.error}")
            return [], {}

        items_raw = self._extractor.extract_items(result.data, data_path)
        pagination = self._extractor.extract_pagination_info(
            result.data,
            cursor_path=self.pagination_config.next_cursor_path,
        )
        parsed_items, failed = self._extractor.parse_items(items_raw, self.model_cls)

        if failed:
            logger.warning(f"Page {page}: {len(failed)} items failed to parse")

        return parsed_items, pagination

    async def scrape_all_pages(
        self,
        endpoint: str,
        extra_params: Optional[Dict] = None,
        data_path: Optional[str] = None,
    ) -> AsyncIterator[List[T]]:
        """
        Async generator that yields items page by page.
        
        Usage:
            async for page_items in scraper.scrape_all_pages("/users"):
                await save_to_db(page_items)
        """
        page = 1
        total_records = 0
        max_pages = self.pagination_config.max_pages

        logger.info(f"Starting pagination scrape: {endpoint}")

        while True:
            logger.info(f"Fetching page {page}...")

            items, pagination = await self.scrape_page(
                endpoint, page, extra_params, data_path
            )

            if not items:
                logger.info(f"No items on page {page}, stopping.")
                break

            total_records += len(items)
            yield items

            logger.info(
                f"Page {page}: {len(items)} items "
                f"(total so far: {total_records}) | "
                f"Pagination: {pagination}"
            )

            # Determine if there

Try coding tasks with both models

See Claude and Qwen answer side by side in Multichat

Try it yourself — free

Detailed Breakdown

When it comes to coding assistance, both Claude and Qwen are capable tools — but they serve different types of developers in meaningfully different ways.

Claude's strongest coding asset is precision. It follows instructions carefully, produces clean and well-structured code, and excels at multi-step tasks like refactoring legacy codebases, writing unit tests, and explaining complex logic in plain language. Its SWE-bench Verified score of 79.6% puts it ahead of Qwen's 76.4%, which matters in practice: Claude is more reliable at autonomous software engineering tasks like identifying bugs in context, suggesting correct fixes, and reasoning through edge cases. For developers building production-quality software, Claude's output tends to require less cleanup.

Claude Code, Anthropic's CLI tool, is a standout feature for serious developers. It integrates directly into your terminal and can read, write, and reason about entire codebases — not just snippets. Combined with extended thinking, Claude can work through architectural decisions and debug non-obvious issues that stumped a simpler pass. The 128K context window on Sonnet (200K on Opus) is large enough for most real-world projects, though not as expansive as Qwen's 256K.

Qwen holds its own, especially given its price point. At roughly $0.40 per million input tokens versus Claude's ~$3.00, Qwen is significantly more affordable for high-volume API use — relevant for teams running automated code review pipelines, generating boilerplate at scale, or building coding tools into products. Its 256K context window is a genuine advantage when you need to load an entire codebase or a large dependency tree into a single prompt.

Where Qwen lags behind is in nuance and instruction-following for complex coding tasks. It's competitive on benchmarks, but Claude's qualitative output — particularly for documentation, code comments, and explaining tricky logic — tends to feel more polished. Qwen also has less tooling built around it in Western developer ecosystems, meaning less community support and fewer integrations.

For most individual developers and small teams, Claude is the better coding companion — particularly if you're doing exploratory debugging, code review, or working with Claude Code in a terminal. The accuracy and quality justify the higher cost.

If you're building a product that needs LLM-powered coding features at scale, or you're cost-sensitive and working with large codebases that benefit from a wider context window, Qwen is a surprisingly strong alternative that punches well above its price.

Recommendation: Claude for quality and developer experience; Qwen for cost-efficiency at scale.

Frequently Asked Questions

Other Topics for Claude vs Qwen

Coding Comparisons for Other Models

Try coding tasks with Claude and Qwen

Compare in Multichat — free

Join 10,000+ professionals who use Multichat