From 907cda4c172936c6d5b7ea2ac1a3a0986c875853 Mon Sep 17 00:00:00 2001 From: ubq323 Date: Sat, 27 Apr 2024 21:27:12 +0100 Subject: use poll to check writing as well as reading; packet_reader handles both now --- packet_reader/packet_reader.ha | 94 +++++++++++++++++++++++++++++------------- 1 file changed, 66 insertions(+), 28 deletions(-) (limited to 'packet_reader') diff --git a/packet_reader/packet_reader.ha b/packet_reader/packet_reader.ha index d541a11..ae38be7 100644 --- a/packet_reader/packet_reader.ha +++ b/packet_reader/packet_reader.ha @@ -4,7 +4,7 @@ use endian; use drawing; use drawing::{pos,CHUNKSIZE}; -export def VERSION: u8 = 2; +export def VERSION: u8 = 3; export type error = !str; @@ -13,12 +13,14 @@ export type packet_reader = struct { good: []u8, wbuf: []u8, wgood: []u8, + sock: io::file, }; -export fn new() packet_reader = { +export fn new(sock: io::file) packet_reader = { let pr = packet_reader { buf = alloc([0...],512*512*4*2), // ehhh wbuf = alloc([0...], 512*512*4*2), // ehhhh + sock = sock, ... }; pr.good = pr.buf[0..0]; @@ -26,6 +28,11 @@ export fn new() packet_reader = { return pr; }; +export fn finish(pr: *packet_reader) void = { + free(pr.buf); + free(pr.wbuf); +}; + fn cast_u32s_to_u8s(in: []u32) []u8 = (in: *[*]u32: *[*]u8)[..len(in)*4]; fn cast_u8s_to_u32s(in: []u8) []u32 = @@ -52,24 +59,51 @@ export type packet = ( | packet_position | packet_reqchunk); -// call when input is ready. could block otherwise -export fn read(pr: *packet_reader, sock: io::handle) (void | io::error | io::EOF) = { - const remaining_amt = len(pr.good); - const read_pos = if (remaining_amt > 0) { - // still some unconsumed content in the buffer - // move unconsumed stuff to start of buffer - // fmt::printfln("moving {} remaining bytes",remaining_amt)!; - pr.buf[0..remaining_amt] = pr.good; - yield remaining_amt; - } else 0z; - const nread = match(io::read(sock, pr.buf[read_pos..])?) { - case io::EOF => return io::EOF; - case let n: size => yield n; +// call when input ready, so it doesn't block +export fn service( + pr: *packet_reader, + should_read: bool, + should_write: bool, +) (void | io::error | io::EOF) = { + const sock = pr.sock; + + if (should_read) { + // reading + const remaining_amt = len(pr.good); + const read_pos = if (remaining_amt > 0) { + // still some unconsumed content in the buffer + // move unconsumed stuff to start of buffer + // fmt::printfln("moving {} remaining bytes",remaining_amt)!; + pr.buf[0..remaining_amt] = pr.good; + yield remaining_amt; + } else 0z; + const nread = match(io::read(sock, pr.buf[read_pos..])?) { + case io::EOF => return io::EOF; + case let n: size => yield n; + }; + // fmt::printfln("read {} bytes",nread)!; + const total_amt = read_pos + nread; + pr.good = pr.buf[0..total_amt]; + // fmt::printfln("now {} bytes in buffer",total_amt)!; + }; + + + if (should_write) { + // writing + const amt_to_write = len(pr.wgood); + if (amt_to_write == 0) return; + const nwritten = io::write(sock, pr.wgood)?; + // fmt::printfln("written {} of {} bytes", nwritten, amt_to_write)!; + if (nwritten < amt_to_write) { + // fmt::println("moving buffer back")!; + const remaining_len = amt_to_write - nwritten; + pr.wbuf[0..remaining_len] = pr.wgood[nwritten..]; + pr.wgood = pr.wbuf[0..remaining_len]; + } else { + pr.wgood = pr.wbuf[0..0]; + }; + // fmt::printfln(" {} to write next time", len(pr.wgood))!; }; - // fmt::printfln("read {} bytes",nread)!; - const total_amt = read_pos + nread; - pr.good = pr.buf[0..total_amt]; - // fmt::printfln("now {} bytes in buffer",total_amt)!; }; // packet format: @@ -78,7 +112,7 @@ export fn read(pr: *packet_reader, sock: io::handle) (void | io::error | io::EOF // [size-8]u8 data... // size includes size of header (size and type fields) -export fn next(pr: *packet_reader) (packet | done | error) = { +export fn read_next(pr: *packet_reader) (packet | done | error) = { // either parse a full packet out of the front of good, // move good along that many bytes, // and return the packet, @@ -136,24 +170,28 @@ export fn next(pr: *packet_reader) (packet | done | error) = { }; }; -export fn send_raw(sock: io::file, ty: packet_type, datas: []u8...) (void | io::error) = { +fn append_wbuf(pr: *packet_reader, ty: packet_type, datas: []u8...) (void | error) = { // ehh const header: [8]u8 = [0...]; let total_len = 8u32; for (const data .. datas) total_len += len(data): u32; + + const remaining_space = len(pr.wbuf) - len(pr.wgood); + if (remaining_space < total_len) return "wbuff full": error; + endian::leputu32(header[0..4], total_len); endian::leputu32(header[4..8], ty); - io::writeall(sock, header)?; + static append(pr.wgood, header...); for (const data .. datas) - io::writeall(sock, data)?; + static append(pr.wgood, data...); }; -export fn send(sock: io::file, packet: packet) (void | io::error) = { +export fn write(pr: *packet_reader, packet: packet) (void | error) = { match (packet) { case let op: packet_drawop => const ser_op = drawing::ser_op(op); defer free(ser_op); - send_raw(sock, packet_type::DRAW_OP, ser_op)?; + append_wbuf(pr, packet_type::DRAW_OP, ser_op)?; case let packet: packet_sendchunk => const pos_buf: [8]u8 = [0...]; endian::leputu32(pos_buf[0..4],packet.world_pos.0: u32); @@ -161,16 +199,16 @@ export fn send(sock: io::file, packet: packet) (void | io::error) = { const chunk_data_compressed = rle_encode(packet.chunk_data); defer free(chunk_data_compressed); const compressed_data_bytes = cast_u32s_to_u8s(chunk_data_compressed); - send_raw(sock, packet_type::SEND_CHUNK, pos_buf, compressed_data_bytes)?; + append_wbuf(pr, packet_type::SEND_CHUNK, pos_buf, compressed_data_bytes)?; case let pos: packet_position => const pos_buf: [8]u8 = [0...]; endian::leputu32(pos_buf[0..4], pos.0: u32); endian::leputu32(pos_buf[4..8], pos.1: u32); - send_raw(sock, packet_type::POSITION, pos_buf)?; + append_wbuf(pr, packet_type::POSITION, pos_buf)?; case let world_pos: packet_reqchunk => const pos_buf: [8]u8 = [0...]; endian::leputu32(pos_buf[0..4], world_pos.0: u32); endian::leputu32(pos_buf[4..8], world_pos.1: u32); - send_raw(sock, packet_type::REQ_CHUNK, pos_buf)?; + append_wbuf(pr, packet_type::REQ_CHUNK, pos_buf)?; }; }; -- cgit v1.2.3