wulfey
wulfey2y ago

Properly continuously read data from a Reader

What's the proper way to continuously read from a Deno.Reader? I'm trying to parse incoming HTTP2 frames from a Deno.TlsConn, but my current method of reading involves two sets of while loops, one to continuously perform the read operation, and the other, to try read everything currently available to read from the stream in chunks of 16k. Naturally, the while loops make it take approximately one entire core of my processor, which is far from ideal. Can this in any way be improved? Is there a way to know (wait for a promise to resolve or such) when there is available data?
21 Replies
jeff.hykin
jeff.hykin2y ago
If you're using an await inside of your loops, it definitely won't be taking up a whole thread just waiting on data. Something like
const returnReader = stream.getReader()
let blocks = []
while (true) {
const {value, done} = await returnReader.read()
if (done) {
break
}
blocks.push(value)
}
const string = new TextDecoder().decode(concatUint8Arrays(blocks))
const returnReader = stream.getReader()
let blocks = []
while (true) {
const {value, done} = await returnReader.read()
if (done) {
break
}
blocks.push(value)
}
const string = new TextDecoder().decode(concatUint8Arrays(blocks))
wulfey
wulfey2y ago
wulfey
wulfey2y ago
that is precisely what is going on
jeff.hykin
jeff.hykin2y ago
then you're good. The await is exactly being triggered by effectively a 'newDataAvailable' event, and it resumes the loop every time that event fires. It loops around, hits the await, and the thread goes to work on other stuff until the next 'newDataAvailable' event
wulfey
wulfey2y ago
i see, thank you though, how can i find out whats causing all that cpu usage?
wulfey
wulfey2y ago
jeff.hykin
jeff.hykin2y ago
That I'm not sure of, I would take a look at deno profiling https://deno.land/manual@v1.30.0/references/contributing/profiling
Deno
Profiling | Manual | Deno
Tools that can be used to generate/ visualise perf results:
wulfey
wulfey2y ago
thank you would you mind if i asked another thing?
jeff.hykin
jeff.hykin2y ago
sure haha
wulfey
wulfey2y ago
in the case that the data received happens to exactly be the size of my buffer (currently 16384 bytes) wont the next read in that while technically halt until newDataAvailable? i mean, i could test that myself but, in the case that is correct, how would i go around that? yes, that is in fact correct actually
jeff.hykin
jeff.hykin2y ago
it should hit the break condition (unless I'm misunderstanding the question)
wulfey
wulfey2y ago
oh, let me try to clarify
let eof = false;
while (true) {
const buf = new Uint8Array(bufsize),
read = await conn.read(buf);

if (read === null) {
eof = true;
break;
}

buffers.push(buf.slice(0, read));

if (read !== bufsize) break;
}
let eof = false;
while (true) {
const buf = new Uint8Array(bufsize),
read = await conn.read(buf);

if (read === null) {
eof = true;
break;
}

buffers.push(buf.slice(0, read));

if (read !== bufsize) break;
}
in the case the conn.read call returns the bufsize (the read result was exactly bufsize long) but, assuming that no more data than that is available the while loop will continue, because it will try to check if there is more data
jeff.hykin
jeff.hykin2y ago
I see. One way to sidestep the buffer size issue would be like this:
// simplified from: https://stackoverflow.com/questions/49129643/how-do-i-merge-an-array-of-uint8arrays
const concatUint8Arrays = (arrays) => new Uint8Array( arrays.reduce((acc, curr) => (acc.push(...curr),acc), [])
)

const streamToString = async (stream) => {
console.debug(`stream is:`,stream)
const returnReader = stream.getReader()
let blocks = []
while (true) {
const {value, done} = await returnReader.read()
if (done) {
break
}
blocks.push(value)
}
const string = new TextDecoder().decode(concatUint8Arrays(blocks))
return string
}
// simplified from: https://stackoverflow.com/questions/49129643/how-do-i-merge-an-array-of-uint8arrays
const concatUint8Arrays = (arrays) => new Uint8Array( arrays.reduce((acc, curr) => (acc.push(...curr),acc), [])
)

const streamToString = async (stream) => {
console.debug(`stream is:`,stream)
const returnReader = stream.getReader()
let blocks = []
while (true) {
const {value, done} = await returnReader.read()
if (done) {
break
}
blocks.push(value)
}
const string = new TextDecoder().decode(concatUint8Arrays(blocks))
return string
}
wulfey
wulfey2y ago
but the next conn.read call will have no data available, so it would wait for new data to be available do note that the Reader in question is a Deno.TlsConn
Deno.Reader.read(p: Uint8Array): Promise<number | null>`
Deno.Reader.read(p: Uint8Array): Promise<number | null>`
the only thing it returns is a number, or null if EOF (end of file, connection close) though, only now i notice
jeff.hykin
jeff.hykin2y ago
I meant more that, you don't have to specify a buffer size, just put all the non-null results into a regular array, then use concatUint8Arrays() to put them all together
wulfey
wulfey2y ago
Deno.TlsConn has a readable property i see, that method uses the actual "getReader"
jeff.hykin
jeff.hykin2y ago
Yeah sorry about that, the streamToString wouldn't directly work in your case
wulfey
wulfey2y ago
it in theory should if i just do some replacing there thank you for your help!!
jeff.hykin
jeff.hykin2y ago
no problem! I spent a long time trying to get readable streams working haha so I'm happy to help someone else. The Deno dev's were nice enought to help me when I needed it.
wulfey
wulfey2y ago
thank you have a great day :)
jeff.hykin
jeff.hykin2y ago
you too 👍