import asyncio import os from dotenv import load_dotenv from fastapi import Depends, FastAPI, HTTPException, status from fastapi.security import APIKeyHeader from rbx_upload import ClothingAsset, RateLimitError, RbxAssetType, RobloxClient import database from utils import discord, hashing load_dotenv() TARGET = os.getenv("TARGET_ID") VALID_API_KEY = os.getenv("VALID_API_KEY") ROBLOSECURITY = os.getenv("ROBLOSECURITY_TOKEN") PUBLISHER_USER_ID = os.getenv("PUBLISHER_USER_ID") ROBLOX_PROXY = os.getenv("ROBLOX_PROXY") # Optional separate account for onsaling (falls back to upload account) ONSALE_ROBLOSECURITY = os.getenv("ONSALE_ROBLOSECURITY_TOKEN", ROBLOSECURITY) ONSALE_PUBLISHER_USER_ID = os.getenv("ONSALE_PUBLISHER_USER_ID", PUBLISHER_USER_ID) if TARGET is None: raise EnvironmentError("TARGET_ID is missing from environment.") if not VALID_API_KEY: raise EnvironmentError("VALID_API_KEY is missing from environment.") if not ROBLOSECURITY: raise EnvironmentError("ROBLOSECURITY_TOKEN is missing from environment.") if not PUBLISHER_USER_ID: raise EnvironmentError("PUBLISHER_USER_ID is missing from environment.") from contextlib import asynccontextmanager # Retry Configuration RETRY_INTERVAL = int(os.getenv("RETRY_INTERVAL_SECONDS", 60)) RETRY_DELAY = int(os.getenv("RETRY_DELAY_SECONDS", 300)) roblox: RobloxClient roblox_onsale: RobloxClient _use_separate_onsale = ( ONSALE_ROBLOSECURITY != ROBLOSECURITY or ONSALE_PUBLISHER_USER_ID != PUBLISHER_USER_ID ) @asynccontextmanager async def _create_clients(): global roblox, roblox_onsale if _use_separate_onsale: async with RobloxClient( roblosecurity=ROBLOSECURITY, publisher_user_id=int(PUBLISHER_USER_ID), proxy=ROBLOX_PROXY, ) as upload_client, RobloxClient( roblosecurity=ONSALE_ROBLOSECURITY, publisher_user_id=int(ONSALE_PUBLISHER_USER_ID), proxy=ROBLOX_PROXY, ) as onsale_client: roblox = upload_client roblox_onsale = onsale_client yield else: async with RobloxClient( roblosecurity=ROBLOSECURITY, publisher_user_id=int(PUBLISHER_USER_ID), proxy=ROBLOX_PROXY, ) as client: roblox = client roblox_onsale = client yield @asynccontextmanager async def lifespan(app: FastAPI): async with _create_clients(): task = asyncio.create_task(process_onsale_queue()) yield task.cancel() try: await task except asyncio.CancelledError: pass app = FastAPI(lifespan=lifespan) async def process_onsale_queue(): """Background task to retry failed onsale attempts.""" while True: try: pending_items = database.get_pending_onsale_items() for item in pending_items: print( f"Retrying onsale for asset {item['asset_id']} (Attempt {item['retry_count'] + 1})" ) try: collectible_item_id = item["collectible_item_id"] or await roblox_onsale.publish_collectible( item["asset_id"], item["group_id"], item["name"], item["description"], price=5 ) await roblox_onsale.onsale_asset(collectible_item_id) database.remove_from_onsale_queue(item["id"]) print( f"Successfully put asset {item['asset_id']} on sale via queue." ) await discord.send_upload_webhook( item["name"], item["original_asset_id"], item["asset_id"], item["asset_type"], ) except RateLimitError: print( f"Rate limit hit again for asset {item['asset_id']}, backing off." ) database.increment_retry_onsale( item["id"], delay_seconds=RETRY_DELAY ) except Exception as e: print(f"Unexpected error retrying asset {item['asset_id']}: {e}") database.increment_retry_onsale( item["id"], delay_seconds=RETRY_DELAY * 2 ) await asyncio.sleep(RETRY_INTERVAL) except Exception as e: print(f"Error in background queue task: {e}") await asyncio.sleep(RETRY_INTERVAL) api_key_header = APIKeyHeader(name="x-api-key") async def verify_api_key(api_key: str = Depends(api_key_header)): if api_key != VALID_API_KEY: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Invalid API key" ) return api_key @app.get("/asset/{asset_id}") async def get_asset_info(asset_id: int, _: str = Depends(verify_api_key)): asset = await roblox.asset_from_id(asset_id) if isinstance(asset, ClothingAsset): print("clothing asset found") image = await roblox.fetch_clothing_image(asset) with open(f"tests/{asset.asset_id}.png", "wb") as output: output.write(image) return { "asset_id": asset.asset_id, "name": asset.name, "asset_type": asset.asset_type, } # Global dictionary to store locks per image hash upload_locks: dict[str, asyncio.Lock] = {} @app.post("/create/") async def reupload_asset(asset_id: int, _: str = Depends(verify_api_key)): asset = await roblox.asset_from_id(asset_id) if isinstance(asset, ClothingAsset): print(f"Clothing asset found: {asset.name}") image = await roblox.fetch_clothing_image(asset) # Check for duplicates using hash image_hash = hashing.get_image_hash(image) # Get or create a lock for this specific hash if image_hash not in upload_locks: upload_locks[image_hash] = asyncio.Lock() async with upload_locks[image_hash]: # Double-check database inside the lock existing_new_id = database.get_uploaded_asset(image_hash, asset.asset_type) if existing_new_id: print( f"Asset already uploaded (hash and type match): {existing_new_id}" ) return {"uploaded": {"asset_id": existing_new_id}} # Prepare description with original URL original_url = f"https://www.roblox.com/catalog/{asset_id}" new_description = f"{asset.description}\n\nOriginal: {original_url}" uploaded = await roblox.upload_clothing_image( image, asset.name, new_description, asset.asset_type, int(TARGET), ) new_asset_id = uploaded.get("asset_id") if new_asset_id: # Save to database database.save_uploaded_asset( image_hash, asset.asset_type, asset_id, new_asset_id ) await asyncio.sleep(5) collectible_item_id = await roblox_onsale.publish_collectible( new_asset_id, int(TARGET), asset.name, new_description ) try: await roblox_onsale.onsale_asset(collectible_item_id) except RateLimitError: print( f"Rate limit hit for asset {new_asset_id}, adding to retry queue." ) asset_type_name = ( "Shirt" if asset.asset_type == RbxAssetType.SHIRT else "Pants" ) database.add_to_onsale_queue( new_asset_id, asset_id, asset.name, new_description, int(TARGET), asset_type_name, collectible_item_id, ) return { "uploaded": uploaded, "onsale": "queued", "info": "Rate limit hit, item will be put on sale automatically later.", } # Send Discord notification (only for successful initial onsale) asset_type_name = ( "Shirt" if asset.asset_type == RbxAssetType.SHIRT else "Pants" ) await discord.send_upload_webhook( asset.name, asset_id, new_asset_id, asset_type_name ) return {"uploaded": uploaded} return uploaded raise HTTPException(status_code=400, detail="Asset is not a clothing asset.")