LFCavalcanti
LFCavalcantiā€¢5mo ago

Is there a way to read big files using something like .seek but with ending position as well?

So, I'm doing a version of the billion line challenge in JS. I've already implemented a version with Node basically calculating offsets to "slice" the file in a amount og segments matching the available parallelism in the system. To achieve that I'm calling Node's "fs" library like so:
const fileReadStream = fs.createReadStream(workerData.filePath, {
start: workerData.start,
end: workerData.end,
});
const fileReadStream = fs.createReadStream(workerData.filePath, {
start: workerData.start,
end: workerData.end,
});
I'm now trying to do the same in Deno, both for the challenge and learning Deno. In the docs there's a way to use the file.seek(offset, whence) that allows for a offset that can be either from the end, start or from current. I'm not sure how to use the same logic having different threads/workers reading from the same files in different byte ranges.
19 Replies
Deno AI Helper
Deno AI Helperā€¢5mo ago
In Deno, you can achieve similar functionality to Node's fs.createReadStream with specific start and end positions by using a combination of Deno.open, file.seek, and file.read. The documentation provides examples of how to read files in chunks and seek to specific positions within a file. Here's a step-by-step approach to read a specific range of a file in Deno: 1. Open the file using Deno.open to get a Deno.FsFile object. 2. Seek to the starting position using file.seek(startPosition, Deno.SeekMode.Start). 3. Read the desired amount of bytes from the file. However, the documentation does not provide a direct example of reading up to an end position. To read a specific range, you would need to calculate the number of bytes you want to read based on the start and end positions and then read that amount of bytes from the file. Here's an example based on the documentation:
const startPosition = workerData.start; // Your calculated start position
const endPosition = workerData.end; // Your calculated end position
const readLength = endPosition - startPosition + 1; // Calculate the number of bytes to read

const file = await Deno.open(workerData.filePath);
await file.seek(startPosition, Deno.SeekMode.Start); // Seek to the start position

const buffer = new Uint8Array(readLength);
const bytesRead = await file.read(buffer); // Read the calculated length
console.log(`Read ${bytesRead} bytes`);

file.close(); // Make sure to close the file when you are done
const startPosition = workerData.start; // Your calculated start position
const endPosition = workerData.end; // Your calculated end position
const readLength = endPosition - startPosition + 1; // Calculate the number of bytes to read

const file = await Deno.open(workerData.filePath);
await file.seek(startPosition, Deno.SeekMode.Start); // Seek to the start position

const buffer = new Uint8Array(readLength);
const bytesRead = await file.read(buffer); // Read the calculated length
console.log(`Read ${bytesRead} bytes`);

file.close(); // Make sure to close the file when you are done
Remember that you need to handle the case where the actual number of bytes read (bytesRead) might be less than the requested readLength, especially if the end of the file is reached before reading the full range. For parallel processing, you can use Deno's worker threads to read
Deno AI Helper
Deno AI Helperā€¢5mo ago
different parts of the file simultaneously. Each worker would perform the steps above for their respective range. Please note that you need to handle permissions with --allow-read and also consider the file system's limitations and the potential for race conditions when multiple workers are reading from the same file concurrently. šŸ¦• If you need more granular control over file operations or if you're dealing with very large files, you might want to look into streaming file operations as described in the documentation, which allows for more efficient memory usage when processing large files.
raunioroo
rauniorooā€¢5mo ago
This is one approach which has worked for me to reply to HTTP range requests. Struggled a bit to arrive at this, was a lot of trial and error. That is to say, I don't have the most intimate knowledge on these stream stuffs and whether this is the neatest or most performant approach, but it does appear to work reliably in true streaming manner. Here start and end is the byte range. Can't remember whether that's inclusive range as in, is it everything between start and end, or does it include the bytes at start and end position, too. Probably however the range is defined in HTTP spec/browsers, as that was what I was testing this against.
const file = await Deno.open(localpath, { read: true });
await file.seek(start, Deno.SeekMode.Start);
const slicestart = 0;
const sliceend = end - start + 1;
const slice = file.readable.pipeThrough(
new ByteSliceStream(slicestart, sliceend),
);
return slice;

const file = await Deno.open(localpath, { read: true });
await file.seek(start, Deno.SeekMode.Start);
const slicestart = 0;
const sliceend = end - start + 1;
const slice = file.readable.pipeThrough(
new ByteSliceStream(slicestart, sliceend),
);
return slice;

