refactor: migrate to rbx-upload library

- Replace hand-rolled roblox_service.py and models.py with rbx-upload
- Initialize RobloxClient in FastAPI lifespan context manager
- Remove leaked singleton httpx.AsyncClient
- Simplify upload_clothing_image call (group_id int instead of RbxCreator)
- Fix GET /asset/{id} returning a set literal instead of a dict
- Drop httpx direct dependency (now transitive via rbx-upload)
This commit is contained in:
2026-02-18 09:41:22 -05:00
parent d2bed596ae
commit 5afbdd7d94
4 changed files with 100 additions and 432 deletions
+1 -3
View File
@@ -7,7 +7,5 @@ requires-python = ">=3.13"
dependencies = [
"fastapi[standard]>=0.126.0",
"python-dotenv>=1.2.1",
"httpx>=0.25.0",
"rbx-upload",
]
+86 -45
View File
@@ -4,77 +4,104 @@ import os
from dotenv import load_dotenv
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import APIKeyHeader
from rbx_upload import ClothingAsset, RateLimitError, RbxAssetType, RobloxClient
import models
from utils import roblox_service, hashing, discord
import database
from utils import discord, hashing
load_dotenv()
TARGET = os.getenv("TARGET_ID")
VALID_API_KEY = os.getenv("VALID_API_KEY")
ROBLOSECURITY = os.getenv("ROBLOSECURITY_TOKEN")
PUBLISHER_USER_ID = os.getenv("PUBLISHER_USER_ID")
ROBLOX_PROXY = os.getenv("ROBLOX_PROXY")
if TARGET is None:
raise EnvironmentError("TARGET_ID is missing from environment.")
if not VALID_API_KEY:
raise EnvironmentError("VALID_API_KEY is missing from environment.")
if not ROBLOSECURITY:
raise EnvironmentError("ROBLOSECURITY_TOKEN is missing from environment.")
if not PUBLISHER_USER_ID:
raise EnvironmentError("PUBLISHER_USER_ID is missing from environment.")
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# Start background task
task = asyncio.create_task(process_onsale_queue())
yield
# Cleanup
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
app = FastAPI(lifespan=lifespan)
# Retry Configuration
RETRY_INTERVAL = int(os.getenv("RETRY_INTERVAL_SECONDS", 60))
RETRY_DELAY = int(os.getenv("RETRY_DELAY_SECONDS", 300))
roblox: RobloxClient
@asynccontextmanager
async def lifespan(app: FastAPI):
global roblox
async with RobloxClient(
roblosecurity=ROBLOSECURITY,
publisher_user_id=int(PUBLISHER_USER_ID),
proxy=ROBLOX_PROXY,
) as client:
roblox = client
task = asyncio.create_task(process_onsale_queue())
yield
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
app = FastAPI(lifespan=lifespan)
async def process_onsale_queue():
"""Background task to retry failed onsale attempts."""
while True:
try:
pending_items = database.get_pending_onsale_items()
for item in pending_items:
print(f"Retrying onsale for asset {item['asset_id']} (Attempt {item['retry_count'] + 1})")
print(
f"Retrying onsale for asset {item['asset_id']} (Attempt {item['retry_count'] + 1})"
)
try:
await roblox_service.onsale_asset(
await roblox.onsale_asset(
item["asset_id"],
item["name"],
item["description"],
item["group_id"],
)
database.remove_from_onsale_queue(item["id"])
print(f"Successfully put asset {item['asset_id']} on sale via queue.")
print(
f"Successfully put asset {item['asset_id']} on sale via queue."
)
# Send Discord notification on successful retry
await discord.send_upload_webhook(
item["name"],
item["original_asset_id"],
item["asset_id"],
item["asset_type"]
item["asset_type"],
)
except RateLimitError:
print(
f"Rate limit hit again for asset {item['asset_id']}, backing off."
)
database.increment_retry_onsale(
item["id"], delay_seconds=RETRY_DELAY
)
except roblox_service.RateLimitError:
print(f"Rate limit hit again for asset {item['asset_id']}, backing off.")
database.increment_retry_onsale(item["id"], delay_seconds=RETRY_DELAY)
except Exception as e:
print(f"Unexpected error retrying asset {item['asset_id']}: {e}")
database.increment_retry_onsale(item["id"], delay_seconds=RETRY_DELAY * 2)
database.increment_retry_onsale(
item["id"], delay_seconds=RETRY_DELAY * 2
)
await asyncio.sleep(RETRY_INTERVAL)
except Exception as e:
print(f"Error in background queue task: {e}")
await asyncio.sleep(RETRY_INTERVAL)
api_key_header = APIKeyHeader(name="x-api-key")
@@ -88,25 +115,29 @@ async def verify_api_key(api_key: str = Depends(api_key_header)):
@app.get("/asset/{asset_id}")
async def get_asset_info(asset_id: int, _: str = Depends(verify_api_key)):
asset = await roblox_service.asset_from_id(asset_id)
final_dict = {asset}
if isinstance(asset, models.ClothingAsset):
asset = await roblox.asset_from_id(asset_id)
if isinstance(asset, ClothingAsset):
print("clothing asset found")
image = await roblox_service.fetch_clothing_image(asset)
image = await roblox.fetch_clothing_image(asset)
with open(f"tests/{asset.asset_id}.png", "wb") as output:
output.write(image)
return final_dict
return {
"asset_id": asset.asset_id,
"name": asset.name,
"asset_type": asset.asset_type,
}
# Global dictionary to store locks per image hash
upload_locks: dict[str, asyncio.Lock] = {}
@app.post("/create/")
async def reupload_asset(asset_id: int, _: str = Depends(verify_api_key)):
asset = await roblox_service.asset_from_id(asset_id)
if isinstance(asset, models.ClothingAsset):
asset = await roblox.asset_from_id(asset_id)
if isinstance(asset, ClothingAsset):
print(f"Clothing asset found: {asset.name}")
image = await roblox_service.fetch_clothing_image(asset)
image = await roblox.fetch_clothing_image(asset)
# Check for duplicates using hash
image_hash = hashing.get_image_hash(image)
@@ -120,51 +151,61 @@ async def reupload_asset(asset_id: int, _: str = Depends(verify_api_key)):
existing_new_id = database.get_uploaded_asset(image_hash, asset.asset_type)
if existing_new_id:
print(f"Asset already uploaded (hash and type match): {existing_new_id}")
print(
f"Asset already uploaded (hash and type match): {existing_new_id}"
)
return {"uploaded": {"asset_id": existing_new_id}}
# Prepare description with original URL
original_url = f"https://www.roblox.com/catalog/{asset_id}"
new_description = f"{asset.description}\n\nOriginal: {original_url}"
uploaded = await roblox_service.upload_clothing_image(
uploaded = await roblox.upload_clothing_image(
image,
asset.name,
new_description,
asset.asset_type,
models.RbxCreator(int(TARGET), "Upload_Group", "Group"),
int(TARGET),
)
new_asset_id = uploaded.get("asset_id")
if new_asset_id:
# Save to database
database.save_uploaded_asset(image_hash, asset.asset_type, asset_id, new_asset_id)
database.save_uploaded_asset(
image_hash, asset.asset_type, asset_id, new_asset_id
)
try:
onsale = await roblox_service.onsale_asset(
await roblox.onsale_asset(
new_asset_id,
asset.name,
new_description,
int(TARGET),
)
except roblox_service.RateLimitError:
print(f"Rate limit hit for asset {new_asset_id}, adding to retry queue.")
asset_type_name = "Shirt" if asset.asset_type == models.RbxAssetType.SHIRT else "Pants"
except RateLimitError:
print(
f"Rate limit hit for asset {new_asset_id}, adding to retry queue."
)
asset_type_name = (
"Shirt" if asset.asset_type == RbxAssetType.SHIRT else "Pants"
)
database.add_to_onsale_queue(
new_asset_id,
asset_id, # original
asset_id,
asset.name,
new_description,
int(TARGET),
asset_type_name
asset_type_name,
)
return {
"uploaded": uploaded,
"onsale": "queued",
"info": "Rate limit hit, item will be put on sale automatically later."
"info": "Rate limit hit, item will be put on sale automatically later.",
}
# Send Discord notification (only for successful initial onsale)
asset_type_name = "Shirt" if asset.asset_type == models.RbxAssetType.SHIRT else "Pants"
asset_type_name = (
"Shirt" if asset.asset_type == RbxAssetType.SHIRT else "Pants"
)
await discord.send_upload_webhook(
asset.name, asset_id, new_asset_id, asset_type_name
)
-73
View File
@@ -1,73 +0,0 @@
from enum import IntEnum
from typing import Literal
# Enums
class RbxAssetType(IntEnum):
IMAGE = 1
SHIRT = 11
PANTS = 12
# Types
ClothingAssetType = Literal[
RbxAssetType.SHIRT,
RbxAssetType.PANTS,
]
CreatorType = Literal["User", "Group"]
# Roblox Creator class (for creator info - mainly useful for returning data + TUI)
class RbxCreator:
def __init__(self, creator_id: int, username: str, creator_type: CreatorType):
self.creator_id = creator_id
self.username = username
self.creator_type = creator_type
# Asset base class
class RbxAsset:
def __init__(
self,
asset_id: int,
creator: RbxCreator,
name: str,
description: str,
asset_type: RbxAssetType,
) -> None:
self.asset_id = asset_id
self.name = name
self.description = description
self.creator = creator
self.asset_type = asset_type
# Clothing asset class
class ClothingAsset(RbxAsset):
def __init__(
self,
asset_id: int,
creator: RbxCreator,
name: str,
description: str,
asset_type: ClothingAssetType,
) -> None:
super().__init__(
asset_id=asset_id,
creator=creator,
name=name,
description=description,
asset_type=asset_type,
)
async def get_image(self) -> bytes:
from utils import roblox_service
return await roblox_service.fetch_clothing_image(self)
-298
View File
@@ -1,298 +0,0 @@
import asyncio
import json
import os
import uuid
import xml.etree.ElementTree
import httpx
from dotenv import load_dotenv
import models
load_dotenv()
# Constants and State
ROBLOSECURITY = os.getenv("ROBLOSECURITY_TOKEN")
if not ROBLOSECURITY:
raise EnvironmentError("ROBLOSECURITY_TOKEN is missing from environment.")
# Must match the user ID of the account that owns the ROBLOSECURITY_TOKEN
PUBLISHER_USER_ID = os.getenv("PUBLISHER_USER_ID")
if not PUBLISHER_USER_ID:
raise EnvironmentError("PUBLISHER_USER_ID is missing from environment.")
FETCH_HEADERS = {
"Cookie": f".ROBLOSECURITY={ROBLOSECURITY}",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
}
CSRF_HEADERS = {
"X-CSRF-TOKEN": "",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:145.0) Gecko/20100101 Firefox/145.0",
"Referer": "https://create.roblox.com/",
"Origin": "https://create.roblox.com",
}
CSRF_COOKIES = {".ROBLOSECURITY": ROBLOSECURITY}
# Proxy configuration
ROBLOX_PROXY = os.getenv("ROBLOX_PROXY")
def _proxy_url(url: str) -> str:
"""Redirect roblox.com URLs to a proxy if configured."""
if not ROBLOX_PROXY:
return url
return url.replace("roblox.com", ROBLOX_PROXY)
CSRF_URL = _proxy_url("https://apis.roblox.com/assets/user-auth/v1/assets")
ASSET_DELIVERY_BASE_URL = _proxy_url(
"https://assetdelivery.roblox.com/v1/asset/?id={asset_id}"
)
ECONOMY_BASE_URL = _proxy_url("https://economy.roblox.com/v2/assets/{asset_id}/details")
UPLOAD_URL = _proxy_url("https://apis.roblox.com/assets/user-auth/v1/assets")
# Custom Exceptions
class RateLimitError(Exception):
"""Raised when hitting Roblox rate limits (HTTP 429)."""
pass
# Client instance for connection pooling
_client = None
def _get_client() -> httpx.AsyncClient:
"""Get or create the shared HTTP client for connection pooling."""
global _client
if _client is None:
_client = httpx.AsyncClient()
return _client
# Internal methods
async def _economy_request(asset_id: int) -> httpx.Response:
"""Fetch asset details from the Roblox economy API."""
client = _get_client()
return await client.get(
ECONOMY_BASE_URL.format(asset_id=asset_id), headers=FETCH_HEADERS
)
async def _asset_delivery_request(asset_id: int) -> httpx.Response:
"""Fetch an asset from the Roblox asset delivery API."""
client = _get_client()
return await client.get(
ASSET_DELIVERY_BASE_URL.format(asset_id=asset_id),
headers=FETCH_HEADERS,
follow_redirects=True,
)
async def _get_asset_xml(asset: models.RbxAsset) -> xml.etree.ElementTree.Element:
"""Fetch and parse asset XML content."""
response = await _asset_delivery_request(asset.asset_id)
response.raise_for_status()
content = response.content.decode("utf-8")
xml_root = xml.etree.ElementTree.fromstring(content)
return xml_root
def _get_shirt_template_id_from_xml(root: xml.etree.ElementTree.Element) -> int:
"""Extract shirt template ID from asset XML."""
url_element = root.find(".//url")
if url_element is None:
raise ValueError("XML did not contain a <url> tag.")
url = url_element.text
if not url:
raise ValueError("<url> tag did not contain any text.")
template_id = url.split("id=")[1]
return int(template_id)
async def _get_csrf_token(url: str = CSRF_URL) -> str:
"""Retrieve CSRF token from the Roblox API."""
client = _get_client()
response = await client.post(url, cookies=CSRF_COOKIES, headers=CSRF_HEADERS)
csrf = response.headers.get("X-CSRF-TOKEN")
if not csrf:
raise httpx.HTTPStatusError(
"Failed to retrieve X-CSRF-TOKEN.",
request=response.request,
response=response,
)
return csrf
# External methods
async def asset_from_id(id: int) -> models.RbxAsset:
"""Fetch asset information from Roblox by asset ID."""
response = await _economy_request(id)
response.raise_for_status()
asset_info = json.loads(response.content)
creator_info = asset_info["Creator"]
asset_creator = models.RbxCreator(
creator_id=creator_info["Id"],
username=creator_info["Name"],
creator_type=creator_info["CreatorType"],
)
asset_type_id = asset_info["AssetTypeId"]
if (
asset_type_id == models.RbxAssetType.SHIRT
or asset_type_id == models.RbxAssetType.PANTS
):
return models.ClothingAsset(
asset_id=asset_info["AssetId"],
creator=asset_creator,
name=asset_info["Name"],
description=asset_info["Description"],
asset_type=asset_info["AssetTypeId"],
)
return models.RbxAsset(
asset_id=asset_info["AssetId"],
creator=asset_creator,
name=asset_info["Name"],
description=asset_info["Description"],
asset_type=asset_info["AssetTypeId"],
)
async def fetch_clothing_image(asset: models.ClothingAsset) -> bytes:
"""Fetch the image data for a clothing asset."""
try:
xml = await _get_asset_xml(asset)
template_id = _get_shirt_template_id_from_xml(xml)
image = await _asset_delivery_request(template_id)
image.raise_for_status()
return image.content
except Exception:
raise # TODO add logging
async def upload_clothing_image(
image: bytes,
name: str,
description: str,
asset_type: models.RbxAssetType,
target: models.RbxCreator,
) -> dict:
"""Upload a clothing image to Roblox and return the asset ID."""
csrf = await _get_csrf_token()
meta = {
"displayName": name,
"description": description,
"assetType": asset_type,
# TODO add support for user creation context
"creationContext": {
"creator": {"groupId": target.creator_id},
"expectedPrice": 10,
},
}
client = _get_client()
response = await client.post(
UPLOAD_URL,
files={
"request": (None, json.dumps(meta), "application/json"),
"fileContent": ("clothing_upload", image, "image/png"),
},
headers={
"X-CSRF-TOKEN": csrf,
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:145.0) Gecko/20100101 Firefox/145.0",
"Accept": "*/*",
"Accept-Language": "en-US,en;q=0.5",
"Referer": "https://create.roblox.com/",
"Origin": "https://create.roblox.com",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-site",
},
cookies=CSRF_COOKIES,
)
if response.status_code == 429:
raise RateLimitError("Rate limit hit during upload")
response.raise_for_status()
data = response.json()
operation_id = data.get("operationId")
if operation_id:
max_tries = 10
wait_time = 1
for attempt in range(max_tries):
await asyncio.sleep(wait_time)
op_response = await client.get(
_proxy_url(f"https://apis.roblox.com/assets/user-auth/v1/operations/{operation_id}"),
headers={"X-CSRF-TOKEN": csrf},
cookies=CSRF_COOKIES,
)
op_response.raise_for_status()
op_data = op_response.json()
if op_data.get("done"):
if op_data.get("response") and op_data["response"].get("assetId"):
return {"asset_id": op_data["response"]["assetId"]}
return op_data
return data
async def onsale_asset(
asset_id: int,
name: str,
description: str,
group_id: int,
price: int = 5,
):
"""Put an asset on sale."""
csrf = await _get_csrf_token()
data = {
"saleLocationConfiguration": {"saleLocationType": 1, "places": []},
"targetId": asset_id,
"priceInRobux": price,
"publishingType": 2,
"idempotencyToken": str(uuid.uuid4()),
"publisherUserId": PUBLISHER_USER_ID,
"creatorGroupId": group_id,
"name": name,
"description": description,
"isFree": False,
"agreedPublishingFee": 0,
"priceOffset": 0,
"quantity": 0,
"quantityLimitPerUser": 0,
"resaleRestriction": 2,
"targetType": 0,
}
client = _get_client()
response = await client.post(
_proxy_url("https://itemconfiguration.roblox.com/v1/collectibles"),
json=data,
headers={
"X-CSRF-TOKEN": csrf,
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:145.0) Gecko/20100101 Firefox/145.0",
"Referer": "https://create.roblox.com/",
"Origin": "https://create.roblox.com",
},
cookies=CSRF_COOKIES,
)
if response.status_code == 429:
raise RateLimitError("Rate limit hit during onsale")
response.raise_for_status()
return response.json()