User Guide
Batch Workflow
Set the relevant provider key in your environment before calling hosted APIs:
The main workflow is "run a batch, keep the good results, inspect the failures, and retry only what failed."
from infermesh import LMClient
prompts = [
"Summarize section 1 in two bullet points.",
"Summarize section 2 in two bullet points.",
"Summarize section 3 in two bullet points.",
]
with LMClient(
model="openai/gpt-4.1-mini",
max_parallel_requests=32,
rpm=500,
tpm=100_000,
) as client:
batch = client.generate_batch(prompts)
retry_prompts: list[str] = []
for prompt, result, error in zip(prompts, batch.results, batch.errors or []):
if error is not None:
print(f"FAILED: {prompt}\n {error}")
retry_prompts.append(prompt)
continue
print(result.output_text)
if result.token_usage is not None:
print("tokens:", result.token_usage.total_tokens)
if retry_prompts:
with LMClient(model="openai/gpt-4.1-mini") as retry_client:
retry_batch = retry_client.generate_batch(retry_prompts)
By default, one failing request does not abort the whole batch. Failed items are
stored as None in batch.results, and the corresponding exception is stored
in batch.errors[i].
For large Python batches, set max_parallel_requests explicitly. generate_batch
and transcribe_batch both use a bounded in-flight window when it is set; when it
is unset, they start one coroutine per item up front, which can cause memory pressure
for very large inputs. embed_batch is always micro-batched regardless of
max_parallel_requests — pass micro_batch_size to tune chunk size instead.
Crash-Resilient Batches with on_result
For large batches, you may want to write results to disk as each request completes rather than waiting for the whole batch to finish. This way a process crash or interruption only loses the in-flight requests, not everything already completed.
generate_batch, embed_batch, and transcribe_batch all support the same
on_result(index, result, error) contract.
Pass an on_result callback to generate_batch (or agenerate_batch):
import json
from pathlib import Path
from infermesh import LMClient
prompts = [...] # large list
with open("results.jsonl", "w") as out, \
LMClient(model="openai/gpt-4.1-mini", max_parallel_requests=32) as client:
def save(index: int, result, error) -> None:
row = {"index": index}
if error is not None:
row["error"] = str(error)
else:
row["output_text"] = result.output_text
out.write(json.dumps(row) + "\n")
out.flush()
client.generate_batch(prompts, on_result=save)
The callback receives:
| Argument | Type | Notes |
|---|---|---|
index |
int |
Position in input_batch (global item index, not micro-batch index) |
result |
GenerationResult \| EmbeddingResult \| TranscriptionResult \| None |
None on failure |
error |
BaseException \| None |
None on success |
The same contract applies to embed_batch and transcribe_batch.
For embed_batch, the callback uses the same index, result, and error
arguments when on_result is invoked, and index is always the position in the
original input list even when the provider call was part of a micro-batch.
Per-item error callbacks are guaranteed when return_exceptions=True. With
return_exceptions=False, a failed embedding micro-batch may raise before
on_result is called for the affected indices.
done = set()
output_path = Path("results.jsonl")
if output_path.exists():
for line in output_path.read_text().splitlines():
row = json.loads(line)
if "index" in row:
done.add(row["index"])
pending = [(i, p) for i, p in enumerate(prompts) if i not in done]
with open(output_path, "a") as out, \
LMClient(model="openai/gpt-4.1-mini", max_parallel_requests=32) as client:
def save(batch_idx: int, result, error) -> None:
orig_idx = pending[batch_idx][0]
row = {"index": orig_idx}
if error is not None:
row["error"] = str(error)
else:
row["output_text"] = result.output_text
out.write(json.dumps(row) + "\n")
out.flush()
client.generate_batch([p for _, p in pending], on_result=save)
The CLI --resume flag automates this pattern end-to-end.
Pass return_exceptions=False if you want the first failure to cancel the rest
of the batch and raise immediately instead.
This code works in Jupyter notebooks without any asyncio setup. The sync API
runs a background event loop so you do not have to.
The model string uses LiteLLM's provider/model-name format. See the
LiteLLM model list for all supported
providers and model names:
| Provider | Example |
|---|---|
| OpenAI | "openai/gpt-4.1-mini" |
| Anthropic | "anthropic/claude-3-5-sonnet-20241022" |
| Local vLLM | "hosted_vllm/meta-llama/Meta-Llama-3-8B-Instruct" |
api_base is optional for hosted providers — LiteLLM already knows their
endpoints. Set it explicitly for local servers or custom deployments.
CLI Batch From JSONL
For dataset-driven jobs, the CLI can read one JSON object per line.
prompts.jsonl:
{"prompt": "Summarize abstract A in one sentence."}
{"prompt": "Summarize abstract B in one sentence."}
{"prompt": "Summarize abstract C in one sentence."}
Run the batch:
infermesh generate \
--model openai/gpt-4.1-mini \
--api-base https://api.openai.com/v1 \
--input-jsonl prompts.jsonl \
--output-jsonl results.jsonl
Each output line contains a result or an error, plus an _index field that
records the row's position in the input file:
{"_index": 0, "output_text": "Abstract A is about...", "error": null}
{"_index": 1, "output_text": null, "error": "RateLimitError(...)"}
{"_index": 2, "output_text": "Abstract C is about...", "error": null}
Input rows for infermesh generate may contain any of the following fields:
promptfor a plain string promptmessagesfor a pre-built chat conversationresponses_inputfor an OpenAI Responses-style input payload
Resuming an Interrupted Run
Every file-backed run writes a checkpoint file alongside the output:
results.jsonl ← your output (human-readable)
results.checkpoint.sqlite ← checkpoint file (resume state)
By default the checkpoint stays beside the output for portability and
discoverability. If you want the checkpoint on local scratch instead, pass
--checkpoint-dir DIR or set INFERMESH_CHECKPOINT_DIR=DIR before the run.
When you resume later, reuse the same checkpoint-dir setting.
If a long batch is interrupted (Ctrl-C, OOM, network loss), re-run with
--resume to skip settled items and append only the remaining rows:
# First attempt — interrupted partway through
infermesh generate \
--model openai/gpt-4.1-mini \
--api-base https://api.openai.com/v1 \
--input-jsonl prompts.jsonl \
--output-jsonl results.jsonl
# Resume — reads results.checkpoint.sqlite, skips settled items, appends the rest
infermesh generate \
--model openai/gpt-4.1-mini \
--api-base https://api.openai.com/v1 \
--input-jsonl prompts.jsonl \
--output-jsonl results.jsonl \
--resume
Each source row is tracked by its content fingerprint plus its occurrence
count, so duplicate rows are resumed independently. Re-ordering the input file
before resuming is safe, and resumed rows keep the original _index values
from the first run. Removing rows, adding rows, or deduplicating the input
before resuming is not supported. Results are written to disk one row at a
time as each request completes, so a crash only loses the requests that were
in-flight at that moment.
The workflow keeps a rolling in-flight window, so each settled row immediately
admits the next pending row until the source is exhausted. Output rows are
written in completion order, not input order.
Row-level generation failures become per-item error rows and do not abort
their siblings, but setup and workflow failures still stop the command.
Use the _index field to re-sort after the run if needed.
--resume requires --output-jsonl and the matching checkpoint file from a
previous file-backed run. If the checkpoint is missing,
if the input and output paths are the same file, if the output file is missing
any settled _index rows recorded in the checkpoint, or if the current input
does not match the original row occurrences, infermesh fails fast instead of
guessing.
Custom Input Mapping with --mapper
Use --mapper to transform raw source records before they are sent to the
model. This lets you drive generation from any record format without
preprocessing the source file.
infermesh generate \
--model openai/gpt-4.1-mini \
--input-jsonl dataset.jsonl \
--output-jsonl results.jsonl \
--mapper mypackage.prompts:build_prompt
The mapper is imported as package.module:function. The function receives
each raw source record as a dict and must return a dict with at least an
"input" key:
# mypackage/prompts.py
def build_prompt(record: dict) -> dict:
return {
"input": f"Classify the following text:\n\n{record['body']}",
"metadata": {"doc_id": record["id"]},
}
| Return key | Required | Notes |
|---|---|---|
"input" |
Yes | Passed directly to the generation endpoint |
"metadata" |
No | Copied into the output row under "metadata" when it is a JSON-serializable dict |
Extra keys beyond "input" and "metadata" are ignored. Mapper failures
become per-item error rows — they do not abort the run. If you later resume a
file-backed run, infermesh requires the same mapper implementation that wrote
the original checkpoint file.
Generate Text
result = client.generate("Say hello in one sentence.")
print(result.output_text) # generated text
print(result.token_usage) # prompt / completion / total token counts
print(result.finish_reason) # "stop", "length", …
print(result.request_id) # provider-assigned ID for debugging
Structured Output
Pass a Pydantic model as response_format and the output is parsed
automatically:
from pydantic import BaseModel
class Answer(BaseModel):
value: int
confidence: float
result = client.generate(
"What is 2 + 2? Respond in JSON.",
response_format=Answer,
)
print(result.output_text) # raw JSON string
print(result.output_parsed) # Answer(value=4, confidence=0.99)
A plain dict (JSON Schema) is also accepted in place of a Pydantic model; the
output is returned as a plain Python object and validated against the schema.
Parse failures are logged as warnings and output_parsed is None if parsing
fails.
Timeout And Per-Request Overrides
Set a default timeout for every request at construction time:
client = LMClient(
model="openai/gpt-4.1-mini",
api_base="https://api.openai.com/v1",
timeout=30.0,
)
Any LiteLLM keyword argument passed to a call overrides the default for that request:
Use default_request_kwargs to set persistent request defaults:
client = LMClient(
model="openai/gpt-4.1-mini",
api_base="https://api.openai.com/v1",
default_request_kwargs={"max_tokens": 256, "temperature": 0.7},
)
Create Embeddings
# Single string -> EmbeddingResult
result = client.embed("The quick brown fox")
print(result.embedding)
# Multiple strings -> processed in resilient micro-batches by default
batch = client.embed_batch(
["sentence one", "sentence two", "sentence three"],
micro_batch_size=32,
)
vectors = [r.embedding for r in batch if r is not None]
Transcribe Audio
result = client.transcribe("recording.wav")
print(result.text)
print(result.language)
print(result.duration_s)
batch = client.transcribe_batch(["recording-a.wav", "recording-b.wav"])
texts = [r.text if r is not None else None for r in batch]
transcribe_batch supports the same on_result and on_progress callbacks as
generate_batch. Use on_result to stream results to disk as each file completes
rather than waiting for the whole batch:
import json
with open("transcripts.jsonl", "w") as out, \
LMClient(model="whisper-1", max_parallel_requests=4) as client:
def save(index: int, result, error) -> None:
row = {"index": index}
if error is not None:
row["error"] = str(error)
else:
row["text"] = result.text
out.write(json.dumps(row) + "\n")
out.flush()
client.transcribe_batch(audio_paths, on_result=save)
Set max_parallel_requests to bound how many audio files are in-flight at once.
When it is unset, transcribe_batch starts all requests up front.
Audio inputs larger than 25 MB are rejected by default. Pass
max_transcription_bytes=None only in trusted environments where the server is
expected to accept larger uploads. Disabling the guard means the client may
read and send very large audio files in full. Pass a smaller integer to
tighten the limit.
Multimodal / VLM
For URL-based images, pass the OpenAI content-block dict directly:
result = client.generate([{
"role": "user",
"content": [
{"type": "text", "text": "What's in this image?"},
{"type": "image_url", "image_url": {"url": "https://example.com/photo.jpg"}},
],
}])
For local files or raw bytes, use image_block() to
handle base64 encoding:
from pathlib import Path
from infermesh import LMClient, image_block
with LMClient(model="openai/gpt-4o", api_base="https://api.openai.com/v1") as client:
result = client.generate([{
"role": "user",
"content": [
{"type": "text", "text": "Describe this diagram in one sentence."},
image_block(Path("diagram.png")),
image_block(Path("photo.jpg"), detail="high"),
image_block(raw_bytes, mime_type="image/jpeg"),
],
}])
print(result.output_text)
Handling API Keys
Never pass secrets on the command line. Instead, export provider environment
variables or use --env-file to load a .env file:
infermesh generate \
--model openai/gpt-4.1-mini \
--api-base https://api.openai.com/v1 \
--env-file .env \
--prompt "Hello"
Add .env to .gitignore so secrets are never committed.
Rate Limiting
Pass any combination of rpm / tpm / rpd / tpd to activate the built-in
rate limiter. The client queues requests automatically and respects all four
limits simultaneously. Find your tier's limits in the provider dashboard: for
OpenAI check Settings → Limits; for Anthropic check Console → Settings →
Limits.
client = LMClient(
model="openai/gpt-4.1-mini",
api_base="https://api.openai.com/v1",
rpm=500,
tpm=100_000,
)
Use max_request_burst / max_token_burst to allow short bursts above the
steady-state rate. Use default_output_tokens to pre-reserve output tokens for
rate-limit accounting when you do not set max_tokens per request.
Provider rate-limit headers (x-ratelimit-*) are read automatically after each
response to keep the client's internal counters in sync with the server's view.
Use header_bucket_scope to control whether headers are routed to the
per-minute or per-day buckets.
CLI flags: --rpm, --tpm, --rpd, --tpd, --max-request-burst,
--max-token-burst.
Multi-Replica Routing
When you run multiple inference servers for the same model, pass a
deployments dict to spread load across them. model is the logical model
name the router exposes; each DeploymentConfig.model is the backend string
sent to that replica.
from infermesh import DeploymentConfig, LMClient
client = LMClient(
model="llama-3-8b",
deployments={
"gpu-0": DeploymentConfig(
model="hosted_vllm/meta-llama/Meta-Llama-3-8B-Instruct",
api_base="http://host1:8000/v1",
),
"gpu-1": DeploymentConfig(
model="hosted_vllm/meta-llama/Meta-Llama-3-8B-Instruct",
api_base="http://host2:8000/v1",
),
"gpu-2": DeploymentConfig(
model="hosted_vllm/meta-llama/Meta-Llama-3-8B-Instruct",
api_base="http://host3:8000/v1",
),
},
routing_strategy="least-busy", # or "simple-shuffle" (default), "latency-based-routing"
)
result = client.generate("Summarize this paper in one paragraph.")
print(result.metrics.deployment)
CLI With Repeated --api-base
infermesh generate \
--model llama-3-8b \
--api-base http://host1:8000/v1 \
--api-base http://host2:8000/v1 \
--api-base http://host3:8000/v1 \
--prompt "Hello"
CLI With TOML
# deployments.toml
[deployments.gpu-0]
model = "hosted_vllm/meta-llama/Meta-Llama-3-8B-Instruct"
api_base = "http://host1:8000/v1"
[deployments.gpu-1]
model = "hosted_vllm/meta-llama/Meta-Llama-3-8B-Instruct"
api_base = "http://host2:8000/v1"
Keep API keys out of TOML files. Use environment variables or --env-file
instead.
Automatic Retries
By default, LMClient retries transient provider errors up to 3 times with
exponential backoff. This covers the failure modes you'd otherwise have to handle
yourself: rate-limit spikes, momentary server unavailability, network drops, and
request timeouts.
What gets retried:
| Status | Exception |
|---|---|
| 429 Too Many Requests | RateLimitError |
| 503 Service Unavailable | ServiceUnavailableError |
| 500 Internal Server Error | InternalServerError |
| Network failure | APIConnectionError |
| Request timeout | Timeout |
What is not retried (propagates immediately): BadRequestError,
AuthenticationError, PermissionDeniedError, NotFoundError,
ContextWindowExceededError, and other non-transient errors.
Backoff formula: min(2 ** attempt, 60) seconds plus up to 0.5 s of random
jitter. If the provider returns a Retry-After header the client sleeps for that
duration instead (capped at 60 s).
The backoff sleep happens outside the semaphore, so other in-flight requests are not blocked while one request is waiting to retry.
Use result.metrics.retries to see how many attempts were needed:
result = client.generate("Hello")
if result.metrics.retries > 0:
print(f"Succeeded after {result.metrics.retries} retries")
For batch jobs, retries are per-item and transparent — the result you get back already reflects the final successful response.
CLI flag: --max-retries.
Async API
All methods have async counterparts prefixed with a. The sync methods work in
notebooks and scripts by running a background event loop thread, so you do not
need to manage the loop yourself.
import asyncio
from infermesh import LMClient
async def main():
async with LMClient(model="openai/gpt-4.1-mini", api_base="https://api.openai.com/v1") as client:
result = await client.agenerate("Hello")
batch = await client.agenerate_batch(["prompt A", "prompt B", "prompt C"])
embedding = await client.aembed("The quick brown fox")
embedding_batch = await client.aembed_batch(["text a", "text b"])
transcription_batch = await client.atranscribe_batch(["a.wav", "b.wav"])
print(
result.output_text,
len(batch),
len(embedding.embedding),
len(embedding_batch),
len(transcription_batch),
)
asyncio.run(main())
async with calls close() automatically. For sync code, use with or call
client.close() when done.
Benchmarking
infermesh bench measures client-side throughput across a sweep. It is
intentionally a client benchmark: it helps you choose a good
max_parallel_requests or embedding batch size for your workload, not the
server's absolute maximum capacity.
infermesh bench generate \
--model openai/gpt-4.1-mini \
--api-base https://api.openai.com/v1 \
--prompt "Write a haiku." \
--warmup 5 \
--requests 50 \
--output-json bench.json
Output:
c=1 rps=3.14 p50=0.401s p95=0.412s p99=0.420s svc_p95=0.410s q_p95=0.001s err=0/50 elapsed=15.9s
c=2 rps=5.81 p50=0.470s p95=0.487s p99=0.501s svc_p95=0.480s q_p95=0.002s err=0/50 elapsed=8.6s
recommended_max_parallel_requests=8
c is the concurrency level. p50/p95/p99 are end-to-end latency
percentiles as seen by the caller. svc_p95 is the P95 of net provider
response time (excluding queue wait). q_p95 is the P95 of time a request
spent waiting in the client queue before being sent. High q_p95 relative to
svc_p95 means the client is the bottleneck, not the server.
Use --input-jsonl to benchmark with a real prompt distribution instead of one
repeated prompt. An embedding benchmark is available as infermesh bench embed.