Upload and Standardize Multiple

Upload multiple documents to DocuPanda and standardize them, then retrieve the results

This guide demonstrates how to upload multiple documents to DocuPanda for parsing, processing them in batches of 4 at a time. Each batch is submitted sequentially, and we wait for all documents in the batch to complete before moving on to the next. Then we submit the entire batch for standardization, and wait for that to complete. Then we print the results. In this way, we can process any number of files efficiently.

Prerequisites

Before you begin, make sure you have:

  1. A DocuPanda API key
  2. Python 3 installed

Authentication

Every request to DocuPanda needs to include an API key. You can obtain your API key by signing up and going to your account settings page.

Step 1: Define Document Paths and API Configuration

First, specify the list of documents you want to upload, along with your API key and the DocuPanda endpoint. The documents can be any supported filetype: PDF, images, HTML, etc.

import time
import base64
import requests

API_KEY = "YOUR_API_KEY"
APP_URL = "https://app.docupanda.io"
DOC_PATHS = [
    "/path/to/doc1.pdf", "/path/to/doc2.pdf", "/path/to/doc3.jpg", "/path/to/doc4.png",
    "/path/to/doc5.html", "/path/to/doc6.jpeg", "/path/to/doc7.webp", "/path/to/doc8.pdf"
]
SCHEMA_ID = "YOUR_SCHEMA_ID"
DATASET_NAME = "YOUR_DATASET_NAME"
HEADERS = {"accept": "application/json", "content-type": "application/json", "X-API-Key": API_KEY}
BATCH_SIZE = 4  # Process 4 documents at a time

Replace YOUR_API_KEY with your actual API key and the DOC_PATHS with your actually document file paths. You can also optionally set the dataset name and schema ID. Note that if Schema ID is set to None or left out, the AI will improvise a different schema for each document.

Step 2: Upload a Document

Each document is uploaded by encoding it in Base64 and sending a POST request. You can also optionally use a file URL instead. We define a function below that accepts a document file path and uploads it to DocuPanda for parsing, returning the metadata of the job.

def post_doc(doc_path):
    url = f"{APP_URL}/document"
    payload = {
        "document": {
            "file": {
                "contents": base64.b64encode(open(doc_path, 'rb').read()).decode(),
                "filename": doc_path.split("/")[-1]
            },
        },
        "dataset": DATASET_NAME
    }
    response = requests.post(url, json=payload, headers=HEADERS)
    assert response.status_code == 200
    res_json = response.json()
    return {"job_id": res_json["jobId"], "doc_id": res_json["documentId"], "filename": doc_path}

Step 3: Check Job Status

Since DocuPanda processes documents asynchronously, we need to track multiple job IDs at once and wait for all of them to finish. Note that instead of polling you could use Webhooks to react immediately as results become available, and avoid polling for job status. We define a function that accepts multiple job IDs and returns when all processing is done.

def is_batch_done(job_ids):
    """Check if all jobs in the list are completed or failed."""
    url = f"{APP_URL}/job"
    output = {job_id: "processing" for job_id in job_ids}

    for _ in range(60):  # Max 3 minutes (60 * 3 sec)
        for job_id, status in output.items():
            if status == "processing":
                response = requests.get(f"{url}/{job_id}", headers=HEADERS)
                assert response.status_code == 200
                output[job_id] = response.json()["status"]

        if all(status != "processing" for status in output.values()):
            break  # Exit early if all jobs are done
        
        time.sleep(3)  # Wait before next check
    
    return output

Step 4: Standardize the batch

Once a batch of documents has been processed, we can send the entire batch all together to be standardized with the /batch endpoint:

def standardize_batch(doc_ids):
    """Standardize a batch of documents."""
    url = f"{APP_URL}/v2/standardize/batch"
    payload = {"schemaId": SCHEMA_ID, "documentIds": doc_ids}
    response = requests.post(url, json=payload, headers=HEADERS)
    assert response.status_code == 200
    res_json = response.json()
    return {"jobId": res_json["jobId"], "standardizationIds": res_json["standardizationIds"]}

Step 5: Wait for completion

