Upload multiple documents to DocuPanda and standardize them, then retrieve the results
This guide demonstrates how to upload multiple documents to DocuPanda for parsing, processing them in batches of 4 at a time. Each batch is submitted sequentially, and we wait for all documents in the batch to complete before moving on to the next. Then we submit the entire batch for standardization, and wait for that to complete. Then we print the results. In this way, we can process any number of files efficiently.
Prerequisites
Before you begin, make sure you have:
- A DocuPanda API key
- Python 3 installed
Authentication
Every request to DocuPanda needs to include an API key. You can obtain your API key by signing up and going to your account settings page.
Step 1: Define Document Paths and API Configuration
First, specify the list of documents you want to upload, along with your API key and the DocuPanda endpoint. The documents can be any supported filetype: PDF, images, HTML, etc.
import time
import base64
import requests
API_KEY = "YOUR_API_KEY"
APP_URL = "https://app.docupanda.io"
DOC_PATHS = [
"/path/to/doc1.pdf", "/path/to/doc2.pdf", "/path/to/doc3.jpg", "/path/to/doc4.png",
"/path/to/doc5.html", "/path/to/doc6.jpeg", "/path/to/doc7.webp", "/path/to/doc8.pdf"
]
SCHEMA_ID = "YOUR_SCHEMA_ID"
DATASET_NAME = "YOUR_DATASET_NAME"
HEADERS = {"accept": "application/json", "content-type": "application/json", "X-API-Key": API_KEY}
BATCH_SIZE = 4 # Process 4 documents at a time
Replace YOUR_API_KEY
with your actual API key and the DOC_PATHS
with your actually document file paths. You can also optionally set the dataset name and schema ID. Note that if Schema ID is set to None
or left out, the AI will improvise a different schema for each document.
Step 2: Upload a Document
Each document is uploaded by encoding it in Base64 and sending a POST request. You can also optionally use a file URL instead. We define a function below that accepts a document file path and uploads it to DocuPanda for parsing, returning the metadata of the job.
def post_doc(doc_path):
url = f"{APP_URL}/document"
payload = {
"document": {
"file": {
"contents": base64.b64encode(open(doc_path, 'rb').read()).decode(),
"filename": doc_path.split("/")[-1]
},
},
"dataset": DATASET_NAME
}
response = requests.post(url, json=payload, headers=HEADERS)
assert response.status_code == 200
res_json = response.json()
return {"job_id": res_json["jobId"], "doc_id": res_json["documentId"], "filename": doc_path}
Step 3: Check Job Status
Since DocuPanda processes documents asynchronously, we need to track multiple job IDs at once and wait for all of them to finish. Note that instead of polling you could use Webhooks to react immediately as results become available, and avoid polling for job status. We define a function that accepts multiple job IDs and returns when all processing is done.
def is_batch_done(job_ids):
"""Check if all jobs in the list are completed or failed."""
url = f"{APP_URL}/job"
output = {job_id: "processing" for job_id in job_ids}
for _ in range(60): # Max 3 minutes (60 * 3 sec)
for job_id, status in output.items():
if status == "processing":
response = requests.get(f"{url}/{job_id}", headers=HEADERS)
assert response.status_code == 200
output[job_id] = response.json()["status"]
if all(status != "processing" for status in output.values()):
break # Exit early if all jobs are done
time.sleep(3) # Wait before next check
return output
Step 4: Standardize the batch
Once a batch of documents has been processed, we can send the entire batch all together to be standardized with the /batch endpoint:
def standardize_batch(doc_ids):
"""Standardize a batch of documents."""
url = f"{APP_URL}/v2/standardize/batch"
payload = {"schemaId": SCHEMA_ID, "documentIds": doc_ids}
response = requests.post(url, json=payload, headers=HEADERS)
assert response.status_code == 200
res_json = response.json()
return {"jobId": res_json["jobId"], "standardizationIds": res_json["standardizationIds"]}
Step 5: Wait for completion
We can now ping the jobs endpoint to wait for the batch job to complete. As stated before, you can also use webhooks for this.
def is_job_done(job_id):
"""Check if a job is completed or failed."""
url = f"{APP_URL}/job/{job_id}"
for _ in range(60):
response = requests.get(url, headers=HEADERS)
assert response.status_code == 200
res_json = response.json()
status = res_json["status"]
if status == "completed":
return True
if status == "error":
return False
time.sleep(5)
return False
s guide demonstrates how to upload multiple documents to DocuPanda for parsing, processing them in batches of 4 at a time. Each batch is submitted sequentially, and we wait for all documents in the batch to complete before moving on to the next. Then we submit the entire batch for standardization, and wait for that to complete. Then we print the results. In this way, we can process any number of files efficiently.
Prerequisites
Before you begin, make sure you have:
- A DocuPanda API key
- Python 3 installed
Authentication
Every request to DocuPanda needs to include an API key. You can obtain your API key by signing up and going to your account settings page.
Step 1: Define Document Paths and API Configuration
First, specify the list of documents you want to upload, along with your API key and the DocuPanda endpoint. The documents can be any supported filetype: PDF, images, HTML, etc.
import time
import base64
import requests
API_KEY = "YOUR_API_KEY"
APP_URL = "https://app.docupanda.io"
DOC_PATHS = [
"/path/to/doc1.pdf", "/path/to/doc2.pdf", "/path/to/doc3.jpg", "/path/to/doc4.png",
"/path/to/doc5.html", "/path/to/doc6.jpeg", "/path/to/doc7.webp", "/path/to/doc8.pdf"
]
SCHEMA_ID = "YOUR_SCHEMA_ID"
DATASET_NAME = "YOUR_DATASET_NAME"
HEADERS = {"accept": "application/json", "content-type": "application/json", "X-API-Key": API_KEY}
BATCH_SIZE = 4 # Process 4 documents at a time
Replace YOUR_API_KEY
with your actual API key and the DOC_PATHS
with your actually document file paths. You can also optionally set the dataset name and schema ID. Note that if Schema ID is set to None
or left out, the AI will improvise a different schema for each document.
Step 2: Upload a Document
Each document is uploaded by encoding it in Base64 and sending a POST request. You can also optionally use a file URL instead. We define a function below that accepts a document file path and uploads it to DocuPanda for parsing, returning the metadata of the job.
def post_doc(doc_path):
url = f"{APP_URL}/document"
payload = {
"document": {
"file": {
"contents": base64.b64encode(open(doc_path, 'rb').read()).decode(),
"filename": doc_path.split("/")[-1]
},
},
"dataset": DATASET_NAME
}
response = requests.post(url, json=payload, headers=HEADERS)
assert response.status_code == 200
res_json = response.json()
return {"job_id": res_json["jobId"], "doc_id": res_json["documentId"], "filename": doc_path}
Step 3: Check Job Status
Since DocuPanda processes documents asynchronously, we need to track multiple job IDs at once and wait for all of them to finish. Note that instead of polling you could use Webhooks to react immediately as results become available, and avoid polling for job status. We define a function that accepts multiple job IDs and returns when all processing is done.
def is_batch_done(job_ids):
"""Check if all jobs in the list are completed or failed."""
url = f"{APP_URL}/job"
output = {job_id: "processing" for job_id in job_ids}
for _ in range(60): # Max 3 minutes (60 * 3 sec)
for job_id, status in output.items():
if status == "processing":
response = requests.get(f"{url}/{job_id}", headers=HEADERS)
assert response.status_code == 200
output[job_id] = response.json()["status"]
if all(status != "processing" for status in output.values()):
break # Exit early if all jobs are done
time.sleep(3) # Wait before next check
return output
Step 4: Standardize the batch
Once a batch of documents has been processed, we can send the entire batch all together to be standardized with the /batch endpoint:
def standardize_batch(doc_ids):
"""Standardize a batch of documents."""
url = f"{APP_URL}/v2/standardize/batch"
payload = {"schemaId": SCHEMA_ID, "documentIds": doc_ids}
response = requests.post(url, json=payload, headers=HEADERS)
assert response.status_code == 200
res_json = response.json()
return {"jobId": res_json["jobId"], "standardizationIds": res_json["standardizationIds"]}
Step 5: Wait for completion
We can now ping the jobs endpoint to wait for the batch job to complete. As stated before, you can also use webhooks for this.
def is_job_done(job_id):
"""Check if a job is completed or failed."""
url = f"{APP_URL}/job/{job_id}"
for _ in range(60):
response = requests.get(url, headers=HEADERS)
assert response.status_code == 200
res_json = response.json()
status = res_json["status"]
if status == "completed":
return True
if status == "error":
return False
time.sleep(5)
return False
Step 5: Wait for completion
Once the batch has been standardized, we can retrieve each individual standardization as follows:
def is_job_done(job_id):
"""Check if a job is completed or failed."""
url = f"{APP_URL}/job/{job_id}"
for _ in range(60):
response = requests.get(url, headers=HEADERS)
assert response.status_code == 200
res_json = response.json()
status = res_json["status"]
if status == "completed":
return True
if status == "error":
return False
time.sleep(5)
return False
Step 6: Retrieve Standardization Results
To efficiently handle multiple documents, we upload 4 documents at a time, wait for them to finish, and then retrieve their results.
def get_std(std_id):
"""Retrieve standardized document results from DocuPanda."""
url = f"{APP_URL}/standardization/{std_id}"
response = requests.get(url, headers=HEADERS)
if response.status_code == 200:
return response.json()
return None
Step 7: Process in Batches
To efficiently handle multiple documents, we upload 4 documents at a time, wait for them to finish, then standardize them at once, wait for that to finish, and then fetch the results
def process_batch(batch):
"""Upload a batch of documents, wait for processing, and retrieve results."""
uploaded_docs = [post_doc(doc_path) for doc_path in batch]
parse_job_ids = [doc["job_id"] for doc in uploaded_docs]
# Wait for all documents in the batch to complete parsing
parse_results = is_batch_done(parse_job_ids)
doc_ids = [doc["doc_id"] for doc in uploaded_docs if parse_results[doc["job_id"]] == "completed"]
# Standardize the documents and wait for the job to complete
batch_results = standardize_batch(doc_ids)
done = is_job_done(batch_results["jobId"])
if not done:
print("❌ Standardization job failed to complete")
return
# Retrieve and print results for each standardized document
for std_id in batch_results["standardizationIds"]:
std_data = get_std(std_id)
if std_data is None:
print(f"❌ Document '{std_id}' failed to standardize")
else:
print(f"✅ Document '{std_data['documentId']}' standardized successfully")
print(f"Standardized data: {std_data['data']}")
Step 8: Run the Full Process
Now, we iterate through the document list in batches of 4, ensuring each batch finishes before moving to the next.
def main():
"""Processes all documents in batches of 4."""
for i in range(0, len(DOC_PATHS), BATCH_SIZE):
batch = DOC_PATHS[i:i + BATCH_SIZE]
print(f"\n🚀 Processing batch: {batch}\n")
process_batch(batch)
print("✅ Batch completed.\n")
if __name__ == '__main__':
main()
Complete Example
Here’s the full working implementation:
"""Upload and standardize multiple documents with DocuPanda and retrieve results in batches.
"""
import time
import base64
import requests
API_KEY = "YOUR_API_KEY"
APP_URL = "https://app.docupanda.io"
DOC_PATHS = [
"/path/to/doc1.pdf", "/path/to/doc2.pdf", "/path/to/doc3.jpg", "/path/to/doc4.png",
"/path/to/doc5.html", "/path/to/doc6.jpeg", "/path/to/doc7.webp", "/path/to/doc8.pdf"
]
SCHEMA_ID = "YOUR_SCHEMA_ID"
DATASET_NAME = "YOUR_DATASET_NAME"
HEADERS = {"accept": "application/json", "content-type": "application/json", "X-API-Key": API_KEY}
BATCH_SIZE = 4 # Process 4 documents at a time
def post_doc(doc_path):
url = f"{APP_URL}/document"
payload = {
"document": {
"file": {
"contents": base64.b64encode(open(doc_path, 'rb').read()).decode(),
"filename": doc_path.split("/")[-1]
},
},
"dataset": DATASET_NAME
}
response = requests.post(url, json=payload, headers=HEADERS)
assert response.status_code == 200
res_json = response.json()
return {"job_id": res_json["jobId"], "doc_id": res_json["documentId"], "filename": doc_path}
def is_batch_done(job_ids):
"""Check if all jobs in the list are completed or failed."""
url = f"{APP_URL}/job"
output = {job_id: "processing" for job_id in job_ids}
for _ in range(60): # Max 3 minutes (60 * 3 sec)
for job_id, status in output.items():
if status == "processing":
response = requests.get(f"{url}/{job_id}", headers=HEADERS)
assert response.status_code == 200
output[job_id] = response.json()["status"]
if all(status != "processing" for status in output.values()):
break # Exit early if all jobs are done
time.sleep(3) # Wait before next check
return output
def standardize_batch(doc_ids):
"""Standardize a batch of documents."""
url = f"{APP_URL}/v2/standardize/batch"
payload = {"schemaId": SCHEMA_ID, "documentIds": doc_ids}
response = requests.post(url, json=payload, headers=HEADERS)
assert response.status_code == 200
res_json = response.json()
return {"jobId": res_json["jobId"], "standardizationIds": res_json["standardizationIds"]}
def is_job_done(job_id):
"""Check if a job is completed or failed."""
url = f"{APP_URL}/job/{job_id}"
for _ in range(60):
response = requests.get(url, headers=HEADERS)
assert response.status_code == 200
res_json = response.json()
status = res_json["status"]
if status == "completed":
return True
if status == "error":
return False
time.sleep(5)
return False
def get_std(std_id):
"""Retrieve standardized document results from DocuPanda."""
url = f"{APP_URL}/standardization/{std_id}"
response = requests.get(url, headers=HEADERS)
if response.status_code == 200:
return response.json()
return None
def process_batch(batch):
"""Upload a batch of documents, wait for processing, and retrieve results."""
uploaded_docs = [post_doc(doc_path) for doc_path in batch]
parse_job_ids = [doc["job_id"] for doc in uploaded_docs]
# Wait for all documents in the batch to complete parsing
parse_results = is_batch_done(parse_job_ids)
doc_ids = [doc["doc_id"] for doc in uploaded_docs if parse_results[doc["job_id"]] == "completed"]
# Standardize the documents and wait for the job to complete
batch_results = standardize_batch(doc_ids)
done = is_job_done(batch_results["jobId"])
if not done:
print("❌ Standardization job failed to complete")
return
# Retrieve and print results for each standardized document
for std_id in batch_results["standardizationIds"]:
std_data = get_std(std_id)
if std_data is None:
print(f"❌ Document '{std_id}' failed to standardize")
else:
print(f"✅ Document '{std_data['documentId']}' standardized successfully")
print(f"Standardized data: {std_data['data']}")
def main():
"""Processes all documents in batches of 4."""
for i in range(0, len(DOC_PATHS), BATCH_SIZE):
batch = DOC_PATHS[i:i + BATCH_SIZE]
print(f"\n🚀 Processing batch: {batch}\n")
process_batch(batch)
print("✅ Batch completed.\n")
if __name__ == '__main__':
main()
Summary
This guide walks through:
- Uploading multiple documents to DocuPanda and then standardizing them
- Processing them in batches of 4 at a time
- Standardizing the entire batch at once
- Waiting for all jobs to complete before moving on
- Retrieving and displaying results only for successfully standardized documents
- Handling failures gracefully
This method ensures your API usage is efficient, reliable, and scalable. 🚀