Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/activity-observers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@tanstack/ai': minor
---

Add an activity-agnostic observability hook for non-chat activities (#720). The media activities — `generateImage`, `generateVideo`, `generateAudio`, `generateSpeech`, and `generateTranscription` — now accept an `observers` option taking lightweight `ActivityObserver`s (`onStart` / `onFinish` / `onError`, payload discriminated by `activity`). Observers are awaited in order and strictly non-fatal — a throwing observer is logged and skipped, never breaking the activity.

Ships `otelObserver()` on the new `@tanstack/ai/observability` subpath: it emits one `gen_ai.*` span per activity call, tagged with the correct `gen_ai.operation.name` (`image_generation`, `video_generation`, `audio_generation`, `text_to_speech`, `transcription`), and reuses the same `gen_ai.usage.*` attribute set as `otelMiddleware` — now including `tanstack.ai.usage.units_billed` for unit-billed media. With a `Meter` it also records the `gen_ai.client.operation.duration` histogram per activity. The `ActivityObserver` types are exported from the package root, while the `otelObserver` value lives on the subpath so importing `@tanstack/ai` never requires the optional `@opentelemetry/api` peer.
5 changes: 5 additions & 0 deletions .changeset/otel-full-usage-emission.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@tanstack/ai': minor
---

`otelMiddleware` now emits the rest of the reported `TokenUsage` on spans instead of only input/output tokens (#721). When the provider reports them, spans carry `gen_ai.usage.total_tokens`, `gen_ai.usage.cost` (provider-reported cost — cache discounts and gateway markup included, so backends like PostHog no longer re-derive cost from price tables), the official semconv cache/reasoning breakdowns (`gen_ai.usage.cache_read.input_tokens`, `gen_ai.usage.cache_creation.input_tokens`, `gen_ai.usage.reasoning.output_tokens`), and TanStack-namespaced attributes for duration-based billing (`tanstack.ai.usage.duration_seconds`) and the upstream cost split (`tanstack.ai.usage.upstream_cost` / `upstream_input_cost` / `upstream_output_cost`). All attributes are guarded — spans stay unchanged when a provider doesn't report a field. Media-oriented fields (`unitsBilled`, per-modality token breakdowns) and the provider-shaped `providerUsageDetails` bag are intentionally not emitted; media-activity observability is tracked in #720.
47 changes: 47 additions & 0 deletions docs/advanced/otel.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ Iteration spans are numbered (`#0`, `#1`, ...) so distinct iterations of the sam
| iteration | `gen_ai.request.max_tokens` | from config |
| iteration | `gen_ai.usage.input_tokens` | per iteration |
| iteration | `gen_ai.usage.output_tokens` | per iteration |
| root / iteration | `gen_ai.usage.total_tokens` | provider-reported total |
| root / iteration | `gen_ai.usage.cost` | provider-reported cost, when available |
| root / iteration | `gen_ai.usage.cache_read.input_tokens` | cached prompt tokens, when reported |
| root / iteration | `gen_ai.usage.cache_creation.input_tokens` | cache-write prompt tokens, when reported |
| root / iteration | `gen_ai.usage.reasoning.output_tokens` | reasoning/thinking tokens, when reported |
| root / iteration | `tanstack.ai.usage.duration_seconds` | duration-based billing (e.g. transcription), when reported |
| root / iteration | `tanstack.ai.usage.upstream_cost` | gateway upstream cost (e.g. OpenRouter), when reported |
| root / iteration | `tanstack.ai.usage.upstream_input_cost` | upstream input cost split, when reported |
| root / iteration | `tanstack.ai.usage.upstream_output_cost` | upstream output cost split, when reported |
| iteration | `gen_ai.response.finish_reasons` | `[stop]`, `[tool_calls]`, ... |
| root | `gen_ai.usage.input_tokens` | rolled up |
| root | `gen_ai.usage.output_tokens` | rolled up |
Expand All @@ -81,6 +90,8 @@ Iteration spans are numbered (`#0`, `#1`, ...) so distinct iterations of the sam
| tool | `gen_ai.tool.type` | `function` |
| tool | `tanstack.ai.tool.outcome` | `success` / `error` |

Usage attributes beyond input/output tokens are emitted only when the provider reports them, so spans stay clean otherwise. Cache and reasoning breakdowns use the official GenAI semconv names; `gen_ai.usage.cost` and `gen_ai.usage.total_tokens` are de-facto extensions consumed directly by backends like PostHog — without them, backends re-derive cost from their own price tables and lose cache discounts and gateway markup. Fields with no established convention (duration-based billing, the upstream cost split) are TanStack-namespaced.

### Metrics

Two GenAI-standard histograms:
Expand Down Expand Up @@ -164,6 +175,42 @@ otelMiddleware({
})
```

## Beyond chat: media activities

`otelMiddleware` covers `chat()`. The media activities — `generateImage`, `generateVideo`, `generateAudio`, `generateSpeech`, and `generateTranscription` — are single request → response (or submit → poll for video), so instead of the chat middleware pipeline they take a lighter **observer**. Pass `otelObserver()` (from the `@tanstack/ai/observability` subpath) on the activity's `observers` option to emit one span per call:

```ts
import { generateImage } from '@tanstack/ai'
import { otelObserver } from '@tanstack/ai/observability'
import { openaiImage } from '@tanstack/ai-openai'
import { trace, metrics } from '@opentelemetry/api'

const observer = otelObserver({
tracer: trace.getTracer('my-app'),
meter: metrics.getMeter('my-app'),
})

const result = await generateImage({
adapter: openaiImage('gpt-image-2'),
prompt: 'A serene mountain landscape at sunset',
observers: [observer],
})
```

Each call produces one `CLIENT` span tagged with the activity's `gen_ai.operation.name`:

| Activity | `gen_ai.operation.name` |
| --- | --- |
| `generateImage` | `image_generation` |
| `generateVideo` | `video_generation` |
| `generateAudio` | `audio_generation` |
| `generateSpeech` | `text_to_speech` |
| `generateTranscription` | `transcription` |

The span carries `gen_ai.system` and `gen_ai.request.model` at start and, on finish, the same `gen_ai.usage.*` / `tanstack.ai.usage.*` attributes documented above — including `tanstack.ai.usage.units_billed` for unit-billed media. When a `Meter` is supplied it records the `gen_ai.client.operation.duration` histogram, tagged per activity. For streaming video the span covers the full create → poll → complete lifecycle; for non-streaming `generateVideo` it covers job submission.

`otelObserver` supports the same `spanNameFormatter` and `attributeEnricher` extension points. For a custom backend, implement the `ActivityObserver` contract (`onStart` / `onFinish` / `onError`) directly — its event payload is discriminated by `activity`. The observer types are exported from the package root; the `otelObserver` value lives on the `@tanstack/ai/observability` subpath so importing `@tanstack/ai` never requires the optional `@opentelemetry/api` peer.

## Related

- [Middleware](./middleware) — the lifecycle this middleware hooks into
Expand Down
3 changes: 2 additions & 1 deletion docs/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@
{
"label": "OpenTelemetry",
"to": "advanced/otel",
"addedAt": "2026-05-08"
"addedAt": "2026-05-08",
"updatedAt": "2026-06-15"
}
]
},
Expand Down
4 changes: 4 additions & 0 deletions packages/ai/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
"types": "./dist/esm/middlewares/otel.d.ts",
"import": "./dist/esm/middlewares/otel.js"
},
"./observability": {
"types": "./dist/esm/observability/index.d.ts",
"import": "./dist/esm/observability/index.js"
},
"./adapter-internals": {
"types": "./dist/esm/adapter-internals.d.ts",
"import": "./dist/esm/adapter-internals.js"
Expand Down
51 changes: 50 additions & 1 deletion packages/ai/src/activities/generateAudio/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@
import { aiEventClient } from '@tanstack/ai-event-client'
import { streamGenerationResult } from '../stream-generation-result.js'
import { resolveDebugOption } from '../../logger/resolve'
import {
notifyObserverError,
notifyObserverFinish,
notifyObserverStart,
} from '../../observability/notify'
import type { InternalLogger } from '../../logger/internal-logger'
import type { DebugOption } from '../../logger/types'
import type { ActivityObserver } from '../../observability/types'
import type { AudioAdapter } from './adapter'
import type { AudioGenerationResult, StreamChunk } from '../../types'

Expand Down Expand Up @@ -70,6 +76,12 @@ export interface AudioActivityOptions<
* control and/or a custom `Logger`.
*/
debug?: DebugOption
/**
* Observability hooks notified on start, success, and error. Pass
* `otelObserver()` to emit OpenTelemetry spans, or implement the
* `ActivityObserver` contract for a custom backend.
*/
observers?: Array<ActivityObserver>
}

// ===========================
Expand Down Expand Up @@ -135,7 +147,7 @@ async function runGenerateAudio<
>(
options: AudioActivityOptions<TAdapter, boolean>,
): Promise<AudioGenerationResult> {
const { adapter, stream: _stream, debug: _debug, ...rest } = options
const { adapter, stream: _stream, debug: _debug, observers, ...rest } = options
const model = adapter.model
const requestId = createId('audio')
const startTime = Date.now()
Expand All @@ -145,6 +157,18 @@ async function runGenerateAudio<
(adapter as { name?: string }).name ??
'unknown'

await notifyObserverStart(
observers,
{
activity: 'audio',
requestId,
provider: adapter.name,
model,
modelOptions: rest.modelOptions,
},
logger,
)

aiEventClient.emit('audio:request:started', {
requestId,
provider: adapter.name,
Expand Down Expand Up @@ -189,6 +213,19 @@ async function runGenerateAudio<
audioDuration: result.audio.duration,
})

await notifyObserverFinish(
observers,
{
activity: 'audio',
requestId,
provider: adapter.name,
model,
durationMs: elapsedMs,
usage: result.usage,
},
logger,
)

return result
} catch (error) {
const elapsedMs = Date.now() - startTime
Expand All @@ -202,6 +239,18 @@ async function runGenerateAudio<
modelOptions: rest.modelOptions as Record<string, unknown> | undefined,
timestamp: Date.now(),
})
await notifyObserverError(
observers,
{
activity: 'audio',
requestId,
provider: adapter.name,
model,
durationMs: elapsedMs,
error,
},
logger,
)
logger.errors('generateAudio activity failed', {
error,
source: 'generateAudio',
Expand Down
51 changes: 50 additions & 1 deletion packages/ai/src/activities/generateImage/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@
import { aiEventClient } from '@tanstack/ai-event-client'
import { streamGenerationResult } from '../stream-generation-result.js'
import { resolveDebugOption } from '../../logger/resolve'
import {
notifyObserverError,
notifyObserverFinish,
notifyObserverStart,
} from '../../observability/notify'
import type { InternalLogger } from '../../logger/internal-logger'
import type { DebugOption } from '../../logger/types'
import type { ActivityObserver } from '../../observability/types'
import type { ImageAdapter } from './adapter'
import type { ImageGenerationResult, StreamChunk } from '../../types'

Expand Down Expand Up @@ -92,6 +98,12 @@ export type ImageActivityOptions<
* control and/or a custom `Logger`.
*/
debug?: DebugOption
/**
* Observability hooks notified on start, success, and error. Pass
* `otelObserver()` to emit OpenTelemetry spans, or implement the
* `ActivityObserver` contract for a custom backend.
*/
observers?: Array<ActivityObserver>
} & ({} extends ImageProviderOptionsForModel<TAdapter, TAdapter['model']>
? {
/** Provider-specific options for image generation */ modelOptions?: ImageProviderOptionsForModel<
Expand Down Expand Up @@ -197,12 +209,24 @@ async function runGenerateImage<
>(
options: ImageActivityOptions<TAdapter, boolean>,
): Promise<ImageGenerationResult> {
const { adapter, stream: _stream, debug: _debug, ...rest } = options
const { adapter, stream: _stream, debug: _debug, observers, ...rest } = options
const model = adapter.model
const requestId = createId('image')
const startTime = Date.now()
const logger: InternalLogger = resolveDebugOption(options.debug)

await notifyObserverStart(
observers,
{
activity: 'image',
requestId,
provider: adapter.name,
model,
modelOptions: rest.modelOptions,
},
logger,
)

aiEventClient.emit('image:request:started', {
requestId,
provider: adapter.name,
Expand Down Expand Up @@ -255,8 +279,33 @@ async function runGenerateImage<
count: result.images.length,
})

await notifyObserverFinish(
observers,
{
activity: 'image',
requestId,
provider: adapter.name,
model,
durationMs: duration,
usage: result.usage,
},
logger,
)

return result
} catch (error) {
await notifyObserverError(
observers,
{
activity: 'image',
requestId,
provider: adapter.name,
model,
durationMs: Date.now() - startTime,
error,
},
logger,
)
logger.errors('generateImage activity failed', {
error,
source: 'generateImage',
Expand Down
51 changes: 50 additions & 1 deletion packages/ai/src/activities/generateSpeech/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@
import { aiEventClient } from '@tanstack/ai-event-client'
import { streamGenerationResult } from '../stream-generation-result.js'
import { resolveDebugOption } from '../../logger/resolve'
import {
notifyObserverError,
notifyObserverFinish,
notifyObserverStart,
} from '../../observability/notify'
import type { InternalLogger } from '../../logger/internal-logger'
import type { DebugOption } from '../../logger/types'
import type { ActivityObserver } from '../../observability/types'
import type { TTSAdapter } from './adapter'
import type { StreamChunk, TTSResult } from '../../types'

Expand Down Expand Up @@ -73,6 +79,12 @@ export interface TTSActivityOptions<
* control and/or a custom `Logger`.
*/
debug?: DebugOption
/**
* Observability hooks notified on start, success, and error. Pass
* `otelObserver()` to emit OpenTelemetry spans, or implement the
* `ActivityObserver` contract for a custom backend.
*/
observers?: Array<ActivityObserver>
}

// ===========================
Expand Down Expand Up @@ -143,7 +155,7 @@ export function generateSpeech<
async function runGenerateSpeech<
TAdapter extends TTSAdapter<string, TTSProviderOptions<TAdapter>>,
>(options: TTSActivityOptions<TAdapter, boolean>): Promise<TTSResult> {
const { adapter, stream: _stream, debug: _debug, ...rest } = options
const { adapter, stream: _stream, debug: _debug, observers, ...rest } = options
const model = adapter.model
const requestId = createId('speech')
const startTime = Date.now()
Expand All @@ -153,6 +165,18 @@ async function runGenerateSpeech<
(adapter as { name?: string }).name ??
'unknown'

await notifyObserverStart(
observers,
{
activity: 'speech',
requestId,
provider: adapter.name,
model,
modelOptions: rest.modelOptions,
},
logger,
)

aiEventClient.emit('speech:request:started', {
requestId,
provider: adapter.name,
Expand Down Expand Up @@ -202,6 +226,19 @@ async function runGenerateSpeech<
contentType: result.contentType,
})

await notifyObserverFinish(
observers,
{
activity: 'speech',
requestId,
provider: adapter.name,
model,
durationMs: duration,
usage: result.usage,
},
logger,
)

return result
} catch (error) {
const duration = Date.now() - startTime
Expand All @@ -215,6 +252,18 @@ async function runGenerateSpeech<
modelOptions: rest.modelOptions as Record<string, unknown> | undefined,
timestamp: Date.now(),
})
await notifyObserverError(
observers,
{
activity: 'speech',
requestId,
provider: adapter.name,
model,
durationMs: duration,
error,
},
logger,
)
logger.errors('generateSpeech activity failed', {
error,
source: 'generateSpeech',
Expand Down
Loading