Overview
Bifrost supports AWS Bedrock’s Files and Batch APIs with cross-provider routing. This means you can use boto3 to manage files and batch jobs across multiple providers including Bedrock, OpenAI, and Gemini. For Bedrock SDK:- Files are managed through an S3-compatible API
- Batches are managed through the Bedrock service API
- Provider routing is done via the
x-model-providerheader
Anthropic Limitation: Anthropic does not support file upload via the S3-compatible API. For Anthropic batch operations, use the Anthropic SDK with inline requests instead.
Client Setup
Default Bedrock Client
Copy
Ask AI
import boto3
# S3 client for file operations
s3_client = boto3.client(
"s3",
region_name="us-west-2",
endpoint_url="http://localhost:8080/bedrock/files",
)
# Bedrock client for batch operations
bedrock_client = boto3.client(
"bedrock",
region_name="us-west-2",
endpoint_url="http://localhost:8080/bedrock",
)
Cross-Provider Client Setup
To route requests to different providers, add thex-model-provider header using boto3 events:
- Bedrock Provider (Default)
- OpenAI Provider
- Anthropic Provider
- Gemini Provider
Copy
Ask AI
import boto3
def add_bedrock_header(request, **kwargs):
request.headers["x-model-provider"] = "bedrock"
# S3 client for Bedrock files
s3_client = boto3.client(
"s3",
region_name="us-west-2",
endpoint_url="http://localhost:8080/bedrock/files",
)
s3_client.meta.events.register("before-send", add_bedrock_header)
# Bedrock client for batches
bedrock_client = boto3.client(
"bedrock",
region_name="us-west-2",
endpoint_url="http://localhost:8080/bedrock",
)
bedrock_client.meta.events.register("before-send", add_bedrock_header)
Copy
Ask AI
import boto3
def add_openai_header(request, **kwargs):
request.headers["x-model-provider"] = "openai"
# S3 client for OpenAI files
s3_client = boto3.client(
"s3",
region_name="us-west-2",
endpoint_url="http://localhost:8080/bedrock/files",
)
s3_client.meta.events.register("before-send", add_openai_header)
# Bedrock client for OpenAI batches
bedrock_client = boto3.client(
"bedrock",
region_name="us-west-2",
endpoint_url="http://localhost:8080/bedrock",
)
bedrock_client.meta.events.register("before-send", add_openai_header)
Anthropic does not support S3-based file uploads. For Anthropic operations, use the Anthropic SDK with inline batch requests instead.
Copy
Ask AI
import boto3
def add_anthropic_header(request, **kwargs):
request.headers["x-model-provider"] = "anthropic"
# Note: File operations are NOT supported for Anthropic
# Use Anthropic SDK with inline requests instead
# Bedrock client for Anthropic (limited to non-batch operations)
bedrock_client = boto3.client(
"bedrock",
region_name="us-west-2",
endpoint_url="http://localhost:8080/bedrock",
)
bedrock_client.meta.events.register("before-send", add_anthropic_header)
Copy
Ask AI
import boto3
def add_gemini_header(request, **kwargs):
request.headers["x-model-provider"] = "gemini"
# S3 client for Gemini files
s3_client = boto3.client(
"s3",
region_name="us-west-2",
endpoint_url="http://localhost:8080/bedrock/files",
)
s3_client.meta.events.register("before-send", add_gemini_header)
# Bedrock client for Gemini batches
bedrock_client = boto3.client(
"bedrock",
region_name="us-west-2",
endpoint_url="http://localhost:8080/bedrock",
)
bedrock_client.meta.events.register("before-send", add_gemini_header)
Helper Function for Provider-Specific Clients
Copy
Ask AI
import boto3
def create_provider_header_handler(provider: str):
"""Create a header handler function for a specific provider"""
def add_provider_header(request, **kwargs):
request.headers["x-model-provider"] = provider
return add_provider_header
def get_provider_s3_client(provider: str):
"""Create S3 client with x-model-provider header"""
client = boto3.client(
"s3",
region_name="us-west-2",
endpoint_url="http://localhost:8080/bedrock/files",
)
client.meta.events.register("before-send", create_provider_header_handler(provider))
return client
def get_provider_bedrock_client(provider: str):
"""Create Bedrock batch client with x-model-provider header"""
client = boto3.client(
"bedrock",
region_name="us-west-2",
endpoint_url="http://localhost:8080/bedrock",
)
client.meta.events.register("before-send", create_provider_header_handler(provider))
return client
Files API (S3-Compatible)
Files are managed through Bifrost’s S3-compatible endpoint.Upload a File
- Bedrock Provider
- OpenAI Provider
- Gemini Provider
Copy
Ask AI
import boto3
import json
import time
def add_bedrock_header(request, **kwargs):
request.headers["x-model-provider"] = "bedrock"
s3_client = boto3.client(
"s3",
region_name="us-west-2",
endpoint_url="http://localhost:8080/bedrock/files",
)
s3_client.meta.events.register("before-send", add_bedrock_header)
# Create JSONL content for Bedrock batch format
def create_bedrock_batch_jsonl(model_id: str, num_requests: int = 2) -> str:
lines = []
for i in range(num_requests):
record = {
"recordId": f"request-{i+1}",
"modelInput": {
"messages": [
{
"role": "user",
"content": [
{"text": f"Hello, this is test message {i+1}. Say hi back briefly."}
],
}
],
"inferenceConfig": {"maxTokens": 100},
},
}
lines.append(json.dumps(record))
return "\n".join(lines)
# Create content
jsonl_content = create_bedrock_batch_jsonl("anthropic.claude-3-sonnet-20240229-v1:0")
# Upload to S3
s3_bucket = "your-s3-bucket"
s3_key = f"bifrost-batch-input/batch_input_{int(time.time())}.jsonl"
response = s3_client.put_object(
Bucket=s3_bucket,
Key=s3_key,
Body=jsonl_content.encode(),
ContentType="application/jsonl",
)
# Extract file ID from ETag header
file_id = response.get("ETag", "").strip('"')
print(f"Uploaded file ID: {file_id}")
print(f"S3 URI: s3://{s3_bucket}/{s3_key}")
No S3 configuration required. Files are stored in OpenAI’s native storage. The bucket/key values are identifiers used by Bifrost for routing.
Copy
Ask AI
import boto3
import json
import time
def add_openai_header(request, **kwargs):
request.headers["x-model-provider"] = "openai"
s3_client = boto3.client(
"s3",
region_name="us-west-2",
endpoint_url="http://localhost:8080/bedrock/files",
)
s3_client.meta.events.register("before-send", add_openai_header)
# Create JSONL content for OpenAI batch format
def create_openai_batch_jsonl(model_id: str, num_requests: int = 2) -> str:
lines = []
for i in range(num_requests):
record = {
"custom_id": f"request-{i+1}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": model_id,
"messages": [
{"role": "user", "content": f"Hello, this is test message {i+1}. Say hi back briefly."}
],
"max_tokens": 100,
},
}
lines.append(json.dumps(record))
return "\n".join(lines)
# Create content
jsonl_content = create_openai_batch_jsonl("gpt-4o-mini")
# Upload file (bucket/key are routing identifiers, not actual S3 paths)
response = s3_client.put_object(
Bucket="openai-files",
Key=f"batch_input_{int(time.time())}.jsonl",
Body=jsonl_content.encode(),
ContentType="application/jsonl",
)
# Extract file ID from ETag header
file_id = response.get("ETag", "").strip('"')
print(f"Uploaded file ID: {file_id}")
No S3 configuration required. Files are stored in Google Cloud Storage. The bucket/key values are identifiers used by Bifrost for routing.
Copy
Ask AI
import boto3
import json
import time
def add_gemini_header(request, **kwargs):
request.headers["x-model-provider"] = "gemini"
s3_client = boto3.client(
"s3",
region_name="us-west-2",
endpoint_url="http://localhost:8080/bedrock/files",
)
s3_client.meta.events.register("before-send", add_gemini_header)
# Create JSONL content for Gemini batch format
def create_gemini_batch_jsonl(model_id: str, num_requests: int = 2) -> str:
lines = []
for i in range(num_requests):
record = {
"request": {
"contents": [
{
"role": "user",
"parts": [
{"text": f"Hello, this is test message {i+1}. Say hi back briefly."}
],
}
],
"generationConfig": {"maxOutputTokens": 100},
},
"metadata": {"key": f"request-{i+1}"},
}
lines.append(json.dumps(record))
return "\n".join(lines)
# Create content
jsonl_content = create_gemini_batch_jsonl("gemini-1.5-flash")
# Upload file (bucket/key are routing identifiers, not actual S3 paths)
response = s3_client.put_object(
Bucket="gemini-files",
Key=f"batch_input_{int(time.time())}.jsonl",
Body=jsonl_content.encode(),
ContentType="application/jsonl",
)
file_id = response.get("ETag", "").strip('"')
print(f"Uploaded file ID: {file_id}")
List Files
For OpenAI and Gemini, use any bucket name as an identifier—files are stored in the provider’s native storage and listed by file ID.
Copy
Ask AI
import boto3
def add_provider_header(request, **kwargs):
request.headers["x-model-provider"] = "bedrock"
s3_client = boto3.client(
"s3",
region_name="us-west-2",
endpoint_url="http://localhost:8080/bedrock/files",
)
s3_client.meta.events.register("before-send", add_provider_header)
# List files (S3 bucket required for Bedrock only)
s3_bucket = "your-s3-bucket"
response = s3_client.list_objects_v2(
Bucket=s3_bucket,
Prefix="bifrost-batch-input/"
)
if "Contents" in response:
for obj in response["Contents"]:
print(f"Key: {obj['Key']}")
print(f"Size: {obj['Size']} bytes")
print(f"Last Modified: {obj['LastModified']}")
print("---")
Retrieve File Metadata
Copy
Ask AI
import boto3
def add_provider_header(request, **kwargs):
request.headers["x-model-provider"] = "bedrock"
s3_client = boto3.client(
"s3",
region_name="us-west-2",
endpoint_url="http://localhost:8080/bedrock/files",
)
s3_client.meta.events.register("before-send", add_provider_header)
# Retrieve file metadata (HEAD request)
# For OpenAI/Gemini: use any bucket name, file_id from upload
s3_bucket = "your-s3-bucket"
s3_key = "bifrost-batch-input/batch_input.jsonl"
file_id = "abc123" # ETag from upload
response = s3_client.head_object(
Bucket=s3_bucket,
Key=s3_key,
IfMatch=file_id
)
print(f"Content Length: {response['ContentLength']} bytes")
print(f"Content Type: {response['ContentType']}")
print(f"ETag (File ID): {response['ETag']}")
Delete a File
Copy
Ask AI
import boto3
def add_provider_header(request, **kwargs):
request.headers["x-model-provider"] = "bedrock"
s3_client = boto3.client(
"s3",
region_name="us-west-2",
endpoint_url="http://localhost:8080/bedrock/files",
)
s3_client.meta.events.register("before-send", add_provider_header)
# Delete file
# For OpenAI/Gemini: use any bucket name, file_id from upload
s3_bucket = "your-s3-bucket"
s3_key = "bifrost-batch-input/batch_input.jsonl"
file_id = "abc123"
s3_client.delete_object(
Bucket=s3_bucket,
Key=s3_key,
IfMatch=file_id
)
print(f"Deleted file: {s3_key}")
Download File Content
File content download is only supported for Bedrock. For OpenAI and Gemini, use their native SDKs to download file content.
Copy
Ask AI
import boto3
def add_provider_header(request, **kwargs):
request.headers["x-model-provider"] = "bedrock"
s3_client = boto3.client(
"s3",
region_name="us-west-2",
endpoint_url="http://localhost:8080/bedrock/files",
)
s3_client.meta.events.register("before-send", add_provider_header)
# Download file content (Bedrock only)
s3_bucket = "your-s3-bucket"
s3_key = "bifrost-batch-input/batch_input.jsonl"
file_id = "abc123"
response = s3_client.get_object(
Bucket=s3_bucket,
Key=s3_key,
IfMatch=file_id
)
content = response["Body"].read().decode("utf-8")
print(f"File content:\n{content}")
Batch API
The Bedrock Batch API usescreate_model_invocation_job and related methods.
Create a Batch Job
- Bedrock Provider
- OpenAI Provider
- Gemini Provider
Copy
Ask AI
import boto3
import time
def add_bedrock_header(request, **kwargs):
request.headers["x-model-provider"] = "bedrock"
bedrock_client = boto3.client(
"bedrock",
region_name="us-west-2",
endpoint_url="http://localhost:8080/bedrock",
)
bedrock_client.meta.events.register("before-send", add_bedrock_header)
# Configuration
s3_bucket = "your-s3-bucket"
role_arn = "arn:aws:iam::123456789:role/BedrockBatchRole"
model_id = "anthropic.claude-3-sonnet-20240229-v1:0"
# Input/output URIs (file should already be uploaded)
input_uri = f"s3://{s3_bucket}/bifrost-batch-input/batch_input.jsonl"
output_uri = f"s3://{s3_bucket}/bifrost-batch-output/"
# Create batch job
response = bedrock_client.create_model_invocation_job(
jobName=f"bifrost-batch-{int(time.time())}",
modelId=model_id,
roleArn=role_arn,
inputDataConfig={
"s3InputDataConfig": {
"s3Uri": input_uri,
"s3InputFormat": "JSONL"
}
},
outputDataConfig={
"s3OutputDataConfig": {
"s3Uri": output_uri
}
},
tags=[
{"key": "endpoint", "value": "/v1/chat/completions"},
{"key": "source", "value": "bifrost-docs"},
],
)
job_arn = response["jobArn"]
print(f"Created batch job: {job_arn}")
No S3 or IAM configuration required. Files are stored in OpenAI’s native storage. The S3 URIs are routing identifiers used by Bifrost.
Copy
Ask AI
import boto3
import time
def add_openai_header(request, **kwargs):
request.headers["x-model-provider"] = "openai"
bedrock_client = boto3.client(
"bedrock",
region_name="us-west-2",
endpoint_url="http://localhost:8080/bedrock",
)
bedrock_client.meta.events.register("before-send", add_openai_header)
# Configuration (no S3 bucket or IAM role needed)
model_id = "gpt-4o-mini"
file_id = "file-abc123" # File ID from upload step
# Create batch job
response = bedrock_client.create_model_invocation_job(
jobName=f"openai-batch-{int(time.time())}",
modelId=model_id,
roleArn="not-required-for-openai",
inputDataConfig={
"s3InputDataConfig": {
"s3Uri": f"s3://openai-files/{file_id}", # Routing identifier
"s3InputFormat": "JSONL"
}
},
outputDataConfig={
"s3OutputDataConfig": {
"s3Uri": "s3://openai-output/"
}
},
tags=[
{"key": "endpoint", "value": "/v1/chat/completions"},
{"key": "file_id", "value": file_id},
],
)
job_arn = response["jobArn"]
print(f"Created OpenAI batch job: {job_arn}")
No S3 or IAM configuration required. Files are stored in Google Cloud Storage. The S3 URIs are routing identifiers used by Bifrost.
Copy
Ask AI
import boto3
import time
def add_gemini_header(request, **kwargs):
request.headers["x-model-provider"] = "gemini"
bedrock_client = boto3.client(
"bedrock",
region_name="us-west-2",
endpoint_url="http://localhost:8080/bedrock",
)
bedrock_client.meta.events.register("before-send", add_gemini_header)
# Configuration (no S3 bucket or IAM role needed)
model_id = "gemini-1.5-flash"
file_id = "file-xyz789" # File ID from upload step
# Create batch job
response = bedrock_client.create_model_invocation_job(
jobName=f"gemini-batch-{int(time.time())}",
modelId=model_id,
roleArn="not-required-for-gemini",
inputDataConfig={
"s3InputDataConfig": {
"s3Uri": f"s3://gemini-files/{file_id}", # Routing identifier
"s3InputFormat": "JSONL"
}
},
outputDataConfig={
"s3OutputDataConfig": {
"s3Uri": "s3://gemini-output/"
}
},
)
job_arn = response["jobArn"]
print(f"Created Gemini batch job: {job_arn}")
Anthropic Note: Anthropic prefers inline batch requests rather than file-based batching. When targeting Anthropic from the Bedrock SDK, consider using the Anthropic SDK directly for better compatibility.
List Batch Jobs
Copy
Ask AI
import boto3
def add_provider_header(request, **kwargs):
request.headers["x-model-provider"] = "bedrock" # or "gemini"
bedrock_client = boto3.client(
"bedrock",
region_name="us-west-2",
endpoint_url="http://localhost:8080/bedrock",
)
bedrock_client.meta.events.register("before-send", add_provider_header)
# List batch jobs
response = bedrock_client.list_model_invocation_jobs(maxResults=10)
if "invocationJobSummaries" in response:
for job in response["invocationJobSummaries"]:
print(f"Job ARN: {job['jobArn']}")
print(f"Job Name: {job['jobName']}")
print(f"Status: {job['status']}")
print(f"Model ID: {job.get('modelId', 'N/A')}")
print("---")
Retrieve Batch Job Status
Copy
Ask AI
import boto3
def add_provider_header(request, **kwargs):
request.headers["x-model-provider"] = "bedrock"
bedrock_client = boto3.client(
"bedrock",
region_name="us-west-2",
endpoint_url="http://localhost:8080/bedrock",
)
bedrock_client.meta.events.register("before-send", add_provider_header)
# Get batch job status
job_arn = "arn:aws:bedrock:us-west-2:123456789:model-invocation-job/abc123"
response = bedrock_client.get_model_invocation_job(jobIdentifier=job_arn)
print(f"Job ARN: {response['jobArn']}")
print(f"Job Name: {response['jobName']}")
print(f"Status: {response['status']}")
print(f"Model ID: {response['modelId']}")
if response["status"] == "Completed" and "statistics" in response:
stats = response["statistics"]
print(f"Total Records: {stats.get('totalRecordCount', 'N/A')}")
print(f"Successful: {stats.get('successfulRecordCount', 'N/A')}")
print(f"Failed: {stats.get('failedRecordCount', 'N/A')}")
Stop a Batch Job
Copy
Ask AI
import boto3
def add_provider_header(request, **kwargs):
request.headers["x-model-provider"] = "bedrock"
bedrock_client = boto3.client(
"bedrock",
region_name="us-west-2",
endpoint_url="http://localhost:8080/bedrock",
)
bedrock_client.meta.events.register("before-send", add_provider_header)
# Stop batch job
job_arn = "arn:aws:bedrock:us-west-2:123456789:model-invocation-job/abc123"
bedrock_client.stop_model_invocation_job(jobIdentifier=job_arn)
print(f"Stopped job: {job_arn}")
End-to-End Batch Workflow
Bedrock Provider
Copy
Ask AI
import boto3
import json
import time
# Configuration
region = "us-west-2"
s3_bucket = "your-s3-bucket"
role_arn = "arn:aws:iam::123456789:role/BedrockBatchRole"
model_id = "anthropic.claude-3-sonnet-20240229-v1:0"
provider = "bedrock"
# Provider header handler
def add_provider_header(request, **kwargs):
request.headers["x-model-provider"] = provider
# Setup clients
s3_client = boto3.client(
"s3",
region_name=region,
endpoint_url="http://localhost:8080/bedrock/files",
)
s3_client.meta.events.register("before-send", add_provider_header)
bedrock_client = boto3.client(
"bedrock",
region_name=region,
endpoint_url="http://localhost:8080/bedrock",
)
bedrock_client.meta.events.register("before-send", add_provider_header)
# Step 1: Create JSONL content
print("Step 1: Creating batch input file...")
def create_batch_jsonl(num_requests: int) -> str:
lines = []
for i in range(num_requests):
record = {
"recordId": f"request-{i+1}",
"modelInput": {
"messages": [
{
"role": "user",
"content": [{"text": f"What is {i+1} + {i+1}? Answer briefly."}],
}
],
"inferenceConfig": {"maxTokens": 100},
},
}
lines.append(json.dumps(record))
return "\n".join(lines)
jsonl_content = create_batch_jsonl(num_requests=3)
# Step 2: Upload input file to S3
print("Step 2: Uploading input file to S3...")
timestamp = int(time.time())
s3_key = f"bifrost-batch-input/batch_{timestamp}.jsonl"
upload_response = s3_client.put_object(
Bucket=s3_bucket,
Key=s3_key,
Body=jsonl_content.encode(),
ContentType="application/jsonl",
)
file_id = upload_response.get("ETag", "").strip('"')
input_uri = f"s3://{s3_bucket}/{s3_key}"
print(f" Uploaded: {input_uri}")
# Step 3: Create batch job
print("Step 3: Creating batch job...")
output_uri = f"s3://{s3_bucket}/bifrost-batch-output/"
job_response = bedrock_client.create_model_invocation_job(
jobName=f"bifrost-e2e-{timestamp}",
modelId=model_id,
roleArn=role_arn,
inputDataConfig={
"s3InputDataConfig": {"s3Uri": input_uri, "s3InputFormat": "JSONL"}
},
outputDataConfig={
"s3OutputDataConfig": {"s3Uri": output_uri}
},
tags=[
{"key": "endpoint", "value": "/v1/chat/completions"},
{"key": "file_id", "value": file_id},
],
)
job_arn = job_response["jobArn"]
print(f" Created job: {job_arn}")
# Step 4: Poll for completion
print("Step 4: Polling job status...")
for i in range(20):
status_response = bedrock_client.get_model_invocation_job(jobIdentifier=job_arn)
status = status_response["status"]
print(f" Poll {i+1}: status = {status}")
if status in ["Completed", "Failed", "Stopped"]:
print(f" Job reached terminal state: {status}")
if status == "Completed" and "statistics" in status_response:
stats = status_response["statistics"]
print(f" Total: {stats.get('totalRecordCount')}")
print(f" Successful: {stats.get('successfulRecordCount')}")
print(f" Failed: {stats.get('failedRecordCount')}")
break
time.sleep(10)
# Step 5: Verify job is in list
print("Step 5: Verifying job in list...")
list_response = bedrock_client.list_model_invocation_jobs(maxResults=20)
job_arns = [job["jobArn"] for job in list_response.get("invocationJobSummaries", [])]
assert job_arn in job_arns, f"Job {job_arn} should be in list"
print(f" Verified job is in list")
print(f"\nSuccess! Batch workflow completed for job {job_arn}")
OpenAI Provider
No S3 configuration required. Files are stored in OpenAI’s native storage. The bucket/key values are routing identifiers used by Bifrost.
Copy
Ask AI
import boto3
import json
import time
# Configuration (no S3 bucket needed for OpenAI)
region = "us-west-2"
model_id = "gpt-4o-mini"
provider = "openai"
# Provider header handler
def add_provider_header(request, **kwargs):
request.headers["x-model-provider"] = provider
# Setup clients
s3_client = boto3.client(
"s3",
region_name=region,
endpoint_url="http://localhost:8080/bedrock/files",
)
s3_client.meta.events.register("before-send", add_provider_header)
bedrock_client = boto3.client(
"bedrock",
region_name=region,
endpoint_url="http://localhost:8080/bedrock",
)
bedrock_client.meta.events.register("before-send", add_provider_header)
# Step 1: Create OpenAI JSONL content
print("Step 1: Creating OpenAI batch input file...")
def create_openai_jsonl(num_requests: int) -> str:
lines = []
for i in range(num_requests):
record = {
"custom_id": f"request-{i+1}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": model_id,
"messages": [
{"role": "user", "content": f"What is {i+1} + {i+1}? Answer briefly."}
],
"max_tokens": 100,
},
}
lines.append(json.dumps(record))
return "\n".join(lines)
jsonl_content = create_openai_jsonl(num_requests=3)
# Step 2: Upload input file (bucket/key are routing identifiers)
print("Step 2: Uploading input file...")
timestamp = int(time.time())
upload_response = s3_client.put_object(
Bucket="openai-files", # Routing identifier, not actual S3
Key=f"batch_{timestamp}.jsonl",
Body=jsonl_content.encode(),
ContentType="application/jsonl",
)
file_id = upload_response.get("ETag", "").strip('"')
print(f" Uploaded file ID: {file_id}")
# Step 3: Create batch job
print("Step 3: Creating OpenAI batch job...")
job_response = bedrock_client.create_model_invocation_job(
jobName=f"openai-e2e-{timestamp}",
modelId=model_id,
roleArn="not-required-for-openai", # Not used for OpenAI
inputDataConfig={
"s3InputDataConfig": {"s3Uri": f"s3://openai-files/{file_id}", "s3InputFormat": "JSONL"}
},
outputDataConfig={
"s3OutputDataConfig": {"s3Uri": "s3://openai-output/"}
},
tags=[
{"key": "endpoint", "value": "/v1/chat/completions"},
{"key": "file_id", "value": file_id},
],
)
job_arn = job_response["jobArn"]
print(f" Created job: {job_arn}")
# Step 4: Poll for completion
print("Step 4: Polling job status...")
for i in range(20):
status_response = bedrock_client.get_model_invocation_job(jobIdentifier=job_arn)
status = status_response["status"]
print(f" Poll {i+1}: status = {status}")
if status in ["Completed", "Failed", "Stopped"]:
print(f" Job reached terminal state: {status}")
break
time.sleep(10)
print(f"\nSuccess! OpenAI batch workflow completed for job {job_arn}")
Gemini Provider
No S3 configuration required. Files are stored in Google Cloud Storage. The bucket/key values are routing identifiers used by Bifrost.
Copy
Ask AI
import boto3
import json
import time
# Configuration (no S3 bucket needed for Gemini)
region = "us-west-2"
model_id = "gemini-1.5-flash"
provider = "gemini"
# Provider header handler
def add_provider_header(request, **kwargs):
request.headers["x-model-provider"] = provider
# Setup clients
s3_client = boto3.client(
"s3",
region_name=region,
endpoint_url="http://localhost:8080/bedrock/files",
)
s3_client.meta.events.register("before-send", add_provider_header)
bedrock_client = boto3.client(
"bedrock",
region_name=region,
endpoint_url="http://localhost:8080/bedrock",
)
bedrock_client.meta.events.register("before-send", add_provider_header)
# Step 1: Create Gemini JSONL content
print("Step 1: Creating Gemini batch input file...")
def create_gemini_jsonl(num_requests: int) -> str:
lines = []
for i in range(num_requests):
record = {
"request": {
"contents": [
{
"role": "user",
"parts": [{"text": f"What is {i+1} + {i+1}? Answer briefly."}],
}
],
"generationConfig": {"maxOutputTokens": 100},
},
"metadata": {"key": f"request-{i+1}"},
}
lines.append(json.dumps(record))
return "\n".join(lines)
jsonl_content = create_gemini_jsonl(num_requests=3)
# Step 2: Upload input file (bucket/key are routing identifiers)
print("Step 2: Uploading input file...")
timestamp = int(time.time())
upload_response = s3_client.put_object(
Bucket="gemini-files", # Routing identifier, not actual S3
Key=f"batch_{timestamp}.jsonl",
Body=jsonl_content.encode(),
ContentType="application/jsonl",
)
file_id = upload_response.get("ETag", "").strip('"')
print(f" Uploaded file ID: {file_id}")
# Step 3: Create batch job
print("Step 3: Creating Gemini batch job...")
job_response = bedrock_client.create_model_invocation_job(
jobName=f"gemini-e2e-{timestamp}",
modelId=model_id,
roleArn="not-required-for-gemini", # Not used for Gemini
inputDataConfig={
"s3InputDataConfig": {"s3Uri": f"s3://gemini-files/{file_id}", "s3InputFormat": "JSONL"}
},
outputDataConfig={
"s3OutputDataConfig": {"s3Uri": "s3://gemini-output/"}
},
)
job_arn = job_response["jobArn"]
print(f" Created job: {job_arn}")
# Step 4: Poll for completion (same as Bedrock)
# ... (same polling logic as above)
print(f"\nSuccess! Gemini batch workflow completed.")
JSONL Format Reference
Bedrock Format
Copy
Ask AI
{"recordId": "request-1", "modelInput": {"messages": [{"role": "user", "content": [{"text": "Hello!"}]}], "inferenceConfig": {"maxTokens": 100}}}
OpenAI Format
Copy
Ask AI
{"custom_id": "request-1", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "gpt-4o-mini", "messages": [{"role": "user", "content": "Hello!"}], "max_tokens": 100}}
Gemini Format
Copy
Ask AI
{"request": {"contents": [{"role": "user", "parts": [{"text": "Hello!"}]}], "generationConfig": {"maxOutputTokens": 100}}, "metadata": {"key": "request-1"}}
Provider-Specific Notes
| Provider | Header Value | File Storage | S3 Config Required | IAM Role Required |
|---|---|---|---|---|
| Bedrock | bedrock | AWS S3 | ✅ Yes | ✅ Yes |
| OpenAI | openai | OpenAI storage | ❌ No | ❌ No |
| Gemini | gemini | Google Cloud Storage | ❌ No | ❌ No |
| Anthropic | anthropic | N/A | N/A | N/A |
Bedrock Provider: Requires S3 bucket and IAM role configuration. You can use Anthropic models deployed on Bedrock for batch and files APIs.OpenAI & Gemini Providers: No AWS infrastructure needed. Files are stored in the provider’s native storage. The S3 bucket/key values in the examples are routing identifiers used by Bifrost.Anthropic Provider: Does not support S3-based file uploads. Use the Anthropic SDK with inline batch requests instead.
IAM Role Requirements (Bedrock Only)
For Bedrock provider batch jobs, you need an IAM role with these permissions:Copy
Ask AI
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:GetObject", "s3:PutObject", "s3:ListBucket"],
"Resource": ["arn:aws:s3:::your-bucket", "arn:aws:s3:::your-bucket/*"]
},
{
"Effect": "Allow",
"Action": ["bedrock:InvokeModel"],
"Resource": "*"
}
]
}
Copy
Ask AI
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "bedrock.amazonaws.com"},
"Action": "sts:AssumeRole"
}
]
}
Next Steps
- Overview - Bedrock SDK integration basics
- Configuration - Bifrost setup and configuration
- Core Features - Governance, semantic caching, and more

