diff --git a/.gitignore b/.gitignore index 15f20011..3d7b0ace 100644 --- a/.gitignore +++ b/.gitignore @@ -6,5 +6,6 @@ /pkg/ /spec/reports/ /tmp/ +/vendor/ Gemfile.lock /.rspec_status diff --git a/README.md b/README.md index 3c81da2e..ac58ed39 100644 --- a/README.md +++ b/README.md @@ -24,10 +24,11 @@ A toolkit for working with Custom Shopify Apps built on Rails. - [ ] GraphQL Admin API client with built-in caching - [ ] GraphQL Admin API client with built-in error handling - [ ] GraphQL Admin API client with built-in logging -- [ ] Bulk Operations - - [ ] Interface for uploading and getting results for query / mutation - - [ ] Error handling and Logging - - [ ] Callbacks +- [x] Bulk Operations + - [x] Interface for uploading and getting results for query / mutation + - [x] Error handling and Logging + - [x] Callbacks + - [x] CLI commands ## Installation @@ -137,6 +138,210 @@ shopify-toolkit analyze products-result.csv --force-import => 116103 ``` +### Working with Bulk Operations + +Bulk Operations allow you to asynchronously run large GraphQL queries and mutations against the Shopify Admin API without worrying about rate limits or managing pagination manually. + +#### Ruby API + +Include the `ShopifyToolkit::BulkOperations` module in your class to access bulk operations functionality: + +```ruby +class MyService + include ShopifyToolkit::BulkOperations + + def export_all_products + query = <<~GRAPHQL + { + products { + edges { + node { + id + title + handle + productType + vendor + createdAt + variants { + edges { + node { + id + title + price + inventoryQuantity + } + } + } + } + } + } + } + GRAPHQL + + # Submit the bulk query + operation = run_bulk_query(query) + operation_id = operation.dig("bulkOperation", "id") + + # Poll until completion + completed = poll_until_complete(operation_id) do |status| + puts "Status: #{status["status"]}, Objects: #{status["objectCount"]}" + end + + # Download and parse results + if completed["status"] == "COMPLETED" + results = download_results(completed) + puts "Downloaded #{results.size} products" + return results + end + end + + def bulk_create_products(products_data) + mutation = <<~GRAPHQL + mutation createProduct($input: ProductInput!) { + productCreate(input: $input) { + product { + id + title + handle + } + userErrors { + field + message + } + } + } + GRAPHQL + + # Prepare variables for each product + variables = products_data.map { |product| { input: product } } + + # Submit bulk mutation + operation = run_bulk_mutation(mutation, variables) + operation_id = operation.dig("bulkOperation", "id") + + # Wait for completion + completed = poll_until_complete(operation_id) + + if completed["status"] == "COMPLETED" + results = download_results(completed) + puts "Created #{results.size} products" + return results + end + end +end +``` + +#### CLI Commands + +The gem provides several CLI commands for working with bulk operations: + +##### Bulk Query + +Submit a bulk GraphQL query: + +```bash +# Submit a query and get the operation ID +shopify-toolkit bulk_query examples/bulk_query_products.graphql + +# Submit and poll until completion, then download results +shopify-toolkit bulk_query examples/bulk_query_products.graphql --poll --output results.json + +# Submit with object grouping enabled +shopify-toolkit bulk_query examples/bulk_query_products.graphql --group-objects +``` + +##### Bulk Mutation + +Submit a bulk GraphQL mutation with variables: + +```bash +# Submit a mutation with JSON variables file +shopify-toolkit bulk_mutation examples/bulk_mutation_products.graphql examples/bulk_mutation_variables.json + +# Submit with JSONL variables file and poll for completion +shopify-toolkit bulk_mutation examples/bulk_mutation_products.graphql examples/bulk_mutation_variables.jsonl --poll + +# Submit with a client identifier for tracking +shopify-toolkit bulk_mutation examples/bulk_mutation_products.graphql examples/bulk_mutation_variables.json --client-identifier "my-import-job" +``` + +##### Check Status + +Check the status of a bulk operation: + +```bash +# Check current bulk operation status +shopify-toolkit bulk_status + +# Check status of specific operation +shopify-toolkit bulk_status gid://shopify/BulkOperation/123456 + +# Filter by operation type +shopify-toolkit bulk_status --type QUERY +``` + +##### Cancel Operation + +Cancel a running bulk operation: + +```bash +shopify-toolkit bulk_cancel gid://shopify/BulkOperation/123456 +``` + +##### Download Results + +Download and display results from a completed operation: + +```bash +# Download results by operation ID +shopify-toolkit bulk_results gid://shopify/BulkOperation/123456 --output results.json + +# Download results by direct URL +shopify-toolkit bulk_results "https://storage.googleapis.com/shopify/results.jsonl" + +# Download raw JSONL without parsing +shopify-toolkit bulk_results gid://shopify/BulkOperation/123456 --raw --output results.jsonl +``` + +#### Example Files + +The gem includes example files in the `examples/` directory: + +- `bulk_query_products.graphql` - Query to fetch all products with variants +- `bulk_mutation_products.graphql` - Mutation to create products +- `bulk_mutation_variables.json` - JSON format variables for mutations +- `bulk_mutation_variables.jsonl` - JSONL format variables for mutations + +#### Error Handling + +The module provides specific error classes: + +- `ShopifyToolkit::BulkOperations::BulkOperationError` - General bulk operation errors +- `ShopifyToolkit::BulkOperations::OperationInProgressError` - Thrown when trying to start an operation while another is running + +```ruby +begin + operation = run_bulk_query(query) +rescue ShopifyToolkit::BulkOperations::OperationInProgressError + puts "Another bulk operation is already running" +rescue ShopifyToolkit::BulkOperations::BulkOperationError => e + puts "Bulk operation failed: #{e.message}" + puts "Error code: #{e.error_code}" if e.error_code + puts "User errors: #{e.user_errors}" if e.user_errors.any? +end +``` + +#### Features + +- **Automatic staged file uploads** for bulk mutations +- **JSONL parsing and streaming** to handle large result files efficiently +- **Comprehensive error handling** with specific error types +- **Progress polling** with customizable intervals and timeouts +- **Result downloading** with parsing options +- **Operation cancellation** support +- **CLI integration** for all bulk operations +- **Logging** for debugging and monitoring + ## Development After checking out the repo, run `bin/setup` to install dependencies. Then, run `rake test` to run the tests. You can also run `bin/console` for an interactive prompt that will allow you to experiment. diff --git a/examples/bulk_mutation_products.graphql b/examples/bulk_mutation_products.graphql new file mode 100644 index 00000000..bfa73f76 --- /dev/null +++ b/examples/bulk_mutation_products.graphql @@ -0,0 +1,13 @@ +mutation createProduct($input: ProductInput!) { + productCreate(input: $input) { + product { + id + title + handle + } + userErrors { + field + message + } + } +} \ No newline at end of file diff --git a/examples/bulk_mutation_variables.json b/examples/bulk_mutation_variables.json new file mode 100644 index 00000000..2f2b67ad --- /dev/null +++ b/examples/bulk_mutation_variables.json @@ -0,0 +1,26 @@ +[ + { + "input": { + "title": "Awesome T-Shirt", + "productType": "Apparel", + "vendor": "Acme Corp", + "tags": ["clothing", "casual"] + } + }, + { + "input": { + "title": "Cool Sneakers", + "productType": "Footwear", + "vendor": "Shoe Co", + "tags": ["shoes", "athletic"] + } + }, + { + "input": { + "title": "Stylish Hat", + "productType": "Accessories", + "vendor": "Fashion Inc", + "tags": ["accessories", "fashion"] + } + } +] \ No newline at end of file diff --git a/examples/bulk_mutation_variables.jsonl b/examples/bulk_mutation_variables.jsonl new file mode 100644 index 00000000..d1ae8eaa --- /dev/null +++ b/examples/bulk_mutation_variables.jsonl @@ -0,0 +1,3 @@ +{"input": {"title": "Product 1", "productType": "Apparel", "vendor": "Brand A"}} +{"input": {"title": "Product 2", "productType": "Electronics", "vendor": "Brand B"}} +{"input": {"title": "Product 3", "productType": "Home & Garden", "vendor": "Brand C"}} \ No newline at end of file diff --git a/examples/bulk_query_products.graphql b/examples/bulk_query_products.graphql new file mode 100644 index 00000000..598d0445 --- /dev/null +++ b/examples/bulk_query_products.graphql @@ -0,0 +1,25 @@ +{ + products { + edges { + node { + id + title + handle + productType + vendor + createdAt + updatedAt + variants { + edges { + node { + id + title + price + inventoryQuantity + } + } + } + } + } + } +} \ No newline at end of file diff --git a/lib/shopify_toolkit/bulk_operations.rb b/lib/shopify_toolkit/bulk_operations.rb new file mode 100644 index 00000000..f4e66215 --- /dev/null +++ b/lib/shopify_toolkit/bulk_operations.rb @@ -0,0 +1,535 @@ +# frozen_string_literal: true + +require "net/http" +require "json" +require "tempfile" + +module ShopifyToolkit + # Bulk Operations module provides methods to submit, track, and retrieve results + # from Shopify Admin GraphQL API bulk operations including bulk queries and mutations. + # + # This module handles: + # - Bulk query operations (bulkOperationRunQuery) + # - Bulk mutation operations with staged uploads (bulkOperationRunMutation) + # - Status tracking and polling (currentBulkOperation) + # - Result retrieval and JSONL parsing + # - Operation cancellation (bulkOperationCancel) + # - Error handling and logging + module BulkOperations + include AdminClient + + # Error raised when a bulk operation fails + class BulkOperationError < StandardError + attr_reader :error_code, :user_errors + + def initialize(message, error_code: nil, user_errors: []) + @error_code = error_code + @user_errors = user_errors + super(message) + end + end + + # Error raised when attempting to run a bulk operation when one is already in progress + class OperationInProgressError < BulkOperationError; end + + # Submits a bulk query operation to retrieve large volumes of data asynchronously + # + # @param query [String] The GraphQL query to run in bulk + # @param group_objects [Boolean] Whether to group objects by type (default: false) + # @return [Hash] The bulk operation response containing id and status + # + # @example + # bulk_ops = BulkOperations.new + # query = <<~GRAPHQL + # { + # products { + # edges { + # node { + # id + # title + # handle + # } + # } + # } + # } + # GRAPHQL + # + # operation = bulk_ops.run_bulk_query(query) + # puts operation.dig("bulkOperation", "id") + def run_bulk_query(query, group_objects: false) + mutation = <<~GRAPHQL + mutation bulkOperationRunQuery($query: String!, $groupObjects: Boolean!) { + bulkOperationRunQuery(query: $query, groupObjects: $groupObjects) { + bulkOperation { + id + status + query + createdAt + completedAt + objectCount + fileSize + url + partialDataUrl + errorCode + type + } + userErrors { + field + message + code + } + } + } + GRAPHQL + + variables = { query: query, groupObjects: group_objects } + + response = query_admin_api(mutation, variables) + handle_bulk_operation_response(response, "bulkOperationRunQuery") + end + + # Submits a bulk mutation operation to import large volumes of data asynchronously + # + # @param mutation [String] The GraphQL mutation to run in bulk + # @param variables_data [Array] Array of variable objects for each mutation call + # @param group_objects [Boolean] Whether to group objects by type (default: false) + # @param client_identifier [String] Optional client identifier for tracking + # @return [Hash] The bulk operation response containing id and status + # + # @example + # bulk_ops = BulkOperations.new + # mutation_query = <<~GRAPHQL + # mutation createProduct($input: ProductInput!) { + # productCreate(input: $input) { + # product { + # id + # title + # } + # userErrors { + # field + # message + # } + # } + # } + # GRAPHQL + # + # variables = [ + # { input: { title: "Product 1", productType: "Apparel" } }, + # { input: { title: "Product 2", productType: "Apparel" } } + # ] + # + # operation = bulk_ops.run_bulk_mutation(mutation_query, variables) + # puts operation.dig("bulkOperation", "id") + def run_bulk_mutation(mutation, variables_data, group_objects: false, client_identifier: nil) + # First, create staged upload for variables + staged_upload_path = create_staged_upload(variables_data) + + mutation_query = <<~GRAPHQL + mutation bulkOperationRunMutation( + $mutation: String! + $stagedUploadPath: String! + $groupObjects: Boolean! + $clientIdentifier: String + ) { + bulkOperationRunMutation( + mutation: $mutation + stagedUploadPath: $stagedUploadPath + groupObjects: $groupObjects + clientIdentifier: $clientIdentifier + ) { + bulkOperation { + id + status + query + createdAt + completedAt + objectCount + fileSize + url + partialDataUrl + errorCode + type + } + userErrors { + field + message + code + } + } + } + GRAPHQL + + variables = { + mutation: mutation, + stagedUploadPath: staged_upload_path, + groupObjects: group_objects, + clientIdentifier: client_identifier + }.compact + + response = query_admin_api(mutation_query, variables) + handle_bulk_operation_response(response, "bulkOperationRunMutation") + end + + # Retrieves the current bulk operation status and details + # + # @param type [String] Type of operation to check ("QUERY" or "MUTATION") + # @return [Hash, nil] The current bulk operation or nil if none exists + # + # @example + # bulk_ops = BulkOperations.new + # status = bulk_ops.current_bulk_operation + # puts status["status"] if status + def current_bulk_operation(type: nil) + query = if type + <<~GRAPHQL + query currentBulkOperation($type: BulkOperationType!) { + currentBulkOperation(type: $type) { + id + status + query + createdAt + completedAt + objectCount + fileSize + url + partialDataUrl + errorCode + type + } + } + GRAPHQL + else + <<~GRAPHQL + query currentBulkOperation { + currentBulkOperation { + id + status + query + createdAt + completedAt + objectCount + fileSize + url + partialDataUrl + errorCode + type + } + } + GRAPHQL + end + + variables = type ? { type: type } : {} + response = query_admin_api(query, variables) + response.dig("currentBulkOperation") + end + + # Cancels a running bulk operation + # + # @param operation_id [String] The ID of the bulk operation to cancel + # @return [Hash] The cancelled bulk operation details + # + # @example + # bulk_ops = BulkOperations.new + # result = bulk_ops.cancel_bulk_operation("gid://shopify/BulkOperation/1234") + # puts result.dig("bulkOperation", "status") + def cancel_bulk_operation(operation_id) + mutation = <<~GRAPHQL + mutation bulkOperationCancel($id: ID!) { + bulkOperationCancel(id: $id) { + bulkOperation { + id + status + errorCode + } + userErrors { + field + message + } + } + } + GRAPHQL + + variables = { id: operation_id } + response = query_admin_api(mutation, variables) + + if response.dig("bulkOperationCancel", "userErrors")&.any? + raise BulkOperationError.new( + "Failed to cancel bulk operation: #{response.dig("bulkOperationCancel", "userErrors").map { |e| e["message"] }.join(", ")}", + user_errors: response.dig("bulkOperationCancel", "userErrors") + ) + end + + response.dig("bulkOperationCancel") + end + + # Downloads and parses the JSONL results from a completed bulk operation + # + # @param operation_or_url [Hash, String] Either a bulk operation hash with 'url' key or direct URL string + # @param parse_results [Boolean] Whether to parse JSON lines (default: true) + # @return [Array, String] Parsed results array or raw JSONL string + # + # @example + # bulk_ops = BulkOperations.new + # operation = bulk_ops.current_bulk_operation + # if operation && operation["status"] == "COMPLETED" + # results = bulk_ops.download_results(operation) + # puts "Downloaded #{results.size} results" + # end + def download_results(operation_or_url, parse_results: true) + url = operation_or_url.is_a?(Hash) ? operation_or_url["url"] : operation_or_url + + return nil unless url + + logger.info "Downloading bulk operation results from: #{url}" + + uri = URI.parse(url) + response = Net::HTTP.get_response(uri) + + unless response.is_a?(Net::HTTPSuccess) + raise BulkOperationError, "Failed to download results: #{response.code} #{response.message}" + end + + content = response.body + logger.info "Downloaded #{content.bytesize} bytes of results" + + if parse_results + parse_jsonl(content) + else + content + end + end + + # Polls a bulk operation until completion, yielding status updates + # + # @param operation_id [String] The bulk operation ID to monitor + # @param poll_interval [Integer] Seconds between status checks (default: 5) + # @param timeout [Integer] Maximum seconds to wait (default: 1800 - 30 minutes) + # @param &block [Proc] Optional block to call with status updates + # @return [Hash] The completed bulk operation details + # + # @example + # bulk_ops = BulkOperations.new + # operation = bulk_ops.run_bulk_query(query) + # + # completed = bulk_ops.poll_until_complete(operation["bulkOperation"]["id"]) do |status| + # puts "Status: #{status["status"]}, Objects: #{status["objectCount"]}" + # end + # + # if completed["status"] == "COMPLETED" + # results = bulk_ops.download_results(completed) + # end + def poll_until_complete(operation_id, poll_interval: 5, timeout: 1800, &block) + start_time = Time.now + + loop do + operation = get_bulk_operation_by_id(operation_id) + + yield operation if block_given? + + case operation["status"] + when "COMPLETED", "FAILED", "CANCELED", "EXPIRED" + return operation + end + + if Time.now - start_time > timeout + raise BulkOperationError, "Polling timeout exceeded (#{timeout}s)" + end + + sleep poll_interval + end + end + + # Logger instance + def logger + @logger ||= Logger.new($stdout) + end + + private + + # Queries the Admin API with error handling + def query_admin_api(query, variables = {}) + response = query(query, **variables) + response + rescue => e + logger.error "GraphQL query failed: #{e.message}" + raise BulkOperationError, "GraphQL query failed: #{e.message}" + end + + # Creates a staged upload for bulk mutation variables + def create_staged_upload(variables_data) + # Convert variables to JSONL format + jsonl_content = variables_data.map { |vars| JSON.generate(vars) }.join("\n") + + # Create staged upload + mutation = <<~GRAPHQL + mutation stagedUploadsCreate($input: [StagedUploadInput!]!) { + stagedUploadsCreate(input: $input) { + stagedTargets { + url + resourceUrl + parameters { + name + value + } + } + userErrors { + field + message + } + } + } + GRAPHQL + + variables = { + input: [{ + resource: "BULK_MUTATION_VARIABLES", + filename: "bulk_mutation_variables.jsonl", + mimeType: "text/jsonl", + httpMethod: "POST" + }] + } + + response = query_admin_api(mutation, variables) + + if response.dig("stagedUploadsCreate", "userErrors")&.any? + raise BulkOperationError.new( + "Failed to create staged upload: #{response.dig("stagedUploadsCreate", "userErrors").map { |e| e["message"] }.join(", ")}", + user_errors: response.dig("stagedUploadsCreate", "userErrors") + ) + end + + staged_target = response.dig("stagedUploadsCreate", "stagedTargets", 0) + upload_url = staged_target["url"] + upload_params = staged_target["parameters"] + + # Upload the JSONL content + upload_to_staged_target(upload_url, upload_params, jsonl_content) + + # Extract the path from parameters (typically the 'key' parameter) + key_param = upload_params.find { |p| p["name"] == "key" } + key_param ? key_param["value"] : nil + end + + # Uploads content to a staged upload target + def upload_to_staged_target(url, parameters, content) + require "net/http/post/multipart" rescue nil + + uri = URI(url) + + # Create multipart form data + form_data = {} + parameters.each { |param| form_data[param["name"]] = param["value"] } + form_data["file"] = content + + # Use basic form encoding since we don't have multipart gem + boundary = "----formdata-shopify-#{Time.now.to_i}" + post_body = [] + + form_data.each do |key, value| + if key == "file" + post_body << "--#{boundary}\r\n" + post_body << "Content-Disposition: form-data; name=\"file\"; filename=\"bulk_mutation_variables.jsonl\"\r\n" + post_body << "Content-Type: text/jsonl\r\n\r\n" + post_body << value + post_body << "\r\n" + else + post_body << "--#{boundary}\r\n" + post_body << "Content-Disposition: form-data; name=\"#{key}\"\r\n\r\n" + post_body << value.to_s + post_body << "\r\n" + end + end + post_body << "--#{boundary}--\r\n" + + http = Net::HTTP.new(uri.host, uri.port) + http.use_ssl = uri.scheme == 'https' + + request = Net::HTTP::Post.new(uri) + request.body = post_body.join + request["Content-Type"] = "multipart/form-data; boundary=#{boundary}" + + response = http.request(request) + + unless response.is_a?(Net::HTTPSuccess) || response.code == "201" + raise BulkOperationError, "Failed to upload staged file: #{response.code} #{response.message}" + end + + logger.info "Successfully uploaded #{content.bytesize} bytes to staged upload" + end + + # Handles bulk operation response and error checking + def handle_bulk_operation_response(response, operation_name) + operation_data = response.dig(operation_name) + + if operation_data.nil? + raise BulkOperationError, "No #{operation_name} data in response" + end + + user_errors = operation_data["userErrors"] || [] + + if user_errors.any? + # Check for operation in progress error + if user_errors.any? { |e| e["code"] == "OPERATION_IN_PROGRESS" } + raise OperationInProgressError.new( + "Another bulk operation is already in progress", + user_errors: user_errors + ) + end + + error_messages = user_errors.map { |e| e["message"] }.join(", ") + raise BulkOperationError.new( + "Bulk operation failed: #{error_messages}", + user_errors: user_errors + ) + end + + operation_data + end + + # Gets a bulk operation by ID using the node query + def get_bulk_operation_by_id(operation_id) + query = <<~GRAPHQL + query getBulkOperation($id: ID!) { + node(id: $id) { + ... on BulkOperation { + id + status + query + createdAt + completedAt + objectCount + fileSize + url + partialDataUrl + errorCode + type + } + } + } + GRAPHQL + + variables = { id: operation_id } + response = query_admin_api(query, variables) + response.dig("node") + end + + # Parses JSONL content into an array of objects + def parse_jsonl(content) + results = [] + content.each_line do |line| + line = line.strip + next if line.empty? + + begin + parsed = JSON.parse(line) + results << parsed + rescue JSON::ParserError => e + logger.warn "Failed to parse JSONL line: #{line[0..100]}... Error: #{e.message}" + end + end + + logger.info "Parsed #{results.size} JSONL records" + results + end + end +end \ No newline at end of file diff --git a/lib/shopify_toolkit/command_line.rb b/lib/shopify_toolkit/command_line.rb index 7bb03b95..26f97eb2 100644 --- a/lib/shopify_toolkit/command_line.rb +++ b/lib/shopify_toolkit/command_line.rb @@ -3,6 +3,7 @@ require 'thor' require 'thor/actions' require 'tmpdir' +require 'json' class ShopifyToolkit::CommandLine < Thor include Thor::Actions @@ -111,6 +112,269 @@ def down end end + # Bulk Operations Commands + + desc "bulk_query QUERY_FILE", "Submit a bulk GraphQL query" + method_option :group_objects, type: :boolean, default: false, desc: "Group objects by type in results" + method_option :poll, type: :boolean, default: false, desc: "Poll until completion and download results" + method_option :timeout, type: :numeric, default: 1800, desc: "Polling timeout in seconds" + method_option :output, type: :string, desc: "Output file for results (defaults to stdout)" + def bulk_query(query_file) + require "./config/environment" if File.exist?("./config/environment.rb") + + unless File.exist?(query_file) + puts "Error: Query file '#{query_file}' not found" + exit 1 + end + + query = File.read(query_file) + + ::Shop.sole.with_shopify_session do + bulk_ops = Class.new { include ShopifyToolkit::BulkOperations }.new + + puts "Submitting bulk query from #{query_file}..." + operation = bulk_ops.run_bulk_query(query, group_objects: options[:group_objects]) + + operation_id = operation.dig("bulkOperation", "id") + status = operation.dig("bulkOperation", "status") + + puts "Bulk operation submitted: #{operation_id}" + puts "Status: #{status}" + + if options[:poll] + puts "Polling for completion (timeout: #{options[:timeout]}s)..." + + completed = bulk_ops.poll_until_complete(operation_id, timeout: options[:timeout]) do |current| + puts "Status: #{current["status"]}, Objects: #{current["objectCount"]}, Elapsed: #{Time.now - Time.parse(current["createdAt"])}s" + end + + if completed["status"] == "COMPLETED" + puts "Operation completed successfully!" + if completed["url"] + results = bulk_ops.download_results(completed) + output_results(results, options[:output]) + else + puts "No results URL available (query may have returned no data)" + end + else + puts "Operation finished with status: #{completed["status"]}" + puts "Error code: #{completed["errorCode"]}" if completed["errorCode"] + end + else + puts "Use 'shopify-toolkit bulk_status #{operation_id}' to check status" + end + end + rescue ShopifyToolkit::BulkOperations::BulkOperationError => e + puts "Bulk operation error: #{e.message}" + exit 1 + end + + desc "bulk_mutation MUTATION_FILE VARIABLES_FILE", "Submit a bulk GraphQL mutation" + method_option :group_objects, type: :boolean, default: false, desc: "Group objects by type in results" + method_option :poll, type: :boolean, default: false, desc: "Poll until completion and download results" + method_option :timeout, type: :numeric, default: 1800, desc: "Polling timeout in seconds" + method_option :output, type: :string, desc: "Output file for results (defaults to stdout)" + method_option :client_identifier, type: :string, desc: "Client identifier for tracking" + def bulk_mutation(mutation_file, variables_file) + require "./config/environment" if File.exist?("./config/environment.rb") + + unless File.exist?(mutation_file) + puts "Error: Mutation file '#{mutation_file}' not found" + exit 1 + end + + unless File.exist?(variables_file) + puts "Error: Variables file '#{variables_file}' not found" + exit 1 + end + + mutation = File.read(mutation_file) + variables_content = File.read(variables_file) + + # Parse variables file (supports JSON array or JSONL) + variables_data = begin + if variables_file.end_with?('.jsonl') + variables_content.lines.map { |line| JSON.parse(line.strip) } + else + JSON.parse(variables_content) + end + rescue JSON::ParserError => e + puts "Error parsing variables file: #{e.message}" + exit 1 + end + + ::Shop.sole.with_shopify_session do + bulk_ops = Class.new { include ShopifyToolkit::BulkOperations }.new + + puts "Submitting bulk mutation from #{mutation_file} with #{variables_data.size} operations..." + operation = bulk_ops.run_bulk_mutation( + mutation, + variables_data, + group_objects: options[:group_objects], + client_identifier: options[:client_identifier] + ) + + operation_id = operation.dig("bulkOperation", "id") + status = operation.dig("bulkOperation", "status") + + puts "Bulk operation submitted: #{operation_id}" + puts "Status: #{status}" + + if options[:poll] + puts "Polling for completion (timeout: #{options[:timeout]}s)..." + + completed = bulk_ops.poll_until_complete(operation_id, timeout: options[:timeout]) do |current| + puts "Status: #{current["status"]}, Objects: #{current["objectCount"]}, Elapsed: #{Time.now - Time.parse(current["createdAt"])}s" + end + + if completed["status"] == "COMPLETED" + puts "Operation completed successfully!" + if completed["url"] + results = bulk_ops.download_results(completed) + output_results(results, options[:output]) + else + puts "No results URL available" + end + else + puts "Operation finished with status: #{completed["status"]}" + puts "Error code: #{completed["errorCode"]}" if completed["errorCode"] + end + else + puts "Use 'shopify-toolkit bulk_status #{operation_id}' to check status" + end + end + rescue ShopifyToolkit::BulkOperations::BulkOperationError => e + puts "Bulk operation error: #{e.message}" + exit 1 + end + + desc "bulk_status [OPERATION_ID]", "Check the status of a bulk operation" + method_option :type, type: :string, desc: "Operation type filter (QUERY or MUTATION)" + def bulk_status(operation_id = nil) + require "./config/environment" if File.exist?("./config/environment.rb") + + ::Shop.sole.with_shopify_session do + bulk_ops = Class.new { include ShopifyToolkit::BulkOperations }.new + + if operation_id + # Get specific operation by ID + operation = bulk_ops.send(:get_bulk_operation_by_id, operation_id) + unless operation + puts "Operation not found: #{operation_id}" + exit 1 + end + + display_operation_status(operation) + else + # Get current operation + operation = bulk_ops.current_bulk_operation(type: options[:type]) + if operation + display_operation_status(operation) + else + puts "No current bulk operation found" + if options[:type] + puts "(filtered by type: #{options[:type]})" + end + end + end + end + end + + desc "bulk_cancel OPERATION_ID", "Cancel a running bulk operation" + def bulk_cancel(operation_id) + require "./config/environment" if File.exist?("./config/environment.rb") + + ::Shop.sole.with_shopify_session do + bulk_ops = Class.new { include ShopifyToolkit::BulkOperations }.new + + puts "Canceling bulk operation: #{operation_id}" + result = bulk_ops.cancel_bulk_operation(operation_id) + + puts "Operation canceled successfully" + puts "Final status: #{result.dig("bulkOperation", "status")}" + end + rescue ShopifyToolkit::BulkOperations::BulkOperationError => e + puts "Cancel failed: #{e.message}" + exit 1 + end + + desc "bulk_results OPERATION_ID_OR_URL", "Download and display results from a completed bulk operation" + method_option :output, type: :string, desc: "Output file for results (defaults to stdout)" + method_option :raw, type: :boolean, default: false, desc: "Output raw JSONL without parsing" + def bulk_results(operation_id_or_url) + require "./config/environment" if File.exist?("./config/environment.rb") + + ::Shop.sole.with_shopify_session do + bulk_ops = Class.new { include ShopifyToolkit::BulkOperations }.new + + # Determine if input is operation ID or direct URL + if operation_id_or_url.start_with?('http') + url = operation_id_or_url + else + # Get operation and extract URL + operation = bulk_ops.send(:get_bulk_operation_by_id, operation_id_or_url) + unless operation + puts "Operation not found: #{operation_id_or_url}" + exit 1 + end + + unless operation["status"] == "COMPLETED" + puts "Operation is not completed (status: #{operation["status"]})" + exit 1 + end + + url = operation["url"] || operation["partialDataUrl"] + unless url + puts "No results URL available for operation" + exit 1 + end + end + + puts "Downloading results from: #{url}" + results = bulk_ops.download_results(url, parse_results: !options[:raw]) + + if options[:raw] + output_content(results, options[:output]) + else + output_results(results, options[:output]) + end + + puts "Downloaded #{options[:raw] ? results.bytesize : results.size} #{options[:raw] ? 'bytes' : 'records'}" + end + rescue ShopifyToolkit::BulkOperations::BulkOperationError => e + puts "Download failed: #{e.message}" + exit 1 + end + + private + + def display_operation_status(operation) + puts "Operation ID: #{operation["id"]}" + puts "Type: #{operation["type"]}" + puts "Status: #{operation["status"]}" + puts "Created: #{operation["createdAt"]}" + puts "Completed: #{operation["completedAt"]}" if operation["completedAt"] + puts "Objects: #{operation["objectCount"]}" + puts "File Size: #{operation["fileSize"]} bytes" if operation["fileSize"] + puts "Error Code: #{operation["errorCode"]}" if operation["errorCode"] + puts "Results URL: #{operation["url"]}" if operation["url"] + puts "Partial Results URL: #{operation["partialDataUrl"]}" if operation["partialDataUrl"] + end + + def output_results(results, output_file = nil) + content = results.map { |result| JSON.pretty_generate(result) }.join("\n") + output_content(content, output_file) + end + + def output_content(content, output_file = nil) + if output_file + File.write(output_file, content) + puts "Results written to: #{output_file}" + else + puts content + end + end + def self.exit_on_failure? true end diff --git a/spec/shopify_toolkit/bulk_operations_spec.rb b/spec/shopify_toolkit/bulk_operations_spec.rb new file mode 100644 index 00000000..13d2faee --- /dev/null +++ b/spec/shopify_toolkit/bulk_operations_spec.rb @@ -0,0 +1,529 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe ShopifyToolkit::BulkOperations do + let(:bulk_operations) { Class.new { include ShopifyToolkit::BulkOperations }.new } + let(:sample_query) do + <<~GRAPHQL + { + products { + edges { + node { + id + title + handle + } + } + } + } + GRAPHQL + end + let(:sample_mutation) do + <<~GRAPHQL + mutation createProduct($input: ProductInput!) { + productCreate(input: $input) { + product { + id + title + } + userErrors { + field + message + } + } + } + GRAPHQL + end + let(:sample_variables) do + [ + { input: { title: "Product 1", productType: "Apparel" } }, + { input: { title: "Product 2", productType: "Apparel" } } + ] + end + + before do + # Mock logger to avoid output during tests + allow(bulk_operations).to receive(:logger).and_return(double("Logger", info: nil, warn: nil, error: nil)) + end + + describe "#run_bulk_query" do + let(:successful_response) do + { + "bulkOperationRunQuery" => { + "bulkOperation" => { + "id" => "gid://shopify/BulkOperation/123456", + "status" => "CREATED", + "query" => sample_query.strip, + "createdAt" => "2024-01-15T10:00:00Z", + "completedAt" => nil, + "objectCount" => "0", + "fileSize" => nil, + "url" => nil, + "partialDataUrl" => nil, + "errorCode" => nil, + "type" => "QUERY" + }, + "userErrors" => [] + } + } + end + + let(:error_response) do + { + "bulkOperationRunQuery" => { + "bulkOperation" => nil, + "userErrors" => [ + { + "field" => ["query"], + "message" => "Query is invalid", + "code" => "INVALID" + } + ] + } + } + end + + let(:operation_in_progress_response) do + { + "bulkOperationRunQuery" => { + "bulkOperation" => nil, + "userErrors" => [ + { + "field" => [], + "message" => "A bulk operation is already running", + "code" => "OPERATION_IN_PROGRESS" + } + ] + } + } + end + + it "submits a bulk query successfully" do + expect(bulk_operations).to receive(:query).with( + anything, + query: sample_query, + groupObjects: false + ).and_return(successful_response) + + result = bulk_operations.run_bulk_query(sample_query) + + expect(result).to eq(successful_response["bulkOperationRunQuery"]) + expect(result["bulkOperation"]["id"]).to eq("gid://shopify/BulkOperation/123456") + expect(result["bulkOperation"]["status"]).to eq("CREATED") + end + + it "submits a bulk query with group_objects option" do + expect(bulk_operations).to receive(:query).with( + anything, + query: sample_query, + groupObjects: true + ).and_return(successful_response) + + bulk_operations.run_bulk_query(sample_query, group_objects: true) + end + + it "raises BulkOperationError on query validation error" do + expect(bulk_operations).to receive(:query).and_return(error_response) + + expect { + bulk_operations.run_bulk_query(sample_query) + }.to raise_error(ShopifyToolkit::BulkOperations::BulkOperationError, /Query is invalid/) + end + + it "raises OperationInProgressError when another operation is running" do + expect(bulk_operations).to receive(:query).and_return(operation_in_progress_response) + + expect { + bulk_operations.run_bulk_query(sample_query) + }.to raise_error(ShopifyToolkit::BulkOperations::OperationInProgressError, /already in progress/) + end + + it "raises BulkOperationError on GraphQL query failure" do + expect(bulk_operations).to receive(:query).and_raise(StandardError, "Network error") + + expect { + bulk_operations.run_bulk_query(sample_query) + }.to raise_error(ShopifyToolkit::BulkOperations::BulkOperationError, /GraphQL query failed.*Network error/) + end + end + + describe "#run_bulk_mutation" do + let(:successful_staged_upload_response) do + { + "stagedUploadsCreate" => { + "stagedTargets" => [ + { + "url" => "https://example.com/upload", + "resourceUrl" => nil, + "parameters" => [ + { "name" => "key", "value" => "tmp/12345/bulk_vars" }, + { "name" => "Content-Type", "value" => "text/jsonl" } + ] + } + ], + "userErrors" => [] + } + } + end + + let(:successful_mutation_response) do + { + "bulkOperationRunMutation" => { + "bulkOperation" => { + "id" => "gid://shopify/BulkOperation/789012", + "status" => "CREATED", + "query" => sample_mutation, + "createdAt" => "2024-01-15T10:00:00Z", + "completedAt" => nil, + "objectCount" => "0", + "fileSize" => nil, + "url" => nil, + "partialDataUrl" => nil, + "errorCode" => nil, + "type" => "MUTATION" + }, + "userErrors" => [] + } + } + end + + it "submits a bulk mutation successfully" do + # Mock staged upload creation + expect(bulk_operations).to receive(:query).with( + anything, + input: [{ + resource: "BULK_MUTATION_VARIABLES", + filename: "bulk_mutation_variables.jsonl", + mimeType: "text/jsonl", + httpMethod: "POST" + }] + ).and_return(successful_staged_upload_response) + + # Mock file upload + expect(bulk_operations).to receive(:upload_to_staged_target).with( + "https://example.com/upload", + [ + { "name" => "key", "value" => "tmp/12345/bulk_vars" }, + { "name" => "Content-Type", "value" => "text/jsonl" } + ], + sample_variables.map { |vars| JSON.generate(vars) }.join("\n") + ) + + # Mock bulk mutation submission + expect(bulk_operations).to receive(:query).with( + anything, + mutation: sample_mutation, + stagedUploadPath: "tmp/12345/bulk_vars", + groupObjects: false + ).and_return(successful_mutation_response) + + result = bulk_operations.run_bulk_mutation(sample_mutation, sample_variables) + + expect(result).to eq(successful_mutation_response["bulkOperationRunMutation"]) + expect(result["bulkOperation"]["id"]).to eq("gid://shopify/BulkOperation/789012") + end + + it "raises BulkOperationError on staged upload failure" do + failed_upload_response = { + "stagedUploadsCreate" => { + "stagedTargets" => [], + "userErrors" => [ + { + "field" => ["input"], + "message" => "File size exceeds limit" + } + ] + } + } + + expect(bulk_operations).to receive(:query).and_return(failed_upload_response) + + expect { + bulk_operations.run_bulk_mutation(sample_mutation, sample_variables) + }.to raise_error(ShopifyToolkit::BulkOperations::BulkOperationError, /Failed to create staged upload/) + end + end + + describe "#current_bulk_operation" do + let(:current_operation_response) do + { + "currentBulkOperation" => { + "id" => "gid://shopify/BulkOperation/555555", + "status" => "RUNNING", + "query" => sample_query, + "createdAt" => "2024-01-15T10:00:00Z", + "completedAt" => nil, + "objectCount" => "150", + "fileSize" => nil, + "url" => nil, + "partialDataUrl" => nil, + "errorCode" => nil, + "type" => "QUERY" + } + } + end + + let(:no_operation_response) do + { "currentBulkOperation" => nil } + end + + it "returns current bulk operation details" do + expect(bulk_operations).to receive(:query).with(anything).and_return(current_operation_response) + + result = bulk_operations.current_bulk_operation + + expect(result).to eq(current_operation_response["currentBulkOperation"]) + expect(result["id"]).to eq("gid://shopify/BulkOperation/555555") + expect(result["status"]).to eq("RUNNING") + end + + it "returns nil when no operation exists" do + expect(bulk_operations).to receive(:query).and_return(no_operation_response) + + result = bulk_operations.current_bulk_operation + + expect(result).to be_nil + end + + it "filters by operation type" do + expect(bulk_operations).to receive(:query).with(anything, type: "QUERY").and_return(current_operation_response) + + result = bulk_operations.current_bulk_operation(type: "QUERY") + + expect(result).to eq(current_operation_response["currentBulkOperation"]) + end + end + + describe "#cancel_bulk_operation" do + let(:operation_id) { "gid://shopify/BulkOperation/123456" } + let(:successful_cancel_response) do + { + "bulkOperationCancel" => { + "bulkOperation" => { + "id" => operation_id, + "status" => "CANCELED", + "errorCode" => nil + }, + "userErrors" => [] + } + } + end + + let(:failed_cancel_response) do + { + "bulkOperationCancel" => { + "bulkOperation" => nil, + "userErrors" => [ + { + "field" => ["id"], + "message" => "Operation not found" + } + ] + } + } + end + + it "cancels a bulk operation successfully" do + expect(bulk_operations).to receive(:query).with( + anything, + id: operation_id + ).and_return(successful_cancel_response) + + result = bulk_operations.cancel_bulk_operation(operation_id) + + expect(result).to eq(successful_cancel_response["bulkOperationCancel"]) + expect(result["bulkOperation"]["status"]).to eq("CANCELED") + end + + it "raises BulkOperationError on cancellation failure" do + expect(bulk_operations).to receive(:query).and_return(failed_cancel_response) + + expect { + bulk_operations.cancel_bulk_operation(operation_id) + }.to raise_error(ShopifyToolkit::BulkOperations::BulkOperationError, /Failed to cancel.*Operation not found/) + end + end + + describe "#download_results" do + let(:sample_jsonl) do + <<~JSONL.strip + {"id":"gid://shopify/Product/1","title":"Product 1","handle":"product-1"} + {"id":"gid://shopify/Product/2","title":"Product 2","handle":"product-2"} + JSONL + end + let(:results_url) { "https://storage.googleapis.com/shopify/results.jsonl" } + let(:operation_with_url) { { "url" => results_url } } + + it "downloads and parses JSONL results from URL" do + uri_double = double("URI") + allow(URI).to receive(:parse).with(results_url).and_return(uri_double) + mock_response = double("HTTPResponse", is_a?: true, body: sample_jsonl) + expect(Net::HTTP).to receive(:get_response).with(uri_double).and_return(mock_response) + + results = bulk_operations.download_results(results_url) + + expect(results).to be_an(Array) + expect(results.size).to eq(2) + expect(results[0]["title"]).to eq("Product 1") + expect(results[1]["title"]).to eq("Product 2") + end + + it "downloads and parses JSONL results from operation hash" do + uri_double = double("URI") + allow(URI).to receive(:parse).with(results_url).and_return(uri_double) + mock_response = double("HTTPResponse", is_a?: true, body: sample_jsonl) + expect(Net::HTTP).to receive(:get_response).with(uri_double).and_return(mock_response) + + results = bulk_operations.download_results(operation_with_url) + + expect(results).to be_an(Array) + expect(results.size).to eq(2) + end + + it "returns raw JSONL when parse_results is false" do + uri_double = double("URI") + allow(URI).to receive(:parse).with(results_url).and_return(uri_double) + mock_response = double("HTTPResponse", is_a?: true, body: sample_jsonl) + expect(Net::HTTP).to receive(:get_response).with(uri_double).and_return(mock_response) + + result = bulk_operations.download_results(results_url, parse_results: false) + + expect(result).to be_a(String) + expect(result).to eq(sample_jsonl) + end + + it "returns nil when no URL is provided" do + result = bulk_operations.download_results({ "url" => nil }) + expect(result).to be_nil + + result = bulk_operations.download_results(nil) + expect(result).to be_nil + end + + it "raises BulkOperationError on download failure" do + uri_double = double("URI") + allow(URI).to receive(:parse).with(results_url).and_return(uri_double) + mock_response = double("HTTPResponse", is_a?: false, code: "404", message: "Not Found") + expect(Net::HTTP).to receive(:get_response).with(uri_double).and_return(mock_response) + + expect { + bulk_operations.download_results(results_url) + }.to raise_error(ShopifyToolkit::BulkOperations::BulkOperationError, /Failed to download results: 404 Not Found/) + end + end + + describe "#poll_until_complete" do + let(:operation_id) { "gid://shopify/BulkOperation/123456" } + let(:running_operation) do + { + "id" => operation_id, + "status" => "RUNNING", + "createdAt" => "2024-01-15T10:00:00Z", + "objectCount" => "50" + } + end + let(:completed_operation) do + running_operation.merge("status" => "COMPLETED", "completedAt" => "2024-01-15T10:05:00Z") + end + + it "polls until operation completes" do + expect(bulk_operations).to receive(:get_bulk_operation_by_id).with(operation_id).and_return(running_operation, completed_operation) + + yielded_statuses = [] + result = bulk_operations.poll_until_complete(operation_id, poll_interval: 0.1) do |status| + yielded_statuses << status["status"] + end + + expect(yielded_statuses).to eq(["RUNNING", "COMPLETED"]) + expect(result).to eq(completed_operation) + end + + it "raises timeout error when operation takes too long" do + expect(bulk_operations).to receive(:get_bulk_operation_by_id).with(operation_id).and_return(running_operation).at_least(:twice) + + expect { + bulk_operations.poll_until_complete(operation_id, poll_interval: 0.1, timeout: 0.2) + }.to raise_error(ShopifyToolkit::BulkOperations::BulkOperationError, /Polling timeout exceeded/) + end + + it "returns immediately for failed operations" do + failed_operation = running_operation.merge("status" => "FAILED", "errorCode" => "TIMEOUT") + expect(bulk_operations).to receive(:get_bulk_operation_by_id).with(operation_id).and_return(failed_operation) + + result = bulk_operations.poll_until_complete(operation_id) + + expect(result).to eq(failed_operation) + end + end + + describe "private methods" do + describe "#parse_jsonl" do + let(:valid_jsonl) do + <<~JSONL.strip + {"id": 1, "name": "Product 1"} + {"id": 2, "name": "Product 2"} + JSONL + end + + let(:invalid_jsonl) do + <<~JSONL.strip + {"id": 1, "name": "Product 1"} + {invalid json line} + {"id": 2, "name": "Product 2"} + JSONL + end + + it "parses valid JSONL content" do + results = bulk_operations.send(:parse_jsonl, valid_jsonl) + + expect(results).to be_an(Array) + expect(results.size).to eq(2) + expect(results[0]).to eq({ "id" => 1, "name" => "Product 1" }) + expect(results[1]).to eq({ "id" => 2, "name" => "Product 2" }) + end + + it "skips invalid JSON lines and logs warnings" do + expect(bulk_operations.logger).to receive(:warn).with(/Failed to parse JSONL line/) + + results = bulk_operations.send(:parse_jsonl, invalid_jsonl) + + expect(results).to be_an(Array) + expect(results.size).to eq(2) + expect(results.map { |r| r["id"] }).to eq([1, 2]) + end + + it "handles empty lines" do + jsonl_with_empty_lines = "#{valid_jsonl}\n\n\n" + + results = bulk_operations.send(:parse_jsonl, jsonl_with_empty_lines) + + expect(results.size).to eq(2) + end + end + end + + describe "error classes" do + describe ShopifyToolkit::BulkOperations::BulkOperationError do + it "stores error code and user errors" do + user_errors = [{ "field" => ["query"], "message" => "Invalid query" }] + error = ShopifyToolkit::BulkOperations::BulkOperationError.new( + "Test error", + error_code: "INVALID", + user_errors: user_errors + ) + + expect(error.message).to eq("Test error") + expect(error.error_code).to eq("INVALID") + expect(error.user_errors).to eq(user_errors) + end + end + + describe ShopifyToolkit::BulkOperations::OperationInProgressError do + it "inherits from BulkOperationError" do + error = ShopifyToolkit::BulkOperations::OperationInProgressError.new("Operation in progress") + expect(error).to be_a(ShopifyToolkit::BulkOperations::BulkOperationError) + end + end + end +end \ No newline at end of file