aboutsummaryrefslogtreecommitdiff
path: root/packet_reader/packet_reader.ha
blob: ae38be7df74a67a60980e684bd153912cf005c0e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
use io;
use fmt;
use endian;
use drawing;
use drawing::{pos,CHUNKSIZE};

export def VERSION: u8 = 3;

export type error = !str;

export type packet_reader = struct {
	buf: []u8,
	good: []u8,
	wbuf: []u8,
	wgood: []u8,
	sock: io::file,
};

export fn new(sock: io::file) packet_reader = {
	let pr = packet_reader {
		buf = alloc([0...],512*512*4*2), // ehhh
		wbuf = alloc([0...], 512*512*4*2), // ehhhh
		sock = sock,
		...
	};
	pr.good = pr.buf[0..0];
	pr.wgood = pr.wbuf[0..0];
	return pr;
};

export fn finish(pr: *packet_reader) void = {
	free(pr.buf);
	free(pr.wbuf);
};

fn cast_u32s_to_u8s(in: []u32) []u8 =
	(in: *[*]u32: *[*]u8)[..len(in)*4];
fn cast_u8s_to_u32s(in: []u8) []u32 =
	(in: *[*]u8: *[*]u32)[..len(in)/4];

export type packet_type = enum u8 {
	DRAW_OP,
	SEND_CHUNK,
	POSITION,
	REQ_CHUNK
};

export type packet_drawop = drawing::op;
export type packet_sendchunk = struct {
	world_pos: pos,
	chunk_data: []u32,
};
export type packet_position = pos;
export type packet_reqchunk = pos;

export type packet = (
	packet_drawop
 	| packet_sendchunk
	| packet_position
	| packet_reqchunk);

// call when input ready, so it doesn't block
export fn service(
	pr: *packet_reader,
	should_read: bool,
	should_write: bool,
) (void | io::error | io::EOF) = {	
	const sock = pr.sock;

	if (should_read) {
		// reading
		const remaining_amt = len(pr.good);
		const read_pos = if (remaining_amt > 0) {
			// still some unconsumed content in the buffer
			// move unconsumed stuff to start of buffer
			// fmt::printfln("moving {} remaining bytes",remaining_amt)!;
			pr.buf[0..remaining_amt] = pr.good;
			yield remaining_amt;
		} else 0z;
		const nread = match(io::read(sock, pr.buf[read_pos..])?) {
			case io::EOF => return io::EOF;
			case let n: size => yield n;
		};
		// fmt::printfln("read {} bytes",nread)!;
		const total_amt = read_pos + nread;
		pr.good = pr.buf[0..total_amt];
		// fmt::printfln("now {} bytes in buffer",total_amt)!;
	};


	if (should_write) {
		// writing
		const amt_to_write = len(pr.wgood);
		if (amt_to_write == 0) return;
		const nwritten = io::write(sock, pr.wgood)?;
		// fmt::printfln("written {} of {} bytes", nwritten, amt_to_write)!;
		if (nwritten < amt_to_write) {
			// fmt::println("moving buffer back")!;
			const remaining_len = amt_to_write - nwritten;
			pr.wbuf[0..remaining_len] = pr.wgood[nwritten..];
			pr.wgood = pr.wbuf[0..remaining_len];
		} else {
			pr.wgood = pr.wbuf[0..0];
		};
		// fmt::printfln("  {} to write next time", len(pr.wgood))!;
	};
};

// packet format:
// u32 size
// u32 type
// [size-8]u8 data...
// size includes size of header (size and type fields)

