jeff.hykin
jeff.hykin2y ago

How to cancel or abort .pipeTo()?

I prefer not asking, but after I found (what I consider) an MDN easter egg, I realized this is probably not something I'm going to solve without help from runtime-implementers. https://user-images.githubusercontent.com/17692058/200146738-da3e8747-6c7f-4ec7-adb8-fbb8c8947d33.png The example is straightforward, call stream1.pipeTo(stream2) then, with full access to both stream1 and stream2 try to shutdown/cancel/close/abort/sabotage/nuke stream1, stream2 or even just the pipe itself (copy-paste example below). The big issue is that, because of the pipeTo the deno process never ends. I've tried absolutely everything I can think of, including inheriting from WritableStream, prototype pollution hacks, accessing probably-shouldnt-be-accessed-directly-symbol keys, etc and I've not got a hint of success
// dont use deno repl for this code, use deno eval instead, b/c the Deno.stdin access will freeze the whole repl
const [ stream1, newStdinReadable ] = Deno.stdin.readable.tee()
const stream2 = new WritableStream({}, new CountQueuingStrategy({ highWaterMark: 1 }))
stream1.pipeTo(
stream2,
{ preventClose: false, preventAbort: false, preventCancel: false,}
)
// stream1.stopPipingTo(stream2)
try { await stream1.cancel() } catch (error) { console.log(error) }
// ^ Cannot cancel a locked ReadableStream.
try { await stream1.abort() } catch (error) { console.log(error) }
// ^ TypeError: stream1.abort is not a function
try { await stream1.close() } catch (error) { console.log(error) }
// ^ TypeError: stream1.close is not a function
try { await stream2.cancel() } catch (error) { console.log(error) }
// ^ TypeError: stream2.cancel is not a function
try { await stream2.abort() } catch (error) { console.log(error) }
// ^ TypeError: The writable stream is locked, therefore cannot be aborted.
try { await stream2.close() } catch (error) { console.log(error) }
// ^ TypeError: The writable stream is locked, therefore cannot be closed.
// dont use deno repl for this code, use deno eval instead, b/c the Deno.stdin access will freeze the whole repl
const [ stream1, newStdinReadable ] = Deno.stdin.readable.tee()
const stream2 = new WritableStream({}, new CountQueuingStrategy({ highWaterMark: 1 }))
stream1.pipeTo(
stream2,
{ preventClose: false, preventAbort: false, preventCancel: false,}
)
// stream1.stopPipingTo(stream2)
try { await stream1.cancel() } catch (error) { console.log(error) }
// ^ Cannot cancel a locked ReadableStream.
try { await stream1.abort() } catch (error) { console.log(error) }
// ^ TypeError: stream1.abort is not a function
try { await stream1.close() } catch (error) { console.log(error) }
// ^ TypeError: stream1.close is not a function
try { await stream2.cancel() } catch (error) { console.log(error) }
// ^ TypeError: stream2.cancel is not a function
try { await stream2.abort() } catch (error) { console.log(error) }
// ^ TypeError: The writable stream is locked, therefore cannot be aborted.
try { await stream2.close() } catch (error) { console.log(error) }
// ^ TypeError: The writable stream is locked, therefore cannot be closed.
2 Replies
Andreu Botella (they/them)
have you tried passing an AbortSignal as the signal parameter in the second parameter to pipeTo?
jeff.hykin
jeff.hykin2y ago
~I think I'm missing something :/ To anyone with a similar problem: the code below (without all the try-catch's) does seem to be the correct approach. However, when stream1 is Deno.stdin, the behavior is probably still not what you're looking for.
const controller = new AbortController()
stream1.pipeTo(stream2, { signal: controller.signal, preventClose: false, preventAbort: false, preventCancel: false, })
controller.abort()



try { await stream1.cancel() } catch (error) { console.log(error) }
// ^ Cannot cancel a locked ReadableStream.
try { await stream1.abort() } catch (error) { console.log(error) }
// ^ TypeError: stream1.abort is not a function
try { await stream1.close() } catch (error) { console.log(error) }
// ^ TypeError: stream1.close is not a function
try { await stream2.cancel() } catch (error) { console.log(error) }
// ^ TypeError: stream2.cancel is not a function
try { await stream2.abort() } catch (error) { console.log(error) }
// ^ TypeError: The writable stream is locked, therefore cannot be aborted.
try { await stream2.close() } catch (error) { console.log(error) }
// ^ TypeError: The writable stream is locked, therefore cannot be closed.

// (process still doesnt exit)
const controller = new AbortController()
stream1.pipeTo(stream2, { signal: controller.signal, preventClose: false, preventAbort: false, preventCancel: false, })
controller.abort()



try { await stream1.cancel() } catch (error) { console.log(error) }
// ^ Cannot cancel a locked ReadableStream.
try { await stream1.abort() } catch (error) { console.log(error) }
// ^ TypeError: stream1.abort is not a function
try { await stream1.close() } catch (error) { console.log(error) }
// ^ TypeError: stream1.close is not a function
try { await stream2.cancel() } catch (error) { console.log(error) }
// ^ TypeError: stream2.cancel is not a function
try { await stream2.abort() } catch (error) { console.log(error) }
// ^ TypeError: The writable stream is locked, therefore cannot be aborted.
try { await stream2.close() } catch (error) { console.log(error) }
// ^ TypeError: The writable stream is locked, therefore cannot be closed.

// (process still doesnt exit)
I took a look at MDN, can't say I fully understand AbortControllers, but I feel like the last attempt should've worked Although I didn't solve the problem, I did figure out what is going on. I believe its an issue that .read() from stdin doesnt truly abort. I'll create an issue for it
import { abortable } from "https://deno.land/std@0.161.0/async/mod.ts"
const controller = new AbortController()
setTimeout(()=>controller.abort(), 100)
try {
await abortable(
Deno.stdin.readable.tee()[0].getReader().read(),
controller.signal
)
} catch (error) {
console.debug(`Abort error:`,error)
}
console.log("Abort prints but program doesnt exit")
import { abortable } from "https://deno.land/std@0.161.0/async/mod.ts"
const controller = new AbortController()
setTimeout(()=>controller.abort(), 100)
try {
await abortable(
Deno.stdin.readable.tee()[0].getReader().read(),
controller.signal
)
} catch (error) {
console.debug(`Abort error:`,error)
}
console.log("Abort prints but program doesnt exit")