From 4ddc5c526b3d418da2b14076ce4f88594ca36d1b Mon Sep 17 00:00:00 2001 From: Zoe <62722391+juls0730@users.noreply.github.com> Date: Sun, 14 Sep 2025 20:48:02 -0500 Subject: [PATCH] file uploads and bug fixes --- bun.lock | 9 + package.json | 3 + server/websocketHandler.ts | 48 +++- src/app.html | 21 +- src/components/RTCMessage.svelte | 419 +++++++++++++++++++++++++------ src/lib/webrtc.ts | 20 +- src/routes/+layout.svelte | 6 + src/routes/+page.svelte | 6 + src/routes/[roomId]/+page.svelte | 2 +- src/stores/messageStore.ts | 6 +- src/stores/websocketStore.ts | 2 + src/types/message.ts | 50 +++- src/types/webrtc.ts | 8 +- src/types/websocket.ts | 1 - src/utils/buffer.ts | 197 +++++++++++++++ src/utils/webrtcUtil.ts | 237 +++++++++++++++-- vite.config.ts | 2 +- 17 files changed, 895 insertions(+), 142 deletions(-) create mode 100644 src/utils/buffer.ts diff --git a/bun.lock b/bun.lock index 83b42fd..5907a0b 100644 --- a/bun.lock +++ b/bun.lock @@ -9,8 +9,11 @@ "@noble/ciphers": "^1.3.0", "@noble/curves": "^1.9.0", "@sveltejs/adapter-node": "^5.3.1", + "@types/streamsaver": "^2.0.5", "@types/ws": "^8.18.1", + "i": "^0.3.7", "polka": "^0.5.2", + "streamsaver": "^2.0.6", "ts-mls": "^1.1.0", "ws": "^8.18.3", }, @@ -242,6 +245,8 @@ "@types/serve-static": ["@types/serve-static@1.15.8", "", { "dependencies": { "@types/http-errors": "*", "@types/node": "*", "@types/send": "*" } }, "sha512-roei0UY3LhpOJvjbIP6ZZFngyLKl5dskOtDhxY5THRSpO+ZI+nzJ+m5yUMzGrp89YRa7lvknKkMYjqQFGwA7Sg=="], + "@types/streamsaver": ["@types/streamsaver@2.0.5", "", {}, "sha512-93o0zjV8swEhR2YI57h/2ytbJF8bJh7sI9GNB02TLJHdM4fWDxZuChwfWhyD8vt2ub4kw4rsfZ0C0yAUX+3gcg=="], + "@types/trouter": ["@types/trouter@3.1.4", "", {}, "sha512-4YIL/2AvvZqKBWenjvEpxpblT2KGO6793ipr5QS7/6DpQ3O3SwZGgNGWezxf3pzeYZc24a2pJIrR/+Jxh/wYNQ=="], "@types/ws": ["@types/ws@8.18.1", "", { "dependencies": { "@types/node": "*" } }, "sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg=="], @@ -290,6 +295,8 @@ "hasown": ["hasown@2.0.2", "", { "dependencies": { "function-bind": "^1.1.2" } }, "sha512-0hJU9SCPvmMzIBdZFqNPXWa6dqh7WdH0cII9y+CyS8rG3nL48Bclra9HmKhVVUHyPWNH5Y7xDwAB7bfgSjkUMQ=="], + "i": ["i@0.3.7", "", {}, "sha512-FYz4wlXgkQwIPqhzC5TdNMLSE5+GS1IIDJZY/1ZiEPCT2S3COUVZeT5OW4BmW4r5LHLQuOosSwsvnroG9GR59Q=="], + "is-core-module": ["is-core-module@2.16.1", "", { "dependencies": { "hasown": "^2.0.2" } }, "sha512-UfoeMA6fIJ8wTYFEUjelnaGI67v6+N7qXJEvQuIGa99l4xsCruSYOVSQ0uPANn4dAzm8lkYPaKLrrijLq7x23w=="], "is-module": ["is-module@1.0.0", "", {}, "sha512-51ypPSPCoTEIN9dy5Oy+h4pShgJmPCygKfyRCISBI+JoWT/2oJvK8QPxmwv7b/p239jXrm9M1mlQbyKJ5A152g=="], @@ -368,6 +375,8 @@ "source-map-js": ["source-map-js@1.2.1", "", {}, "sha512-UXWMKhLOwVKb728IUtQPXxfYU+usdybtUrK/8uGE8CQMvrhOpwvzDBwj0QhSL7MQc7vIsISBG8VQ8+IDQxpfQA=="], + "streamsaver": ["streamsaver@2.0.6", "", {}, "sha512-LK4e7TfCV8HzuM0PKXuVUfKyCB1FtT9L0EGxsFk5Up8njj0bXK8pJM9+Wq2Nya7/jslmCQwRK39LFm55h7NBTw=="], + "supports-preserve-symlinks-flag": ["supports-preserve-symlinks-flag@1.0.0", "", {}, "sha512-ot0WnXS9fgdkgIcePe6RHNk1WA8+muPa6cSjeR3V8K27q9BB1rTE3R1p7Hv0z1ZyAc8s6Vvv8DIyWf681MAt0w=="], "svelte": ["svelte@5.38.8", "", { "dependencies": { "@jridgewell/remapping": "^2.3.4", "@jridgewell/sourcemap-codec": "^1.5.0", "@sveltejs/acorn-typescript": "^1.0.5", "@types/estree": "^1.0.5", "acorn": "^8.12.1", "aria-query": "^5.3.1", "axobject-query": "^4.1.0", "clsx": "^2.1.1", "esm-env": "^1.2.1", "esrap": "^2.1.0", "is-reference": "^3.0.3", "locate-character": "^3.0.0", "magic-string": "^0.30.11", "zimmerframe": "^1.1.2" } }, "sha512-UDpTbM/iuZ4MaMnn4ODB3rf5JKDyPOi5oJcopP0j7YHQ9BuJtsAqsR71r2N6AnJf7ygbalTJU5y8eSWGAQZjlQ=="], diff --git a/package.json b/package.json index 1814cf1..c7b98a1 100644 --- a/package.json +++ b/package.json @@ -29,8 +29,11 @@ "@noble/ciphers": "^1.3.0", "@noble/curves": "^1.9.0", "@sveltejs/adapter-node": "^5.3.1", + "@types/streamsaver": "^2.0.5", "@types/ws": "^8.18.1", + "i": "^0.3.7", "polka": "^0.5.2", + "streamsaver": "^2.0.6", "ts-mls": "^1.1.0", "ws": "^8.18.3" }, diff --git a/server/websocketHandler.ts b/server/websocketHandler.ts index 98ce5e2..46b869d 100644 --- a/server/websocketHandler.ts +++ b/server/websocketHandler.ts @@ -75,11 +75,6 @@ async function joinRoom(roomId: string, socket: Socket): Promise { - room = rooms.get(roomId) - if (!room) { - throw new Error("Room not found"); - } - room.notifyAll({ type: WebSocketMessageType.ROOM_LEFT, roomId }); // for some reason, when you filter the array when the length is 1 it stays at 1, but we *know* that if its 1 @@ -108,6 +103,34 @@ async function joinRoom(roomId: string, socket: Socket): Promise { + if (rooms.get(roomId)?.length === 1) { + console.log("Room is empty, deleting"); + deleteRoom(roomId); + } + }, 5000) + return; + } + + room.set(room.filter(client => client !== socket)); + + socket.send({ type: WebSocketMessageType.ROOM_LEFT, roomId }); + + return room; +} + function deleteRoom(roomId: string) { rooms.delete(roomId); } @@ -165,6 +188,21 @@ export function confgiureWebsocketServer(wss: WebSocketServer) { // the client is now in the room and the peer knows about it socket.send({ type: WebSocketMessageType.ROOM_JOINED, roomId: message.roomId, participants: room.length }); + break; + case WebSocketMessageType.LEAVE_ROOM: + if (!message.roomId) { + socket.send({ type: WebSocketMessageType.ERROR, data: 'Invalid message' }); + return; + } + + if (rooms.get(message.roomId) == undefined) { + socket.send({ type: WebSocketMessageType.ERROR, data: 'Invalid roomId' }); + return; + } + + room = await leaveRoom(message.roomId, socket); + if (!room) return; + break; case WebSocketMessageType.WEBRTC_OFFER: case WebSocketMessageType.WERTC_ANSWER: diff --git a/src/app.html b/src/app.html index f273cc5..ed7fefa 100644 --- a/src/app.html +++ b/src/app.html @@ -1,11 +1,14 @@ - - - - %sveltekit.head% - - -
%sveltekit.body%
- - + + + + + %sveltekit.head% + + + +
%sveltekit.body%
+ + + \ No newline at end of file diff --git a/src/components/RTCMessage.svelte b/src/components/RTCMessage.svelte index 5140dda..b80fd0e 100644 --- a/src/components/RTCMessage.svelte +++ b/src/components/RTCMessage.svelte @@ -8,26 +8,55 @@ peer, keyExchangeDone, } from "../utils/webrtcUtil"; - import { messages } from "../stores/messageStore"; + import { + advertisedOffers, + fileRequestIds, + messages, + receivedOffers, + } from "../stores/messageStore"; import { WebRTCPacketType } from "../types/webrtc"; import { ConnectionState, type Room } from "../types/websocket"; import { MessageType } from "../types/message"; import { fade } from "svelte/transition"; + import { WebBuffer } from "../utils/buffer"; let inputMessage: Writable = writable(""); - let inputFile = writable(null); + let inputFile: Writable = writable(null); let inputFileElement: HTMLInputElement | null = $state(null); + let initialConnectionCompleteCount = writable(0); let initialConnectionComplete = derived( - [isRTCConnected, dataChannelReady, keyExchangeDone], - (values: Array) => values.every((value) => value), + initialConnectionCompleteCount, + (value) => value === 3, ); + // TODO: is this the most elegant way to do this? + isRTCConnected.subscribe((value) => { + if (value) { + $initialConnectionCompleteCount++; + } + }); + + dataChannelReady.subscribe((value) => { + if (value) { + $initialConnectionCompleteCount++; + } + }); + + keyExchangeDone.subscribe((value) => { + if (value) { + $initialConnectionCompleteCount++; + } + }); + const { room }: { room: Writable } = $props(); room.subscribe((newRoom) => { console.log("Room changed:", newRoom); if (newRoom.id !== $room?.id) { messages.set([]); + isRTCConnected.set(false); + dataChannelReady.set(false); + keyExchangeDone.set(false); } }); @@ -37,15 +66,69 @@ return; } - if (!$inputFile && !$inputMessage) { + let messageBuf: Uint8Array | undefined = undefined; + + if (!$inputFile && !$inputMessage.trim()) { return; } - // if ($inputFile != null && $inputFile[0] !== undefined) { - // $messages = [...$messages, `You: ${$inputFile[0].name}`]; - // $peer.send($inputFile[0]); - // $inputFile = null; - // } + if ($inputFile != null && $inputFile[0] !== undefined) { + // fileSize + fileNameSize + fileNameLen + id + textLen + header + let messageLen = + 8 + + $inputFile[0].name.length + + 2 + + 8 + + $inputMessage.length + + 1; + let messageBuf = new WebBuffer(new ArrayBuffer(messageLen)); + + let fileId = new WebBuffer( + crypto.getRandomValues(new Uint8Array(8)).buffer, + ).readBigInt64LE(); + $advertisedOffers.set(fileId, $inputFile[0]); + + console.log( + "Advertised file:", + fileId, + $inputFile[0].size, + $inputFile[0].name, + $inputFile[0].name.length, + ); + + messageBuf.writeInt8(MessageType.FILE_OFFER); + messageBuf.writeBigInt64LE(BigInt($inputFile[0].size)); + messageBuf.writeInt16LE($inputFile[0].name.length); + messageBuf.writeString($inputFile[0].name); + messageBuf.writeBigInt64LE(fileId); + messageBuf.writeString($inputMessage); + + console.log( + "Sending file offer", + new Uint8Array(messageBuf.buffer), + ); + + $messages = [ + ...$messages, + { + initiator: true, + type: MessageType.FILE_OFFER, + data: { + fileSize: BigInt($inputFile[0].size), + fileNameSize: $inputFile[0].name.length, + fileName: $inputFile[0].name, + id: fileId, + text: $inputMessage === "" ? null : $inputMessage, + }, + }, + ]; + + $inputFile = null; + $inputMessage = ""; + + $peer.send(messageBuf.buffer, WebRTCPacketType.MESSAGE); + return; + } if ($inputMessage) { $messages = [ @@ -56,17 +139,46 @@ data: $inputMessage, }, ]; - $peer.send( - new TextEncoder().encode( - JSON.stringify({ - type: MessageType.TEXT, - data: $inputMessage, - }), - ).buffer, - WebRTCPacketType.MESSAGE, - ); + + let newMessageBuf = new ArrayBuffer(1 + $inputMessage.length); + messageBuf = new Uint8Array(newMessageBuf); + + messageBuf[0] = MessageType.TEXT; + messageBuf.set(new TextEncoder().encode($inputMessage), 1); + $inputMessage = ""; } + + if (!messageBuf) { + return; + } + + $peer.send(messageBuf.buffer, WebRTCPacketType.MESSAGE); + } + + function downloadFile(id: bigint) { + if (!$peer) { + console.error("Peer not initialized"); + return; + } + + let file = $receivedOffers.get(id); + if (!file) { + console.error("Unknown file id:", id); + return; + } + + let requesterId = new WebBuffer( + crypto.getRandomValues(new Uint8Array(8)).buffer, + ).readBigInt64LE(); + let fileRequestBuf = new WebBuffer(new ArrayBuffer(1 + 8 + 8)); + fileRequestBuf.writeInt8(MessageType.FILE_REQUEST); + fileRequestBuf.writeBigInt64LE(id); + fileRequestBuf.writeBigInt64LE(requesterId); + + $fileRequestIds.set(requesterId, id); + + $peer.send(fileRequestBuf.buffer, WebRTCPacketType.MESSAGE); } let canCloseLoadingOverlay = writable(false); @@ -85,6 +197,29 @@ inputFileElement.click(); } + + function autogrow(node: HTMLElement) { + function resize() { + // 1. Temporarily reset height to calculate the new scrollHeight + node.style.height = "0px"; + // 2. Set the height to the scrollHeight, which represents the full content height + node.style.height = `${node.scrollHeight}px`; + } + + // Call resize initially in case the textarea already has content + resize(); + + // Add an event listener to resize on every input + node.addEventListener("input", resize); + + // Return a destroy method to clean up the event listener when the component is unmounted + return { + update: resize, + destroy() { + node.removeEventListener("input", resize); + }, + }; + }

