raaymax
raaymax4mo ago

Event loop resolving prematurely

It looks like event loop clears on await it.return?.(); call which is really strange. I can't figure out whats happening here and where the problem is. Can anyone help with that?
import { assert } from 'jsr:@std/assert@^1.0.0';

async function* abortable(
p: ReadableStream<Uint8Array>,
signal: AbortSignal,
): AsyncGenerator<Uint8Array> {
signal.throwIfAborted();
const { promise, reject } = Promise.withResolvers<never>();
const abort = () => reject(signal.reason);
signal.addEventListener("abort", abort, { once: true });

const it = p[Symbol.asyncIterator]();
try{
while (true) {
const race = Promise.race([promise, it.next()]);
race.catch(() => {
signal.removeEventListener("abort", abort);
});
const { done, value } = await race;
if (done) {
signal.removeEventListener("abort", abort);
return;
}
yield value;
}
}catch(e){
await it.return?.();
throw e;
}
}

Deno.test('aborting streams', async () => {
let streamClosed = false;
const stream = new ReadableStream<Uint8Array>({
cancel() {
streamClosed = true;
console.log('Stream closed');
}
});
console.log('stream created');
const signalController = new AbortController();
setTimeout(() => signalController.abort(), 1000);
try{
for await (const _ of abortable(stream, signalController.signal)) {
console.log('Got data');
}
}catch(e){
console.error(e);
}
console.log('Stream closed');
assert(streamClosed);
});
import { assert } from 'jsr:@std/assert@^1.0.0';

async function* abortable(
p: ReadableStream<Uint8Array>,
signal: AbortSignal,
): AsyncGenerator<Uint8Array> {
signal.throwIfAborted();
const { promise, reject } = Promise.withResolvers<never>();
const abort = () => reject(signal.reason);
signal.addEventListener("abort", abort, { once: true });

const it = p[Symbol.asyncIterator]();
try{
while (true) {
const race = Promise.race([promise, it.next()]);
race.catch(() => {
signal.removeEventListener("abort", abort);
});
const { done, value } = await race;
if (done) {
signal.removeEventListener("abort", abort);
return;
}
yield value;
}
}catch(e){
await it.return?.();
throw e;
}
}

Deno.test('aborting streams', async () => {
let streamClosed = false;
const stream = new ReadableStream<Uint8Array>({
cancel() {
streamClosed = true;
console.log('Stream closed');
}
});
console.log('stream created');
const signalController = new AbortController();
setTimeout(() => signalController.abort(), 1000);
try{
for await (const _ of abortable(stream, signalController.signal)) {
console.log('Got data');
}
}catch(e){
console.error(e);
}
console.log('Stream closed');
assert(streamClosed);
});
Execution result
❯ deno test -A strange.test.ts
running 1 test from ./oko.test.ts
aborting streams ...
------- output -------
stream created
----- output end -----

ok | 0 passed | 0 failed (1s)

error: Promise resolution is still pending but the event loop has already resolved.
❯ deno test -A strange.test.ts
running 1 test from ./oko.test.ts
aborting streams ...
------- output -------
stream created
----- output end -----

ok | 0 passed | 0 failed (1s)

error: Promise resolution is still pending but the event loop has already resolved.
3 Replies
raaymax
raaymaxOP4mo ago
I was finally able to narrow this issue to just:
Deno.test('stream iterator fail', async () => {
const stream = new ReadableStream<Uint8Array>();
const it = stream[Symbol.asyncIterator]();
await it.next();
await it.return?.();
console.log('Stream closed');
});
Deno.test('stream iterator fail', async () => {
const stream = new ReadableStream<Uint8Array>();
const it = stream[Symbol.asyncIterator]();
await it.next();
await it.return?.();
console.log('Stream closed');
});
Deno AI Helper
Deno AI Helper4mo ago
I wasn't able to find the answer to your question in the official Deno documentation. However, looking at your message and the provided code snippet, I can offer some observations that might help you troubleshoot the issue. Please note that this advice is not based on the Deno documentation, and you should double-check the accuracy of the advice given. In your abortable function, you are using Promise.race to wait for either the promise to be rejected due to an abort signal or the next value from the stream iterator. If the abort signal is triggered, you correctly remove the event listener, but you also need to ensure that the iterator is properly closed. The await it.return?.(); call is intended to close the iterator, but it seems like you are experiencing unexpected behavior when this is called. One potential issue could be related to the use of optional chaining (?.) with it.return. If it.return is not defined, the optional chaining will result in the entire expression evaluating to undefined, and no action will be taken. This could potentially leave the iterator in an unresolved state. However, this would not be the case if it.return is indeed a function as it should be for a well-formed async iterator. Another thing to consider is the error message you're seeing: "Promise resolution is still pending but the event loop has already resolved." This suggests that there might be a promise that has not been settled (either resolved or rejected) by the time the event loop is empty. It's possible that there's an unresolved promise in your code that's causing this issue. Since I cannot provide a definitive answer based on the Deno documentation, I recommend carefully reviewing the logic related to promise handling and event listeners in your abortable
Deno AI Helper
Deno AI Helper4mo ago
function. Also, consider adding more logging to track the state of promises and the iterator to help identify where the event loop might be getting cleared prematurely. Remember to test any changes thoroughly and consult the Deno documentation or community resources if you need more specific guidance on the behavior of async iterators and event loops in Deno.