Upload Multiple

Upload multiple documents to DocuPanda and retrieve the results

This guide demonstrates how to upload multiple documents to DocuPanda for parsing, processing them in batches of 4 at a time. Each batch is submitted sequentially, and we wait for all documents in the batch to complete before moving on to the next.

Prerequisites

Before you begin, make sure you have:

  1. A DocuPanda API key
  2. Python 3 installed

Authentication

Every request to DocuPanda needs to include an API key. You can obtain your API key by signing up and going to your account settings page.

Step 1: Define Document Paths and API Configuration

First, specify the list of documents you want to upload, along with your API key and the DocuPanda endpoint. The documents can be any supported filetype: PDF, images, HTML, etc.

import time
import base64
import requests

API_KEY = "YOUR_API_KEY"
APP_URL = "https://app.docupanda.io"
DOC_PATHS = [
    "/path/to/doc1.pdf", "/path/to/doc2.pdf", "/path/to/doc3.jpg", "/path/to/doc4.png",
    "/path/to/doc5.html", "/path/to/doc6.jpeg", "/path/to/doc7.webp", "/path/to/doc8.pdf"
]
DATASET_NAME = "YOUR_DATASET_NAME"
HEADERS = {"accept": "application/json", "content-type": "application/json", "X-API-Key": API_KEY}
BATCH_SIZE = 4  # Process 4 documents at a time

Replace YOUR_API_KEY with your actual API key and the DOC_PATHS with your actually document file paths.

Step 2: Upload a Document

Each document is uploaded by encoding it in Base64 and sending a POST request. You can also optionally use a file URL instead. We define a function below that accepts a document file path and uploads it to DocuPanda for parsing, returning the metadata of the job.

def post_doc(doc_path):
    url = f"{APP_URL}/document"
    payload = {
        "document": {
            "file": {
                "contents": base64.b64encode(open(doc_path, 'rb').read()).decode(),
                "filename": doc_path.split("/")[-1]
            },
        },
        "dataset": DATASET_NAME
    }
    response = requests.post(url, json=payload, headers=HEADERS)
    assert response.status_code == 200
    res_json = response.json()
    return {"job_id": res_json["jobId"], "doc_id": res_json["documentId"], "filename": doc_path}

Step 3: Check Job Status

Since DocuPanda processes documents asynchronously, we need to track multiple job IDs at once and wait for all of them to finish. Note that instead of polling you could use Webhooks to react immediately as results become available, and avoid polling for job status. We define a function that accepts multiple job IDs and returns when all processing is done.

def is_batch_done(job_ids):
    """Check if all jobs in the list are completed or failed."""
    url = f"{APP_URL}/job"
    output = {job_id: "processing" for job_id in job_ids}

    for _ in range(60):  # Max 3 minutes (60 * 3 sec)
        for job_id, status in output.items():
            if status == "processing":
                response = requests.get(f"{url}/{job_id}", headers=HEADERS)
                assert response.status_code == 200
                output[job_id] = response.json()["status"]

        if all(status != "processing" for status in output.values()):
            break  # Exit early if all jobs are done
        
        time.sleep(3)  # Wait before next check
    
    return output

Step 4: Retrieve Parsing Results

Once a batch of documents has been processed, we retrieve their parsed results.

def get_doc(doc_id):
    """Retrieve parsed document results from DocuPanda."""
    url = f"{APP_URL}/document/{doc_id}"
    response = requests.get(url, headers=HEADERS)
    assert response.status_code == 200
    return response.json()

Step 5: Process Documents in Batches of 4

To efficiently handle multiple documents, we upload 4 documents at a time, wait for them to finish, and then retrieve their results.

def process_batch(batch):
    """Upload a batch of documents, wait for processing, and retrieve results."""
    uploaded_docs = [post_doc(doc_path) for doc_path in batch]
    job_ids = [doc["job_id"] for doc in uploaded_docs]

    # Wait for all documents in the batch to complete
    results = is_batch_done(job_ids)

    for doc in uploaded_docs:
        status = results[doc["job_id"]]
        if status == "completed":
            doc_data = get_doc(doc["doc_id"])
            print(f"✅ Document '{doc['filename']}' processed successfully")
            print(f"Full text:\n{doc_data['result']['text']}")
        else:
            print(f"❌ Failed to process '{doc['filename']}' (Status: {status})")

