Upload Multiple

Upload multiple documents to DocuPanda and retrieve the results

This guide demonstrates how to upload multiple documents to DocuPanda for parsing, processing them in batches of 4 at a time. Each batch is submitted sequentially, and we wait for all documents in the batch to complete before moving on to the next.

Prerequisites

Before you begin, make sure you have:

  1. A DocuPanda API key
  2. Python 3 installed

Authentication

Every request to DocuPanda needs to include an API key. You can obtain your API key by signing up and going to your account settings page.

Step 1: Define Document Paths and API Configuration

First, specify the list of documents you want to upload, along with your API key and the DocuPanda endpoint. The documents can be any supported filetype: PDF, images, HTML, etc.

import time import base64 import requests API_KEY = "YOUR_API_KEY" APP_URL = "https://app.docupanda.io" DOC_PATHS = [ "/path/to/doc1.pdf", "/path/to/doc2.pdf", "/path/to/doc3.jpg", "/path/to/doc4.png", "/path/to/doc5.html", "/path/to/doc6.jpeg", "/path/to/doc7.webp", "/path/to/doc8.pdf" ] DATASET_NAME = "YOUR_DATASET_NAME" HEADERS = {"accept": "application/json", "content-type": "application/json", "X-API-Key": API_KEY} BATCH_SIZE = 4 # Process 4 documents at a time

Replace YOUR_API_KEY with your actual API key and the DOC_PATHS with your actually document file paths.

Step 2: Upload a Document

Each document is uploaded by encoding it in Base64 and sending a POST request. You can also optionally use a file URL instead. We define a function below that accepts a document file path and uploads it to DocuPanda for parsing, returning the metadata of the job.

def post_doc(doc_path): url = f"{APP_URL}/document" payload = { "document": { "file": { "contents": base64.b64encode(open(doc_path, 'rb').read()).decode(), "filename": doc_path.split("/")[-1] }, }, "dataset": DATASET_NAME } response = requests.post(url, json=payload, headers=HEADERS) assert response.status_code == 200 res_json = response.json() return {"job_id": res_json["jobId"], "doc_id": res_json["documentId"], "filename": doc_path}

Step 3: Check Job Status

Since DocuPanda processes documents asynchronously, we need to track multiple job IDs at once and wait for all of them to finish. Note that instead of polling you could use Webhooks to react immediately as results become available, and avoid polling for job status. We define a function that accepts multiple job IDs and returns when all processing is done.

def is_batch_done(job_ids): """Check if all jobs in the list are completed or failed.""" url = f"{APP_URL}/job" output = {job_id: "processing" for job_id in job_ids} for _ in range(60): # Max 3 minutes (60 * 3 sec) for job_id, status in output.items(): if status == "processing": response = requests.get(f"{url}/{job_id}", headers=HEADERS) assert response.status_code == 200 output[job_id] = response.json()["status"] if all(status != "processing" for status in output.values()): break # Exit early if all jobs are done time.sleep(3) # Wait before next check return output

Step 4: Retrieve Parsing Results

Once a batch of documents has been processed, we retrieve their parsed results.

def get_doc(doc_id): """Retrieve parsed document results from DocuPanda.""" url = f"{APP_URL}/document/{doc_id}" response = requests.get(url, headers=HEADERS) assert response.status_code == 200 return response.json()

Step 5: Process Documents in Batches of 4

To efficiently handle multiple documents, we upload 4 documents at a time, wait for them to finish, and then retrieve their results.

def process_batch(batch): """Upload a batch of documents, wait for processing, and retrieve results.""" uploaded_docs = [post_doc(doc_path) for doc_path in batch] job_ids = [doc["job_id"] for doc in uploaded_docs] # Wait for all documents in the batch to complete results = is_batch_done(job_ids) for doc in uploaded_docs: status = results[doc["job_id"]] if status == "completed": doc_data = get_doc(doc["doc_id"]) print(f"✅ Document '{doc['filename']}' processed successfully") print(f"Full text:\n{doc_data['result']['text']}") else: print(f"❌ Failed to process '{doc['filename']}' (Status: {status})")

