Bubbles
Bubbles3y ago

Sharing streams

So I have a small project where I want to use streams. But now I need two different transform streams on the same input
const file = await Deno.open(path);

const processingStream1 = file.pipeThrough(transform1);
for await (const line of processingStream1) console.log(line);


const processingStream2 = file.pipeThrough(transform2);
for await (const line of processingStream2) console.log(line);
const file = await Deno.open(path);

const processingStream1 = file.pipeThrough(transform1);
for await (const line of processingStream1) console.log(line);


const processingStream2 = file.pipeThrough(transform2);
for await (const line of processingStream2) console.log(line);
9 Replies
Bubbles
BubblesOP3y ago
What I want to try and do is let processingStream2 continue from where processStream1 left off
Andreu Botella (they/them)
a transform can only stop in the middle if it throws an error, otherwise it will consume all of its input
Bubbles
BubblesOP3y ago
Even if .cancel() is used on the controller?
Andreu Botella (they/them)
oh I'm not actually sure about that
Bubbles
BubblesOP3y ago
Cuz right now that is done, but the readable stream that feeds into the transformstream is still locked Which means that I cannot reuse it :/
Andreu Botella (they/them)
I don't think you can do that. But I don't think that's a bad thing.
Bubbles
BubblesOP3y ago
Shit.... That really sucks for data that needs different processing...
Andreu Botella (they/them)
To explain why, consider this implementation of TextLineStream:
new TransformStream({
lineSoFar: "",
transform(chunk, controller) {
chunk = this.lineSoFar + chunk;
const lines = chunk.split("\n");
this.lineSoFar = lines.pop();
for (const line of lines) {
controller.enqueue(line + "\n");
}
},
flush(controller) {
if (this.lineSoFar !== "") {
controller.enqueue(this.lineSoFar);
}
}
});
new TransformStream({
lineSoFar: "",
transform(chunk, controller) {
chunk = this.lineSoFar + chunk;
const lines = chunk.split("\n");
this.lineSoFar = lines.pop();
for (const line of lines) {
controller.enqueue(line + "\n");
}
},
flush(controller) {
if (this.lineSoFar !== "") {
controller.enqueue(this.lineSoFar);
}
}
});
there's data coming from previous chunks that hasn't been enqueued yet so if you could get back to the original stream, you'd lose that data
Bubbles
BubblesOP3y ago
Hmmm. I guess I'll just use a reader and loop with that It's just so annoying that a stream can't be reused or cloned from a certaint part I guess I'll figure out another solution... Cuz 1.5m nop fn calls are quite expensive... Well thanks for the insight and help @Andreu Botella (he/they). I'm going to bed thinking of how to do this It's also not possible to do this sequentially? Where one stream is done first and then the other one can go?

Did you find this page helpful?