aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorubq323 <ubq323@ubq323.website>2024-04-27 21:27:12 +0100
committerubq323 <ubq323@ubq323.website>2024-04-27 21:27:17 +0100
commit907cda4c172936c6d5b7ea2ac1a3a0986c875853 (patch)
tree46020134dd1b8c3434fde07ab6cf9346391bf70b
parent4c7156d0b20447e29f354679acdcd28480df91bc (diff)
use poll to check writing as well as reading; packet_reader handles both now
-rw-r--r--.gitignore2
-rw-r--r--client/main.ha20
-rw-r--r--client/pictures.ha4
-rw-r--r--packet_reader/packet_reader.ha94
-rw-r--r--server/main.ha47
5 files changed, 112 insertions, 55 deletions
diff --git a/.gitignore b/.gitignore
index de34bb1..464d774 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,5 @@
*.ppm
*.png
e/*
+garden
+garden_server
diff --git a/client/main.ha b/client/main.ha
index 081c422..30a8341 100644
--- a/client/main.ha
+++ b/client/main.ha
@@ -47,9 +47,11 @@ export fn main() void = {
};
const pollfd: [1]poll::pollfd = [ poll::pollfd {
- fd = conn, events=poll::event::POLLIN, revents = 0
+ fd = conn,
+ events=poll::event::POLLIN | poll::event::POLLOUT,
+ ...
}];
- const packet_reader = packet_reader::new();
+ const packet_reader = packet_reader::new(conn);
// paintui state
let pstate = paintui::state { size_idx = 4, ... };
@@ -64,7 +66,7 @@ export fn main() void = {
let mouse_down = false;
- process_chunk_loadedness(&pmgr, conn, camera_pos);
+ process_chunk_loadedness(&pmgr, &packet_reader, camera_pos);
const win_pic = picture_from_surface(wsurf, (9,9));
for (!quit) {
@@ -108,18 +110,20 @@ export fn main() void = {
case void => yield;
case let op: drawing::op =>
perform_drawop(&pmgr, op);
- packet_reader::send(conn, op: packet_reader::packet_drawop)!;
+ packet_reader::write(&packet_reader, op: packet_reader::packet_drawop)!;
};
if (did_move) {
- packet_reader::send(conn, camera_pos: packet_reader::packet_position)!;
- process_chunk_loadedness(&pmgr, conn, camera_pos);
+ packet_reader::write(&packet_reader, camera_pos: packet_reader::packet_position)!;
+ process_chunk_loadedness(&pmgr, &packet_reader, camera_pos);
};
const n = poll::poll(pollfd, poll::NONBLOCK)!;
if (n > 0) {
- packet_reader::read(&packet_reader, conn)!;
- for (true) match (packet_reader::next(&packet_reader)) {
+ const should_read = 0 != pollfd[0].revents & poll::event::POLLIN;
+ const should_write = 0 != pollfd[0].revents & poll::event::POLLOUT;
+ packet_reader::service(&packet_reader, should_read, should_write)!;
+ for (true) match (packet_reader::read_next(&packet_reader)) {
case done => break;
case let e: packet_reader::error =>
fmt::fatalf("death: packet_reader: {}", e);
diff --git a/client/pictures.ha b/client/pictures.ha
index ce6c600..0ae17d6 100644
--- a/client/pictures.ha
+++ b/client/pictures.ha
@@ -92,7 +92,7 @@ fn is_request_inflight(pmgr: *picture_mgr, world_pos: pos) bool = {
// request it.
fn process_chunk_loadedness(
pmgr: *picture_mgr,
- conn: net::socket,
+ pr: *packet_reader::packet_reader,
camera_pos: pos,
) void = {
@@ -133,7 +133,7 @@ fn process_chunk_loadedness(
};
fmt::printfln("!! requesting {},{}", world_pos.0, world_pos.1)!;
- packet_reader::send(conn, world_pos: packet_reader::packet_reqchunk)!;
+ packet_reader::write(pr, world_pos: packet_reader::packet_reqchunk)!;
append(pmgr.requests, world_pos);
};
};
diff --git a/packet_reader/packet_reader.ha b/packet_reader/packet_reader.ha
index d541a11..ae38be7 100644
--- a/packet_reader/packet_reader.ha
+++ b/packet_reader/packet_reader.ha
@@ -4,7 +4,7 @@ use endian;
use drawing;
use drawing::{pos,CHUNKSIZE};
-export def VERSION: u8 = 2;
+export def VERSION: u8 = 3;
export type error = !str;
@@ -13,12 +13,14 @@ export type packet_reader = struct {
good: []u8,
wbuf: []u8,
wgood: []u8,
+ sock: io::file,
};
-export fn new() packet_reader = {
+export fn new(sock: io::file) packet_reader = {
let pr = packet_reader {
buf = alloc([0...],512*512*4*2), // ehhh
wbuf = alloc([0...], 512*512*4*2), // ehhhh
+ sock = sock,
...
};
pr.good = pr.buf[0..0];
@@ -26,6 +28,11 @@ export fn new() packet_reader = {
return pr;
};
+export fn finish(pr: *packet_reader) void = {
+ free(pr.buf);
+ free(pr.wbuf);
+};
+
fn cast_u32s_to_u8s(in: []u32) []u8 =
(in: *[*]u32: *[*]u8)[..len(in)*4];
fn cast_u8s_to_u32s(in: []u8) []u32 =
@@ -52,24 +59,51 @@ export type packet = (
| packet_position
| packet_reqchunk);
-// call when input is ready. could block otherwise
-export fn read(pr: *packet_reader, sock: io::handle) (void | io::error | io::EOF) = {
- const remaining_amt = len(pr.good);
- const read_pos = if (remaining_amt > 0) {
- // still some unconsumed content in the buffer
- // move unconsumed stuff to start of buffer
- // fmt::printfln("moving {} remaining bytes",remaining_amt)!;
- pr.buf[0..remaining_amt] = pr.good;
- yield remaining_amt;
- } else 0z;
- const nread = match(io::read(sock, pr.buf[read_pos..])?) {
- case io::EOF => return io::EOF;
- case let n: size => yield n;
+// call when input ready, so it doesn't block
+export fn service(
+ pr: *packet_reader,
+ should_read: bool,
+ should_write: bool,
+) (void | io::error | io::EOF) = {
+ const sock = pr.sock;
+
+ if (should_read) {
+ // reading
+ const remaining_amt = len(pr.good);
+ const read_pos = if (remaining_amt > 0) {
+ // still some unconsumed content in the buffer
+ // move unconsumed stuff to start of buffer
+ // fmt::printfln("moving {} remaining bytes",remaining_amt)!;
+ pr.buf[0..remaining_amt] = pr.good;
+ yield remaining_amt;
+ } else 0z;
+ const nread = match(io::read(sock, pr.buf[read_pos..])?) {
+ case io::EOF => return io::EOF;
+ case let n: size => yield n;
+ };
+ // fmt::printfln("read {} bytes",nread)!;
+ const total_amt = read_pos + nread;
+ pr.good = pr.buf[0..total_amt];
+ // fmt::printfln("now {} bytes in buffer",total_amt)!;
+ };
+
+
+ if (should_write) {
+ // writing
+ const amt_to_write = len(pr.wgood);
+ if (amt_to_write == 0) return;
+ const nwritten = io::write(sock, pr.wgood)?;
+ // fmt::printfln("written {} of {} bytes", nwritten, amt_to_write)!;
+ if (nwritten < amt_to_write) {
+ // fmt::println("moving buffer back")!;
+ const remaining_len = amt_to_write - nwritten;
+ pr.wbuf[0..remaining_len] = pr.wgood[nwritten..];
+ pr.wgood = pr.wbuf[0..remaining_len];
+ } else {
+ pr.wgood = pr.wbuf[0..0];
+ };
+ // fmt::printfln(" {} to write next time", len(pr.wgood))!;
};
- // fmt::printfln("read {} bytes",nread)!;
- const total_amt = read_pos + nread;
- pr.good = pr.buf[0..total_amt];
- // fmt::printfln("now {} bytes in buffer",total_amt)!;
};
// packet format:
@@ -78,7 +112,7 @@ export fn read(pr: *packet_reader, sock: io::handle) (void | io::error | io::EOF
// [size-8]u8 data...
// size includes size of header (size and type fields)
-export fn next(pr: *packet_reader) (packet | done | error) = {
+export fn read_next(pr: *packet_reader) (packet | done | error) = {
// either parse a full packet out of the front of good,
// move good along that many bytes,
// and return the packet,
@@ -136,24 +170,28 @@ export fn next(pr: *packet_reader) (packet | done | error) = {
};
};
-export fn send_raw(sock: io::file, ty: packet_type, datas: []u8...) (void | io::error) = {
+fn append_wbuf(pr: *packet_reader, ty: packet_type, datas: []u8...) (void | error) = {
// ehh
const header: [8]u8 = [0...];
let total_len = 8u32;
for (const data .. datas) total_len += len(data): u32;
+
+ const remaining_space = len(pr.wbuf) - len(pr.wgood);
+ if (remaining_space < total_len) return "wbuff full": error;
+
endian::leputu32(header[0..4], total_len);
endian::leputu32(header[4..8], ty);
- io::writeall(sock, header)?;
+ static append(pr.wgood, header...);
for (const data .. datas)
- io::writeall(sock, data)?;
+ static append(pr.wgood, data...);
};
-export fn send(sock: io::file, packet: packet) (void | io::error) = {
+export fn write(pr: *packet_reader, packet: packet) (void | error) = {
match (packet) {
case let op: packet_drawop =>
const ser_op = drawing::ser_op(op);
defer free(ser_op);
- send_raw(sock, packet_type::DRAW_OP, ser_op)?;
+ append_wbuf(pr, packet_type::DRAW_OP, ser_op)?;
case let packet: packet_sendchunk =>
const pos_buf: [8]u8 = [0...];
endian::leputu32(pos_buf[0..4],packet.world_pos.0: u32);
@@ -161,16 +199,16 @@ export fn send(sock: io::file, packet: packet) (void | io::error) = {
const chunk_data_compressed = rle_encode(packet.chunk_data);
defer free(chunk_data_compressed);
const compressed_data_bytes = cast_u32s_to_u8s(chunk_data_compressed);
- send_raw(sock, packet_type::SEND_CHUNK, pos_buf, compressed_data_bytes)?;
+ append_wbuf(pr, packet_type::SEND_CHUNK, pos_buf, compressed_data_bytes)?;
case let pos: packet_position =>
const pos_buf: [8]u8 = [0...];
endian::leputu32(pos_buf[0..4], pos.0: u32);
endian::leputu32(pos_buf[4..8], pos.1: u32);
- send_raw(sock, packet_type::POSITION, pos_buf)?;
+ append_wbuf(pr, packet_type::POSITION, pos_buf)?;
case let world_pos: packet_reqchunk =>
const pos_buf: [8]u8 = [0...];
endian::leputu32(pos_buf[0..4], world_pos.0: u32);
endian::leputu32(pos_buf[4..8], world_pos.1: u32);
- send_raw(sock, packet_type::REQ_CHUNK, pos_buf)?;
+ append_wbuf(pr, packet_type::REQ_CHUNK, pos_buf)?;
};
};
diff --git a/server/main.ha b/server/main.ha
index 475d19a..f78bf0c 100644
--- a/server/main.ha
+++ b/server/main.ha
@@ -32,7 +32,6 @@ type server_state = struct {
type connection = struct {
pr: packet_reader::packet_reader,
- sock: net::socket,
pos: pos,
should_delete: bool,
};
@@ -90,13 +89,17 @@ fn loop(state: *server_state, timeout: time::duration) void = {
let pollfds: []poll::pollfd = [];
append(pollfds, poll::pollfd {
fd = state.listener,
- events = poll::event::POLLIN,
+ events = poll::event::POLLIN | poll::event::POLLOUT,
...
});
for (const conn &.. state.connections) {
+ const events = if (len(conn.pr.wgood) > 0)
+ poll::event::POLLIN | poll::event::POLLOUT
+ else
+ poll::event::POLLIN;
append(pollfds, poll::pollfd {
- fd = conn.sock,
- events = poll::event::POLLIN,
+ fd = conn.pr.sock,
+ events = events,
...
});
};
@@ -107,8 +110,7 @@ fn loop(state: *server_state, timeout: time::duration) void = {
if (0 != pollfds[0].revents & poll::event::POLLIN) {
const new_conn = tcp::accept(pollfds[0].fd)!;
append(state.connections, connection {
- sock = new_conn,
- pr = packet_reader::new(),
+ pr = packet_reader::new(new_conn),
pos = (0,0),
should_delete = false,
});
@@ -130,8 +132,12 @@ fn loop(state: *server_state, timeout: time::duration) void = {
for (let pollfd_idx = 1z: size; pollfd_idx < len(pollfds); pollfd_idx += 1) {
const conn_idx = pollfd_idx - 1;
if (state.connections[conn_idx].should_delete) continue;
- if (0 != pollfds[conn_idx+1].revents & poll::event::POLLIN) {
- read_from_connection(state, conn_idx);
+
+ const revents = pollfds[conn_idx+1].revents;
+ const should_read = 0 != revents & poll::event::POLLIN;
+ const should_write = 0 != revents & poll::event::POLLOUT;
+ if (should_read || should_write) {
+ read_from_connection(state, conn_idx, should_read, should_write);
};
};
@@ -146,6 +152,8 @@ fn perform_deletions(state: *server_state) void = {
const conn = state.connections[i];
if (conn.should_delete) {
fmt::printfln("deleting conn {}",i)!;
+ packet_reader::finish(&conn.pr);
+ io::close(conn.pr.sock): void;
delete(state.connections[i]);
};
};
@@ -154,14 +162,19 @@ fn perform_deletions(state: *server_state) void = {
fn greet_connection(state: *server_state, conn_idx: size)
(void|io::error) = {
const conn = state.connections[conn_idx];
- io::write(conn.sock, [VERSION])?;
+ io::write(conn.pr.sock, [VERSION])?;
};
-fn read_from_connection(state: *server_state, conn_idx: size) void = {
+fn read_from_connection(
+ state: *server_state,
+ conn_idx: size,
+ should_read: bool,
+ should_write: bool,
+) void = {
const conn = &state.connections[conn_idx];
- match (packet_reader::read(&conn.pr, conn.sock)) {
+ match (packet_reader::service(&conn.pr, should_read, should_write)) {
case let err: io::error =>
fmt::printfln("#{} error: {}", conn_idx, io::strerror(err))!;
conn.should_delete = true;
@@ -169,7 +182,7 @@ fn read_from_connection(state: *server_state, conn_idx: size) void = {
fmt::printfln("#{} disconnect", conn_idx)!;
conn.should_delete = true;
case void =>
- for (true) match (packet_reader::next(&conn.pr)) {
+ for (true) match (packet_reader::read_next(&conn.pr)) {
// xxx foreach loop seems to break match exhaustivity here
// investigate that at some point
case done => break;
@@ -192,7 +205,7 @@ fn handle_packet(
state: *server_state,
conn_idx: size,
packet: packet_reader::packet
-) (void|io::error) = {
+) (void|io::error|packet_reader::error) = {
const conn = &state.connections[conn_idx];
match (packet) {
case let op: packet_reader::packet_drawop =>
@@ -207,11 +220,11 @@ fn handle_packet(
const other_conn = &state.connections[other_idx];
if (other_conn.should_delete) continue;
// fmt::printfln("\t -> #{}",other_idx)!;
- match (packet_reader::send(other_conn.sock, packet)) {
+ match (packet_reader::write(&other_conn.pr, packet)) {
case void => yield;
- case let e: io::error =>
+ case let e: packet_reader::error =>
fmt::printfln("couldn't send to #{}: {}",
- other_idx, io::strerror(e))!;
+ other_idx, e: str)!;
other_conn.should_delete = true;
};
};
@@ -225,7 +238,7 @@ fn handle_packet(
conn_idx, world_pos.0, world_pos.1)!;
const pic = ensure_chunk(state, world_pos);
- packet_reader::send(conn.sock, packet_reader::packet_sendchunk {
+ packet_reader::write(&conn.pr, packet_reader::packet_sendchunk {
world_pos = world_pos,
chunk_data = pic.d[..pic.w*pic.h],
})?;