Step 6: Run the Full Process

Now, we iterate through the document list in batches of 4, ensuring each batch finishes before moving to the next.

def main(): """Processes all documents in batches of 4.""" for i in range(0, len(DOC_PATHS), BATCH_SIZE): batch = DOC_PATHS[i:i + BATCH_SIZE] print(f"\n🚀 Processing batch: {batch}\n") process_batch(batch) print("✅ Batch completed.\n") if __name__ == '__main__': main()

Complete Example

Here’s the full working implementation:

import time import base64 import requests API_KEY = "YOUR_API_KEY" APP_URL = "https://app.docupanda.io" DOC_PATHS = [ "/path/to/doc1.pdf", "/path/to/doc2.pdf", "/path/to/doc3.jpg", "/path/to/doc4.png", "/path/to/doc5.html", "/path/to/doc6.jpeg", "/path/to/doc7.webp", "/path/to/doc8.pdf" ] DATASET_NAME = "YOUR_DATASET_NAME" HEADERS = {"accept": "application/json", "content-type": "application/json", "X-API-Key": API_KEY} BATCH_SIZE = 4 # Process 4 documents at a time def post_doc(doc_path): url = f"{APP_URL}/document" payload = { "document": { "file": { "contents": base64.b64encode(open(doc_path, 'rb').read()).decode(), "filename": doc_path.split("/")[-1] }, }, "dataset": DATASET_NAME } response = requests.post(url, json=payload, headers=HEADERS) assert response.status_code == 200 res_json = response.json() return {"job_id": res_json["jobId"], "doc_id": res_json["documentId"], "filename": doc_path} def is_batch_done(job_ids): """Check if all jobs in the list are completed or failed.""" url = f"{APP_URL}/job" output = {job_id: "processing" for job_id in job_ids} for _ in range(60): # Max 3 minutes (60 * 3 sec) for job_id, status in output.items(): if status == "processing": response = requests.get(f"{url}/{job_id}", headers=HEADERS) assert response.status_code == 200 output[job_id] = response.json()["status"] if all(status != "processing" for status in output.values()): break # Exit early if all jobs are done time.sleep(3) # Wait before next check return output def get_doc(doc_id): """Retrieve parsed document results from DocuPanda.""" url = f"{APP_URL}/document/{doc_id}" response = requests.get(url, headers=HEADERS) assert response.status_code == 200 return response.json() def process_batch(batch): """Upload a batch of documents, wait for processing, and retrieve results.""" uploaded_docs = [post_doc(doc_path) for doc_path in batch] job_ids = [doc["job_id"] for doc in uploaded_docs] # Wait for all documents in the batch to complete results = is_batch_done(job_ids) for doc in uploaded_docs: status = results[doc["job_id"]] if status == "completed": doc_data = get_doc(doc["doc_id"]) print(f"✅ Document '{doc['filename']}' processed successfully") print(f"Full text:\n{doc_data['result']['text']}") else: print(f"❌ Failed to process '{doc['filename']}' (Status: {status})") def main(): """Processes all documents in batches of 4.""" for i in range(0, len(DOC_PATHS), BATCH_SIZE): batch = DOC_PATHS[i:i + BATCH_SIZE] print(f"\n🚀 Processing batch: {batch}\n") process_batch(batch) print("✅ Batch completed.\n") if __name__ == '__main__': main()

Summary

This guide walks through:

  • Uploading multiple documents to DocuPanda
  • Processing them in batches of 4 at a time
  • Waiting for all jobs in a batch to complete before moving on
  • Retrieving and displaying results only for successfully processed documents
  • Handling failures gracefully and reporting status per document

This method ensures your API usage is efficient, reliable, and scalable. 🚀