diff options
| author | ubq323 <ubq323@ubq323.website> | 2024-04-27 21:27:12 +0100 | 
|---|---|---|
| committer | ubq323 <ubq323@ubq323.website> | 2024-04-27 21:27:17 +0100 | 
| commit | 907cda4c172936c6d5b7ea2ac1a3a0986c875853 (patch) | |
| tree | 46020134dd1b8c3434fde07ab6cf9346391bf70b | |
| parent | 4c7156d0b20447e29f354679acdcd28480df91bc (diff) | |
use poll to check writing as well as reading; packet_reader handles both now
| -rw-r--r-- | .gitignore | 2 | ||||
| -rw-r--r-- | client/main.ha | 20 | ||||
| -rw-r--r-- | client/pictures.ha | 4 | ||||
| -rw-r--r-- | packet_reader/packet_reader.ha | 94 | ||||
| -rw-r--r-- | server/main.ha | 47 | 
5 files changed, 112 insertions, 55 deletions
| @@ -1,3 +1,5 @@  *.ppm  *.png  e/* +garden +garden_server diff --git a/client/main.ha b/client/main.ha index 081c422..30a8341 100644 --- a/client/main.ha +++ b/client/main.ha @@ -47,9 +47,11 @@ export fn main() void = {  	};  	const pollfd: [1]poll::pollfd = [ poll::pollfd { -		fd = conn, events=poll::event::POLLIN, revents = 0 +		fd = conn, +		events=poll::event::POLLIN | poll::event::POLLOUT, +		...  	}]; -	const packet_reader = packet_reader::new(); +	const packet_reader = packet_reader::new(conn);  	// paintui state  	let pstate = paintui::state { size_idx = 4, ... }; @@ -64,7 +66,7 @@ export fn main() void = {  	let mouse_down = false; -	process_chunk_loadedness(&pmgr, conn, camera_pos); +	process_chunk_loadedness(&pmgr, &packet_reader, camera_pos);  	const win_pic = picture_from_surface(wsurf, (9,9));  	for (!quit) { @@ -108,18 +110,20 @@ export fn main() void = {  		case void => yield;  		case let op: drawing::op =>  			perform_drawop(&pmgr, op); -			packet_reader::send(conn, op: packet_reader::packet_drawop)!; +			packet_reader::write(&packet_reader, op: packet_reader::packet_drawop)!;  		};  		if (did_move) { -			packet_reader::send(conn, camera_pos: packet_reader::packet_position)!; -			process_chunk_loadedness(&pmgr, conn, camera_pos); +			packet_reader::write(&packet_reader, camera_pos: packet_reader::packet_position)!; +			process_chunk_loadedness(&pmgr, &packet_reader, camera_pos);  		};  		const n = poll::poll(pollfd, poll::NONBLOCK)!;  		if (n > 0) { -			packet_reader::read(&packet_reader, conn)!; -			for (true) match (packet_reader::next(&packet_reader)) { +			const should_read = 0 != pollfd[0].revents & poll::event::POLLIN; +			const should_write = 0 != pollfd[0].revents & poll::event::POLLOUT; +			packet_reader::service(&packet_reader, should_read, should_write)!; +			for (true) match (packet_reader::read_next(&packet_reader)) {  			case done => break;  			case let e: packet_reader::error =>  				fmt::fatalf("death: packet_reader: {}", e); diff --git a/client/pictures.ha b/client/pictures.ha index ce6c600..0ae17d6 100644 --- a/client/pictures.ha +++ b/client/pictures.ha @@ -92,7 +92,7 @@ fn is_request_inflight(pmgr: *picture_mgr, world_pos: pos) bool = {  // 	request it.  fn process_chunk_loadedness(  	pmgr: *picture_mgr, -	conn: net::socket, +	pr: *packet_reader::packet_reader,  	camera_pos: pos,  ) void = { @@ -133,7 +133,7 @@ fn process_chunk_loadedness(  			};  			fmt::printfln("!! requesting {},{}", world_pos.0, world_pos.1)!; -			packet_reader::send(conn, world_pos: packet_reader::packet_reqchunk)!; +			packet_reader::write(pr, world_pos: packet_reader::packet_reqchunk)!;  			append(pmgr.requests, world_pos);  		};  	}; diff --git a/packet_reader/packet_reader.ha b/packet_reader/packet_reader.ha index d541a11..ae38be7 100644 --- a/packet_reader/packet_reader.ha +++ b/packet_reader/packet_reader.ha @@ -4,7 +4,7 @@ use endian;  use drawing;  use drawing::{pos,CHUNKSIZE}; -export def VERSION: u8 = 2; +export def VERSION: u8 = 3;  export type error = !str; @@ -13,12 +13,14 @@ export type packet_reader = struct {  	good: []u8,  	wbuf: []u8,  	wgood: []u8, +	sock: io::file,  }; -export fn new() packet_reader = { +export fn new(sock: io::file) packet_reader = {  	let pr = packet_reader {  		buf = alloc([0...],512*512*4*2), // ehhh  		wbuf = alloc([0...], 512*512*4*2), // ehhhh +		sock = sock,  		...  	};  	pr.good = pr.buf[0..0]; @@ -26,6 +28,11 @@ export fn new() packet_reader = {  	return pr;  }; +export fn finish(pr: *packet_reader) void = { +	free(pr.buf); +	free(pr.wbuf); +}; +  fn cast_u32s_to_u8s(in: []u32) []u8 =  	(in: *[*]u32: *[*]u8)[..len(in)*4];  fn cast_u8s_to_u32s(in: []u8) []u32 = @@ -52,24 +59,51 @@ export type packet = (  	| packet_position  	| packet_reqchunk); -// call when input is ready. could block otherwise -export fn read(pr: *packet_reader, sock: io::handle) (void | io::error | io::EOF) = { -	const remaining_amt = len(pr.good); -	const read_pos = if (remaining_amt > 0) { -		// still some unconsumed content in the buffer -		// move unconsumed stuff to start of buffer -		// fmt::printfln("moving {} remaining bytes",remaining_amt)!; -		pr.buf[0..remaining_amt] = pr.good; -		yield remaining_amt; -	} else 0z; -	const nread = match(io::read(sock, pr.buf[read_pos..])?) { -		case io::EOF => return io::EOF; -		case let n: size => yield n; +// call when input ready, so it doesn't block +export fn service( +	pr: *packet_reader, +	should_read: bool, +	should_write: bool, +) (void | io::error | io::EOF) = {	 +	const sock = pr.sock; + +	if (should_read) { +		// reading +		const remaining_amt = len(pr.good); +		const read_pos = if (remaining_amt > 0) { +			// still some unconsumed content in the buffer +			// move unconsumed stuff to start of buffer +			// fmt::printfln("moving {} remaining bytes",remaining_amt)!; +			pr.buf[0..remaining_amt] = pr.good; +			yield remaining_amt; +		} else 0z; +		const nread = match(io::read(sock, pr.buf[read_pos..])?) { +			case io::EOF => return io::EOF; +			case let n: size => yield n; +		}; +		// fmt::printfln("read {} bytes",nread)!; +		const total_amt = read_pos + nread; +		pr.good = pr.buf[0..total_amt]; +		// fmt::printfln("now {} bytes in buffer",total_amt)!; +	}; + + +	if (should_write) { +		// writing +		const amt_to_write = len(pr.wgood); +		if (amt_to_write == 0) return; +		const nwritten = io::write(sock, pr.wgood)?; +		// fmt::printfln("written {} of {} bytes", nwritten, amt_to_write)!; +		if (nwritten < amt_to_write) { +			// fmt::println("moving buffer back")!; +			const remaining_len = amt_to_write - nwritten; +			pr.wbuf[0..remaining_len] = pr.wgood[nwritten..]; +			pr.wgood = pr.wbuf[0..remaining_len]; +		} else { +			pr.wgood = pr.wbuf[0..0]; +		}; +		// fmt::printfln("  {} to write next time", len(pr.wgood))!;  	}; -	// fmt::printfln("read {} bytes",nread)!; -	const total_amt = read_pos + nread; -	pr.good = pr.buf[0..total_amt]; -	// fmt::printfln("now {} bytes in buffer",total_amt)!;  };  // packet format: @@ -78,7 +112,7 @@ export fn read(pr: *packet_reader, sock: io::handle) (void | io::error | io::EOF  // [size-8]u8 data...  // size includes size of header (size and type fields) -export fn next(pr: *packet_reader) (packet | done | error) = { +export fn read_next(pr: *packet_reader) (packet | done | error) = {  	// either parse a full packet out of the front of good,  	// move good along that many bytes,  	// and return the packet, @@ -136,24 +170,28 @@ export fn next(pr: *packet_reader) (packet | done | error) = {  	};  }; -export fn send_raw(sock: io::file, ty: packet_type, datas: []u8...) (void | io::error) = { +fn append_wbuf(pr: *packet_reader, ty: packet_type, datas: []u8...) (void | error) = {  	// ehh  	const header: [8]u8 = [0...];  	let total_len = 8u32;  	for (const data .. datas) total_len += len(data): u32; + +	const remaining_space = len(pr.wbuf) - len(pr.wgood); +	if (remaining_space < total_len) return "wbuff full": error; +  	endian::leputu32(header[0..4], total_len);  	endian::leputu32(header[4..8], ty); -	io::writeall(sock, header)?; +	static append(pr.wgood, header...);  	for (const data .. datas) -		io::writeall(sock, data)?; +		static append(pr.wgood, data...);  }; -export fn send(sock: io::file, packet: packet) (void | io::error) = { +export fn write(pr: *packet_reader, packet: packet) (void | error) = {  	match (packet) {  	case let op: packet_drawop =>  		const ser_op = drawing::ser_op(op);  		defer free(ser_op); -		send_raw(sock, packet_type::DRAW_OP, ser_op)?; +		append_wbuf(pr, packet_type::DRAW_OP, ser_op)?;  	case let packet: packet_sendchunk =>  		const pos_buf: [8]u8 = [0...];  		endian::leputu32(pos_buf[0..4],packet.world_pos.0: u32); @@ -161,16 +199,16 @@ export fn send(sock: io::file, packet: packet) (void | io::error) = {  		const chunk_data_compressed = rle_encode(packet.chunk_data);  		defer free(chunk_data_compressed);  		const compressed_data_bytes = cast_u32s_to_u8s(chunk_data_compressed); -		send_raw(sock, packet_type::SEND_CHUNK, pos_buf, compressed_data_bytes)?; +		append_wbuf(pr, packet_type::SEND_CHUNK, pos_buf, compressed_data_bytes)?;  	case let pos: packet_position =>  		const pos_buf: [8]u8 = [0...];  		endian::leputu32(pos_buf[0..4], pos.0: u32);  		endian::leputu32(pos_buf[4..8], pos.1: u32); -		send_raw(sock, packet_type::POSITION, pos_buf)?; +		append_wbuf(pr, packet_type::POSITION, pos_buf)?;  	case let world_pos: packet_reqchunk =>  		const pos_buf: [8]u8 = [0...];  		endian::leputu32(pos_buf[0..4], world_pos.0: u32);  		endian::leputu32(pos_buf[4..8], world_pos.1: u32); -		send_raw(sock, packet_type::REQ_CHUNK, pos_buf)?; +		append_wbuf(pr, packet_type::REQ_CHUNK, pos_buf)?;  	};  }; diff --git a/server/main.ha b/server/main.ha index 475d19a..f78bf0c 100644 --- a/server/main.ha +++ b/server/main.ha @@ -32,7 +32,6 @@ type server_state = struct {  type connection = struct {  	pr: packet_reader::packet_reader, -	sock: net::socket,  	pos: pos,  	should_delete: bool,  }; @@ -90,13 +89,17 @@ fn loop(state: *server_state, timeout: time::duration) void = {  	let pollfds: []poll::pollfd = [];  	append(pollfds, poll::pollfd {  		fd = state.listener, -		events = poll::event::POLLIN, +		events = poll::event::POLLIN | poll::event::POLLOUT,  		...  	});  	for (const conn &.. state.connections) { +		const events = if (len(conn.pr.wgood) > 0) +			poll::event::POLLIN | poll::event::POLLOUT +		else +			poll::event::POLLIN;  		append(pollfds, poll::pollfd { -			fd = conn.sock, -			events = poll::event::POLLIN, +			fd = conn.pr.sock, +			events = events,  			...  		});  	}; @@ -107,8 +110,7 @@ fn loop(state: *server_state, timeout: time::duration) void = {  	if (0 != pollfds[0].revents & poll::event::POLLIN) {  		const new_conn = tcp::accept(pollfds[0].fd)!;  		append(state.connections, connection { -			sock = new_conn, -			pr = packet_reader::new(), +			pr = packet_reader::new(new_conn),  			pos = (0,0),  			should_delete = false,  		}); @@ -130,8 +132,12 @@ fn loop(state: *server_state, timeout: time::duration) void = {  	for (let pollfd_idx = 1z: size; pollfd_idx < len(pollfds); pollfd_idx += 1) {  		const conn_idx = pollfd_idx - 1;  		if (state.connections[conn_idx].should_delete) continue; -		if (0 != pollfds[conn_idx+1].revents & poll::event::POLLIN) { -			read_from_connection(state, conn_idx); + +		const revents = pollfds[conn_idx+1].revents; +		const should_read = 0 != revents & poll::event::POLLIN; +		const should_write = 0 != revents & poll::event::POLLOUT; +		if (should_read || should_write) { +			read_from_connection(state, conn_idx, should_read, should_write);  		};  	}; @@ -146,6 +152,8 @@ fn perform_deletions(state: *server_state) void = {  		const conn = state.connections[i];  		if (conn.should_delete) {  			fmt::printfln("deleting conn {}",i)!; +			packet_reader::finish(&conn.pr); +			io::close(conn.pr.sock): void;  			delete(state.connections[i]);  		};  	}; @@ -154,14 +162,19 @@ fn perform_deletions(state: *server_state) void = {  fn greet_connection(state: *server_state, conn_idx: size)  (void|io::error) = {  	const conn = state.connections[conn_idx]; -	io::write(conn.sock, [VERSION])?; +	io::write(conn.pr.sock, [VERSION])?;  }; -fn read_from_connection(state: *server_state, conn_idx: size) void = { +fn read_from_connection( +	state: *server_state, +	conn_idx: size, +	should_read: bool, +	should_write: bool, +) void = {  	const conn = &state.connections[conn_idx]; -	match (packet_reader::read(&conn.pr, conn.sock)) { +	match (packet_reader::service(&conn.pr, should_read, should_write)) {  	case let err: io::error =>  		fmt::printfln("#{} error: {}", conn_idx, io::strerror(err))!;  		conn.should_delete = true; @@ -169,7 +182,7 @@ fn read_from_connection(state: *server_state, conn_idx: size) void = {  		fmt::printfln("#{} disconnect", conn_idx)!;  		conn.should_delete = true;  	case void => -		for (true) match (packet_reader::next(&conn.pr)) { +		for (true) match (packet_reader::read_next(&conn.pr)) {  			// xxx foreach loop seems to break match exhaustivity here  			// investigate that at some point  			case done => break; @@ -192,7 +205,7 @@ fn handle_packet(  	state: *server_state,  	conn_idx: size,  	packet: packet_reader::packet -) (void|io::error) = { +) (void|io::error|packet_reader::error) = {  	const conn = &state.connections[conn_idx];  	match (packet) {  	case let op: packet_reader::packet_drawop => @@ -207,11 +220,11 @@ fn handle_packet(  			const other_conn = &state.connections[other_idx];  			if (other_conn.should_delete) continue;  			// fmt::printfln("\t -> #{}",other_idx)!; -			match (packet_reader::send(other_conn.sock, packet)) { +			match (packet_reader::write(&other_conn.pr, packet)) {  			case void => yield; -			case let e: io::error => +			case let e: packet_reader::error =>  				fmt::printfln("couldn't send to #{}: {}", -					other_idx, io::strerror(e))!; +					other_idx, e: str)!;  				other_conn.should_delete = true;  			};  		}; @@ -225,7 +238,7 @@ fn handle_packet(  			conn_idx, world_pos.0, world_pos.1)!;  		const pic = ensure_chunk(state, world_pos); -		packet_reader::send(conn.sock, packet_reader::packet_sendchunk { +		packet_reader::write(&conn.pr, packet_reader::packet_sendchunk {  				world_pos = world_pos,  				chunk_data = pic.d[..pic.w*pic.h],  		})?; | 
