feat: implement websocket realtime data streaming
This commit is contained in:
@@ -33,4 +33,47 @@ document.addEventListener("DOMContentLoaded", () => {
|
||||
updateUptime();
|
||||
// Update every second
|
||||
setInterval(updateUptime, 1000);
|
||||
|
||||
// WebSocket Connection
|
||||
const protocol = window.location.protocol === "https:" ? "wss:" : "ws:";
|
||||
const wsUrl = `${protocol}//${window.location.host}/ws`;
|
||||
|
||||
function connectWs() {
|
||||
const ws = new WebSocket(wsUrl);
|
||||
const statusIndicator = document.querySelector(".status-indicator");
|
||||
|
||||
ws.onopen = () => {
|
||||
console.log("WS Connected");
|
||||
if (statusIndicator) statusIndicator.classList.add("online");
|
||||
};
|
||||
|
||||
ws.onmessage = (event) => {
|
||||
try {
|
||||
const msg = JSON.parse(event.data);
|
||||
if (msg.type === "HEARTBEAT") {
|
||||
console.log("Heartbeat:", msg.data);
|
||||
// Sync uptime?
|
||||
// We can optionally verify if client clock is drifting, but let's keep it simple.
|
||||
} else if (msg.type === "WELCOME") {
|
||||
console.log(msg.message);
|
||||
}
|
||||
} catch (e) {
|
||||
console.error("WS Parse Error", e);
|
||||
}
|
||||
};
|
||||
|
||||
ws.onclose = () => {
|
||||
console.log("WS Disconnected");
|
||||
if (statusIndicator) statusIndicator.classList.remove("online");
|
||||
// Retry in 5s
|
||||
setTimeout(connectWs, 5000);
|
||||
};
|
||||
|
||||
ws.onerror = (err) => {
|
||||
console.error("WS Error", err);
|
||||
ws.close();
|
||||
};
|
||||
}
|
||||
|
||||
connectWs();
|
||||
});
|
||||
|
||||
@@ -4,17 +4,61 @@ import type { Server } from "bun";
|
||||
|
||||
export class WebServer {
|
||||
private static server: Server<unknown> | null = null;
|
||||
private static heartbeatInterval: ReturnType<typeof setInterval> | null = null;
|
||||
|
||||
public static start() {
|
||||
public static start(port?: number) {
|
||||
this.server = Bun.serve({
|
||||
port: env.PORT || 3000,
|
||||
fetch: router,
|
||||
port: port ?? (typeof env.PORT === "string" ? parseInt(env.PORT) : 3000),
|
||||
fetch: (req, server) => {
|
||||
const url = new URL(req.url);
|
||||
if (url.pathname === "/ws") {
|
||||
// Upgrade the request to a WebSocket
|
||||
// We pass dummy data for now
|
||||
if (server.upgrade(req, { data: undefined })) {
|
||||
return undefined;
|
||||
}
|
||||
return new Response("WebSocket upgrade failed", { status: 500 });
|
||||
}
|
||||
return router(req);
|
||||
},
|
||||
websocket: {
|
||||
open(ws) {
|
||||
// console.log("ws: client connected");
|
||||
ws.subscribe("status-updates");
|
||||
ws.send(JSON.stringify({ type: "WELCOME", message: "Connected to Aurora WebSocket" }));
|
||||
},
|
||||
message(ws, message) {
|
||||
// Handle incoming messages if needed
|
||||
},
|
||||
close(ws) {
|
||||
// console.log("ws: client disconnected");
|
||||
ws.unsubscribe("status-updates");
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
console.log(`🌐 Web server listening on http://localhost:${this.server.port}`);
|
||||
|
||||
// Start a heartbeat loop
|
||||
this.heartbeatInterval = setInterval(() => {
|
||||
if (this.server) {
|
||||
const uptime = process.uptime();
|
||||
this.server.publish("status-updates", JSON.stringify({
|
||||
type: "HEARTBEAT",
|
||||
data: {
|
||||
uptime,
|
||||
timestamp: Date.now()
|
||||
}
|
||||
}));
|
||||
}
|
||||
}, 5000);
|
||||
}
|
||||
|
||||
public static stop() {
|
||||
if (this.heartbeatInterval) {
|
||||
clearInterval(this.heartbeatInterval);
|
||||
this.heartbeatInterval = null;
|
||||
}
|
||||
if (this.server) {
|
||||
this.server.stop();
|
||||
console.log("🛑 Web server stopped");
|
||||
|
||||
54
src/web/websocket.test.ts
Normal file
54
src/web/websocket.test.ts
Normal file
@@ -0,0 +1,54 @@
|
||||
import { describe, expect, it, afterAll, beforeAll } from "bun:test";
|
||||
import { WebServer } from "./server";
|
||||
|
||||
describe("WebSocket Server", () => {
|
||||
// Start server on a random port
|
||||
const port = 0;
|
||||
|
||||
beforeAll(() => {
|
||||
WebServer.start(port);
|
||||
});
|
||||
|
||||
afterAll(() => {
|
||||
WebServer.stop();
|
||||
});
|
||||
|
||||
it("should accept websocket connection and send welcome message", async () => {
|
||||
// We need to know the actual port assigned by Bun if we passed 0.
|
||||
// But WebServer stores it in private static server.
|
||||
// We can't access it easily unless we expose it or use a known port.
|
||||
// Let's rely on the fact that if we pass 0, we can't easily know the port without exposing it.
|
||||
// So for this test, let's pick a random high port to avoid conflict: 40000 + Math.floor(Math.random() * 1000)
|
||||
|
||||
// Actually, let's restart with a known port 8081 for testing
|
||||
WebServer.stop();
|
||||
const testPort = 8081;
|
||||
WebServer.start(testPort);
|
||||
|
||||
const ws = new WebSocket(`ws://localhost:${testPort}/ws`);
|
||||
|
||||
const messagePromise = new Promise<any>((resolve) => {
|
||||
ws.onmessage = (event) => {
|
||||
resolve(JSON.parse(event.data as string));
|
||||
};
|
||||
});
|
||||
|
||||
const msg = await messagePromise;
|
||||
expect(msg.type).toBe("WELCOME");
|
||||
expect(msg.message).toContain("Connected");
|
||||
|
||||
ws.close();
|
||||
});
|
||||
|
||||
it("should reject non-ws upgrade requests on /ws endpoint via http", async () => {
|
||||
const testPort = 8081;
|
||||
// Just a normal fetch to /ws should fail with 426 Upgrade Required usually,
|
||||
// but our implementation returns "WebSocket upgrade failed" 500 or undefined -> 101 Switching Protocols if valid.
|
||||
// If we send a normal GET request to /ws without Upgrade headers, server.upgrade(req) returns false.
|
||||
// So it returns status 500 "WebSocket upgrade failed" based on our code.
|
||||
|
||||
const res = await fetch(`http://localhost:${testPort}/ws`);
|
||||
expect(res.status).toBe(500);
|
||||
expect(await res.text()).toBe("WebSocket upgrade failed");
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user