We can now ping the jobs endpoint to wait for the batch job to complete. As stated before, you can also use webhooks for this.

def is_job_done(job_id):
    """Check if a job is completed or failed."""
    url = f"{APP_URL}/job/{job_id}"
    for _ in range(60):
        response = requests.get(url, headers=HEADERS)
        assert response.status_code == 200
        res_json = response.json()
        status = res_json["status"]
        if status == "completed":
            return True
        if status == "error":
            return False
        time.sleep(5)
    return False

s guide demonstrates how to upload multiple documents to DocuPanda for parsing, processing them in batches of 4 at a time. Each batch is submitted sequentially, and we wait for all documents in the batch to complete before moving on to the next. Then we submit the entire batch for standardization, and wait for that to complete. Then we print the results. In this way, we can process any number of files efficiently.

Prerequisites

Before you begin, make sure you have:

  1. A DocuPanda API key
  2. Python 3 installed

Authentication

Every request to DocuPanda needs to include an API key. You can obtain your API key by signing up and going to your account settings page.

Step 1: Define Document Paths and API Configuration

First, specify the list of documents you want to upload, along with your API key and the DocuPanda endpoint. The documents can be any supported filetype: PDF, images, HTML, etc.

import time
import base64
import requests

API_KEY = "YOUR_API_KEY"
APP_URL = "https://app.docupanda.io"
DOC_PATHS = [
    "/path/to/doc1.pdf", "/path/to/doc2.pdf", "/path/to/doc3.jpg", "/path/to/doc4.png",
    "/path/to/doc5.html", "/path/to/doc6.jpeg", "/path/to/doc7.webp", "/path/to/doc8.pdf"
]
SCHEMA_ID = "YOUR_SCHEMA_ID"
DATASET_NAME = "YOUR_DATASET_NAME"
HEADERS = {"accept": "application/json", "content-type": "application/json", "X-API-Key": API_KEY}
BATCH_SIZE = 4  # Process 4 documents at a time

Replace YOUR_API_KEY with your actual API key and the DOC_PATHS with your actually document file paths. You can also optionally set the dataset name and schema ID. Note that if Schema ID is set to None or left out, the AI will improvise a different schema for each document.

Step 2: Upload a Document

Each document is uploaded by encoding it in Base64 and sending a POST request. You can also optionally use a file URL instead. We define a function below that accepts a document file path and uploads it to DocuPanda for parsing, returning the metadata of the job.

def post_doc(doc_path):
    url = f"{APP_URL}/document"
    payload = {
        "document": {
            "file": {
                "contents": base64.b64encode(open(doc_path, 'rb').read()).decode(),
                "filename": doc_path.split("/")[-1]
            },
        },
        "dataset": DATASET_NAME
    }
    response = requests.post(url, json=payload, headers=HEADERS)
    assert response.status_code == 200
    res_json = response.json()
    return {"job_id": res_json["jobId"], "doc_id": res_json["documentId"], "filename": doc_path}

Step 3: Check Job Status

Since DocuPanda processes documents asynchronously, we need to track multiple job IDs at once and wait for all of them to finish. Note that instead of polling you could use Webhooks to react immediately as results become available, and avoid polling for job status. We define a function that accepts multiple job IDs and returns when all processing is done.

def is_batch_done(job_ids):
    """Check if all jobs in the list are completed or failed."""
    url = f"{APP_URL}/job"
    output = {job_id: "processing" for job_id in job_ids}

    for _ in range(60):  # Max 3 minutes (60 * 3 sec)
        for job_id, status in output.items():
            if status == "processing":
                response = requests.get(f"{url}/{job_id}", headers=HEADERS)
                assert response.status_code == 200
                output[job_id] = response.json()["status"]

        if all(status != "processing" for status in output.values()):
            break  # Exit early if all jobs are done
        
        time.sleep(3)  # Wait before next check
    
    return output

Step 4: Standardize the batch

Once a batch of documents has been processed, we can send the entire batch all together to be standardized with the /batch endpoint:

