From 907cda4c172936c6d5b7ea2ac1a3a0986c875853 Mon Sep 17 00:00:00 2001 From: ubq323 Date: Sat, 27 Apr 2024 21:27:12 +0100 Subject: use poll to check writing as well as reading; packet_reader handles both now --- .gitignore | 2 + client/main.ha | 20 +++++---- client/pictures.ha | 4 +- packet_reader/packet_reader.ha | 94 +++++++++++++++++++++++++++++------------- server/main.ha | 47 +++++++++++++-------- 5 files changed, 112 insertions(+), 55 deletions(-) diff --git a/.gitignore b/.gitignore index de34bb1..464d774 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ *.ppm *.png e/* +garden +garden_server diff --git a/client/main.ha b/client/main.ha index 081c422..30a8341 100644 --- a/client/main.ha +++ b/client/main.ha @@ -47,9 +47,11 @@ export fn main() void = { }; const pollfd: [1]poll::pollfd = [ poll::pollfd { - fd = conn, events=poll::event::POLLIN, revents = 0 + fd = conn, + events=poll::event::POLLIN | poll::event::POLLOUT, + ... }]; - const packet_reader = packet_reader::new(); + const packet_reader = packet_reader::new(conn); // paintui state let pstate = paintui::state { size_idx = 4, ... }; @@ -64,7 +66,7 @@ export fn main() void = { let mouse_down = false; - process_chunk_loadedness(&pmgr, conn, camera_pos); + process_chunk_loadedness(&pmgr, &packet_reader, camera_pos); const win_pic = picture_from_surface(wsurf, (9,9)); for (!quit) { @@ -108,18 +110,20 @@ export fn main() void = { case void => yield; case let op: drawing::op => perform_drawop(&pmgr, op); - packet_reader::send(conn, op: packet_reader::packet_drawop)!; + packet_reader::write(&packet_reader, op: packet_reader::packet_drawop)!; }; if (did_move) { - packet_reader::send(conn, camera_pos: packet_reader::packet_position)!; - process_chunk_loadedness(&pmgr, conn, camera_pos); + packet_reader::write(&packet_reader, camera_pos: packet_reader::packet_position)!; + process_chunk_loadedness(&pmgr, &packet_reader, camera_pos); }; const n = poll::poll(pollfd, poll::NONBLOCK)!; if (n > 0) { - packet_reader::read(&packet_reader, conn)!; - for (true) match (packet_reader::next(&packet_reader)) { + const should_read = 0 != pollfd[0].revents & poll::event::POLLIN; + const should_write = 0 != pollfd[0].revents & poll::event::POLLOUT; + packet_reader::service(&packet_reader, should_read, should_write)!; + for (true) match (packet_reader::read_next(&packet_reader)) { case done => break; case let e: packet_reader::error => fmt::fatalf("death: packet_reader: {}", e); diff --git a/client/pictures.ha b/client/pictures.ha index ce6c600..0ae17d6 100644 --- a/client/pictures.ha +++ b/client/pictures.ha @@ -92,7 +92,7 @@ fn is_request_inflight(pmgr: *picture_mgr, world_pos: pos) bool = { // request it. fn process_chunk_loadedness( pmgr: *picture_mgr, - conn: net::socket, + pr: *packet_reader::packet_reader, camera_pos: pos, ) void = { @@ -133,7 +133,7 @@ fn process_chunk_loadedness( }; fmt::printfln("!! requesting {},{}", world_pos.0, world_pos.1)!; - packet_reader::send(conn, world_pos: packet_reader::packet_reqchunk)!; + packet_reader::write(pr, world_pos: packet_reader::packet_reqchunk)!; append(pmgr.requests, world_pos); }; }; diff --git a/packet_reader/packet_reader.ha b/packet_reader/packet_reader.ha index d541a11..ae38be7 100644 --- a/packet_reader/packet_reader.ha +++ b/packet_reader/packet_reader.ha @@ -4,7 +4,7 @@ use endian; use drawing; use drawing::{pos,CHUNKSIZE}; -export def VERSION: u8 = 2; +export def VERSION: u8 = 3; export type error = !str; @@ -13,12 +13,14 @@ export type packet_reader = struct { good: []u8, wbuf: []u8, wgood: []u8, + sock: io::file, }; -export fn new() packet_reader = { +export fn new(sock: io::file) packet_reader = { let pr = packet_reader { buf = alloc([0...],512*512*4*2), // ehhh wbuf = alloc([0...], 512*512*4*2), // ehhhh + sock = sock, ... }; pr.good = pr.buf[0..0]; @@ -26,6 +28,11 @@ export fn new() packet_reader = { return pr; }; +export fn finish(pr: *packet_reader) void = { + free(pr.buf); + free(pr.wbuf); +}; + fn cast_u32s_to_u8s(in: []u32) []u8 = (in: *[*]u32: *[*]u8)[..len(in)*4]; fn cast_u8s_to_u32s(in: []u8) []u32 = @@ -52,24 +59,51 @@ export type packet = ( | packet_position | packet_reqchunk); -// call when input is ready. could block otherwise -export fn read(pr: *packet_reader, sock: io::handle) (void | io::error | io::EOF) = { - const remaining_amt = len(pr.good); - const read_pos = if (remaining_amt > 0) { - // still some unconsumed content in the buffer - // move unconsumed stuff to start of buffer - // fmt::printfln("moving {} remaining bytes",remaining_amt)!; - pr.buf[0..remaining_amt] = pr.good; - yield remaining_amt; - } else 0z; - const nread = match(io::read(sock, pr.buf[read_pos..])?) { - case io::EOF => return io::EOF; - case let n: size => yield n; +// call when input ready, so it doesn't block +export fn service( + pr: *packet_reader, + should_read: bool, + should_write: bool, +) (void | io::error | io::EOF) = { + const sock = pr.sock; + + if (should_read) { + // reading + const remaining_amt = len(pr.good); + const read_pos = if (remaining_amt > 0) { + // still some unconsumed content in the buffer + // move unconsumed stuff to start of buffer + // fmt::printfln("moving {} remaining bytes",remaining_amt)!; + pr.buf[0..remaining_amt] = pr.good; + yield remaining_amt; + } else 0z; + const nread = match(io::read(sock, pr.buf[read_pos..])?) { + case io::EOF => return io::EOF; + case let n: size => yield n; + }; + // fmt::printfln("read {} bytes",nread)!; + const total_amt = read_pos + nread; + pr.good = pr.buf[0..total_amt]; + // fmt::printfln("now {} bytes in buffer",total_amt)!; + }; + + + if (should_write) { + // writing + const amt_to_write = len(pr.wgood); + if (amt_to_write == 0) return; + const nwritten = io::write(sock, pr.wgood)?; + // fmt::printfln("written {} of {} bytes", nwritten, amt_to_write)!; + if (nwritten < amt_to_write) { + // fmt::println("moving buffer back")!; + const remaining_len = amt_to_write - nwritten; + pr.wbuf[0..remaining_len] = pr.wgood[nwritten..]; + pr.wgood = pr.wbuf[0..remaining_len]; + } else { + pr.wgood = pr.wbuf[0..0]; + }; + // fmt::printfln(" {} to write next time", len(pr.wgood))!; }; - // fmt::printfln("read {} bytes",nread)!; - const total_amt = read_pos + nread; - pr.good = pr.buf[0..total_amt]; - // fmt::printfln("now {} bytes in buffer",total_amt)!; }; // packet format: @@ -78,7 +112,7 @@ export fn read(pr: *packet_reader, sock: io::handle) (void | io::error | io::EOF // [size-8]u8 data... // size includes size of header (size and type fields) -export fn next(pr: *packet_reader) (packet | done | error) = { +export fn read_next(pr: *packet_reader) (packet | done | error) = { // either parse a full packet out of the front of good, // move good along that many bytes, // and return the packet, @@ -136,24 +170,28 @@ export fn next(pr: *packet_reader) (packet | done | error) = { }; }; -export fn send_raw(sock: io::file, ty: packet_type, datas: []u8...) (void | io::error) = { +fn append_wbuf(pr: *packet_reader, ty: packet_type, datas: []u8...) (void | error) = { // ehh const header: [8]u8 = [0...]; let total_len = 8u32; for (const data .. datas) total_len += len(data): u32; + + const remaining_space = len(pr.wbuf) - len(pr.wgood); + if (remaining_space < total_len) return "wbuff full": error; + endian::leputu32(header[0..4], total_len); endian::leputu32(header[4..8], ty); - io::writeall(sock, header)?; + static append(pr.wgood, header...); for (const data .. datas) - io::writeall(sock, data)?; + static append(pr.wgood, data...); }; -export fn send(sock: io::file, packet: packet) (void | io::error) = { +export fn write(pr: *packet_reader, packet: packet) (void | error) = { match (packet) { case let op: packet_drawop => const ser_op = drawing::ser_op(op); defer free(ser_op); - send_raw(sock, packet_type::DRAW_OP, ser_op)?; + append_wbuf(pr, packet_type::DRAW_OP, ser_op)?; case let packet: packet_sendchunk => const pos_buf: [8]u8 = [0...]; endian::leputu32(pos_buf[0..4],packet.world_pos.0: u32); @@ -161,16 +199,16 @@ export fn send(sock: io::file, packet: packet) (void | io::error) = { const chunk_data_compressed = rle_encode(packet.chunk_data); defer free(chunk_data_compressed); const compressed_data_bytes = cast_u32s_to_u8s(chunk_data_compressed); - send_raw(sock, packet_type::SEND_CHUNK, pos_buf, compressed_data_bytes)?; + append_wbuf(pr, packet_type::SEND_CHUNK, pos_buf, compressed_data_bytes)?; case let pos: packet_position => const pos_buf: [8]u8 = [0...]; endian::leputu32(pos_buf[0..4], pos.0: u32); endian::leputu32(pos_buf[4..8], pos.1: u32); - send_raw(sock, packet_type::POSITION, pos_buf)?; + append_wbuf(pr, packet_type::POSITION, pos_buf)?; case let world_pos: packet_reqchunk => const pos_buf: [8]u8 = [0...]; endian::leputu32(pos_buf[0..4], world_pos.0: u32); endian::leputu32(pos_buf[4..8], world_pos.1: u32); - send_raw(sock, packet_type::REQ_CHUNK, pos_buf)?; + append_wbuf(pr, packet_type::REQ_CHUNK, pos_buf)?; }; }; diff --git a/server/main.ha b/server/main.ha index 475d19a..f78bf0c 100644 --- a/server/main.ha +++ b/server/main.ha @@ -32,7 +32,6 @@ type server_state = struct { type connection = struct { pr: packet_reader::packet_reader, - sock: net::socket, pos: pos, should_delete: bool, }; @@ -90,13 +89,17 @@ fn loop(state: *server_state, timeout: time::duration) void = { let pollfds: []poll::pollfd = []; append(pollfds, poll::pollfd { fd = state.listener, - events = poll::event::POLLIN, + events = poll::event::POLLIN | poll::event::POLLOUT, ... }); for (const conn &.. state.connections) { + const events = if (len(conn.pr.wgood) > 0) + poll::event::POLLIN | poll::event::POLLOUT + else + poll::event::POLLIN; append(pollfds, poll::pollfd { - fd = conn.sock, - events = poll::event::POLLIN, + fd = conn.pr.sock, + events = events, ... }); }; @@ -107,8 +110,7 @@ fn loop(state: *server_state, timeout: time::duration) void = { if (0 != pollfds[0].revents & poll::event::POLLIN) { const new_conn = tcp::accept(pollfds[0].fd)!; append(state.connections, connection { - sock = new_conn, - pr = packet_reader::new(), + pr = packet_reader::new(new_conn), pos = (0,0), should_delete = false, }); @@ -130,8 +132,12 @@ fn loop(state: *server_state, timeout: time::duration) void = { for (let pollfd_idx = 1z: size; pollfd_idx < len(pollfds); pollfd_idx += 1) { const conn_idx = pollfd_idx - 1; if (state.connections[conn_idx].should_delete) continue; - if (0 != pollfds[conn_idx+1].revents & poll::event::POLLIN) { - read_from_connection(state, conn_idx); + + const revents = pollfds[conn_idx+1].revents; + const should_read = 0 != revents & poll::event::POLLIN; + const should_write = 0 != revents & poll::event::POLLOUT; + if (should_read || should_write) { + read_from_connection(state, conn_idx, should_read, should_write); }; }; @@ -146,6 +152,8 @@ fn perform_deletions(state: *server_state) void = { const conn = state.connections[i]; if (conn.should_delete) { fmt::printfln("deleting conn {}",i)!; + packet_reader::finish(&conn.pr); + io::close(conn.pr.sock): void; delete(state.connections[i]); }; }; @@ -154,14 +162,19 @@ fn perform_deletions(state: *server_state) void = { fn greet_connection(state: *server_state, conn_idx: size) (void|io::error) = { const conn = state.connections[conn_idx]; - io::write(conn.sock, [VERSION])?; + io::write(conn.pr.sock, [VERSION])?; }; -fn read_from_connection(state: *server_state, conn_idx: size) void = { +fn read_from_connection( + state: *server_state, + conn_idx: size, + should_read: bool, + should_write: bool, +) void = { const conn = &state.connections[conn_idx]; - match (packet_reader::read(&conn.pr, conn.sock)) { + match (packet_reader::service(&conn.pr, should_read, should_write)) { case let err: io::error => fmt::printfln("#{} error: {}", conn_idx, io::strerror(err))!; conn.should_delete = true; @@ -169,7 +182,7 @@ fn read_from_connection(state: *server_state, conn_idx: size) void = { fmt::printfln("#{} disconnect", conn_idx)!; conn.should_delete = true; case void => - for (true) match (packet_reader::next(&conn.pr)) { + for (true) match (packet_reader::read_next(&conn.pr)) { // xxx foreach loop seems to break match exhaustivity here // investigate that at some point case done => break; @@ -192,7 +205,7 @@ fn handle_packet( state: *server_state, conn_idx: size, packet: packet_reader::packet -) (void|io::error) = { +) (void|io::error|packet_reader::error) = { const conn = &state.connections[conn_idx]; match (packet) { case let op: packet_reader::packet_drawop => @@ -207,11 +220,11 @@ fn handle_packet( const other_conn = &state.connections[other_idx]; if (other_conn.should_delete) continue; // fmt::printfln("\t -> #{}",other_idx)!; - match (packet_reader::send(other_conn.sock, packet)) { + match (packet_reader::write(&other_conn.pr, packet)) { case void => yield; - case let e: io::error => + case let e: packet_reader::error => fmt::printfln("couldn't send to #{}: {}", - other_idx, io::strerror(e))!; + other_idx, e: str)!; other_conn.should_delete = true; }; }; @@ -225,7 +238,7 @@ fn handle_packet( conn_idx, world_pos.0, world_pos.1)!; const pic = ensure_chunk(state, world_pos); - packet_reader::send(conn.sock, packet_reader::packet_sendchunk { + packet_reader::write(&conn.pr, packet_reader::packet_sendchunk { world_pos = world_pos, chunk_data = pic.d[..pic.w*pic.h], })?; -- cgit v1.2.3