Large file uploads with kluster.ai API¶
When working with large datasets for AI model training or batch inference, you may need to upload files several gigabytes in size. For these scenarios, kluster.ai provides a multipart upload API that allows you to split your large files into smaller chunks and upload them efficiently.
This tutorial demonstrates how to:
- Split a large file into multiple parts
- Upload each part using the kluster.ai uploads API
- Complete the upload process to create a usable File object
- Use the uploaded file for a batch inference job
The example uses a large JSONL file for batch inference with a language model, but this approach can be adapted for any large file upload scenario.
Prerequisites¶
Before getting started, ensure you have the following:
- A kluster.ai account - sign up on the kluster.ai platform if you don't have one
- A kluster.ai API key - after signing in, go to the API Keys section and create a new key
Setup¶
Let's start by installing the necessary libraries and setting up our environment. We'll use the OpenAI Python library to interact with the kluster.ai API (since kluster.ai API is OpenAI compatible).
%pip install -q openai tqdm
Note: you may need to restart the kernel to use updated packages.
In this notebook, we'll use Python's getpass
module to input the API key securely. After execution, please provide your unique kluster.ai API key (ensure no spaces).
from getpass import getpass
api_key = getpass("Enter your kluster.ai API key: ")
Now, let's import all the necessary libraries that we'll use throughout this tutorial:
from openai import OpenAI
import os
import json
import time
import math
import pandas as pd
import hashlib
from tqdm import tqdm
from IPython.display import clear_output, display
Initialize the OpenAI client by pointing it to the kluster.ai endpoint and passing your API key:
# Set up the client
client = OpenAI(
base_url="https://api.kluster.ai/v1",
api_key=api_key,
)
Sample data creation (optional)¶
If you don't already have a large JSONL file for testing, you can create one using the code below. This will generate a synthetic dataset of text prompts that we'll use for batch inference later.
def create_sample_jsonl(filename, num_samples=1000):
"""Create a sample JSONL file with text prompts for batch inference."""
os.makedirs(os.path.dirname(filename), exist_ok=True)
topics = [
"climate change", "renewable energy", "space exploration", "quantum computing",
"artificial intelligence", "biodiversity", "ocean conservation", "sustainable agriculture",
"future of transportation", "advanced materials"
]
with open(filename, 'w') as f:
for i in range(num_samples):
topic = topics[i % len(topics)]
request = {
"custom_id": f"sample-{i}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "klusterai/Meta-Llama-3.1-8B-Instruct-Turbo",
"temperature": 0.7,
"messages": [
{"role": "system", "content": "You are a helpful assistant that provides concise information."},
{"role": "user", "content": f"Explain {topic} in one short paragraph"}
],
}
}
f.write(json.dumps(request) + '\n')
file_size = os.path.getsize(filename)
print(f"Created sample file '{filename}' with {num_samples} prompts ({file_size/1024/1024:.2f} MB)")
return filename, file_size
# Create a sample JSONL file with 1000 prompts
sample_file, file_size = create_sample_jsonl('data/sample_large_file.jsonl', 1000)
Created sample file 'data/sample_large_file.jsonl' with 1000 prompts (0.34 MB)
Multipart upload process¶
Now, let's implement the multipart upload process. We'll break this down into several steps:
- Create the Upload object
- Split the file into chunks and upload each chunk as a part
- Complete the upload process
Create the upload object¶
First, we need to create an Upload object that will serve as a container for all the parts we're about to upload.
def create_upload(client, filename, file_size, purpose="batch", mime_type="application/jsonl"):
"""Create an Upload object to which we can add parts."""
upload = client.uploads.create(
purpose=purpose,
filename=os.path.basename(filename),
bytes=file_size,
mime_type=mime_type
)
print(f"Created upload with ID: {upload.id}")
print(f"Upload will expire at: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(upload.expires_at))}")
return upload
# Get the file you want to upload
file_to_upload = sample_file
file_size = os.path.getsize(file_to_upload)
# Create the upload object
upload = create_upload(client, file_to_upload, file_size)
Created upload with ID: 6806aa95647c3680875b6339 Upload will expire at: 2025-04-21 16:29:09
Split the file and upload parts¶
Now, we'll split the file into chunks and upload each chunk as a part. According to the documentation, each part can be at most 64 MB, so we'll make sure our chunks don't exceed that limit.
def split_and_upload_parts(client, upload_id, file_path, num_parts=2):
"""Split a file into a specific number of chunks and upload each chunk as a part."""
file_size = os.path.getsize(file_path)
part_size = math.ceil(file_size / num_parts)
print(f"Uploading file in {num_parts} parts (part size: {part_size/1024/1024:.2f} MB)")
parts = []
with open(file_path, 'rb') as f:
for i in tqdm(range(num_parts), desc="Uploading parts"):
# Read a chunk of the file
chunk = f.read(part_size)
# Create a temporary file for the chunk
temp_filename = f"part_{i}.tmp"
with open(temp_filename, 'wb') as temp_f:
temp_f.write(chunk)
# Upload the part
with open(temp_filename, 'rb') as temp_f:
part = client.uploads.parts.create(
upload_id=upload_id,
data=temp_f
)
parts.append(part)
# Log the part ID
print(f"Part {i+1}/{num_parts} uploaded with ID: {part.id}")
# Clean up the temporary file
os.remove(temp_filename)
return parts
# For demonstration purposes, we'll use a smaller part size
parts = split_and_upload_parts(client, upload.id, file_to_upload, num_parts=2)
Uploading file in 2 parts (part size: 0.17 MB)
Uploading parts: 50%|██████████████ | 1/2 [00:00<00:00, 2.00it/s]
Part 1/2 uploaded with ID: 6806aa968967dabeabba9ebd
Uploading parts: 100%|████████████████████████████| 2/2 [00:00<00:00, 2.20it/s]
Part 2/2 uploaded with ID: 6806aa966f520bb3f2023acc
Let's prepare to complete the upload process:
def calculate_md5(file_path):
"""Calculate the MD5 checksum of a file."""
md5_hash = hashlib.md5()
with open(file_path, "rb") as f:
# Read the file in chunks to avoid loading large files into memory
for chunk in iter(lambda: f.read(4096), b""):
md5_hash.update(chunk)
return md5_hash.hexdigest()
file_md5 = calculate_md5(file_to_upload)
print(f"File MD5 checksum: {file_md5}")
File MD5 checksum: 69888e6132fa493024089b877d812c89
Complete the upload¶
Finally, we'll complete the upload process by providing the ordered list of part IDs.
def complete_upload(client, upload_id, parts):
"""Complete the upload process with the ordered list of part IDs."""
part_ids = [part.id for part in parts]
params = {
"upload_id": upload_id,
"part_ids": part_ids
}
try:
completed_upload = client.uploads.complete(**params)
print(f"Upload completed successfully!")
print(f"Status: {completed_upload.status}")
print(f"File ID: {completed_upload.file.id}")
return completed_upload
except Exception as e:
print(f"Error completing upload: {e}")
raise
# Complete the upload
completed_upload = complete_upload(client, upload.id, parts)
Upload completed successfully! Status: completed File ID: 6806aa97b2cfbed2561aecf0
Use the uploaded file for batch inference¶
Now that we've successfully uploaded our large file, let's use it to create a batch inference job.
def create_batch_job(client, file_id):
"""Create a batch job using the uploaded file."""
batch_job = client.batches.create(
input_file_id=file_id,
endpoint="/v1/chat/completions",
completion_window="24h"
)
print(f"Batch job created with ID: {batch_job.id}")
print(f"Status: {batch_job.status}")
return batch_job
# Create a batch job with the file we just uploaded
batch_job = create_batch_job(client, completed_upload.file.id)
Batch job created with ID: 6806aa976cdee70bad145125 Status: pre_schedule
Monitor batch job progress¶
Let's monitor the progress of our batch job. We'll check the status every 10 seconds until the job is completed.
def monitor_batch_job(client, batch_job_id, check_interval=10):
"""Monitor the progress of a batch job until it's completed."""
all_completed = False
while not all_completed:
all_completed = True
output_lines = []
updated_job = client.batches.retrieve(batch_job_id)
if updated_job.status != "completed":
all_completed = False
completed = updated_job.request_counts.completed
total = updated_job.request_counts.total
output_lines.append(f"Job status: {updated_job.status} - Progress: {completed}/{total}")
else:
output_lines.append(f"Job completed!")
output_lines.append(f"Output file ID: {updated_job.output_file_id}")
# Clear the output and display updated status
clear_output(wait=True)
for line in output_lines:
display(line)
if not all_completed:
time.sleep(check_interval)
return updated_job
# Monitor the batch job progress
completed_job = monitor_batch_job(client, batch_job.id)
'Job completed!'
'Output file ID: 6806aadc694596f597839fe1'
Retrieve and process results¶
Once the batch job is completed, we can retrieve and process the results. Specifically, we’ll download the output file returned by the API and parse each line‑delimited JSON record into Python objects, making it easy to inspect, validate, and visualize every response produced by the batch run.
def retrieve_batch_results(client, job):
"""Retrieve and parse the results of a completed batch job."""
if job.status != "completed":
print(f"Job is not completed yet. Current status: {job.status}")
return None
result_file_id = job.output_file_id
result = client.files.content(result_file_id).content
# Parse JSON results
if isinstance(result, bytes):
result = result.decode('utf-8')
json_strings = result.strip().split('\n')
json_objects = []
for json_str in json_strings:
try:
json_obj = json.loads(json_str)
json_objects.append(json_obj)
except json.JSONDecodeError as e:
print(f"Error parsing JSON: {e}")
return json_objects
# Retrieve and parse the batch job results
results = retrieve_batch_results(client, completed_job)
# Display a sample of the results
if results:
print(f"Retrieved {len(results)} results")
# Now display the results with a more streamlined approach
print("\nSample of results:")
for i, result in enumerate(results[:5]):
try:
# Extract the content and ID based on the structure we now know
content = result["response"]["body"]["choices"][0]["message"]["content"]
custom_id = result.get("custom_id", f"unknown-{i}")
print(f"\nResult {i+1} (Custom ID: {custom_id}):\n{content}")
except (KeyError, IndexError, TypeError) as e:
print(f"Error processing result {i+1}: {e}")
# Print only the keys to understand structure, not all content
print(f"Result keys: {list(result.keys())}")
if "response" in result:
print(f"Response keys: {list(result['response'].keys())}")
Retrieved 1000 results Sample of results: Result 1 (Custom ID: sample-0): Climate change is a global phenomenon caused by the increasing levels of greenhouse gases, such as carbon dioxide and methane, in the Earth's atmosphere. These gases trap heat from the sun, leading to a rise in global temperatures, changes in weather patterns, and more frequent extreme events like heatwaves, droughts, and storms. This is primarily driven by human activities like burning fossil fuels, deforestation, and industrial processes, which release large amounts of greenhouse gases into the atmosphere. Result 2 (Custom ID: sample-1): Renewable energy is derived from natural resources that can be replenished over time, such as sunlight, wind, water, and geothermal heat. Unlike fossil fuels, renewable energy sources are sustainable and non-polluting, producing electricity or heat with minimal environmental impact. Examples of renewable energy include solar power, wind power, hydroelectric power, and biomass energy, which can be harnessed to power homes, industries, and transportation systems. Result 3 (Custom ID: sample-2): Space exploration is the ongoing effort to explore the universe beyond Earth's atmosphere, seeking to understand the mysteries of space and the planets within it. Through various missions, space agencies and private companies have successfully landed rovers on Mars, sent probes to distant planets and asteroids, and even established temporary human settlements in space, such as the International Space Station. The goal of space exploration is to expand our knowledge of the cosmos, unlock new technologies, and potentially one day establish a human presence beyond Earth. Result 4 (Custom ID: sample-3): Quantum computing is a revolutionary technology that uses the principles of quantum mechanics to perform calculations and operations on data. Unlike traditional computers, which use bits to represent 0s and 1s, quantum computers use quantum bits or qubits, which can exist in multiple states simultaneously, allowing them to process vast amounts of information exponentially faster and more efficiently. Result 5 (Custom ID: sample-4): Artificial intelligence (AI) refers to the development of computer systems that can perform tasks typically requiring human intelligence, such as learning, problem-solving, decision-making, and perception. AI systems use algorithms and data to analyze and understand complex information, enabling them to make predictions, classify objects, and automate tasks, ultimately simulating human-like behavior and intelligence.
Handle failed uploads¶
Sometimes, uploads may fail for various reasons. In such cases, you may want to cancel the upload and start over. Here's how to cancel an upload:
def cancel_upload(client, upload_id):
"""Cancel an upload that's in progress or has failed."""
cancelled_upload = client.uploads.cancel(upload_id=upload_id)
print(f"Upload cancelled successfully!")
print(f"Status: {cancelled_upload.status}")
return cancelled_upload
# This is just for demonstration - we won't actually cancel our upload
# cancel_upload(client, upload.id)
Best practices and tips¶
Here are some best practices and tips for using the kluster.ai Uploads API effectively:
- Optimal part size - choose an appropriate part size based on your network conditions. While the API allows parts up to 64 MB, you might want to use smaller parts if your network is unstable
- Parallel uploads - for very large files, consider uploading parts in parallel to speed up the process
- Error handling - implement proper error handling and retries for failed part uploads
- Upload expiration - remember that uploads expire after an hour, so make sure to complete the upload within that time frame
- Cleanup - always clean up temporary files created during the upload process
Advanced: implement parallel uploads¶
For very large files, uploading parts sequentially might take a long time. You can implement a version that uploads parts in parallel using Python's concurrent.futures
module. This approach creates a thread pool that allows multiple parts to be uploaded simultaneously, significantly reducing the total upload time for large files over high-bandwidth connections.
The implementation uses the ThreadPoolExecutor
to manage a pool of worker threads that process uploads concurrently. Each chunk is written to a temporary file, uploaded to the server, and then the temporary file is cleaned up. The code carefully tracks each part's original position to ensure they're properly ordered when completing the upload, regardless of which part finishes uploading first.
import concurrent.futures
import tempfile
def upload_part(client, upload_id, part_data, part_index):
"""Upload a single part and return the part object."""
# Create a temporary file for the chunk
with tempfile.NamedTemporaryFile(delete=False) as temp_f:
temp_filename = temp_f.name
temp_f.write(part_data)
try:
# Upload the part
with open(temp_filename, 'rb') as temp_f:
part = client.uploads.parts.create(
upload_id=upload_id,
data=temp_f
)
return part, part_index
finally:
# Clean up the temporary file
os.remove(temp_filename)
def parallel_upload_parts(client, upload_id, file_path, num_parts=2, max_workers=2):
"""Split a file into a specific number of chunks and upload parts in parallel."""
file_size = os.path.getsize(file_path)
part_size = math.ceil(file_size / num_parts)
print(f"Uploading file in {num_parts} parts using up to {max_workers} parallel workers")
# Read all chunks from the file
chunks = []
with open(file_path, 'rb') as f:
for i in range(num_parts):
chunk = f.read(part_size)
if chunk: # Ensure we don't add empty chunks
chunks.append(chunk)
# Upload parts in parallel
parts = [None] * len(chunks) # Pre-allocate list to maintain order
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all upload tasks
future_to_index = {}
for i, chunk in enumerate(chunks):
future = executor.submit(upload_part, client, upload_id, chunk, i)
future_to_index[future] = i
# Process results as they complete
for i, future in enumerate(concurrent.futures.as_completed(future_to_index.keys())):
try:
part, index = future.result()
parts[index] = part
print(f"Part {index+1}/{len(chunks)} uploaded with ID: {part.id}")
except Exception as e:
print(f"Error uploading part: {e}")
# Ensure all parts were uploaded successfully
if None in parts:
raise Exception("Some parts failed to upload")
return parts
# To use parallel uploads, replace the call to split_and_upload_parts with:
# parts = parallel_upload_parts(client, upload.id, file_to_upload, num_parts=2, max_workers=2)
# Advanced: Demonstrating Parallel Uploads
print("\n\n## PARALLEL UPLOAD DEMONSTRATION ##\n")
# First, create a new upload object for our parallel demo
print("Creating a new upload for parallel demonstration...")
parallel_upload = create_upload(client, file_to_upload, file_size)
# Now use the parallel upload function instead of the sequential one
print("\nUploading parts in parallel...")
parallel_parts = parallel_upload_parts(client, parallel_upload.id, file_to_upload, num_parts=2, max_workers=2)
# Complete the parallel upload
print("\nCompleting the parallel upload...")
parallel_completed_upload = complete_upload(client, parallel_upload.id, parallel_parts)
print("\nParallel upload demonstration completed!")
print(f"Parallel uploaded file ID: {parallel_completed_upload.file.id}")
# We won't use this file for further processing, it was just for demonstration
print("Note: This file was uploaded just for demonstration purposes.")
## PARALLEL UPLOAD DEMONSTRATION ## Creating a new upload for parallel demonstration... Created upload with ID: 6806aadfb2cfbed2561aeea9 Upload will expire at: 2025-04-21 16:30:23 Uploading parts in parallel... Uploading file in 2 parts using up to 2 parallel workers Part 1/2 uploaded with ID: 6806aae0fd88d31a6f21d200 Part 2/2 uploaded with ID: 6806aae06cdee70bad1452ac Completing the parallel upload... Upload completed successfully! Status: completed File ID: 6806aae0d6646988da98c092 Parallel upload demonstration completed! Parallel uploaded file ID: 6806aae0d6646988da98c092 Note: This file was uploaded just for demonstration purposes.
Conclusion¶
In this tutorial, we've learned how to:
- Create an Upload object using the kluster.ai API
- Split a large file into smaller parts and upload each part
- Complete the upload process to create a usable File object
- Use the uploaded file for a batch inference job
- Monitor the batch job progress and retrieve results
This approach allows you to efficiently upload and process large datasets with kluster.ai, making it an invaluable tool for your AI workflows.