import asyncio import json import uuid import xml.etree.ElementTree import httpx from .models import ( AssetNotFoundError, AuthError, BatchResult, BatchUploadItem, ClothingAsset, RateLimitError, RbxAsset, RbxAssetType, RbxCreator, UploadError, ) class RobloxClient: def __init__( self, roblosecurity: str, publisher_user_id: int, proxy: str | None = None, ): self._roblosecurity = roblosecurity self._publisher_user_id = publisher_user_id self._proxy = proxy self._http = httpx.AsyncClient() self._fetch_headers = { "Cookie": f".ROBLOSECURITY={roblosecurity}", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", } self._csrf_headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:145.0) Gecko/20100101 Firefox/145.0", "Referer": "https://create.roblox.com/", "Origin": "https://create.roblox.com", } self._csrf_cookies = {".ROBLOSECURITY": roblosecurity} def _proxy_url(self, url: str, force_direct: bool = False) -> str: if not self._proxy or force_direct: return url return url.replace("roblox.com", self._proxy) async def _get_csrf_token(self) -> str: url = self._proxy_url("https://apis.roblox.com/assets/user-auth/v1/assets") response = await self._http.post( url, cookies=self._csrf_cookies, headers=self._csrf_headers ) csrf = response.headers.get("X-CSRF-TOKEN") if not csrf: if response.status_code in (401, 403): raise AuthError("Invalid or expired ROBLOSECURITY token.") raise AuthError("Failed to retrieve X-CSRF-TOKEN.") return csrf async def _economy_request(self, asset_id: int) -> httpx.Response: url = self._proxy_url( f"https://economy.roblox.com/v2/assets/{asset_id}/details" ) return await self._http.get(url, headers=self._fetch_headers) async def _asset_delivery_request(self, asset_id: int) -> httpx.Response: url = self._proxy_url( f"https://assetdelivery.roblox.com/v1/asset/?id={asset_id}" ) return await self._http.get( url, headers=self._fetch_headers, follow_redirects=True ) async def _get_asset_xml(self, asset: RbxAsset) -> xml.etree.ElementTree.Element: response = await self._asset_delivery_request(asset.asset_id) response.raise_for_status() content = response.content.decode("utf-8") return xml.etree.ElementTree.fromstring(content) @staticmethod def _get_shirt_template_id_from_xml(root: xml.etree.ElementTree.Element) -> int: url_element = root.find(".//url") if url_element is None: raise ValueError("XML did not contain a tag.") url = url_element.text if not url: raise ValueError(" tag did not contain any text.") return int(url.split("id=")[1]) async def asset_from_id(self, asset_id: int) -> RbxAsset: """Fetch asset information from Roblox by asset ID.""" response = await self._economy_request(asset_id) if response.status_code == 404: raise AssetNotFoundError(f"Asset {asset_id} not found.") if response.status_code in (401, 403): raise AuthError("Not authorized to fetch this asset.") response.raise_for_status() asset_info = response.json() creator_info = asset_info["Creator"] creator = RbxCreator( creator_id=creator_info["Id"], username=creator_info["Name"], creator_type=creator_info["CreatorType"], ) asset_type_id = asset_info["AssetTypeId"] if asset_type_id in (RbxAssetType.SHIRT, RbxAssetType.PANTS): return ClothingAsset( asset_id=asset_info["AssetId"], creator=creator, name=asset_info["Name"], description=asset_info["Description"], asset_type=asset_type_id, ) return RbxAsset( asset_id=asset_info["AssetId"], creator=creator, name=asset_info["Name"], description=asset_info["Description"], asset_type=asset_type_id, ) async def fetch_clothing_image(self, asset: ClothingAsset) -> bytes: """Fetch the image data for a clothing asset.""" xml_root = await self._get_asset_xml(asset) template_id = self._get_shirt_template_id_from_xml(xml_root) image = await self._asset_delivery_request(template_id) image.raise_for_status() return image.content async def upload_clothing_image( self, image: bytes, name: str, description: str, asset_type: RbxAssetType, group_id: int, max_attempts: int = 10, poll_interval: float = 1.0, ) -> dict: """Upload a clothing image to Roblox and return the operation result. Args: image: Raw PNG bytes of the clothing image. name: Display name for the asset. description: Description for the asset. asset_type: RbxAssetType.SHIRT or RbxAssetType.PANTS. group_id: ID of the group to upload the asset to. max_attempts: Number of times to poll the operation status. Defaults to 10. poll_interval: Seconds to wait between polls. Defaults to 1.0. """ csrf = await self._get_csrf_token() upload_url = self._proxy_url( "https://apis.roblox.com/assets/user-auth/v1/assets" ) meta = { "displayName": name, "description": description, "assetType": asset_type, "creationContext": { "creator": {"groupId": group_id}, "expectedPrice": 10, }, } upload_headers = { "X-CSRF-TOKEN": csrf, "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:145.0) Gecko/20100101 Firefox/145.0", "Accept": "*/*", "Accept-Language": "en-US,en;q=0.5", "Referer": "https://create.roblox.com/", "Origin": "https://create.roblox.com", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-site", } response = await self._http.post( upload_url, files={ "request": (None, json.dumps(meta), "application/json"), "fileContent": ("clothing_upload", image, "image/png"), }, headers=upload_headers, cookies=self._csrf_cookies, ) if response.status_code == 429: raise RateLimitError("Rate limit hit during upload.") if response.status_code in (401, 403): raise AuthError("Not authorized to upload assets.") response.raise_for_status() data = response.json() operation_id = data.get("operationId") if operation_id: for _ in range(max_attempts): await asyncio.sleep(poll_interval) op_response = await self._http.get( self._proxy_url( f"https://apis.roblox.com/assets/user-auth/v1/operations/{operation_id}" ), headers={"X-CSRF-TOKEN": csrf}, cookies=self._csrf_cookies, ) op_response.raise_for_status() op_data = op_response.json() if op_data.get("done"): if op_data.get("response", {}).get("assetId"): return {"asset_id": op_data["response"]["assetId"]} return op_data raise UploadError( f"Upload operation did not complete after {max_attempts} attempts." ) return data async def batch_upload( self, items: list[BatchUploadItem], max_attempts: int = 10, poll_interval: float = 1.0, ) -> BatchResult: """Upload multiple clothing images with limited concurrency. Processes items 2 at a time. Continues on failure and reports all failures in the returned BatchResult. Args: items: List of BatchUploadItem to upload. max_attempts: Passed to each upload_clothing_image call. poll_interval: Passed to each upload_clothing_image call. """ result = BatchResult() semaphore = asyncio.Semaphore(2) async def _upload_one(item: BatchUploadItem): async with semaphore: try: upload_result = await self.upload_clothing_image( image=item.image, name=item.name, description=item.description, asset_type=item.asset_type, group_id=item.group_id, max_attempts=max_attempts, poll_interval=poll_interval, ) result.succeeded.append((item, upload_result)) except Exception as e: result.failed.append((item, e)) await asyncio.gather(*[_upload_one(item) for item in items]) return result async def publish_collectible( self, asset_id: int, group_id: int, name: str, description: str, price: int = 5, ) -> str: """Publish an asset as a Limited collectible. Returns the collectibleItemId.""" csrf = await self._get_csrf_token() response = await self._http.post( "https://itemconfiguration.roblox.com/v1/collectibles", json={ "isRentalOptIn": False, "idempotencyToken": str(uuid.uuid4()), "targetId": asset_id, "targetType": 0, "publishingType": 2, "agreedPublishingFee": 10, "creatorGroupId": group_id, "publisherUserId": self._publisher_user_id, "quantity": 0, "quantityLimitPerUser": 0, "resaleRestriction": 2, "priceInRobux": price, "priceOffset": 0, "optOutFromRegionalPricing": False, "isFree": False, "saleLocationConfiguration": {"saleLocationType": 1, "places": []}, "name": name, "description": description, }, headers={ "X-CSRF-TOKEN": csrf, "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:145.0) Gecko/20100101 Firefox/145.0", "Referer": "https://create.roblox.com/", "Origin": "https://create.roblox.com", }, cookies=self._csrf_cookies, ) if response.status_code == 429: raise RateLimitError("Rate limit hit during collectible publish.") if response.status_code in (401, 403): raise AuthError("Not authorized to publish this collectible.") response.raise_for_status() data = response.json() collectible_item_id = data.get("collectibleItemId") if not collectible_item_id: raise UploadError(f"publish_collectible did not return a collectibleItemId: {data}") return collectible_item_id async def onsale_asset( self, collectible_item_id: str, price: int = 5, ) -> dict: """Put an asset on sale.""" csrf = await self._get_csrf_token() response = await self._http.patch( f"https://itemconfiguration.roblox.com/v1/collectibles/{collectible_item_id}", json={ "saleLocationConfiguration": {"saleLocationType": 1, "places": []}, "saleStatus": 0, "quantityLimitPerUser": 0, "resaleRestriction": 2, "priceInRobux": price, "priceOffset": 0, "optOutFromRegionalPricing": False, "isFree": False, }, headers={ "X-CSRF-TOKEN": csrf, "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:145.0) Gecko/20100101 Firefox/145.0", "Referer": "https://create.roblox.com/", "Origin": "https://create.roblox.com", }, cookies=self._csrf_cookies, ) if response.status_code == 429: raise RateLimitError("Rate limit hit during onsale.") if response.status_code in (401, 403): raise AuthError("Not authorized to put this asset on sale.") response.raise_for_status() return response.json() if response.text else {} async def get_collectible_item_id(self, asset_id: int, max_attempts: int = 10, poll_interval: float = 3.0) -> str: """Look up the collectible item ID (UUID) for a given asset ID, retrying until available.""" for _ in range(max_attempts): response = await self._http.get( f"https://itemconfiguration.roblox.com/v1/collectibles/0/{asset_id}", cookies=self._csrf_cookies, ) response.raise_for_status() collectible_item_id = response.json().get("collectibleItemId") if collectible_item_id: return collectible_item_id await asyncio.sleep(poll_interval) raise UploadError(f"collectibleItemId not available for asset {asset_id} after {max_attempts} attempts.") async def close(self): """Close the underlying HTTP client.""" await self._http.aclose() async def __aenter__(self): return self async def __aexit__(self, *args): await self.close()