Step 6: Run the Full Process

Now, we iterate through the document list in batches of 4, ensuring each batch finishes before moving to the next.

def main():
    """Processes all documents in batches of 4."""
    for i in range(0, len(DOC_PATHS), BATCH_SIZE):
        batch = DOC_PATHS[i:i + BATCH_SIZE]
        print(f"\n🚀 Processing batch: {batch}\n")
        process_batch(batch)
        print("✅ Batch completed.\n")

if __name__ == '__main__':
    main()

Complete Example

Here’s the full working implementation:

import time
import base64
import requests

API_KEY = "YOUR_API_KEY"
APP_URL = "https://app.docupanda.io"
DOC_PATHS = [
    "/path/to/doc1.pdf", "/path/to/doc2.pdf", "/path/to/doc3.jpg", "/path/to/doc4.png",
    "/path/to/doc5.html", "/path/to/doc6.jpeg", "/path/to/doc7.webp", "/path/to/doc8.pdf"
]
DATASET_NAME = "YOUR_DATASET_NAME"
HEADERS = {"accept": "application/json", "content-type": "application/json", "X-API-Key": API_KEY}
BATCH_SIZE = 4  # Process 4 documents at a time

def post_doc(doc_path):
    url = f"{APP_URL}/document"
    payload = {
        "document": {
            "file": {
                "contents": base64.b64encode(open(doc_path, 'rb').read()).decode(),
                "filename": doc_path.split("/")[-1]
            },
        },
        "dataset": DATASET_NAME
    }
    response = requests.post(url, json=payload, headers=HEADERS)
    assert response.status_code == 200
    res_json = response.json()
    return {"job_id": res_json["jobId"], "doc_id": res_json["documentId"], "filename": doc_path}

def is_batch_done(job_ids):
    """Check if all jobs in the list are completed or failed."""
    url = f"{APP_URL}/job"
    output = {job_id: "processing" for job_id in job_ids}

    for _ in range(60):  # Max 3 minutes (60 * 3 sec)
        for job_id, status in output.items():
            if status == "processing":
                response = requests.get(f"{url}/{job_id}", headers=HEADERS)
                assert response.status_code == 200
                output[job_id] = response.json()["status"]

        if all(status != "processing" for status in output.values()):
            break  # Exit early if all jobs are done
        
        time.sleep(3)  # Wait before next check
    
    return output
  
def get_doc(doc_id):
    """Retrieve parsed document results from DocuPanda."""
    url = f"{APP_URL}/document/{doc_id}"
    response = requests.get(url, headers=HEADERS)
    assert response.status_code == 200
    return response.json()

def process_batch(batch):
    """Upload a batch of documents, wait for processing, and retrieve results."""
    uploaded_docs = [post_doc(doc_path) for doc_path in batch]
    job_ids = [doc["job_id"] for doc in uploaded_docs]

    # Wait for all documents in the batch to complete
    results = is_batch_done(job_ids)

    for doc in uploaded_docs:
        status = results[doc["job_id"]]
        if status == "completed":
            doc_data = get_doc(doc["doc_id"])
            print(f"✅ Document '{doc['filename']}' processed successfully")
            print(f"Full text:\n{doc_data['result']['text']}")
        else:
            print(f"❌ Failed to process '{doc['filename']}' (Status: {status})")

def main():
    """Processes all documents in batches of 4."""
    for i in range(0, len(DOC_PATHS), BATCH_SIZE):
        batch = DOC_PATHS[i:i + BATCH_SIZE]
        print(f"\n🚀 Processing batch: {batch}\n")
        process_batch(batch)
        print("✅ Batch completed.\n")

if __name__ == '__main__':
    main()

Summary

This guide walks through:

  • Uploading multiple documents to DocuPanda
  • Processing them in batches of 4 at a time
  • Waiting for all jobs in a batch to complete before moving on
  • Retrieving and displaying results only for successfully processed documents
  • Handling failures gracefully and reporting status per document

This method ensures your API usage is efficient, reliable, and scalable. 🚀