I’m rewriting a script to upload large batches of images to Shopify using the GraphQL API. I keep running into two issues, the first is images that upload successfully do not process properly. The second issue is I hit the rate limit and the script breaks out of the loop despite having exponential rate limiting. I’ve tried rate limiting and using different queries in GQL to facilitate the upload and I’ve identified the appropriate one. I have everything broken down into classes > definitions to make things easier to work with. Outside of this class is where I get the necessary data for the GQL queries.
Apologies if this is too much code to work with, I’m not sure if its all necessary but it is all used when executing this portion of the script. f
class ShopifyPhotoUploader:
def __init__(self, connection):
self.connection = connection
self.missing_photos = []
self.successfully_uploaded = []
def select_folder(self):
root = tk.Tk()
root.withdraw()
folder_selected = filedialog.askdirectory()
return folder_selected
def generate_staged_uploads(self, image_path):
original_filename = os.path.basename(image_path)
file_size = os.path.getsize(image_path)
# Define backoff parameters
max_retries = 5
base_delay = 5 # in seconds
max_delay = 60 # in seconds
# GraphQL mutation for generating staged uploads
mutation = """
mutation generateStagedUploads($input: [StagedUploadInput!]!) {
stagedUploadsCreate(input: $input) {
stagedTargets {
url
resourceUrl
parameters {
name
value
}
}
userErrors {
field
message
}
}
}
"""
# Variables for the mutation
variables = {
"input": [
{
"filename": original_filename,
"mimeType": "image/jpg",
"resource": "IMAGE",
"fileSize": str(file_size)
}
]
}
for attempt in range(max_retries):
try:
# Send the mutation to Shopify
response = self.connection.make_request(mutation, variables)
if not response:
raise Exception("No response received from Shopify.")
# Handle the response
if 'data' in response and 'stagedUploadsCreate' in response['data']:
staged_targets = response['data']['stagedUploadsCreate']['stagedTargets']
if staged_targets:
url = staged_targets[0]['url']
# If successful, break out of the loop
if url:
print(url) # This will print the URL for your reference
return url
else:
raise Exception("Failed to generate staged uploads")
except Exception as e:
# Handle exceptions that you expect, like rate limiting
if "Throttled" in str(e) and attempt < max_retries - 1:
delay = min(base_delay * (2 ** attempt), max_delay)
print(f"Rate limited. Retrying in {delay} seconds...")
time.sleep(delay)
else:
# If it's an unexpected exception or you've reached max retries, raise the exception
raise
raise Exception("Max retries reached for generate_staged_uploads")
def handle_staged_upload_response(self, response, original_filename):
"""Handle the response from the generate_staged_uploads function and extract the URL."""
if not response:
print(f"Error: No response received for {original_filename}.")
return None, None
# Check if the response contains the necessary data
if 'data' in response and 'stagedUploadsCreate' in response['data']:
staged_targets = response['data']['stagedUploadsCreate']['stagedTargets']
# Check if stagedTargets contains any data
if staged_targets:
url = staged_targets[0]['url']
resource_url = staged_targets[0]['resourceUrl']
return url, resource_url
else:
print(f"Error: No staged targets found in the response for {original_filename}.")
return None, None
else:
print(f"Error: Invalid response format for {original_filename}.")
return None, None
def upload_images_from_folder(self, folder_path, SKUandProductID):
print(f"Starting upload process from folder: {folder_path}")
print("SKUandProductID List:", SKUandProductID)
# Iterate through files in the folder and subfolders
for root, _, files in os.walk(folder_path):
for file in files:
print(f"Checking file: {file}")
# Check if the file matches the naming scheme "GCE#20001_1.jpg"
match = re.match(r'GCE#(d+)_(d+).jpg', file)
if match:
sku, position = match.groups()
print(f"Checking file: {file}")
print(f"Extracted SKU: {sku}")
##product_id = next((item['product_id'] for item in SKUandProductID if item['sku'] == sku), None)
product_id = next((item[1] for item in SKUandProductID if item[0] == sku), None)
if product_id:
full_image_path = os.path.join(root, file)
print(f"Attempting to upload image: {full_image_path} for SKU: {sku}")
if self.upload_image(product_id, full_image_path, int(position), sku):
self.successfully_uploaded.append({'product_id': product_id, 'sku': sku})
else:
print(f"Missing product ID for SKU: {sku}")
self.missing_photos.append(sku)
else:
print(f"File {file} does not match the expected naming scheme.")
# Create a log file for SKUs without photos
if self.missing_photos:
print(f"Logging SKUs without photos to 'missing_photos.log'")
with open('missing_photos.log', 'w') as log_file:
log_file.write("n".join(self.missing_photos))
# Update metafields for successfully uploaded images
for item in self.successfully_uploaded:
print(f"Updating metafield for product ID: {item['product_id']}")
self.update_metafield(item['product_id'], "custom", "photo_shoot_", "True")
def upload_image(self, product_id, image_path, position, sku):
"""Upload an image to Shopify using GraphQL."""
# Encode the image to base64
##with open(image_path, "rb") as image_file:
## encoded_image = base64.b64encode(image_file.read()).decode('utf-8')
staged_url = self.generate_staged_uploads(image_path)
# GraphQL mutation for uploading the image
mutation = """
mutation productCreateMedia($media: [CreateMediaInput!]!, $productId: ID!) {
productCreateMedia(media: $media, productId: $productId) {
media {
alt
mediaContentType
status
}
mediaUserErrors {
field
message
}
product {
id
title
}
}
}
"""
# Variables for the mutation
variables = {
"media": [
{
"alt": f"{sku}",
"mediaContentType": "IMAGE",
"originalSource": f"{staged_url}"
}
],
"productId": f"{product_id}"
}
# Send the mutation to Shopify
response = self.connection.make_request(mutation, variables)
if response:
data = response
else:
print(f"Error: No response received when uploading image for SKU:{sku}_{position}.")
return False
# Check for errors
user_errors = data.get('data', {}).get('productCreateMedia', {}).get('mediaUserErrors', [])
if user_errors:
for error in user_errors:
field = error.get('field', 'Unknown field')
message = error.get('message', 'Unknown error')
print(f"Error uploading image for SKU {sku} at position {position}: {field} - {message}")
return False
else:
image_id = data.get('data', {}).get('productCreateMedia', {}).get('media', [{}])[0].get('id')
image_src = data.get('data', {}).get('productCreateMedia', {}).get('media', [{}])[0].get('src')
print(f"Successfully uploaded image for SKU {sku} at position {position}. Image ID: {image_id}, Image URL: {image_src}")
return True