aboutsummaryrefslogtreecommitdiff
path: root/packet_reader/packet_reader.ha
diff options
context:
space:
mode:
Diffstat (limited to 'packet_reader/packet_reader.ha')
-rw-r--r--packet_reader/packet_reader.ha94
1 files changed, 66 insertions, 28 deletions
diff --git a/packet_reader/packet_reader.ha b/packet_reader/packet_reader.ha
index d541a11..ae38be7 100644
--- a/packet_reader/packet_reader.ha
+++ b/packet_reader/packet_reader.ha
@@ -4,7 +4,7 @@ use endian;
use drawing;
use drawing::{pos,CHUNKSIZE};
-export def VERSION: u8 = 2;
+export def VERSION: u8 = 3;
export type error = !str;
@@ -13,12 +13,14 @@ export type packet_reader = struct {
good: []u8,
wbuf: []u8,
wgood: []u8,
+ sock: io::file,
};
-export fn new() packet_reader = {
+export fn new(sock: io::file) packet_reader = {
let pr = packet_reader {
buf = alloc([0...],512*512*4*2), // ehhh
wbuf = alloc([0...], 512*512*4*2), // ehhhh
+ sock = sock,
...
};
pr.good = pr.buf[0..0];
@@ -26,6 +28,11 @@ export fn new() packet_reader = {
return pr;
};
+export fn finish(pr: *packet_reader) void = {
+ free(pr.buf);
+ free(pr.wbuf);
+};
+
fn cast_u32s_to_u8s(in: []u32) []u8 =
(in: *[*]u32: *[*]u8)[..len(in)*4];
fn cast_u8s_to_u32s(in: []u8) []u32 =
@@ -52,24 +59,51 @@ export type packet = (
| packet_position
| packet_reqchunk);
-// call when input is ready. could block otherwise
-export fn read(pr: *packet_reader, sock: io::handle) (void | io::error | io::EOF) = {
- const remaining_amt = len(pr.good);
- const read_pos = if (remaining_amt > 0) {
- // still some unconsumed content in the buffer
- // move unconsumed stuff to start of buffer
- // fmt::printfln("moving {} remaining bytes",remaining_amt)!;
- pr.buf[0..remaining_amt] = pr.good;
- yield remaining_amt;
- } else 0z;
- const nread = match(io::read(sock, pr.buf[read_pos..])?) {
- case io::EOF => return io::EOF;
- case let n: size => yield n;
+// call when input ready, so it doesn't block
+export fn service(
+ pr: *packet_reader,
+ should_read: bool,
+ should_write: bool,
+) (void | io::error | io::EOF) = {
+ const sock = pr.sock;
+
+ if (should_read) {
+ // reading
+ const remaining_amt = len(pr.good);
+ const read_pos = if (remaining_amt > 0) {
+ // still some unconsumed content in the buffer
+ // move unconsumed stuff to start of buffer
+ // fmt::printfln("moving {} remaining bytes",remaining_amt)!;
+ pr.buf[0..remaining_amt] = pr.good;
+ yield remaining_amt;
+ } else 0z;
+ const nread = match(io::read(sock, pr.buf[read_pos..])?) {
+ case io::EOF => return io::EOF;
+ case let n: size => yield n;
+ };
+ // fmt::printfln("read {} bytes",nread)!;
+ const total_amt = read_pos + nread;
+ pr.good = pr.buf[0..total_amt];
+ // fmt::printfln("now {} bytes in buffer",total_amt)!;
+ };
+
+
+ if (should_write) {
+ // writing
+ const amt_to_write = len(pr.wgood);
+ if (amt_to_write == 0) return;
+ const nwritten = io::write(sock, pr.wgood)?;
+ // fmt::printfln("written {} of {} bytes", nwritten, amt_to_write)!;
+ if (nwritten < amt_to_write) {
+ // fmt::println("moving buffer back")!;
+ const remaining_len = amt_to_write - nwritten;
+ pr.wbuf[0..remaining_len] = pr.wgood[nwritten..];
+ pr.wgood = pr.wbuf[0..remaining_len];
+ } else {
+ pr.wgood = pr.wbuf[0..0];
+ };
+ // fmt::printfln(" {} to write next time", len(pr.wgood))!;
};
- // fmt::printfln("read {} bytes",nread)!;
- const total_amt = read_pos + nread;
- pr.good = pr.buf[0..total_amt];
- // fmt::printfln("now {} bytes in buffer",total_amt)!;
};
// packet format:
@@ -78,7 +112,7 @@ export fn read(pr: *packet_reader, sock: io::handle) (void | io::error | io::EOF
// [size-8]u8 data...
// size includes size of header (size and type fields)
-export fn next(pr: *packet_reader) (packet | done | error) = {
+export fn read_next(pr: *packet_reader) (packet | done | error) = {
// either parse a full packet out of the front of good,
// move good along that many bytes,
// and return the packet,
@@ -136,24 +170,28 @@ export fn next(pr: *packet_reader) (packet | done | error) = {
};
};
-export fn send_raw(sock: io::file, ty: packet_type, datas: []u8...) (void | io::error) = {
+fn append_wbuf(pr: *packet_reader, ty: packet_type, datas: []u8...) (void | error) = {
// ehh
const header: [8]u8 = [0...];
let total_len = 8u32;
for (const data .. datas) total_len += len(data): u32;
+
+ const remaining_space = len(pr.wbuf) - len(pr.wgood);
+ if (remaining_space < total_len) return "wbuff full": error;
+
endian::leputu32(header[0..4], total_len);
endian::leputu32(header[4..8], ty);
- io::writeall(sock, header)?;
+ static append(pr.wgood, header...);
for (const data .. datas)
- io::writeall(sock, data)?;
+ static append(pr.wgood, data...);
};
-export fn send(sock: io::file, packet: packet) (void | io::error) = {
+export fn write(pr: *packet_reader, packet: packet) (void | error) = {
match (packet) {
case let op: packet_drawop =>
const ser_op = drawing::ser_op(op);
defer free(ser_op);
- send_raw(sock, packet_type::DRAW_OP, ser_op)?;
+ append_wbuf(pr, packet_type::DRAW_OP, ser_op)?;
case let packet: packet_sendchunk =>
const pos_buf: [8]u8 = [0...];
endian::leputu32(pos_buf[0..4],packet.world_pos.0: u32);
@@ -161,16 +199,16 @@ export fn send(sock: io::file, packet: packet) (void | io::error) = {
const chunk_data_compressed = rle_encode(packet.chunk_data);
defer free(chunk_data_compressed);
const compressed_data_bytes = cast_u32s_to_u8s(chunk_data_compressed);
- send_raw(sock, packet_type::SEND_CHUNK, pos_buf, compressed_data_bytes)?;
+ append_wbuf(pr, packet_type::SEND_CHUNK, pos_buf, compressed_data_bytes)?;
case let pos: packet_position =>
const pos_buf: [8]u8 = [0...];
endian::leputu32(pos_buf[0..4], pos.0: u32);
endian::leputu32(pos_buf[4..8], pos.1: u32);
- send_raw(sock, packet_type::POSITION, pos_buf)?;
+ append_wbuf(pr, packet_type::POSITION, pos_buf)?;
case let world_pos: packet_reqchunk =>
const pos_buf: [8]u8 = [0...];
endian::leputu32(pos_buf[0..4], world_pos.0: u32);
endian::leputu32(pos_buf[4..8], world_pos.1: u32);
- send_raw(sock, packet_type::REQ_CHUNK, pos_buf)?;
+ append_wbuf(pr, packet_type::REQ_CHUNK, pos_buf)?;
};
};