Upload multiple documents to DocuPanda and retrieve the results
This guide demonstrates how to upload multiple documents to DocuPanda for parsing, processing them in batches of 4 at a time. Each batch is submitted sequentially, and we wait for all documents in the batch to complete before moving on to the next.
Prerequisites
Before you begin, make sure you have:
- A DocuPanda API key
- Python 3 installed
Authentication
Every request to DocuPanda needs to include an API key. You can obtain your API key by signing up and going to your account settings page.
Step 1: Define Document Paths and API Configuration
First, specify the list of documents you want to upload, along with your API key and the DocuPanda endpoint. The documents can be any supported filetype: PDF, images, HTML, etc.
import time
import base64
import requests
API_KEY = "YOUR_API_KEY"
APP_URL = "https://app.docupanda.io"
DOC_PATHS = [
"/path/to/doc1.pdf", "/path/to/doc2.pdf", "/path/to/doc3.jpg", "/path/to/doc4.png",
"/path/to/doc5.html", "/path/to/doc6.jpeg", "/path/to/doc7.webp", "/path/to/doc8.pdf"
]
DATASET_NAME = "YOUR_DATASET_NAME"
HEADERS = {"accept": "application/json", "content-type": "application/json", "X-API-Key": API_KEY}
BATCH_SIZE = 4 # Process 4 documents at a time
Replace YOUR_API_KEY
with your actual API key and the DOC_PATHS
with your actually document file paths.
Step 2: Upload a Document
Each document is uploaded by encoding it in Base64 and sending a POST request. You can also optionally use a file URL instead. We define a function below that accepts a document file path and uploads it to DocuPanda for parsing, returning the metadata of the job.
def post_doc(doc_path):
url = f"{APP_URL}/document"
payload = {
"document": {
"file": {
"contents": base64.b64encode(open(doc_path, 'rb').read()).decode(),
"filename": doc_path.split("/")[-1]
},
},
"dataset": DATASET_NAME
}
response = requests.post(url, json=payload, headers=HEADERS)
assert response.status_code == 200
res_json = response.json()
return {"job_id": res_json["jobId"], "doc_id": res_json["documentId"], "filename": doc_path}
Step 3: Check Job Status
Since DocuPanda processes documents asynchronously, we need to track multiple job IDs at once and wait for all of them to finish. Note that instead of polling you could use Webhooks to react immediately as results become available, and avoid polling for job status. We define a function that accepts multiple job IDs and returns when all processing is done.
def is_batch_done(job_ids):
"""Check if all jobs in the list are completed or failed."""
url = f"{APP_URL}/job"
output = {job_id: "processing" for job_id in job_ids}
for _ in range(60): # Max 3 minutes (60 * 3 sec)
for job_id, status in output.items():
if status == "processing":
response = requests.get(f"{url}/{job_id}", headers=HEADERS)
assert response.status_code == 200
output[job_id] = response.json()["status"]
if all(status != "processing" for status in output.values()):
break # Exit early if all jobs are done
time.sleep(3) # Wait before next check
return output
Step 4: Retrieve Parsing Results
Once a batch of documents has been processed, we retrieve their parsed results.
def get_doc(doc_id):
"""Retrieve parsed document results from DocuPanda."""
url = f"{APP_URL}/document/{doc_id}"
response = requests.get(url, headers=HEADERS)
assert response.status_code == 200
return response.json()
Step 5: Process Documents in Batches of 4
To efficiently handle multiple documents, we upload 4 documents at a time, wait for them to finish, and then retrieve their results.
def process_batch(batch):
"""Upload a batch of documents, wait for processing, and retrieve results."""
uploaded_docs = [post_doc(doc_path) for doc_path in batch]
job_ids = [doc["job_id"] for doc in uploaded_docs]
# Wait for all documents in the batch to complete
results = is_batch_done(job_ids)
for doc in uploaded_docs:
status = results[doc["job_id"]]
if status == "completed":
doc_data = get_doc(doc["doc_id"])
print(f"✅ Document '{doc['filename']}' processed successfully")
print(f"Full text:\n{doc_data['result']['text']}")
else:
print(f"❌ Failed to process '{doc['filename']}' (Status: {status})")
Step 6: Run the Full Process
Now, we iterate through the document list in batches of 4, ensuring each batch finishes before moving to the next.
def main():
"""Processes all documents in batches of 4."""
for i in range(0, len(DOC_PATHS), BATCH_SIZE):
batch = DOC_PATHS[i:i + BATCH_SIZE]
print(f"\n🚀 Processing batch: {batch}\n")
process_batch(batch)
print("✅ Batch completed.\n")
if __name__ == '__main__':
main()
Complete Example
Here’s the full working implementation:
import time
import base64
import requests
API_KEY = "YOUR_API_KEY"
APP_URL = "https://app.docupanda.io"
DOC_PATHS = [
"/path/to/doc1.pdf", "/path/to/doc2.pdf", "/path/to/doc3.jpg", "/path/to/doc4.png",
"/path/to/doc5.html", "/path/to/doc6.jpeg", "/path/to/doc7.webp", "/path/to/doc8.pdf"
]
DATASET_NAME = "YOUR_DATASET_NAME"
HEADERS = {"accept": "application/json", "content-type": "application/json", "X-API-Key": API_KEY}
BATCH_SIZE = 4 # Process 4 documents at a time
def post_doc(doc_path):
url = f"{APP_URL}/document"
payload = {
"document": {
"file": {
"contents": base64.b64encode(open(doc_path, 'rb').read()).decode(),
"filename": doc_path.split("/")[-1]
},
},
"dataset": DATASET_NAME
}
response = requests.post(url, json=payload, headers=HEADERS)
assert response.status_code == 200
res_json = response.json()
return {"job_id": res_json["jobId"], "doc_id": res_json["documentId"], "filename": doc_path}
def is_batch_done(job_ids):
"""Check if all jobs in the list are completed or failed."""
url = f"{APP_URL}/job"
output = {job_id: "processing" for job_id in job_ids}
for _ in range(60): # Max 3 minutes (60 * 3 sec)
for job_id, status in output.items():
if status == "processing":
response = requests.get(f"{url}/{job_id}", headers=HEADERS)
assert response.status_code == 200
output[job_id] = response.json()["status"]
if all(status != "processing" for status in output.values()):
break # Exit early if all jobs are done
time.sleep(3) # Wait before next check
return output
def get_doc(doc_id):
"""Retrieve parsed document results from DocuPanda."""
url = f"{APP_URL}/document/{doc_id}"
response = requests.get(url, headers=HEADERS)
assert response.status_code == 200
return response.json()
def process_batch(batch):
"""Upload a batch of documents, wait for processing, and retrieve results."""
uploaded_docs = [post_doc(doc_path) for doc_path in batch]
job_ids = [doc["job_id"] for doc in uploaded_docs]
# Wait for all documents in the batch to complete
results = is_batch_done(job_ids)
for doc in uploaded_docs:
status = results[doc["job_id"]]
if status == "completed":
doc_data = get_doc(doc["doc_id"])
print(f"✅ Document '{doc['filename']}' processed successfully")
print(f"Full text:\n{doc_data['result']['text']}")
else:
print(f"❌ Failed to process '{doc['filename']}' (Status: {status})")
def main():
"""Processes all documents in batches of 4."""
for i in range(0, len(DOC_PATHS), BATCH_SIZE):
batch = DOC_PATHS[i:i + BATCH_SIZE]
print(f"\n🚀 Processing batch: {batch}\n")
process_batch(batch)
print("✅ Batch completed.\n")
if __name__ == '__main__':
main()
Summary
This guide walks through:
- Uploading multiple documents to DocuPanda
- Processing them in batches of 4 at a time
- Waiting for all jobs in a batch to complete before moving on
- Retrieving and displaying results only for successfully processed documents
- Handling failures gracefully and reporting status per document
This method ensures your API usage is efficient, reliable, and scalable. 🚀