I’ve tried adjusting the formatting to work but I keep running into the same error regardless of the changes. I’m new to GraphQL and Shopify so I’m a little stumped.
The goal I’m trying to achieve is automating uploading some where around 2000 photos to a shopify store, and doing so based on a SKU/ProductID combo to prevent the photos from going to the wrong item. I already have a fairly functional version of this using REST API instead but I wanted to make things more efficient and faster by using GraphQL.
import os
import re
import time
import base64
import logging
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import tkinter as tk
from tkinter import filedialog
from shopify_config import SHOPIFY_STORE_URL, API_KEY, PASSWORD, API_VERSION
class ShopifyConnection:
def __init__(self):
self.base_url = f"{SHOPIFY_STORE_URL}/admin/api/{API_VERSION}/graphql.json"
self.headers = {
'Content-Type': 'application/json',
'X-Shopify-Access-Token': PASSWORD
}
self.session = self._setup_session()
def _setup_session(self):
"""Sets up a session with retry capabilities."""
session = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
session.mount('https://', HTTPAdapter(max_retries=retries))
return session
def make_request(self, query):
"""Makes a GraphQL request to the Shopify API."""
return self.session.post(self.base_url, headers=self.headers, json={'query': query})
##Used to paginate through shopify inventory and handle rate limits
class ShopifyPaginator:
def __init__(self, connection):
self.connection = connection
def _handle_rate_limits(self, response):
"""Handle rate limits based on the response headers."""
# Extract the rate limit headers
query_cost = int(response.headers.get('X-Shopify-Storefront-Graphql-Call-Cost', 0))
remaining_cost = int(response.headers.get('X-Shopify-Storefront-Graphql-Remaining-Cost', 1000))
# Calculate the time to sleep based on the query cost and remaining bucket size
if remaining_cost < query_cost:
sleep_time = (query_cost - remaining_cost) / 50 # 50 points refill per second
print("---------------------------------------------------------")
print(f"Rate limit usage high, sleeping for {sleep_time} seconds")
print("---------------------------------------------------------")
time.sleep(sleep_time)
def get_all_items(self, query, item_key):
"""Fetch all items from a paginated Shopify GraphQL endpoint."""
items = []
cursor = None
while True:
# Format the query with the cursor if it exists
formatted_query = query.format(cursor=f'"{cursor}"' if cursor else 'null')
response = self.connection.make_request(formatted_query)
self._handle_rate_limits(response) # Handle rate limits
data = response.json()
items.extend(data.get('data', {}).get(item_key, {}).get('edges', []))
# Check for pagination
if items and 'cursor' in items[-1]:
cursor = items[-1]['cursor']
else:
break
return items
##Filters for shopify
class ShopifyProductFilter:
def __init__(self, paginator, namespace, key):
self.paginator = paginator
self.namespace = namespace
self.key = key
def get_ProductIDandSKUsWithPhotoFalse(self):
"""Fetch a list of product IDs and their respective SKUs where the metafield value is FALSE."""
query = """
{{
products(first: 250, after: {cursor}) {{
edges {{
cursor
node {{
id
variants(first: 5) {{
edges {{
node {{
sku
}}
}}
}}
metafields(first: 5, namespace: "{namespace}", key: "{key}") {{
edges {{
node {{
value
}}
}}
}}
}}
}}
}}
}}
"""
formatted_query = query.format(namespace=self.namespace, key=self.key, cursor="{cursor}")
products = self.paginator.get_all_items(formatted_query, 'products')
SKUandProductID = []
for product in products:
if any(metafield['node']['value'] == 'FALSE' for metafield in product['node']['metafields']['edges']):
product_id = product['node']['id']
for variant in product['node']['variants']['edges']:
sku = variant['node']['sku']
SKUandProductID.append({'product_id': product_id, 'sku': sku})
return SKUandProductID
class ShopifyPhotoUploader:
def __init__(self, connection):
self.connection = connection
self.missing_photos = []
self.successfully_uploaded = []
def select_folder(self):
root = tk.Tk()
root.withdraw()
folder_selected = filedialog.askdirectory()
return folder_selected
def upload_images_from_folder(self, folder_path, SKUandProductID):
# Iterate through files in the folder and subfolders
for root, _, files in os.walk(folder_path):
for file in files:
# Check if the file matches the naming scheme "GCE#20001_1.jpg"
match = re.match(r'GCE#(d+)_(d+).jpg', file)
if match:
sku, position = match.groups()
product_id = next((item['product_id'] for item in SKUandProductID if item['sku'] == sku), None)
if product_id:
full_image_path = os.path.join(root, file)
if self.upload_image(product_id, full_image_path, int(position), sku):
self.successfully_uploaded.append({'product_id': product_id, 'sku': sku})
else:
self.missing_photos.append(sku)
# Create a log file for SKUs without photos
if self.missing_photos:
with open('missing_photos.log', 'w') as log_file:
log_file.write("n".join(self.missing_photos))
# Update metafields for successfully uploaded images
for item in self.successfully_uploaded:
self.update_metafield(item['product_id'], "custom", "photo_shoot_", "True")
def upload_image(self, product_id, image_path, position, sku):
"""Upload an image to Shopify using GraphQL."""
# Encode the image to base64
with open(image_path, "rb") as image_file:
encoded_image = base64.b64encode(image_file.read()).decode('utf-8')
# GraphQL mutation for uploading the image
mutation = """
mutation productImageCreate($productId: ID!, $image: ImageInput!) {
productImageCreate(productId: $productId, image: $image) {
image {
id
src
}
userErrors {
field
message
}
}
}
"""
# Variables for the mutation
variables = {
"productId": product_id,
"image": {
"attachment": encoded_image,
"filename": f"{sku}_{position}.jpg",
"position": position
}
}
# Send the mutation to Shopify
response = self.connection.make_request(mutation, variables)
data = response.json()
# Check for errors
user_errors = data.get('data', {}).get('productImageCreate', {}).get('userErrors', [])
if user_errors:
for error in user_errors:
field = error.get('field', 'Unknown field')
message = error.get('message', 'Unknown error')
print(f"Error uploading image for SKU {sku} at position {position}: {field} - {message}")
return False
else:
image_id = data.get('data', {}).get('productImageCreate', {}).get('image', {}).get('id')
image_src = data.get('data', {}).get('productImageCreate', {}).get('image', {}).get('src')
print(f"Successfully uploaded image for SKU {sku} at position {position}. Image ID: {image_id}, Image URL: {image_src}")
return True
def update_metafield(self, product_id, namespace, key, value):
"""Update a product's metafield using GraphQL."""
mutation = """
mutation productUpdate($input: ProductInput!) {
productUpdate(input: $input) {
product {
id
}
userErrors {
field
message
}
}
}
"""
# Variables for the mutation
variables = {
"input": {
"id": product_id,
"metafields": [{
"namespace": namespace,
"key": key,
"value": value,
"valueType": "STRING"
}]
}
}
# Send the mutation to Shopify
response = self.connection.make_request(mutation, variables)
data = response.json()
# Check for errors
user_errors = data.get('data', {}).get('productUpdate', {}).get('userErrors', [])
if user_errors:
for error in user_errors:
field = error.get('field', 'Unknown field')
message = error.get('message', 'Unknown error')
print(f"Error updating metafield for product {product_id}: {field} - {message}")
else:
print(f"Successfully updated metafield for product {product_id}.")
if __name__ == '__main__':
NAMESPACE = 'custom'
KEY = 'photo_shoot_'
connection = ShopifyConnection()
paginator = ShopifyPaginator(connection)
filter = ShopifyProductFilter(paginator, NAMESPACE, KEY)
SKUandProductID = filter.get_ProductIDandSKUsWithPhotoFalse()
uploader = ShopifyPhotoUploader(connection)
folder_path = uploader.select_folder()
if not folder_path:
print("No folder selected. Exiting")
exit()
uploader.upload_images_from_folder(folder_path, SKUandProductID)
print("Image upload process complete!")
Here is the error
File "c:Users**DesktopShopifyAutomationRefactoredCodeuploadTool.py", line 272, in <module>
SKUandProductID = filter.get_ProductIDandSKUsWithPhotoFalse()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "c:Users**DesktopShopifyAutomationRefactoredCodeuploadTool.py", line 117, in get_ProductIDandSKUsWithPhotoFalse
products = self.paginator.get_all_items(formatted_query, 'products')
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "c:Users**DesktopShopifyAutomationRefactoredCodeuploadTool.py", line 60, in get_all_items
formatted_query = query.format(cursor=f'"{cursor}"' if cursor else 'null')
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyError: 'n products(first'
1
Please trim your code to make it easier to find your problem. Follow these guidelines to create a minimal reproducible example.
Bot
2 hours ago