def standardize_batch(doc_ids):
    """Standardize a batch of documents."""
    url = f"{APP_URL}/v2/standardize/batch"
    payload = {"schemaId": SCHEMA_ID, "documentIds": doc_ids}
    response = requests.post(url, json=payload, headers=HEADERS)
    assert response.status_code == 200
    res_json = response.json()
    return {"jobId": res_json["jobId"], "standardizationIds": res_json["standardizationIds"]}

Step 5: Wait for completion

We can now ping the jobs endpoint to wait for the batch job to complete. As stated before, you can also use webhooks for this.

def is_job_done(job_id):
    """Check if a job is completed or failed."""
    url = f"{APP_URL}/job/{job_id}"
    for _ in range(60):
        response = requests.get(url, headers=HEADERS)
        assert response.status_code == 200
        res_json = response.json()
        status = res_json["status"]
        if status == "completed":
            return True
        if status == "error":
            return False
        time.sleep(5)
    return False

Step 5: Wait for completion

Once the batch has been standardized, we can retrieve each individual standardization as follows:

def is_job_done(job_id):
    """Check if a job is completed or failed."""
    url = f"{APP_URL}/job/{job_id}"
    for _ in range(60):
        response = requests.get(url, headers=HEADERS)
        assert response.status_code == 200
        res_json = response.json()
        status = res_json["status"]
        if status == "completed":
            return True
        if status == "error":
            return False
        time.sleep(5)
    return False

Step 6: Retrieve Standardization Results

To efficiently handle multiple documents, we upload 4 documents at a time, wait for them to finish, and then retrieve their results.

def get_std(std_id):
    """Retrieve standardized document results from DocuPanda."""
    url = f"{APP_URL}/standardization/{std_id}"
    response = requests.get(url, headers=HEADERS)
    if response.status_code == 200:
        return response.json()
    return None

Step 7: Process in Batches

To efficiently handle multiple documents, we upload 4 documents at a time, wait for them to finish, then standardize them at once, wait for that to finish, and then fetch the results

def process_batch(batch):
    """Upload a batch of documents, wait for processing, and retrieve results."""
    uploaded_docs = [post_doc(doc_path) for doc_path in batch]
    parse_job_ids = [doc["job_id"] for doc in uploaded_docs]

    # Wait for all documents in the batch to complete parsing
    parse_results = is_batch_done(parse_job_ids)
    doc_ids = [doc["doc_id"] for doc in uploaded_docs if parse_results[doc["job_id"]] == "completed"]

    # Standardize the documents and wait for the job to complete
    batch_results = standardize_batch(doc_ids)
    done = is_job_done(batch_results["jobId"])
    if not done:
        print("❌ Standardization job failed to complete")
        return

    # Retrieve and print results for each standardized document
    for std_id in batch_results["standardizationIds"]:
        std_data = get_std(std_id)
        if std_data is None:
            print(f"❌ Document '{std_id}' failed to standardize")
        else:
            print(f"✅ Document '{std_data['documentId']}' standardized successfully")
            print(f"Standardized data: {std_data['data']}")

Step 8: Run the Full Process

Now, we iterate through the document list in batches of 4, ensuring each batch finishes before moving to the next.

def main():
    """Processes all documents in batches of 4."""
    for i in range(0, len(DOC_PATHS), BATCH_SIZE):
        batch = DOC_PATHS[i:i + BATCH_SIZE]
        print(f"\n🚀 Processing batch: {batch}\n")
        process_batch(batch)
        print("✅ Batch completed.\n")


if __name__ == '__main__':
    main()

Complete Example

Here’s the full working implementation:

"""Upload and standardize multiple documents with DocuPanda and retrieve results in batches.
"""
import time
import base64
import requests

API_KEY = "YOUR_API_KEY"
APP_URL = "https://app.docupanda.io"
DOC_PATHS = [
    "/path/to/doc1.pdf", "/path/to/doc2.pdf", "/path/to/doc3.jpg", "/path/to/doc4.png",
    "/path/to/doc5.html", "/path/to/doc6.jpeg", "/path/to/doc7.webp", "/path/to/doc8.pdf"
]
SCHEMA_ID = "YOUR_SCHEMA_ID"
DATASET_NAME = "YOUR_DATASET_NAME"
HEADERS = {"accept": "application/json", "content-type": "application/json", "X-API-Key": API_KEY}
BATCH_SIZE = 4  # Process 4 documents at a time


