import WaitQueue from "wait-queue"; import { GlobalInfoStore } from "stores"; import { handlers } from "./handlers"; function Worker() { const queue = new WaitQueue(); const doLoop = async () => { const event = await queue.shift(); messageRouter(event); setTimeout(doLoop); }; const messageRouter = (event) => { console.debug("[ws][worker] New message", event.data); try { const data = JSON.parse(event.data); if (!data.event) { return console.error("[ws][worker] Missing `event` field"); } const handlerFn = handlers[data.event]; if (!handlerFn) { return console.warn(`[ws][worker] Can't handle event '${data.event}'`); } handlerFn(data.payload || {}); } catch (err) { console.error("[ws][worker] Could not parse message.", err); } }; return { queue, start: () => { console.debug("[ws][worker] Start processing messages."); doLoop(); }, }; } export const connect = ({ onConnect }) => { return new Promise((resolve, reject) => { const worker = Worker(); GlobalInfoStore.update((state) => { state.connectionState = "connecting"; }); const ws = new WebSocket(process.env.REACT_APP_WS_BASE_URL); let keepAliveInterval; console.log("[ws] Connecting ..."); ws.onopen = () => { GlobalInfoStore.update((state) => { state.connectionState = "connected"; }); console.log("[ws] Connected."); ws.send("CONNECT"); keepAliveInterval = setInterval(() => { ws.send("KEEPALIVE"); console.debug("[ws] Sending keepalive."); }, 30 * 1000); const self = { ws, worker }; if (onConnect) { return onConnect(self).then(() => resolve(self)); } return resolve(self); }; ws.onmessage = worker.queue.push.bind(worker.queue); ws.onclose = (event) => { GlobalInfoStore.update((state) => { state.connectionState = "offline"; }); console.log( "[ws] Socket is closed. Reconnect will be attempted in 1 second.", event.reason ); clearInterval(keepAliveInterval); setTimeout(() => connect({ onConnect }), 1000); }; ws.onerror = (err) => { console.error( "[ws] Socket encountered error: ", err.message, "Closing socket" ); ws.close(); reject(err); }; }); };