TangJieHao
TangJieHao5mo ago

command.spawn() stdin and stdout

I am not sure if I am handling the writeable stream and readable stream correctly. I assigned getWrite and getReader to writer and reader in my class. I am slightly confused on what ready, releaseLock is as it's not well documented (from what sourced I can find)
private async stdin(query: Query): Promise<void> {
const data = this.encoder.encode(JSON.stringify(query) + "\n");
await this.writer.ready;
await this.writer.write(data);
this.writer.releaseLock();
}

private async stdout(): Promise<AIObject> {
const result = await this.reader.read();
if (result.done) {
throw new Error("Stream finished unexpectedly.");
}
const responseStr = this.decoder.decode(result.value);
return JSON.parse(responseStr) as AIObject;
}
private async stdin(query: Query): Promise<void> {
const data = this.encoder.encode(JSON.stringify(query) + "\n");
await this.writer.ready;
await this.writer.write(data);
this.writer.releaseLock();
}

private async stdout(): Promise<AIObject> {
const result = await this.reader.read();
if (result.done) {
throw new Error("Stream finished unexpectedly.");
}
const responseStr = this.decoder.decode(result.value);
return JSON.parse(responseStr) as AIObject;
}
6 Replies
Deno AI Helper
Deno AI Helper5mo ago
In your TypeScript code, you're interacting with the writer and reader objects, which are presumably obtained from a WritableStream and a ReadableStream, respectively. Let's go through the methods ready, write, and releaseLock to clarify their usage based on the documentation. 1. writer.ready: This is a promise that resolves when the writer's desired size is positive, indicating that it can accept more chunks to write. In other words, it's a signal that the writer is ready to write data. By awaiting this.writer.ready, you're ensuring that the writer is ready before attempting to write data to it. 2. writer.write(data): This method is used to write data to the stream's internal queue. It returns a promise that resolves when the data has been handled (e.g., written to the underlying sink). 3. writer.releaseLock(): This method releases the writer's lock on the corresponding stream, allowing other writers to acquire a lock and write to the stream. It's important to release the lock when you're done writing so that other parts of your code (or other code entirely) can write to the stream without being blocked. In the stdin method, you're correctly awaiting writer.ready before writing data to the stream. However, you should be cautious with writer.releaseLock(). Typically, you would only release the lock if you're completely done with writing and you want to allow other writers to start writing to the stream. If you plan to write more data later, you should not release the lock after each write, as it could lead to unnecessary overhead and potential race conditions with other writers. For the stdout method, you're reading from the reader, which is correct. The read() method returns a promise that resolves to an object with two
Deno AI Helper
Deno AI Helper5mo ago
properties: done and value. If done is true, it means the stream has been fully read or has been closed. If done is false, value will contain the chunk of data that was read. Here's a slightly modified version of your stdin method that keeps the lock until you're completely done with writing:
private async stdin(query: Query): Promise<void> {
const data = this.encoder.encode(JSON.stringify(query) + "\n");
await this.writer.ready;
await this.writer.write(data);
// Only release the lock if you're done writing to the stream for good.
// this.writer.releaseLock();
}

// Make sure to eventually release the lock when you're completely done with the writer.
private async stdin(query: Query): Promise<void> {
const data = this.encoder.encode(JSON.stringify(query) + "\n");
await this.writer.ready;
await this.writer.write(data);
// Only release the lock if you're done writing to the stream for good.
// this.writer.releaseLock();
}

// Make sure to eventually release the lock when you're completely done with the writer.
Remember to handle the case where you're done with the writer and want to release the lock. You might want to implement a separate method to close the writer properly, which includes releasing the lock. The documentation does not provide specific examples for the releaseLock method, but it does show usage of streams and
TangJieHao
TangJieHaoOP5mo ago
What about continuous stream? I can't seem to get any output when I want to keep the writer open.
Leokuma
Leokuma5mo ago
Can you provide a reproducible example? If you want to consume the stream in a continuous way, I believe you'll have to either call pipeTo() or use an async iterator somewhere
TangJieHao
TangJieHaoOP5mo ago
this is what I have set up as a test. and recreating the writer seems to work.
async function SubProcess() {
console.log("Starting subprocess...");
const EXE: string = "./katago/katago.exe";
const MODEL: string = "./katago/model.bin.gz";
const CONFIG: string = "./katago/config.cfg";
const command = new Deno.Command(EXE, {
args: ["analysis", "-model", MODEL, "-config", CONFIG],
stdin: "piped",
stdout: "piped",
stderr: "piped",
});

const process = command.spawn();
console.log("Subprocess spawned.");
return process;
}

async function sendQuery(process: Deno.ChildProcess, query: Query) {
console.log(`Sending query with id: ${query.id}`);
const writer = process.stdin.getWriter();
const data = new TextEncoder().encode(JSON.stringify(query) + "\n");
await writer.write(data);
writer.releaseLock();
console.log(`Query ${query.id} sent.`);
}

async function readOutput(process: Deno.ChildProcess) {
console.log("Reading output from subprocess...");
const decoder = new TextDecoder();
const reader = process.stdout.getReader();
let output = "";

try {
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
output += decoder.decode(value, { stream: true });
// Assuming that each response from Katago ends with a newline
if (output.includes("\n")) {
break;
}
}
} catch (error) {
console.error("Error reading subprocess output: ", error);
} finally {
reader.releaseLock();
}
const AIData: AIObject = JSON.parse(output) as AIObject
return AIData;
}
async function SubProcess() {
console.log("Starting subprocess...");
const EXE: string = "./katago/katago.exe";
const MODEL: string = "./katago/model.bin.gz";
const CONFIG: string = "./katago/config.cfg";
const command = new Deno.Command(EXE, {
args: ["analysis", "-model", MODEL, "-config", CONFIG],
stdin: "piped",
stdout: "piped",
stderr: "piped",
});

const process = command.spawn();
console.log("Subprocess spawned.");
return process;
}

async function sendQuery(process: Deno.ChildProcess, query: Query) {
console.log(`Sending query with id: ${query.id}`);
const writer = process.stdin.getWriter();
const data = new TextEncoder().encode(JSON.stringify(query) + "\n");
await writer.write(data);
writer.releaseLock();
console.log(`Query ${query.id} sent.`);
}

async function readOutput(process: Deno.ChildProcess) {
console.log("Reading output from subprocess...");
const decoder = new TextDecoder();
const reader = process.stdout.getReader();
let output = "";

try {
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
output += decoder.decode(value, { stream: true });
// Assuming that each response from Katago ends with a newline
if (output.includes("\n")) {
break;
}
}
} catch (error) {
console.error("Error reading subprocess output: ", error);
} finally {
reader.releaseLock();
}
const AIData: AIObject = JSON.parse(output) as AIObject
return AIData;
}
as for why I am concatenating the output lines is because katago returns a giant JSON text.
TangJieHao
TangJieHaoOP5mo ago
Gist
Deno Spawn Outputs
Deno Spawn Outputs. GitHub Gist: instantly share code, notes, and snippets.