export fn read_next(pr: *packet_reader) (packet | done | error) = {
	// either parse a full packet out of the front of good,
	// move good along that many bytes,
	// and return the packet,
	// or, ascertain there is no full packet, and return done
	if (len(pr.good) < size(u32)) return done;
	const packet_len = endian::legetu32(pr.good[0..4]);
	if (packet_len < 8) return "packet size field too small": error;
	if (len(pr.good) < packet_len) return done;

	const packet_bytes = pr.good[..packet_len];
	pr.good = pr.good[packet_len..];

	const ty = endian::legetu32(packet_bytes[4..8]): packet_type;
	const payload = packet_bytes[8..];
	switch (ty) {
	case packet_type::DRAW_OP =>
		match (drawing::deser_op(payload)) {
		case let o: drawing::op => return o;
		case drawing::deser_fail => return "deser fail": error;
		};
	case packet_type::SEND_CHUNK =>
		// return value muste be FREED by CALLER
		// (we don't really need a copy here. todo)
		// ((the copy isn't here, it's wherever uses here))
		const pos_bytes = payload[0..8];
		const compressed_data_bytes = payload[8..];
		const pos = (
			endian::legetu32(pos_bytes[0..4]): i32,
			endian::legetu32(pos_bytes[4..8]): i32
		): pos;
		const chunk_data_compressed = cast_u8s_to_u32s(compressed_data_bytes);
		const chunk_data = rle_decode(chunk_data_compressed);
		if (len(chunk_data) != CHUNKSIZE*CHUNKSIZE)
			return "wrong chunk size??": error;
		return packet_sendchunk { world_pos = pos, chunk_data = chunk_data };
	case packet_type::POSITION =>
		if (len(payload) != 8)
			return "position packet wrong size": error;
		const pos = (
			endian::legetu32(payload[0..4]): i32,
			endian::legetu32(payload[4..8]): i32
		): pos;
		return pos: packet_position;
	case packet_type::REQ_CHUNK =>
		if (len(payload) != 8)
			return "reqchunk packet wrong size": error;
		const world_pos = (
			endian::legetu32(payload[0..4]): i32,
			endian::legetu32(payload[4..8]): i32
		): pos;
		if (world_pos.0 % CHUNKSIZE != 0 || world_pos.1 % CHUNKSIZE != 0) {
			return "invalid world_pos": error;
		};
		return world_pos: packet_reqchunk;
	};
};

fn append_wbuf(pr: *packet_reader, ty: packet_type, datas: []u8...) (void | error) = {
	// ehh
	const header: [8]u8 = [0...];
	let total_len = 8u32;
	for (const data .. datas) total_len += len(data): u32;

	const remaining_space = len(pr.wbuf) - len(pr.wgood);
	if (remaining_space < total_len) return "wbuff full": error;

	endian::leputu32(header[0..4], total_len);
	endian::leputu32(header[4..8], ty);
	static append(pr.wgood, header...);
	for (const data .. datas)
		static append(pr.wgood, data...);
};

export fn write(pr: *packet_reader, packet: packet) (void | error) = {
	match (packet) {
	case let op: packet_drawop =>
		const ser_op = drawing::ser_op(op);
		defer free(ser_op);
		append_wbuf(pr, packet_type::DRAW_OP, ser_op)?;
	case let packet: packet_sendchunk =>
		const pos_buf: [8]u8 = [0...];
		endian::leputu32(pos_buf[0..4],packet.world_pos.0: u32);
		endian::leputu32(pos_buf[4..8],packet.world_pos.1: u32);
		const chunk_data_compressed = rle_encode(packet.chunk_data);
		defer free(chunk_data_compressed);
		const compressed_data_bytes = cast_u32s_to_u8s(chunk_data_compressed);
		append_wbuf(pr, packet_type::SEND_CHUNK, pos_buf, compressed_data_bytes)?;
	case let pos: packet_position =>
		const pos_buf: [8]u8 = [0...];
		endian::leputu32(pos_buf[0..4], pos.0: u32);
		endian::leputu32(pos_buf[4..8], pos.1: u32);
		append_wbuf(pr, packet_type::POSITION, pos_buf)?;
	case let world_pos: packet_reqchunk =>
		const pos_buf: [8]u8 = [0...];
		endian::leputu32(pos_buf[0..4], world_pos.0: u32);
		endian::leputu32(pos_buf[4..8], world_pos.1: u32);
		append_wbuf(pr, packet_type::REQ_CHUNK, pos_buf)?;
	};
};