-
Notifications
You must be signed in to change notification settings - Fork 1.5k
update #1788
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
update #1788
Changes from 10 commits
c07bc05
8a7ae86
e8d2c18
a18ba50
d1094c5
ec23de8
5b2c630
647b11a
b7ac3ef
fb31c76
9a46214
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,3 @@ | ||
| { | ||
| ".": "6.32.0" | ||
| ".": "6.33.0" | ||
| } |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,7 @@ | ||
| // File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details. | ||
|
|
||
| import * as WS from 'ws'; | ||
| import { ResponsesEmitter, buildURL } from './internal-base'; | ||
| import { ResponsesEmitter, ResponsesStreamMessage, WebSocketError, buildURL } from './internal-base'; | ||
| import * as ResponsesAPI from './responses'; | ||
| import { OpenAI } from '../../client'; | ||
|
|
||
|
|
@@ -65,6 +65,135 @@ export class ResponsesWS extends ResponsesEmitter { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Returns an async iterator over WebSocket lifecycle and message events, | ||
| * providing an alternative to the event-based `.on()` API. | ||
| * The iterator will exit if the socket closes but breaking out of the iterator | ||
| * does not close the socket. | ||
| * | ||
| * @example | ||
| * ```ts | ||
| * for await (const event of connection.stream()) { | ||
| * switch (event.type) { | ||
| * case 'message': | ||
| * console.log('received:', event.message); | ||
| * break; | ||
| * case 'error': | ||
| * console.error(event.error); | ||
| * break; | ||
| * case 'close': | ||
| * console.log('connection closed'); | ||
| * break; | ||
| * } | ||
| * } | ||
| * ``` | ||
| */ | ||
| stream(): AsyncIterableIterator<ResponsesStreamMessage> { | ||
| return this[Symbol.asyncIterator](); | ||
| } | ||
|
|
||
| [Symbol.asyncIterator](): AsyncIterableIterator<ResponsesStreamMessage> { | ||
| // Two-queue async iterator: `queue` buffers incoming messages, | ||
| // `resolvers` buffers waiting next() calls. A push wakes the | ||
| // oldest next(); a next() drains the oldest message. | ||
| const queue: ResponsesStreamMessage[] = []; | ||
| const resolvers: (() => void)[] = []; | ||
| let done = false; | ||
|
|
||
| const push = (msg: ResponsesStreamMessage) => { | ||
| queue.push(msg); | ||
| resolvers.shift()?.(); | ||
| }; | ||
|
Comment on lines
+103
to
+106
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we add a hard cap to the queue and define an overflow behavior? Right now this is an unbounded in-memory buffer. a slow or abandoned consumer might cause unbounded growth. |
||
|
|
||
| const onEvent = (event: ResponsesAPI.ResponsesServerEvent) => { | ||
| if (event.type === 'error') return; // handled by onEmitterError | ||
| push({ type: 'message', message: event }); | ||
| }; | ||
|
|
||
| // Catches both API-level and socket-level errors via _onError → _emit('error') | ||
| const onEmitterError = (err: WebSocketError) => { | ||
| push({ type: 'error', error: err }); | ||
| }; | ||
|
|
||
| const onOpen = () => { | ||
| push({ type: 'open' }); | ||
| }; | ||
|
|
||
| const flushResolvers = () => { | ||
| for (let resolver = resolvers.shift(); resolver; resolver = resolvers.shift()) { | ||
| resolver(); | ||
| } | ||
| }; | ||
|
|
||
| const onClose = () => { | ||
| push({ type: 'close' }); | ||
| done = true; | ||
| flushResolvers(); | ||
| cleanup(); | ||
| }; | ||
|
|
||
| const cleanup = () => { | ||
| this.off('event', onEvent); | ||
| this.off('error', onEmitterError); | ||
| this.socket.off('open', onOpen); | ||
| this.socket.off('close', onClose); | ||
| }; | ||
|
|
||
| this.on('event', onEvent); | ||
| this.on('error', onEmitterError); | ||
| this.socket.on('open', onOpen); | ||
| this.socket.on('close', onClose); | ||
|
Comment on lines
+143
to
+145
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Curious, does each iterator attach its own listeners? if so, this might increase memory usage. Probably not a concern but we should either enforce its ownership or document that multi-consumer duplication is intentional. |
||
|
|
||
| switch (this.socket.readyState) { | ||
| case WS.WebSocket.CONNECTING: | ||
| push({ type: 'connecting' }); | ||
| break; | ||
| case WS.WebSocket.OPEN: | ||
| push({ type: 'open' }); | ||
| break; | ||
| case WS.WebSocket.CLOSING: | ||
| push({ type: 'closing' }); | ||
| break; | ||
| case WS.WebSocket.CLOSED: | ||
| push({ type: 'close' }); | ||
| done = true; | ||
| cleanup(); | ||
| break; | ||
| } | ||
|
Comment on lines
+147
to
+162
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks like it could be a race condition. The socket can transition after listeners are attached BUT before the |
||
|
|
||
| const resolve = (res: (value: IteratorResult<ResponsesStreamMessage>) => void) => { | ||
| if (queue.length > 0) { | ||
| res({ value: queue.shift()!, done: false }); | ||
| } else if (done) { | ||
| res({ value: undefined, done: true }); | ||
| } else { | ||
| return false; | ||
| } | ||
| return true; | ||
| }; | ||
|
|
||
| const next = (): Promise<IteratorResult<ResponsesStreamMessage>> => | ||
| new Promise((res) => { | ||
| if (resolve(res)) return; | ||
| resolvers.push(() => { | ||
| resolve(res); | ||
| }); | ||
| }); | ||
|
|
||
| return { | ||
| next, | ||
| return: (): Promise<IteratorReturnResult<undefined>> => { | ||
| done = true; | ||
| cleanup(); | ||
| flushResolvers(); | ||
| return Promise.resolve({ value: undefined, done: true }); | ||
| }, | ||
| [Symbol.asyncIterator]() { | ||
| return this; | ||
| }, | ||
| }; | ||
| } | ||
|
|
||
| private authHeaders(): Record<string, string> { | ||
| return { Authorization: `Bearer ${this.client.apiKey}` }; | ||
| return {}; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a consumer applies the new
for await/.stream()pattern to the other public websocket entry points (src/realtime/ws.ts,src/realtime/websocket.ts, and thesrc/beta/...variants), those classes still throw because they do not implementstream()orSymbol.asyncIterator. That leaves the SDK with two incompatible websocket APIs under one feature, so shared helpers over responses and realtime sockets will fail at runtime unless the sibling classes are updated as well.Useful? React with 👍 / 👎.