diff --git a/pyproject.toml b/pyproject.toml index 0b73886..661e582 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,5 @@ requires-python = ">=3.13" dependencies = [ "fastapi[standard]>=0.126.0", "python-dotenv>=1.2.1", - "httpx>=0.25.0", + "rbx-upload", ] - - diff --git a/src/main.py b/src/main.py index 0c03083..4ece546 100644 --- a/src/main.py +++ b/src/main.py @@ -4,77 +4,104 @@ import os from dotenv import load_dotenv from fastapi import Depends, FastAPI, HTTPException, status from fastapi.security import APIKeyHeader +from rbx_upload import ClothingAsset, RateLimitError, RbxAssetType, RobloxClient -import models -from utils import roblox_service, hashing, discord import database +from utils import discord, hashing load_dotenv() TARGET = os.getenv("TARGET_ID") VALID_API_KEY = os.getenv("VALID_API_KEY") +ROBLOSECURITY = os.getenv("ROBLOSECURITY_TOKEN") +PUBLISHER_USER_ID = os.getenv("PUBLISHER_USER_ID") +ROBLOX_PROXY = os.getenv("ROBLOX_PROXY") if TARGET is None: raise EnvironmentError("TARGET_ID is missing from environment.") if not VALID_API_KEY: raise EnvironmentError("VALID_API_KEY is missing from environment.") +if not ROBLOSECURITY: + raise EnvironmentError("ROBLOSECURITY_TOKEN is missing from environment.") +if not PUBLISHER_USER_ID: + raise EnvironmentError("PUBLISHER_USER_ID is missing from environment.") from contextlib import asynccontextmanager -@asynccontextmanager -async def lifespan(app: FastAPI): - # Start background task - task = asyncio.create_task(process_onsale_queue()) - yield - # Cleanup - task.cancel() - try: - await task - except asyncio.CancelledError: - pass - -app = FastAPI(lifespan=lifespan) - # Retry Configuration RETRY_INTERVAL = int(os.getenv("RETRY_INTERVAL_SECONDS", 60)) RETRY_DELAY = int(os.getenv("RETRY_DELAY_SECONDS", 300)) +roblox: RobloxClient + + +@asynccontextmanager +async def lifespan(app: FastAPI): + global roblox + async with RobloxClient( + roblosecurity=ROBLOSECURITY, + publisher_user_id=int(PUBLISHER_USER_ID), + proxy=ROBLOX_PROXY, + ) as client: + roblox = client + task = asyncio.create_task(process_onsale_queue()) + yield + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + +app = FastAPI(lifespan=lifespan) + + async def process_onsale_queue(): """Background task to retry failed onsale attempts.""" while True: try: pending_items = database.get_pending_onsale_items() for item in pending_items: - print(f"Retrying onsale for asset {item['asset_id']} (Attempt {item['retry_count'] + 1})") + print( + f"Retrying onsale for asset {item['asset_id']} (Attempt {item['retry_count'] + 1})" + ) try: - await roblox_service.onsale_asset( + await roblox.onsale_asset( item["asset_id"], item["name"], item["description"], item["group_id"], ) database.remove_from_onsale_queue(item["id"]) - print(f"Successfully put asset {item['asset_id']} on sale via queue.") - - # Send Discord notification on successful retry - await discord.send_upload_webhook( - item["name"], - item["original_asset_id"], - item["asset_id"], - item["asset_type"] + print( + f"Successfully put asset {item['asset_id']} on sale via queue." + ) + + await discord.send_upload_webhook( + item["name"], + item["original_asset_id"], + item["asset_id"], + item["asset_type"], + ) + except RateLimitError: + print( + f"Rate limit hit again for asset {item['asset_id']}, backing off." + ) + database.increment_retry_onsale( + item["id"], delay_seconds=RETRY_DELAY ) - except roblox_service.RateLimitError: - print(f"Rate limit hit again for asset {item['asset_id']}, backing off.") - database.increment_retry_onsale(item["id"], delay_seconds=RETRY_DELAY) except Exception as e: print(f"Unexpected error retrying asset {item['asset_id']}: {e}") - database.increment_retry_onsale(item["id"], delay_seconds=RETRY_DELAY * 2) - + database.increment_retry_onsale( + item["id"], delay_seconds=RETRY_DELAY * 2 + ) + await asyncio.sleep(RETRY_INTERVAL) except Exception as e: print(f"Error in background queue task: {e}") await asyncio.sleep(RETRY_INTERVAL) + api_key_header = APIKeyHeader(name="x-api-key") @@ -88,87 +115,101 @@ async def verify_api_key(api_key: str = Depends(api_key_header)): @app.get("/asset/{asset_id}") async def get_asset_info(asset_id: int, _: str = Depends(verify_api_key)): - asset = await roblox_service.asset_from_id(asset_id) - final_dict = {asset} - if isinstance(asset, models.ClothingAsset): + asset = await roblox.asset_from_id(asset_id) + if isinstance(asset, ClothingAsset): print("clothing asset found") - image = await roblox_service.fetch_clothing_image(asset) + image = await roblox.fetch_clothing_image(asset) with open(f"tests/{asset.asset_id}.png", "wb") as output: output.write(image) - return final_dict + return { + "asset_id": asset.asset_id, + "name": asset.name, + "asset_type": asset.asset_type, + } # Global dictionary to store locks per image hash upload_locks: dict[str, asyncio.Lock] = {} + @app.post("/create/") async def reupload_asset(asset_id: int, _: str = Depends(verify_api_key)): - asset = await roblox_service.asset_from_id(asset_id) - if isinstance(asset, models.ClothingAsset): + asset = await roblox.asset_from_id(asset_id) + if isinstance(asset, ClothingAsset): print(f"Clothing asset found: {asset.name}") - image = await roblox_service.fetch_clothing_image(asset) - + image = await roblox.fetch_clothing_image(asset) + # Check for duplicates using hash image_hash = hashing.get_image_hash(image) - + # Get or create a lock for this specific hash if image_hash not in upload_locks: upload_locks[image_hash] = asyncio.Lock() - + async with upload_locks[image_hash]: # Double-check database inside the lock existing_new_id = database.get_uploaded_asset(image_hash, asset.asset_type) - + if existing_new_id: - print(f"Asset already uploaded (hash and type match): {existing_new_id}") + print( + f"Asset already uploaded (hash and type match): {existing_new_id}" + ) return {"uploaded": {"asset_id": existing_new_id}} # Prepare description with original URL original_url = f"https://www.roblox.com/catalog/{asset_id}" new_description = f"{asset.description}\n\nOriginal: {original_url}" - uploaded = await roblox_service.upload_clothing_image( + uploaded = await roblox.upload_clothing_image( image, asset.name, new_description, asset.asset_type, - models.RbxCreator(int(TARGET), "Upload_Group", "Group"), + int(TARGET), ) new_asset_id = uploaded.get("asset_id") if new_asset_id: # Save to database - database.save_uploaded_asset(image_hash, asset.asset_type, asset_id, new_asset_id) - + database.save_uploaded_asset( + image_hash, asset.asset_type, asset_id, new_asset_id + ) + try: - onsale = await roblox_service.onsale_asset( + await roblox.onsale_asset( new_asset_id, asset.name, new_description, int(TARGET), ) - except roblox_service.RateLimitError: - print(f"Rate limit hit for asset {new_asset_id}, adding to retry queue.") - asset_type_name = "Shirt" if asset.asset_type == models.RbxAssetType.SHIRT else "Pants" + except RateLimitError: + print( + f"Rate limit hit for asset {new_asset_id}, adding to retry queue." + ) + asset_type_name = ( + "Shirt" if asset.asset_type == RbxAssetType.SHIRT else "Pants" + ) database.add_to_onsale_queue( new_asset_id, - asset_id, # original + asset_id, asset.name, new_description, int(TARGET), - asset_type_name + asset_type_name, ) return { "uploaded": uploaded, "onsale": "queued", - "info": "Rate limit hit, item will be put on sale automatically later." + "info": "Rate limit hit, item will be put on sale automatically later.", } - + # Send Discord notification (only for successful initial onsale) - asset_type_name = "Shirt" if asset.asset_type == models.RbxAssetType.SHIRT else "Pants" + asset_type_name = ( + "Shirt" if asset.asset_type == RbxAssetType.SHIRT else "Pants" + ) await discord.send_upload_webhook( asset.name, asset_id, new_asset_id, asset_type_name ) - + return {"uploaded": uploaded} return uploaded raise HTTPException(status_code=400, detail="Asset is not a clothing asset.") diff --git a/src/models.py b/src/models.py deleted file mode 100644 index 193bbac..0000000 --- a/src/models.py +++ /dev/null @@ -1,73 +0,0 @@ -from enum import IntEnum -from typing import Literal - -# Enums - - -class RbxAssetType(IntEnum): - IMAGE = 1 - SHIRT = 11 - PANTS = 12 - - -# Types - -ClothingAssetType = Literal[ - RbxAssetType.SHIRT, - RbxAssetType.PANTS, -] -CreatorType = Literal["User", "Group"] - -# Roblox Creator class (for creator info - mainly useful for returning data + TUI) - - -class RbxCreator: - def __init__(self, creator_id: int, username: str, creator_type: CreatorType): - self.creator_id = creator_id - self.username = username - self.creator_type = creator_type - - -# Asset base class - - -class RbxAsset: - def __init__( - self, - asset_id: int, - creator: RbxCreator, - name: str, - description: str, - asset_type: RbxAssetType, - ) -> None: - self.asset_id = asset_id - self.name = name - self.description = description - self.creator = creator - self.asset_type = asset_type - - -# Clothing asset class - - -class ClothingAsset(RbxAsset): - def __init__( - self, - asset_id: int, - creator: RbxCreator, - name: str, - description: str, - asset_type: ClothingAssetType, - ) -> None: - super().__init__( - asset_id=asset_id, - creator=creator, - name=name, - description=description, - asset_type=asset_type, - ) - - async def get_image(self) -> bytes: - from utils import roblox_service - - return await roblox_service.fetch_clothing_image(self) diff --git a/src/utils/roblox_service.py b/src/utils/roblox_service.py deleted file mode 100644 index 485f4cd..0000000 --- a/src/utils/roblox_service.py +++ /dev/null @@ -1,298 +0,0 @@ -import asyncio -import json -import os -import uuid -import xml.etree.ElementTree - -import httpx -from dotenv import load_dotenv - -import models - -load_dotenv() - -# Constants and State - -ROBLOSECURITY = os.getenv("ROBLOSECURITY_TOKEN") - -if not ROBLOSECURITY: - raise EnvironmentError("ROBLOSECURITY_TOKEN is missing from environment.") - -# Must match the user ID of the account that owns the ROBLOSECURITY_TOKEN -PUBLISHER_USER_ID = os.getenv("PUBLISHER_USER_ID") - -if not PUBLISHER_USER_ID: - raise EnvironmentError("PUBLISHER_USER_ID is missing from environment.") - -FETCH_HEADERS = { - "Cookie": f".ROBLOSECURITY={ROBLOSECURITY}", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", -} - -CSRF_HEADERS = { - "X-CSRF-TOKEN": "", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:145.0) Gecko/20100101 Firefox/145.0", - "Referer": "https://create.roblox.com/", - "Origin": "https://create.roblox.com", -} - -CSRF_COOKIES = {".ROBLOSECURITY": ROBLOSECURITY} - -# Proxy configuration -ROBLOX_PROXY = os.getenv("ROBLOX_PROXY") - - -def _proxy_url(url: str) -> str: - """Redirect roblox.com URLs to a proxy if configured.""" - if not ROBLOX_PROXY: - return url - return url.replace("roblox.com", ROBLOX_PROXY) - - -CSRF_URL = _proxy_url("https://apis.roblox.com/assets/user-auth/v1/assets") - -ASSET_DELIVERY_BASE_URL = _proxy_url( - "https://assetdelivery.roblox.com/v1/asset/?id={asset_id}" -) - -ECONOMY_BASE_URL = _proxy_url("https://economy.roblox.com/v2/assets/{asset_id}/details") - -UPLOAD_URL = _proxy_url("https://apis.roblox.com/assets/user-auth/v1/assets") - -# Custom Exceptions - - -class RateLimitError(Exception): - """Raised when hitting Roblox rate limits (HTTP 429).""" - - pass - - -# Client instance for connection pooling -_client = None - - -def _get_client() -> httpx.AsyncClient: - """Get or create the shared HTTP client for connection pooling.""" - global _client - if _client is None: - _client = httpx.AsyncClient() - return _client - - -# Internal methods - - -async def _economy_request(asset_id: int) -> httpx.Response: - """Fetch asset details from the Roblox economy API.""" - client = _get_client() - return await client.get( - ECONOMY_BASE_URL.format(asset_id=asset_id), headers=FETCH_HEADERS - ) - - -async def _asset_delivery_request(asset_id: int) -> httpx.Response: - """Fetch an asset from the Roblox asset delivery API.""" - client = _get_client() - return await client.get( - ASSET_DELIVERY_BASE_URL.format(asset_id=asset_id), - headers=FETCH_HEADERS, - follow_redirects=True, - ) - - -async def _get_asset_xml(asset: models.RbxAsset) -> xml.etree.ElementTree.Element: - """Fetch and parse asset XML content.""" - response = await _asset_delivery_request(asset.asset_id) - response.raise_for_status() - content = response.content.decode("utf-8") - xml_root = xml.etree.ElementTree.fromstring(content) - return xml_root - - -def _get_shirt_template_id_from_xml(root: xml.etree.ElementTree.Element) -> int: - """Extract shirt template ID from asset XML.""" - url_element = root.find(".//url") - if url_element is None: - raise ValueError("XML did not contain a tag.") - url = url_element.text - if not url: - raise ValueError(" tag did not contain any text.") - template_id = url.split("id=")[1] - return int(template_id) - - -async def _get_csrf_token(url: str = CSRF_URL) -> str: - """Retrieve CSRF token from the Roblox API.""" - client = _get_client() - response = await client.post(url, cookies=CSRF_COOKIES, headers=CSRF_HEADERS) - csrf = response.headers.get("X-CSRF-TOKEN") - if not csrf: - raise httpx.HTTPStatusError( - "Failed to retrieve X-CSRF-TOKEN.", - request=response.request, - response=response, - ) - return csrf - - -# External methods - - -async def asset_from_id(id: int) -> models.RbxAsset: - """Fetch asset information from Roblox by asset ID.""" - response = await _economy_request(id) - response.raise_for_status() - asset_info = json.loads(response.content) - creator_info = asset_info["Creator"] - asset_creator = models.RbxCreator( - creator_id=creator_info["Id"], - username=creator_info["Name"], - creator_type=creator_info["CreatorType"], - ) - asset_type_id = asset_info["AssetTypeId"] - if ( - asset_type_id == models.RbxAssetType.SHIRT - or asset_type_id == models.RbxAssetType.PANTS - ): - return models.ClothingAsset( - asset_id=asset_info["AssetId"], - creator=asset_creator, - name=asset_info["Name"], - description=asset_info["Description"], - asset_type=asset_info["AssetTypeId"], - ) - return models.RbxAsset( - asset_id=asset_info["AssetId"], - creator=asset_creator, - name=asset_info["Name"], - description=asset_info["Description"], - asset_type=asset_info["AssetTypeId"], - ) - - -async def fetch_clothing_image(asset: models.ClothingAsset) -> bytes: - """Fetch the image data for a clothing asset.""" - try: - xml = await _get_asset_xml(asset) - template_id = _get_shirt_template_id_from_xml(xml) - image = await _asset_delivery_request(template_id) - image.raise_for_status() - return image.content - except Exception: - raise # TODO add logging - - -async def upload_clothing_image( - image: bytes, - name: str, - description: str, - asset_type: models.RbxAssetType, - target: models.RbxCreator, -) -> dict: - """Upload a clothing image to Roblox and return the asset ID.""" - csrf = await _get_csrf_token() - meta = { - "displayName": name, - "description": description, - "assetType": asset_type, - # TODO add support for user creation context - "creationContext": { - "creator": {"groupId": target.creator_id}, - "expectedPrice": 10, - }, - } - client = _get_client() - response = await client.post( - UPLOAD_URL, - files={ - "request": (None, json.dumps(meta), "application/json"), - "fileContent": ("clothing_upload", image, "image/png"), - }, - headers={ - "X-CSRF-TOKEN": csrf, - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:145.0) Gecko/20100101 Firefox/145.0", - "Accept": "*/*", - "Accept-Language": "en-US,en;q=0.5", - "Referer": "https://create.roblox.com/", - "Origin": "https://create.roblox.com", - "Sec-Fetch-Dest": "empty", - "Sec-Fetch-Mode": "cors", - "Sec-Fetch-Site": "same-site", - }, - cookies=CSRF_COOKIES, - ) - - if response.status_code == 429: - raise RateLimitError("Rate limit hit during upload") - - response.raise_for_status() - data = response.json() - - operation_id = data.get("operationId") - if operation_id: - max_tries = 10 - wait_time = 1 - for attempt in range(max_tries): - await asyncio.sleep(wait_time) - op_response = await client.get( - _proxy_url(f"https://apis.roblox.com/assets/user-auth/v1/operations/{operation_id}"), - headers={"X-CSRF-TOKEN": csrf}, - cookies=CSRF_COOKIES, - ) - op_response.raise_for_status() - op_data = op_response.json() - - if op_data.get("done"): - if op_data.get("response") and op_data["response"].get("assetId"): - return {"asset_id": op_data["response"]["assetId"]} - return op_data - - return data - - -async def onsale_asset( - asset_id: int, - name: str, - description: str, - group_id: int, - price: int = 5, -): - """Put an asset on sale.""" - csrf = await _get_csrf_token() - data = { - "saleLocationConfiguration": {"saleLocationType": 1, "places": []}, - "targetId": asset_id, - "priceInRobux": price, - "publishingType": 2, - "idempotencyToken": str(uuid.uuid4()), - "publisherUserId": PUBLISHER_USER_ID, - "creatorGroupId": group_id, - "name": name, - "description": description, - "isFree": False, - "agreedPublishingFee": 0, - "priceOffset": 0, - "quantity": 0, - "quantityLimitPerUser": 0, - "resaleRestriction": 2, - "targetType": 0, - } - client = _get_client() - response = await client.post( - _proxy_url("https://itemconfiguration.roblox.com/v1/collectibles"), - json=data, - headers={ - "X-CSRF-TOKEN": csrf, - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:145.0) Gecko/20100101 Firefox/145.0", - "Referer": "https://create.roblox.com/", - "Origin": "https://create.roblox.com", - }, - cookies=CSRF_COOKIES, - ) - - if response.status_code == 429: - raise RateLimitError("Rate limit hit during onsale") - - response.raise_for_status() - return response.json()