feat: add batch upload, CLI, and exception hierarchy

- Add RbxError base class with AuthError, RateLimitError, UploadError,
  and AssetNotFoundError subclasses
- Add BatchUploadItem and BatchResult dataclasses
- Add batch_upload method with semaphore-limited concurrency (2 at a time)
- Add configurable max_attempts and poll_interval to upload_clothing_image
- Add CLI with upload and onsale commands (rbx-upload[cli] extra)
- Improve auth error handling across all client methods
- Bump version to 0.2.0
This commit is contained in:
2026-02-18 15:45:17 -05:00
parent 2a3ce02df1
commit 44b4e40c40
6 changed files with 330 additions and 23 deletions
+81 -17
View File
@@ -5,13 +5,18 @@ import xml.etree.ElementTree
import httpx
from .models import ClothingAsset, RbxAsset, RbxAssetType, RbxCreator
class RateLimitError(Exception):
"""Raised when hitting Roblox rate limits (HTTP 429)."""
pass
from .models import (
AssetNotFoundError,
AuthError,
BatchResult,
BatchUploadItem,
ClothingAsset,
RateLimitError,
RbxAsset,
RbxAssetType,
RbxCreator,
UploadError,
)
class RobloxClient:
@@ -49,11 +54,9 @@ class RobloxClient:
)
csrf = response.headers.get("X-CSRF-TOKEN")
if not csrf:
raise httpx.HTTPStatusError(
"Failed to retrieve X-CSRF-TOKEN.",
request=response.request,
response=response,
)
if response.status_code in (401, 403):
raise AuthError("Invalid or expired ROBLOSECURITY token.")
raise AuthError("Failed to retrieve X-CSRF-TOKEN.")
return csrf
async def _economy_request(self, asset_id: int) -> httpx.Response:
@@ -89,6 +92,10 @@ class RobloxClient:
async def asset_from_id(self, asset_id: int) -> RbxAsset:
"""Fetch asset information from Roblox by asset ID."""
response = await self._economy_request(asset_id)
if response.status_code == 404:
raise AssetNotFoundError(f"Asset {asset_id} not found.")
if response.status_code in (401, 403):
raise AuthError("Not authorized to fetch this asset.")
response.raise_for_status()
asset_info = response.json()
creator_info = asset_info["Creator"]
@@ -129,8 +136,20 @@ class RobloxClient:
description: str,
asset_type: RbxAssetType,
group_id: int,
max_attempts: int = 10,
poll_interval: float = 1.0,
) -> dict:
"""Upload a clothing image to Roblox and return the operation result."""
"""Upload a clothing image to Roblox and return the operation result.
Args:
image: Raw PNG bytes of the clothing image.
name: Display name for the asset.
description: Description for the asset.
asset_type: RbxAssetType.SHIRT or RbxAssetType.PANTS.
group_id: ID of the group to upload the asset to.
max_attempts: Number of times to poll the operation status. Defaults to 10.
poll_interval: Seconds to wait between polls. Defaults to 1.0.
"""
csrf = await self._get_csrf_token()
upload_url = self._proxy_url(
"https://apis.roblox.com/assets/user-auth/v1/assets"
@@ -166,15 +185,17 @@ class RobloxClient:
)
if response.status_code == 429:
raise RateLimitError("Rate limit hit during upload")
raise RateLimitError("Rate limit hit during upload.")
if response.status_code in (401, 403):
raise AuthError("Not authorized to upload assets.")
response.raise_for_status()
data = response.json()
operation_id = data.get("operationId")
if operation_id:
for _ in range(10):
await asyncio.sleep(1)
for _ in range(max_attempts):
await asyncio.sleep(poll_interval)
op_response = await self._http.get(
self._proxy_url(
f"https://apis.roblox.com/assets/user-auth/v1/operations/{operation_id}"
@@ -188,9 +209,50 @@ class RobloxClient:
if op_data.get("response", {}).get("assetId"):
return {"asset_id": op_data["response"]["assetId"]}
return op_data
raise UploadError(
f"Upload operation did not complete after {max_attempts} attempts."
)
return data
async def batch_upload(
self,
items: list[BatchUploadItem],
max_attempts: int = 10,
poll_interval: float = 1.0,
) -> BatchResult:
"""Upload multiple clothing images with limited concurrency.
Processes items 2 at a time. Continues on failure and reports all
failures in the returned BatchResult.
Args:
items: List of BatchUploadItem to upload.
max_attempts: Passed to each upload_clothing_image call.
poll_interval: Passed to each upload_clothing_image call.
"""
result = BatchResult()
semaphore = asyncio.Semaphore(2)
async def _upload_one(item: BatchUploadItem):
async with semaphore:
try:
upload_result = await self.upload_clothing_image(
image=item.image,
name=item.name,
description=item.description,
asset_type=item.asset_type,
group_id=item.group_id,
max_attempts=max_attempts,
poll_interval=poll_interval,
)
result.succeeded.append((item, upload_result))
except Exception as e:
result.failed.append((item, e))
await asyncio.gather(*[_upload_one(item) for item in items])
return result
async def onsale_asset(
self,
asset_id: int,
@@ -232,7 +294,9 @@ class RobloxClient:
)
if response.status_code == 429:
raise RateLimitError("Rate limit hit during onsale")
raise RateLimitError("Rate limit hit during onsale.")
if response.status_code in (401, 403):
raise AuthError("Not authorized to put this asset on sale.")
response.raise_for_status()
return response.json()