@@ -101,16 +236,16 @@ class="flex flex-col sm:max-w-4/5 lg:max-w-3/5 min-h-[calc(5/12_*_100vh)]" >

- {#if !$initialConnectionComplete || $room.connectionState === ConnectionState.RECONNECTING || $room.participants !== 2 || !$canCloseLoadingOverlay} + {#if !$initialConnectionComplete || $room.connectionState === ConnectionState.RECONNECTING || $room.participants !== 2 || $dataChannelReady === false || !$canCloseLoadingOverlay}
{#if !$isRTCConnected}

Waiting for peer to connect...

- {:else if !$dataChannelReady} + {:else if !$dataChannelReady && !$initialConnectionComplete}

Establishing data channel...

{:else if !$keyExchangeDone}

Establishing a secure connection with the peer...

@@ -118,7 +253,7 @@

Disconnect from peer, attempting to reconnecting...

- {:else if $room.participants !== 2} + {:else if $room.participants !== 2 || $dataChannelReady === false}

Peer has disconnected, waiting for other peer to reconnect... @@ -130,7 +265,7 @@

{/if}
- {#if !$keyExchangeDone || $room.participants !== 2 || $room.connectionState === ConnectionState.RECONNECTING} + {#if !$keyExchangeDone || $room.participants !== 2 || $dataChannelReady === false || $room.connectionState === ConnectionState.RECONNECTING} -

+

{#if msg.initiator} You: {:else} Peer: {/if}

-

- {#if msg.type === MessageType.TEXT} - {msg.data} - {:else} - Unknown message type: {msg.type} - {/if} -

+ {#if msg.type === MessageType.TEXT} +

{msg.data}

+ {:else if msg.type === MessageType.FILE_OFFER} +
+ {#if msg.data.text !== null} +

+ {msg.data.text} +

+ {/if} +
+

+ {msg.data.fileName} +

+

+ {msg.data.fileSize} bytes +

+ + {#if !msg.initiator} + + {/if} +
+
+ {:else} +

Unknown message type: {msg.type}

+ {/if}
{/each}
@@ -198,52 +372,143 @@ class="absolute opacity-0 -top-[9999px] -left-[9999px]" />
- e.key === "Enter" && sendMessage()} - disabled={!$isRTCConnected || - !$dataChannelReady || - !$keyExchangeDone || - $room.connectionState === ConnectionState.RECONNECTING} - placeholder="Type your message..." - class="flex-grow p-2 rounded bg-gray-700 border border-gray-600 text-gray-100 placeholder-gray-400 - focus:outline-none focus:ring-2 focus:ring-blue-500 disabled:opacity-50 disabled:cursor-not-allowed" - /> - +
+
+
+ {/if} +
- - + +
+ + +
+
+ {/if} diff --git a/src/lib/webrtc.ts b/src/lib/webrtc.ts index 3394d7c..62584b4 100644 --- a/src/lib/webrtc.ts +++ b/src/lib/webrtc.ts @@ -55,6 +55,10 @@ export class WebRTCPeer { iceServers: this.iceServers, }); + this.peer.onicecandidateerror = (event) => { + console.error("ICE candidate error:", event); + } + // 1. Initialize ICE candidates this.peer.onicecandidate = (event) => { if (event.candidate) { @@ -93,14 +97,12 @@ export class WebRTCPeer { channel.binaryType = "arraybuffer"; channel.onopen = async () => { - console.log('data channel open'); - this.callbacks.onDataChannelOpen(); - this.callbacks.onKeyExchangeDone(); - - await this.generateKeyPair(); + this.callbacks.onDataChannelStateChange(true); try { if (this.isInitiator) { + await this.generateKeyPair(); + let groupId = crypto.getRandomValues(new Uint8Array(24)); this.clientState = await createGroup(groupId, this.keyPackage!.publicPackage, this.keyPackage!.privatePackage, [], this.cipherSuite!); @@ -136,6 +138,7 @@ export class WebRTCPeer { console.log("parsed data", data, encrypted, type); if (type === WebRTCPacketType.GROUP_OPEN) { + await this.generateKeyPair(); await this.startKeyExchange(); return; } @@ -179,6 +182,7 @@ export class WebRTCPeer { this.send(encodedWelcomeBuf, WebRTCPacketType.WELCOME); this.encyptionReady = true; + this.callbacks.onKeyExchangeDone(); return; } @@ -202,6 +206,7 @@ export class WebRTCPeer { console.log("Joined group", this.clientState); this.encyptionReady = true; + this.callbacks.onKeyExchangeDone(); return; } @@ -231,11 +236,12 @@ export class WebRTCPeer { data: data.buffer, }; - this.callbacks.onMessage(message); + this.callbacks.onMessage(message, this); }; channel.onclose = () => { - console.log('data channel closed'); + this.callbacks.onDataChannelStateChange(false); + }; channel.onerror = (error) => { diff --git a/src/routes/+layout.svelte b/src/routes/+layout.svelte index 07409f8..381871f 100644 --- a/src/routes/+layout.svelte +++ b/src/routes/+layout.svelte @@ -7,6 +7,12 @@ + + {@render children?.()} diff --git a/src/routes/+page.svelte b/src/routes/+page.svelte index 8b02617..2813a68 100644 --- a/src/routes/+page.svelte +++ b/src/routes/+page.svelte @@ -38,6 +38,12 @@ type: WebSocketMessageType.LEAVE_ROOM, roomId: $room.id, }); + $peer?.close(); + peer.set(null); + room.update((room) => ({ + ...room, + connectionState: ConnectionState.DISCONNECTED, + })); } $ws.send({ type: WebSocketMessageType.CREATE_ROOM }); // send a message when the button is clicked }}>Create Room {#if $error} -

Whoops! That room doesn't exist.

+

Hm. Something went wrong: {$error.toLocaleLowerCase()}

{:else if $room.connectionState !== ConnectionState.CONNECTED && $room.connectionState !== ConnectionState.RECONNECTING}

Connecting to server...

{:else} diff --git a/src/stores/messageStore.ts b/src/stores/messageStore.ts index acdea2e..dc8acf2 100644 --- a/src/stores/messageStore.ts +++ b/src/stores/messageStore.ts @@ -1,4 +1,8 @@ import { writable, type Writable } from "svelte/store"; import type { Message } from "../types/message"; -export let messages: Writable = writable([]); \ No newline at end of file +export let messages: Writable = writable([]); +export let advertisedOffers = writable(new Map()); +export let receivedOffers = writable(new Map()); +// maps request id to received file id +export let fileRequestIds: Writable> = writable(new Map()); diff --git a/src/stores/websocketStore.ts b/src/stores/websocketStore.ts index 51356be..c95f48b 100644 --- a/src/stores/websocketStore.ts +++ b/src/stores/websocketStore.ts @@ -8,6 +8,8 @@ export const webSocketConnected = writable(false); function createSocket(): Socket { if (!browser) { + // this only occurs on the server, which we dont care about because its not a client that can actually connect to the websocket server + // @ts-ignore return null; } diff --git a/src/types/message.ts b/src/types/message.ts index a0961b2..b460769 100644 --- a/src/types/message.ts +++ b/src/types/message.ts @@ -1,13 +1,15 @@ export enum MessageType { // chat packets - TEXT = 0, + TEXT, // user offers to send a file - FILE_OFFER = 1, + FILE_OFFER, // user downloads a file offered by the peer - FILE_REQUEST = 2, + FILE_REQUEST, // file packets - FILE = 3, + FILE, + FILE_ACK, + FILE_DONE, ERROR = 255 } @@ -16,6 +18,7 @@ export type Message = | TextMessage | FileOfferMessage | FileRequestMessage + | FileAckMessage | FileMessage | ErrorMessage; @@ -32,28 +35,51 @@ export interface TextMessage extends BaseMessage { export interface FileOfferMessage extends BaseMessage { type: MessageType.FILE_OFFER; data: { + // 64 bit file size. chunked at 1024 bytes + fileSize: bigint; + + // 16 bit file name size + fileNameSize: number; fileName: string; - fileSize: number; - // randomly generated to identify the file so that multiple files with the same name can be uploaded - id: string; + // 64bit randomly generated id to identify the file so that multiple files with the same name can be uploaded + id: bigint; + text: string | null; }; } export interface FileRequestMessage extends BaseMessage { type: MessageType.FILE_REQUEST; data: { - id: string; + // 64 bit file id + id: bigint; + // 64 bit requester id + requesterId: bigint; }; } +export interface FileAckMessage extends BaseMessage { + type: MessageType.FILE_ACK; + // the request id + id: bigint; +} + // ----- file packets ----- export interface FileMessage extends BaseMessage { type: MessageType.FILE; data: { - id: string; - fileName: string; - fileSize: number; - data: ArrayBuffer; + // the request id + id: bigint; + // no file metadata is sent here, because we already know all of it from the request id + // comes down in 16MB chunks + data: Blob; + }; +} + +export interface FileDoneMessage extends BaseMessage { + type: MessageType.FILE_DONE; + data: { + // the request id + id: bigint; }; } diff --git a/src/types/webrtc.ts b/src/types/webrtc.ts index 98497f1..08fd82f 100644 --- a/src/types/webrtc.ts +++ b/src/types/webrtc.ts @@ -1,7 +1,9 @@ +import type { WebRTCPeer } from "$lib/webrtc"; + export interface WebRTCPeerCallbacks { onConnected: () => void; - onMessage: (message: { type: WebRTCPacketType, data: ArrayBuffer }) => void; - onDataChannelOpen: () => void; + onMessage: (message: { type: WebRTCPacketType, data: ArrayBuffer }, webRtcPeer: WebRTCPeer) => void; + onDataChannelStateChange: (state: boolean) => void; onKeyExchangeDone: () => void; onNegotiationNeeded: () => void; onError: (error: any) => void; @@ -17,6 +19,8 @@ export enum WebRTCPacketType { MESSAGE = 0, } +export const CHUNK_SIZE = 16 * 1024 * 1024; + export interface WebRTCPacket { encrypted: boolean; // 1 bit type: WebRTCPacketType; // 7 bits diff --git a/src/types/websocket.ts b/src/types/websocket.ts index 3350139..9090bb4 100644 --- a/src/types/websocket.ts +++ b/src/types/websocket.ts @@ -1,4 +1,3 @@ - export enum ConnectionState { CONNECTING, RECONNECTING, diff --git a/src/utils/buffer.ts b/src/utils/buffer.ts new file mode 100644 index 0000000..9a45898 --- /dev/null +++ b/src/utils/buffer.ts @@ -0,0 +1,197 @@ +// nodejs like buffer class for browser +export class WebBuffer { + private data: Uint8Array; + // the number of bytes read from the buffer, this allows for you to read the buffer without having to specify the offset every time + private count = 0; + private dataView: DataView; + + constructor(data: ArrayBuffer) { + this.data = new Uint8Array(data); + this.dataView = new DataView(data); + + return new Proxy(this, { + get(target, prop, receiver) { + // Check if the property is a string that represents a valid number (array index) + if (typeof prop === 'string' && /^\d+$/.test(prop)) { + const index = parseInt(prop, 10); + // Delegate array-like access to the underlying Uint8Array + return target.data[index]; + } + // For all other properties (methods like slice, getters like length, etc.), + // use the default property access behavior on the target object. + return Reflect.get(target, prop, receiver); + }, + set(target, prop, value, receiver) { + // Check if the property is a string that represents a valid number (array index) + if (typeof prop === 'string' && /^\d+$/.test(prop)) { + const index = parseInt(prop, 10); + // Delegate array-like assignment to the underlying Uint8Array + target.data[index] = value; + return true; // Indicate success + } + // For all other properties, use the default property assignment behavior. + return Reflect.set(target, prop, value, receiver); + } + }); + } + + [index: number]: number; + + get length(): number { + return this.data.length; + } + + get buffer(): ArrayBuffer { + return this.data.buffer; + } + + slice(start: number, end?: number): WebBuffer { + return new WebBuffer(this.data.slice(start, end).buffer); + } + + set(data: number, offset: number) { + this.dataView.setUint8(offset, data); + // this.data.set(data, offset); + } + + read(length?: number, offset?: number): Uint8Array { + if (length === undefined) { + length = this.length - this.count; + } + + if (offset === undefined) { + offset = this.count; + this.count += length; + } + + return this.data.slice(offset, offset + length); + } + + write(data: Uint8Array, offset?: number) { + if (offset === undefined) { + offset = this.count; + this.count += data.byteLength; + } + + for (let i = 0; i < data.byteLength; i++) { + this.dataView.setUint8(offset + i, data[i]); + } + } + + readInt8(offset?: number): number { + if (offset === undefined) { + offset = this.count; + this.count += 1; + } + + return this.dataView.getUint8(offset); + } + + writeInt8(value: number, offset?: number) { + if (offset === undefined) { + offset = this.count; + this.count += 1; + } + + this.dataView.setUint8(offset, value); + } + + readInt16LE(offset?: number): number { + if (offset === undefined) { + offset = this.count; + this.count += 2; + } + + return this.dataView.getInt16(offset, true); + } + + writeInt16LE(value: number, offset?: number) { + if (offset === undefined) { + offset = this.count; + this.count += 2; + } + + this.dataView.setInt16(offset, value, true); + } + + readInt32LE(offset?: number): number { + if (offset === undefined) { + offset = this.count; + this.count += 4; + } + + return this.dataView.getInt32(offset, true); + } + + writeInt32LE(value: number, offset?: number) { + if (offset === undefined) { + offset = this.count; + this.count += 4; + } + + this.dataView.setInt32(offset, value, true); + } + + readBigInt64LE(offset?: number): bigint { + if (offset === undefined) { + offset = this.count; + this.count += 8; + } + + return this.dataView.getBigInt64(offset, true); + } + + writeBigInt64LE(value: bigint, offset?: number) { + if (offset === undefined) { + offset = this.count; + this.count += 8; + } + + this.dataView.setBigInt64(offset, value, true); + } + + // if no length is specified, reads until the end of the buffer + readString(length?: number, offset?: number): string { + if (length === undefined) { + length = this.length - this.count; + } + + if (offset === undefined) { + offset = this.count; + this.count += length; + } + + let textDeccoder = new TextDecoder(); + let readTextBuf = this.data.slice(offset, offset + length); + let value = textDeccoder.decode(readTextBuf); + + return value; + } + + writeString(value: string, offset?: number) { + if (offset === undefined) { + offset = this.count; + this.count += value.length; + } + + let textEncoder = new TextEncoder(); + let textBuf = textEncoder.encode(value); + + this.data.set(textBuf, offset); + } + + // lets you peek at the next byte without advancing the read pointer + peek(): number { + return this.data[this.count]; + } + + [Symbol.iterator]() { + // Return an iterator over the values of the underlying Uint8Array + return this.data.values(); + } + + // Optional: Add Symbol.toStringTag for better console output + get [Symbol.toStringTag]() { + return 'WebBuffer'; + } +} \ No newline at end of file diff --git a/src/utils/webrtcUtil.ts b/src/utils/webrtcUtil.ts index 24ade48..9f2ce67 100644 --- a/src/utils/webrtcUtil.ts +++ b/src/utils/webrtcUtil.ts @@ -1,11 +1,12 @@ import { writable, get, type Writable } from "svelte/store"; import { WebRTCPeer } from "$lib/webrtc"; -import { WebRTCPacketType } from "../types/webrtc"; +import { CHUNK_SIZE, WebRTCPacketType } from "../types/webrtc"; import { room } from "../stores/roomStore"; import { ConnectionState, type Room } from "../types/websocket"; -import { messages } from "../stores/messageStore"; +import { advertisedOffers, fileRequestIds, messages, receivedOffers } from "../stores/messageStore"; import { MessageType, type Message } from "../types/message"; import { WebSocketMessageType, type WebSocketMessage } from "../types/websocket"; +import { WebBuffer } from "./buffer"; export const error: Writable = writable(null); export let peer: Writable = writable(null); @@ -13,41 +14,225 @@ export let isRTCConnected: Writable = writable(false); export let dataChannelReady: Writable = writable(false); export let keyExchangeDone: Writable = writable(false); +let downloadStream: WritableStream | undefined; +let downloadWriter: WritableStreamDefaultWriter> | undefined; + +let fileAck: Map> = new Map(); + +function beforeUnload(event: BeforeUnloadEvent) { + event.preventDefault(); + event.returnValue = true; +} + +function onPageHide(event: PageTransitionEvent) { + if (event.persisted) { + // page is frozen, but not closed + return; + } + if (downloadWriter && !downloadWriter.closed) { + downloadWriter.abort(); + } + if (downloadStream) { + downloadStream.getWriter().abort(); + } + downloadStream = undefined; + downloadWriter = undefined; + +} + const callbacks = { onConnected: () => { console.log("Connected to peer"); isRTCConnected.set(true); }, //! TODO: come up with a more complex room system. This is largely for testing purposes - onMessage: (message: { type: WebRTCPacketType, data: ArrayBuffer }) => { + onMessage: async (message: { type: WebRTCPacketType, data: ArrayBuffer }, webRtcPeer: WebRTCPeer) => { console.log("WebRTC Received message:", message); - // if (typeof message === 'object' && message instanceof Blob) { - // // download the file - // const url = URL.createObjectURL(message); - // const a = document.createElement('a'); - // a.href = url; - // a.download = message.name; - // document.body.appendChild(a); - // a.click(); - // setTimeout(() => { - // document.body.removeChild(a); - // window.URL.revokeObjectURL(url); - // }, 100); - // } + if (message.type !== WebRTCPacketType.MESSAGE) { + return; + } - console.log("Received message:", message); + console.log("Received message:", message.type, new Uint8Array(message.data)); - // TODO: fixup - if (message.type === WebRTCPacketType.MESSAGE) { - let textDecoder = new TextDecoder(); - let json: Message = JSON.parse(textDecoder.decode(message.data)); - json.initiator = false; - messages.set([...get(messages), json]); + let messageBuf = new WebBuffer(message.data); + console.log("manually extracted type:", messageBuf[0]); + + let messageType = messageBuf[0] as MessageType; + let messageData = messageBuf.slice(1); + let textDecoder = new TextDecoder(); + + console.log("Received message:", messageType, messageData); + + switch (messageType) { + case MessageType.TEXT: + messages.set([...get(messages), { + initiator: false, + type: messageType, + data: textDecoder.decode(messageData.buffer), + }]); + break; + case MessageType.FILE_OFFER: + let fileSize = messageData.readBigInt64LE(); + let fileNameSize = messageData.readInt16LE(); + let fileName = messageData.readString(fileNameSize); + let id = messageData.readBigInt64LE(); + + get(receivedOffers).set(id, { name: fileName, size: fileSize }); + + messages.set([...get(messages), { + initiator: false, + type: messageType, + data: { + fileSize, + fileNameSize, + fileName, + id, + text: messageData.peek() ? messageData.readString() : null, + } + }]); + break; + case MessageType.FILE_REQUEST: + // the id that coresponds to our file offer + let offerId = messageData.readBigInt64LE(); + if (!get(advertisedOffers).has(offerId)) { + console.error("Unknown file offer id:", offerId); + return; + } + + let targetFile = get(advertisedOffers).get(offerId)!; + let fileStream = targetFile.stream(); + let fileReader = fileStream.getReader(); + + let idleTimeout = setTimeout(() => { + console.error("Timed out waiting for file ack"); + fileReader.cancel(); + }, 30000); + + // the id we send the file data with + let fileRequestId = messageData.readBigInt64LE(); + let fileChunk = await fileReader.read(); + + // reactive variable to track if the peer received the chunk + fileAck.set(fileRequestId, writable(false)); + + function sendChunk() { + if (!fileChunk.value) { + clearTimeout(idleTimeout); + fileReader.cancel(); + console.error("Chunk not set"); + return; + } + + // header + id + data + let fileBuf = new WebBuffer(new Uint8Array(1 + 8 + fileChunk.value.byteLength).buffer); + + fileBuf.writeInt8(MessageType.FILE); + fileBuf.writeBigInt64LE(fileRequestId); + fileBuf.write(fileChunk.value); + webRtcPeer.send(fileBuf.buffer, WebRTCPacketType.MESSAGE); + } + + sendChunk(); + + let unsubscribe = fileAck.get(fileRequestId)!.subscribe(async (value) => { + if (!value) { + return; + } + + fileChunk = await fileReader.read(); + + if (fileChunk.done) { + // send the done message + let fileDoneBuf = new WebBuffer(new ArrayBuffer(1 + 8)); + fileDoneBuf.writeInt8(MessageType.FILE_DONE); + fileDoneBuf.writeBigInt64LE(fileRequestId); + webRtcPeer.send(fileDoneBuf.buffer, WebRTCPacketType.MESSAGE); + + // cleanup + fileReader.cancel(); + fileAck.delete(fileRequestId); + clearTimeout(idleTimeout); + unsubscribe(); + return; + } + + sendChunk(); + fileAck.get(fileRequestId)!.set(false); + clearTimeout(idleTimeout); + idleTimeout = setTimeout(() => { + console.error("Timed out waiting for file ack"); + fileReader.cancel(); + }, 30000); + }); + + + console.log("Received file request"); + break; + case MessageType.FILE: + let requestId = messageData.readBigInt64LE(); + let receivedOffserId = get(fileRequestIds).get(requestId); + if (!receivedOffserId) { + console.error("Received file message for unknown file id:", requestId); + return; + } + + let file = get(receivedOffers).get(receivedOffserId); + if (!file) { + console.error("Unknown file id:", requestId); + return; + } + + if (downloadStream === undefined) { + window.addEventListener("pagehide", onPageHide); + window.addEventListener("beforeunload", beforeUnload); + downloadStream = window.streamSaver.createWriteStream(file.name, { size: Number(file.size) }); + downloadWriter = downloadStream.getWriter(); + } + + await downloadWriter!.write(new Uint8Array(messageData.read())); + + let fileAckBuf = new WebBuffer(new ArrayBuffer(1 + 8)); + fileAckBuf.writeInt8(MessageType.FILE_ACK); + fileAckBuf.writeBigInt64LE(requestId); + webRtcPeer.send(fileAckBuf.buffer, WebRTCPacketType.MESSAGE); + + break; + case MessageType.FILE_DONE: + console.log("Received file done"); + let fileDoneId = messageData.readBigInt64LE(); + if (!get(fileRequestIds).has(fileDoneId)) { + console.error("Unknown file done id:", fileDoneId); + return; + } + + window.removeEventListener("pagehide", onPageHide); + window.removeEventListener("beforeunload", beforeUnload); + + if (downloadWriter) { + downloadWriter.close(); + downloadWriter = undefined; + downloadStream = undefined; + } + + break; + case MessageType.FILE_ACK: + console.log("Received file ack"); + let fileAckId = messageData.readBigInt64LE(); + if (!fileAck.has(fileAckId)) { + console.error("Unknown file ack id:", fileAckId); + return; + } + + fileAck.get(fileAckId)!.set(true); + break; + default: + console.warn("Unhandled message type:", messageType); + break; } }, - onDataChannelOpen: () => { - console.log("Data channel open"); - dataChannelReady.set(true); + onDataChannelStateChange: (state: boolean) => { + console.log(`Data channel ${state ? "open" : "closed"}`); + dataChannelReady.set(state); }, onKeyExchangeDone: async () => { console.log("Key exchange done"); diff --git a/vite.config.ts b/vite.config.ts index 00523a2..c9b468e 100644 --- a/vite.config.ts +++ b/vite.config.ts @@ -6,7 +6,7 @@ import { webSocketServer } from './src/websocket.ts'; export default defineConfig({ plugins: [tailwindcss(), sveltekit(), webSocketServer], server: { - allowedHosts: ['.trycloudflare.com'], + allowedHosts: true, }, ssr: { // ts-mls is problematic, make vite bundle it