What the AI response suggests looks like it holds the whole returned range in memory, which can be RAM heavy depending your range sizes. My suggestion above should be properly streaming I think, so it should be able to handle very big files and ranges while using very little RAM. Oh, and the ByteSliceStream comes from Deno std lib: import { ByteSliceStream } from "https://deno.land/std@0.219.0/streams/byte_slice_stream.ts";
Esente
Esenteā€¢5mo ago
Just curious, why can't you use ByteSliceStream(start, end)?
raunioroo
rauniorooā€¢5mo ago
Can't remember testing it, but is based on the assumption that ByteSliceStream alone would "fast forward" the source stream by actually reading bytes and discarding the results up to the starting position. I can't imagine it to know to call file.seek first. Calling seek first on the file first makes sure it can jump right away to the correct start position with a simple fast filesystem call.
Esente
Esenteā€¢5mo ago
That's my belief too. But I didn't think that .seek would do better by skipping. TIL šŸ™‚
LFCavalcanti
LFCavalcantiā€¢5mo ago
Hi @raunioroo , thanks for the tip with the ByteSliceStream. Is the "slice" a stream that can be read into a buffer? In the sense that as parts of the file are read up to a buffer size I can parse that buffer, empty it then a new stream is read into the buffer and so on... I'm asking because the challenge in question has a file with 1 billion lines, each line can have max 105 bytes... we are talking something around 13GB of data Even if I slice that into 16 slices, it's too much to hold in memory I do have 32GB of RAM, but the goal is to use around 4GB as Node does
LFCavalcanti
LFCavalcantiā€¢5mo ago
It seems I need to but the call for the ByteSliceStream inside a loop, so If there is a range of bytes I want to read, I calculate start and end for each iteration moving start and end along with the buffer size So... 1 - Find the offsets I want to process 2 - Using those offsets calculate the segments of the file for each worker thread to process 3 - Openfile and use the ByteSliceStream updating the slice and parsing it in a loop until the sliceEnd >= workerData.end
raunioroo
rauniorooā€¢5mo ago
So, a stream is like a pipeline that you can consume (=read) at your own pace, and yes the stream holds some kind small buffer to get new more data ready as you consume it. The buffer is probably like, a couple of kilobytes or so, so very small, it won't need to keep much in memory at all as it tries to fetch the file at about the same pace as you are consuming the stream. Then you can have transform streams that take a raw byte stream, and, on-the-fly, transforms it to something more useful form. Like, you could have a transformstream that allows you to consume a stream line by line instead of in byte chunks. ByteSliceStream is one transformstream like that. It basically just lets you specify a range you want to read from another stream. Like an image crop function, but it crops a stream. ...but we don't use the start parameter of ByteSliceStream and instead use zero for start. Since calling file.seek does the same thing but more effectively. If I get this right, choosing some easy numbers for demo purposes. Llet's say you have 1Gb file, and you want to split the work of processing it between 4 workers. 1) Find the offsets for each worker, 0-250mb for worker one, 250mb-500mb for worker two etc 2) create only ONE byteslicestream for that worker's offset. For worker two, it would be 250-500mb. 3) in your actual processing loop, repeatedly call the byteslicestream read method to get new small chunk, process the chunk, ask for more by calling read again on the same stream object, etc etc until the stream is exhausted. If the data is in a newline-separated format, you can wrap the stream in yet another transformstream, like TextLineStream available in std. That will make it automatically so that every read() call returns a whole line instead of some arbitrary number of bytes. That's much easier to process
LFCavalcanti
LFCavalcantiā€¢5mo ago
ohhh so the slice object in your example has a .read method that I can pass a buffer to be read into?
raunioroo
rauniorooā€¢5mo ago
So the file is the cake. ByteSliceStream is a cake slice, you give one slice to each worker. stream.read() method is the spoon. Because you only consume and hold a spoonful at a time, the cake can be infinitely big you'll never run out of memory. Yeah something like that!
LFCavalcanti
LFCavalcantiā€¢5mo ago
oh okay... I think it's time to start coding and breaking things to understand better
raunioroo
rauniorooā€¢5mo ago
I think the read() method returns a buffer, though, that holds some smallish amount of bytes. You can use some other stream helpers that do the conversion from byte buffer to something that is even more easy to process. Like TextLineStream makes read() method return strings, one line at a time.
LFCavalcanti
LFCavalcantiā€¢5mo ago
I'll try this out tomorrow Thanks for now!
raunioroo
rauniorooā€¢5mo ago
No worries. The streaming stuffs can be hard to wrap your head around, related API's can be a bit confusing with many similarly named but different things, and since there are so many different ways to accomplish the same tasks, can be easily overwhelming to google and understand it all. I myself still struggle with that stuff. But it's worth it to learn, super useful stuff! Deno uses/offers streaming APIs for so many things it's good to take advantage of that.
LFCavalcanti
LFCavalcantiā€¢5mo ago
The concepts of streaming content from files or http connections I gasped well enough... I think... but Deno has a way of doing things that is different from Node, I'm not versed enough on Deno to give opinions yet, this challenge seemed like a good opportunity to test and learn
raunioroo
rauniorooā€¢5mo ago
Slightly annoyingly, in the past Deno used to have it's own non-standard streams API. Just like Node has it's own. Both kinda similar, but still different. But the old Deno API has been deprecated, or iirc mostly removed. Deno has now moved to a more standard-based Web Streams API (https://developer.mozilla.org/en-US/docs/Web/API/Streams_API). Standard API is nice, but it's a bit more cumbersome in some ways than the simpler, old Node or Deno API's. Standard web Streams API is kinda new and not so widely used yet in the backend, so there is not that much information and guides out there on it. That'll improve for sure in the future. But also when you try to google stuff, you now get a mix of results some using Node API, some use old Deno API, some use the new Web API. All have similar sounding methods etc which makes it a bit tedious to research
LFCavalcanti
LFCavalcantiā€¢3mo ago
So, I'm reviving this now... I had such a crazy streak of work this was in the back bench for more than a month. @raunioroo, the challenge call for use only standard APIs, that's why I'm trying my best to not use any module. What I did was finding the offsets on the file, like so:
const filePath = Deno.args[0];
const MAX_LINE_LENGTH = 106;
const file = await Deno.open(filePath);
const fileStats = await Deno.stat(filePath);
const FILE_SIZE = fileStats.size;
const MAX_WORKERS = mod.cpus().length;
const SEGMENT_SIZE = Math.floor(FILE_SIZE / MAX_WORKERS);
const offsets = [];
const bufferToFindOffsets = new Uint8Array(MAX_LINE_LENGTH);
let offset = 0;
while (true) {
offset += SEGMENT_SIZE;

if (offset >= FILE_SIZE) {
offsets.push(FILE_SIZE);
break;
}
await file.seek(offset, Deno.SeekMode.Start);
await file.read(bufferToFindOffsets);

const lineEndPos = bufferToFindOffsets.indexOf(10);
if (lineEndPos === -1) {
chunkOffsets.push(FILE_SIZE);
break;
} else {
offset += lineEndPos + 1;
offsets.push(offset);
}
}
const filePath = Deno.args[0];
const MAX_LINE_LENGTH = 106;
const file = await Deno.open(filePath);
const fileStats = await Deno.stat(filePath);
const FILE_SIZE = fileStats.size;
const MAX_WORKERS = mod.cpus().length;
const SEGMENT_SIZE = Math.floor(FILE_SIZE / MAX_WORKERS);
const offsets = [];
const bufferToFindOffsets = new Uint8Array(MAX_LINE_LENGTH);
let offset = 0;
while (true) {
offset += SEGMENT_SIZE;

if (offset >= FILE_SIZE) {
offsets.push(FILE_SIZE);
break;
}
await file.seek(offset, Deno.SeekMode.Start);
await file.read(bufferToFindOffsets);

const lineEndPos = bufferToFindOffsets.indexOf(10);
if (lineEndPos === -1) {
chunkOffsets.push(FILE_SIZE);
break;
} else {
offset += lineEndPos + 1;
offsets.push(offset);
}
}
Then, for each worker I call:
const lineWorker = new Worker(import.meta.resolve("./workerLines.js"), {
type: "module",
});
lineWorker.postMessage({
filePath,
start: workerNum === 0 ? 0 : offsets[workerNum - 1],
end: offsets[workerNum] - 1,
});
const lineWorker = new Worker(import.meta.resolve("./workerLines.js"), {
type: "module",
});
lineWorker.postMessage({
filePath,
start: workerNum === 0 ? 0 : offsets[workerNum - 1],
end: offsets[workerNum] - 1,
});
Inside the worker, I create a read strem like this:
import { ByteSliceStream } from "https://deno.land/std/streams/byte_slice_stream.ts";
import { readerFromStreamReader } from "https://deno.land/std/io/mod.ts";
let readBuffer = new Uint8Array(26500);
self.onmessage = async (messageData) => {
const file = await Deno.open(messageData.data.filePath);
await file.seek(messageData.data.start, Deno.SeekMode.Start);
const slicestart = 0;
const sliceend = messageData.data.end - messageData.data.start + 1;
const slice = file.readable.pipeThrough(
new ByteSliceStream(slicestart, sliceend)
);
const fileReaderOri = slice.getReader();

if (fileReaderOri) {
const fileReader = readerFromStreamReader(fileReaderOri);
let numberRead = 0;
do {
numberRead = (await fileReader.read(readBuffer)) || 0;
if (numberRead == 0 || !numberRead) break;
await processChunk(readBuffer, numberRead);
} while (true);
self.postMessage(processedLines);
self.close();
}
};
import { ByteSliceStream } from "https://deno.land/std/streams/byte_slice_stream.ts";
import { readerFromStreamReader } from "https://deno.land/std/io/mod.ts";
let readBuffer = new Uint8Array(26500);
self.onmessage = async (messageData) => {
const file = await Deno.open(messageData.data.filePath);
await file.seek(messageData.data.start, Deno.SeekMode.Start);
const slicestart = 0;
const sliceend = messageData.data.end - messageData.data.start + 1;
const slice = file.readable.pipeThrough(
new ByteSliceStream(slicestart, sliceend)
);
const fileReaderOri = slice.getReader();

if (fileReaderOri) {
const fileReader = readerFromStreamReader(fileReaderOri);
let numberRead = 0;
do {
numberRead = (await fileReader.read(readBuffer)) || 0;
if (numberRead == 0 || !numberRead) break;
await processChunk(readBuffer, numberRead);
} while (true);
self.postMessage(processedLines);
self.close();
}
};
The performance is not good. There's improvements I need to revise in the "processChunk" function, but watching the resource usage it seems the workers can't read in parallel. I know that at the OS level, each thread locks the file while reading, but it seems the lock remains all the way while the stream is not at the final position.