From 907cda4c172936c6d5b7ea2ac1a3a0986c875853 Mon Sep 17 00:00:00 2001
From: ubq323 <ubq323@ubq323.website>
Date: Sat, 27 Apr 2024 21:27:12 +0100
Subject: use poll to check writing as well as reading; packet_reader handles
 both now

---
 packet_reader/packet_reader.ha | 94 +++++++++++++++++++++++++++++-------------
 1 file changed, 66 insertions(+), 28 deletions(-)

(limited to 'packet_reader')

diff --git a/packet_reader/packet_reader.ha b/packet_reader/packet_reader.ha
index d541a11..ae38be7 100644
--- a/packet_reader/packet_reader.ha
+++ b/packet_reader/packet_reader.ha
@@ -4,7 +4,7 @@ use endian;
 use drawing;
 use drawing::{pos,CHUNKSIZE};
 
-export def VERSION: u8 = 2;
+export def VERSION: u8 = 3;
 
 export type error = !str;
 
@@ -13,12 +13,14 @@ export type packet_reader = struct {
 	good: []u8,
 	wbuf: []u8,
 	wgood: []u8,
+	sock: io::file,
 };
 
-export fn new() packet_reader = {
+export fn new(sock: io::file) packet_reader = {
 	let pr = packet_reader {
 		buf = alloc([0...],512*512*4*2), // ehhh
 		wbuf = alloc([0...], 512*512*4*2), // ehhhh
+		sock = sock,
 		...
 	};
 	pr.good = pr.buf[0..0];
@@ -26,6 +28,11 @@ export fn new() packet_reader = {
 	return pr;
 };
 
+export fn finish(pr: *packet_reader) void = {
+	free(pr.buf);
+	free(pr.wbuf);
+};
+
 fn cast_u32s_to_u8s(in: []u32) []u8 =
 	(in: *[*]u32: *[*]u8)[..len(in)*4];
 fn cast_u8s_to_u32s(in: []u8) []u32 =
@@ -52,24 +59,51 @@ export type packet = (
 	| packet_position
 	| packet_reqchunk);
 
-// call when input is ready. could block otherwise
-export fn read(pr: *packet_reader, sock: io::handle) (void | io::error | io::EOF) = {
-	const remaining_amt = len(pr.good);
-	const read_pos = if (remaining_amt > 0) {
-		// still some unconsumed content in the buffer
-		// move unconsumed stuff to start of buffer
-		// fmt::printfln("moving {} remaining bytes",remaining_amt)!;
-		pr.buf[0..remaining_amt] = pr.good;
-		yield remaining_amt;
-	} else 0z;
-	const nread = match(io::read(sock, pr.buf[read_pos..])?) {
-		case io::EOF => return io::EOF;
-		case let n: size => yield n;
+// call when input ready, so it doesn't block
+export fn service(
+	pr: *packet_reader,
+	should_read: bool,
+	should_write: bool,
+) (void | io::error | io::EOF) = {	
+	const sock = pr.sock;
+
+	if (should_read) {
+		// reading
+		const remaining_amt = len(pr.good);
+		const read_pos = if (remaining_amt > 0) {
+			// still some unconsumed content in the buffer
+			// move unconsumed stuff to start of buffer
+			// fmt::printfln("moving {} remaining bytes",remaining_amt)!;
+			pr.buf[0..remaining_amt] = pr.good;
+			yield remaining_amt;
+		} else 0z;
+		const nread = match(io::read(sock, pr.buf[read_pos..])?) {
+			case io::EOF => return io::EOF;
+			case let n: size => yield n;
+		};
+		// fmt::printfln("read {} bytes",nread)!;
+		const total_amt = read_pos + nread;
+		pr.good = pr.buf[0..total_amt];
+		// fmt::printfln("now {} bytes in buffer",total_amt)!;
+	};
+
+
+	if (should_write) {
+		// writing
+		const amt_to_write = len(pr.wgood);
+		if (amt_to_write == 0) return;
+		const nwritten = io::write(sock, pr.wgood)?;
+		// fmt::printfln("written {} of {} bytes", nwritten, amt_to_write)!;
+		if (nwritten < amt_to_write) {
+			// fmt::println("moving buffer back")!;
+			const remaining_len = amt_to_write - nwritten;
+			pr.wbuf[0..remaining_len] = pr.wgood[nwritten..];
+			pr.wgood = pr.wbuf[0..remaining_len];
+		} else {
+			pr.wgood = pr.wbuf[0..0];
+		};
+		// fmt::printfln("  {} to write next time", len(pr.wgood))!;
 	};
-	// fmt::printfln("read {} bytes",nread)!;
-	const total_amt = read_pos + nread;
-	pr.good = pr.buf[0..total_amt];
-	// fmt::printfln("now {} bytes in buffer",total_amt)!;
 };
 
 // packet format:
@@ -78,7 +112,7 @@ export fn read(pr: *packet_reader, sock: io::handle) (void | io::error | io::EOF
 // [size-8]u8 data...
 // size includes size of header (size and type fields)
 
-export fn next(pr: *packet_reader) (packet | done | error) = {
+export fn read_next(pr: *packet_reader) (packet | done | error) = {
 	// either parse a full packet out of the front of good,
 	// move good along that many bytes,
 	// and return the packet,
@@ -136,24 +170,28 @@ export fn next(pr: *packet_reader) (packet | done | error) = {
 	};
 };
 
-export fn send_raw(sock: io::file, ty: packet_type, datas: []u8...) (void | io::error) = {
+fn append_wbuf(pr: *packet_reader, ty: packet_type, datas: []u8...) (void | error) = {
 	// ehh
 	const header: [8]u8 = [0...];
 	let total_len = 8u32;
 	for (const data .. datas) total_len += len(data): u32;
+
+	const remaining_space = len(pr.wbuf) - len(pr.wgood);
+	if (remaining_space < total_len) return "wbuff full": error;
+
 	endian::leputu32(header[0..4], total_len);
 	endian::leputu32(header[4..8], ty);
-	io::writeall(sock, header)?;
+	static append(pr.wgood, header...);
 	for (const data .. datas)
-		io::writeall(sock, data)?;
+		static append(pr.wgood, data...);
 };
 
-export fn send(sock: io::file, packet: packet) (void | io::error) = {
+export fn write(pr: *packet_reader, packet: packet) (void | error) = {
 	match (packet) {
 	case let op: packet_drawop =>
 		const ser_op = drawing::ser_op(op);
 		defer free(ser_op);
-		send_raw(sock, packet_type::DRAW_OP, ser_op)?;
+		append_wbuf(pr, packet_type::DRAW_OP, ser_op)?;
 	case let packet: packet_sendchunk =>
 		const pos_buf: [8]u8 = [0...];
 		endian::leputu32(pos_buf[0..4],packet.world_pos.0: u32);
@@ -161,16 +199,16 @@ export fn send(sock: io::file, packet: packet) (void | io::error) = {
 		const chunk_data_compressed = rle_encode(packet.chunk_data);
 		defer free(chunk_data_compressed);
 		const compressed_data_bytes = cast_u32s_to_u8s(chunk_data_compressed);
-		send_raw(sock, packet_type::SEND_CHUNK, pos_buf, compressed_data_bytes)?;
+		append_wbuf(pr, packet_type::SEND_CHUNK, pos_buf, compressed_data_bytes)?;
 	case let pos: packet_position =>
 		const pos_buf: [8]u8 = [0...];
 		endian::leputu32(pos_buf[0..4], pos.0: u32);
 		endian::leputu32(pos_buf[4..8], pos.1: u32);
-		send_raw(sock, packet_type::POSITION, pos_buf)?;
+		append_wbuf(pr, packet_type::POSITION, pos_buf)?;
 	case let world_pos: packet_reqchunk =>
 		const pos_buf: [8]u8 = [0...];
 		endian::leputu32(pos_buf[0..4], world_pos.0: u32);
 		endian::leputu32(pos_buf[4..8], world_pos.1: u32);
-		send_raw(sock, packet_type::REQ_CHUNK, pos_buf)?;
+		append_wbuf(pr, packet_type::REQ_CHUNK, pos_buf)?;
 	};
 };
-- 
cgit v1.2.3