def post_doc(doc_path):
    url = f"{APP_URL}/document"
    payload = {
        "document": {
            "file": {
                "contents": base64.b64encode(open(doc_path, 'rb').read()).decode(),
                "filename": doc_path.split("/")[-1]
            },
        },
        "dataset": DATASET_NAME
    }
    response = requests.post(url, json=payload, headers=HEADERS)
    assert response.status_code == 200
    res_json = response.json()
    return {"job_id": res_json["jobId"], "doc_id": res_json["documentId"], "filename": doc_path}


def is_batch_done(job_ids):
    """Check if all jobs in the list are completed or failed."""
    url = f"{APP_URL}/job"
    output = {job_id: "processing" for job_id in job_ids}

    for _ in range(60):  # Max 3 minutes (60 * 3 sec)
        for job_id, status in output.items():
            if status == "processing":
                response = requests.get(f"{url}/{job_id}", headers=HEADERS)
                assert response.status_code == 200
                output[job_id] = response.json()["status"]

        if all(status != "processing" for status in output.values()):
            break  # Exit early if all jobs are done

        time.sleep(3)  # Wait before next check

    return output


def standardize_batch(doc_ids):
    """Standardize a batch of documents."""
    url = f"{APP_URL}/v2/standardize/batch"
    payload = {"schemaId": SCHEMA_ID, "documentIds": doc_ids}
    response = requests.post(url, json=payload, headers=HEADERS)
    assert response.status_code == 200
    res_json = response.json()
    return {"jobId": res_json["jobId"], "standardizationIds": res_json["standardizationIds"]}


def is_job_done(job_id):
    """Check if a job is completed or failed."""
    url = f"{APP_URL}/job/{job_id}"
    for _ in range(60):
        response = requests.get(url, headers=HEADERS)
        assert response.status_code == 200
        res_json = response.json()
        status = res_json["status"]
        if status == "completed":
            return True
        if status == "error":
            return False
        time.sleep(5)
    return False


def get_std(std_id):
    """Retrieve standardized document results from DocuPanda."""
    url = f"{APP_URL}/standardization/{std_id}"
    response = requests.get(url, headers=HEADERS)
    if response.status_code == 200:
        return response.json()
    return None


def process_batch(batch):
    """Upload a batch of documents, wait for processing, and retrieve results."""
    uploaded_docs = [post_doc(doc_path) for doc_path in batch]
    parse_job_ids = [doc["job_id"] for doc in uploaded_docs]

    # Wait for all documents in the batch to complete parsing
    parse_results = is_batch_done(parse_job_ids)
    doc_ids = [doc["doc_id"] for doc in uploaded_docs if parse_results[doc["job_id"]] == "completed"]

    # Standardize the documents and wait for the job to complete
    batch_results = standardize_batch(doc_ids)
    done = is_job_done(batch_results["jobId"])
    if not done:
        print("❌ Standardization job failed to complete")
        return

    # Retrieve and print results for each standardized document
    for std_id in batch_results["standardizationIds"]:
        std_data = get_std(std_id)
        if std_data is None:
            print(f"❌ Document '{std_id}' failed to standardize")
        else:
            print(f"✅ Document '{std_data['documentId']}' standardized successfully")
            print(f"Standardized data: {std_data['data']}")


def main():
    """Processes all documents in batches of 4."""
    for i in range(0, len(DOC_PATHS), BATCH_SIZE):
        batch = DOC_PATHS[i:i + BATCH_SIZE]
        print(f"\n🚀 Processing batch: {batch}\n")
        process_batch(batch)
        print("✅ Batch completed.\n")


if __name__ == '__main__':
    main()

Summary

This guide walks through:

  • Uploading multiple documents to DocuPanda and then standardizing them
  • Processing them in batches of 4 at a time
  • Standardizing the entire batch at once
  • Waiting for all jobs to complete before moving on
  • Retrieving and displaying results only for successfully standardized documents
  • Handling failures gracefully

This method ensures your API usage is efficient, reliable, and scalable. 🚀