Files
threads/src/main.py
T

242 lines
8.5 KiB
Python

import asyncio
import os
from dotenv import load_dotenv
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import APIKeyHeader
from rbx_upload import ClothingAsset, RateLimitError, RbxAssetType, RobloxClient
import database
from utils import discord, hashing
load_dotenv()
TARGET = os.getenv("TARGET_ID")
VALID_API_KEY = os.getenv("VALID_API_KEY")
ROBLOSECURITY = os.getenv("ROBLOSECURITY_TOKEN")
PUBLISHER_USER_ID = os.getenv("PUBLISHER_USER_ID")
ROBLOX_PROXY = os.getenv("ROBLOX_PROXY")
# Optional separate account for onsaling (falls back to upload account)
ONSALE_ROBLOSECURITY = os.getenv("ONSALE_ROBLOSECURITY_TOKEN", ROBLOSECURITY)
ONSALE_PUBLISHER_USER_ID = os.getenv("ONSALE_PUBLISHER_USER_ID", PUBLISHER_USER_ID)
if TARGET is None:
raise EnvironmentError("TARGET_ID is missing from environment.")
if not VALID_API_KEY:
raise EnvironmentError("VALID_API_KEY is missing from environment.")
if not ROBLOSECURITY:
raise EnvironmentError("ROBLOSECURITY_TOKEN is missing from environment.")
if not PUBLISHER_USER_ID:
raise EnvironmentError("PUBLISHER_USER_ID is missing from environment.")
from contextlib import asynccontextmanager
# Retry Configuration
RETRY_INTERVAL = int(os.getenv("RETRY_INTERVAL_SECONDS", 60))
RETRY_DELAY = int(os.getenv("RETRY_DELAY_SECONDS", 300))
roblox: RobloxClient
roblox_onsale: RobloxClient
_use_separate_onsale = (
ONSALE_ROBLOSECURITY != ROBLOSECURITY
or ONSALE_PUBLISHER_USER_ID != PUBLISHER_USER_ID
)
@asynccontextmanager
async def _create_clients():
global roblox, roblox_onsale
if _use_separate_onsale:
async with RobloxClient(
roblosecurity=ROBLOSECURITY,
publisher_user_id=int(PUBLISHER_USER_ID),
proxy=ROBLOX_PROXY,
) as upload_client, RobloxClient(
roblosecurity=ONSALE_ROBLOSECURITY,
publisher_user_id=int(ONSALE_PUBLISHER_USER_ID),
proxy=ROBLOX_PROXY,
) as onsale_client:
roblox = upload_client
roblox_onsale = onsale_client
yield
else:
async with RobloxClient(
roblosecurity=ROBLOSECURITY,
publisher_user_id=int(PUBLISHER_USER_ID),
proxy=ROBLOX_PROXY,
) as client:
roblox = client
roblox_onsale = client
yield
@asynccontextmanager
async def lifespan(app: FastAPI):
async with _create_clients():
task = asyncio.create_task(process_onsale_queue())
yield
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
app = FastAPI(lifespan=lifespan)
async def process_onsale_queue():
"""Background task to retry failed onsale attempts."""
while True:
try:
pending_items = database.get_pending_onsale_items()
for item in pending_items:
print(
f"Retrying onsale for asset {item['asset_id']} (Attempt {item['retry_count'] + 1})"
)
try:
collectible_item_id = item["collectible_item_id"] or await roblox_onsale.publish_collectible(
item["asset_id"], item["group_id"], item["name"], item["description"], price=5
)
await roblox_onsale.onsale_asset(collectible_item_id)
database.remove_from_onsale_queue(item["id"])
print(
f"Successfully put asset {item['asset_id']} on sale via queue."
)
await discord.send_upload_webhook(
item["name"],
item["original_asset_id"],
item["asset_id"],
item["asset_type"],
)
except RateLimitError:
print(
f"Rate limit hit again for asset {item['asset_id']}, backing off."
)
database.increment_retry_onsale(
item["id"], delay_seconds=RETRY_DELAY
)
except Exception as e:
print(f"Unexpected error retrying asset {item['asset_id']}: {e}")
database.increment_retry_onsale(
item["id"], delay_seconds=RETRY_DELAY * 2
)
await asyncio.sleep(RETRY_INTERVAL)
except Exception as e:
print(f"Error in background queue task: {e}")
await asyncio.sleep(RETRY_INTERVAL)
api_key_header = APIKeyHeader(name="x-api-key")
async def verify_api_key(api_key: str = Depends(api_key_header)):
if api_key != VALID_API_KEY:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Invalid API key"
)
return api_key
@app.get("/asset/{asset_id}")
async def get_asset_info(asset_id: int, _: str = Depends(verify_api_key)):
asset = await roblox.asset_from_id(asset_id)
if isinstance(asset, ClothingAsset):
print("clothing asset found")
image = await roblox.fetch_clothing_image(asset)
with open(f"tests/{asset.asset_id}.png", "wb") as output:
output.write(image)
return {
"asset_id": asset.asset_id,
"name": asset.name,
"asset_type": asset.asset_type,
}
# Global dictionary to store locks per image hash
upload_locks: dict[str, asyncio.Lock] = {}
@app.post("/create/")
async def reupload_asset(asset_id: int, _: str = Depends(verify_api_key)):
asset = await roblox.asset_from_id(asset_id)
if isinstance(asset, ClothingAsset):
print(f"Clothing asset found: {asset.name}")
image = await roblox.fetch_clothing_image(asset)
# Check for duplicates using hash
image_hash = hashing.get_image_hash(image)
# Get or create a lock for this specific hash
if image_hash not in upload_locks:
upload_locks[image_hash] = asyncio.Lock()
async with upload_locks[image_hash]:
# Double-check database inside the lock
existing_new_id = database.get_uploaded_asset(image_hash, asset.asset_type)
if existing_new_id:
print(
f"Asset already uploaded (hash and type match): {existing_new_id}"
)
return {"uploaded": {"asset_id": existing_new_id}}
# Prepare description with original URL
original_url = f"https://www.roblox.com/catalog/{asset_id}"
new_description = f"{asset.description}\n\nOriginal: {original_url}"
uploaded = await roblox.upload_clothing_image(
image,
asset.name,
new_description,
asset.asset_type,
int(TARGET),
)
new_asset_id = uploaded.get("asset_id")
if new_asset_id:
# Save to database
database.save_uploaded_asset(
image_hash, asset.asset_type, asset_id, new_asset_id
)
await asyncio.sleep(5)
asset_type_name = (
"Shirt" if asset.asset_type == RbxAssetType.SHIRT else "Pants"
)
try:
collectible_item_id = await roblox_onsale.publish_collectible(
new_asset_id, int(TARGET), asset.name, new_description
)
await roblox_onsale.onsale_asset(collectible_item_id)
except (RateLimitError, Exception) as e:
print(
f"Publish/onsale failed for asset {new_asset_id}, adding to retry queue: {e}"
)
database.add_to_onsale_queue(
new_asset_id,
asset_id,
asset.name,
new_description,
int(TARGET),
asset_type_name,
None,
)
return {
"uploaded": uploaded,
"onsale": "queued",
"info": f"Publish/onsale failed, will retry automatically: {e}",
}
# Send Discord notification (only for successful initial onsale)
await discord.send_upload_webhook(
asset.name, asset_id, new_asset_id, asset_type_name
)
return {"uploaded": uploaded}
return uploaded
raise HTTPException(status_code=400, detail="Asset is not a clothing asset.")