feat: connect to the broadcast port instead of default + improve performance

feat: the daemon now will not spawn tens of processes to handle requests, but will handle everything within one process (the implementation can still improve, but this is a big step)
This commit is contained in:
Pato05
2025-10-14 01:52:23 +02:00
committed by Matt Hensman
parent 381a9afada
commit 3e7d455f2b
12 changed files with 1510 additions and 2066 deletions

View File

@@ -4,7 +4,6 @@
"requires": true,
"packages": {
"": {
"name": "bridge",
"dependencies": {
"bplist-creator": "^0.1.0",
"bplist-parser": "^0.3.1",

View File

@@ -1,6 +1,6 @@
import type { Channel } from "castv2";
import messaging from "../../messaging";
import type { Messenger } from "../../messaging";
import type { ReceiverDevice } from "../../messagingTypes";
import type { ReceiverMessage } from "./types";
@@ -70,7 +70,7 @@ export default class Session extends CastClient {
this.establishAppConnection(this.transportId);
this.onSessionCreated?.(this.sessionId);
messaging.sendMessage({
this.messaging.sendMessage({
subject: "main:castSessionCreated",
data: {
sessionId: this.sessionId,
@@ -100,7 +100,7 @@ export default class Session extends CastClient {
break;
}
messaging.sendMessage({
this.messaging.sendMessage({
subject: "main:castSessionUpdated",
data: {
sessionId: this.sessionId,
@@ -137,7 +137,7 @@ export default class Session extends CastClient {
messageData = JSON.stringify(messageData);
messaging.sendMessage({
this.messaging.sendMessage({
subject: "cast:sessionMessageReceived",
data: {
sessionId: this.sessionId,
@@ -156,12 +156,14 @@ export default class Session extends CastClient {
constructor(
private appId: string,
private receiverDevice: ReceiverDevice,
private messaging: Messenger,
private onSessionCreated?: OnSessionCreatedCallback
) {
super();
super
.connect(receiverDevice.host, {
port: receiverDevice.port,
onHeartbeat: () => {
// Include transport heartbeat with platform heartbeat
if (this.transportHeartbeat) {

View File

@@ -77,6 +77,9 @@ export default class CastClient {
this.client.connect(
{
host,
// !! This was the reason it wasn't working for me (aka it was always tried to connect
// to the default port, disregarding what is broadcast in mDNS)
// - @pato05
port: options?.port ?? DEFAULT_PORT
},
// On connection callback

View File

@@ -1,19 +1,24 @@
import messaging, { Message } from "../../messaging";
import type { Messenger, Message } from "../../messaging";
import Session from "./Session";
import CastClient from "./client";
const sessions = new Map<string, Session>();
export function handleCastMessage(message: Message) {
export function handleCastMessage(messaging: Messenger, message: Message) {
switch (message.subject) {
case "bridge:createCastSession": {
const { appId, receiverDevice } = message.data;
// Connect and store with returned ID
const session = new Session(appId, receiverDevice, sessionId => {
const session = new Session(
appId,
receiverDevice,
messaging,
sessionId => {
sessions.set(sessionId, session);
});
}
);
break;
}
@@ -110,7 +115,9 @@ export function handleCastMessage(message: Message) {
const { receiverDevice } = message.data;
const client = new CastClient();
client.connect(receiverDevice.host).then(() => {
client
.connect(receiverDevice.host, { port: receiverDevice.port })
.then(() => {
(client.sendReceiverMessage as any)({ type: "STOP" });
});

View File

@@ -15,6 +15,7 @@ interface CastRemoteOptions {
onApplicationClose?: () => void;
onReceiverStatusUpdate?: (status: ReceiverStatus) => void;
onMediaStatusUpdate?: (status?: MediaStatus) => void;
port?: number;
}
/**
@@ -27,6 +28,7 @@ export default class Remote extends CastClient {
super();
super
.connect(host, {
port: options?.port,
onReceiverMessage: message => {
this.onReceiverMessage(message);
}
@@ -76,7 +78,9 @@ export default class Remote extends CastClient {
message => this.onMediaMessage(message)
);
this.transportClient.connect(this.host).then(() => {
this.transportClient
.connect(this.host, { port: this.options?.port })
.then(() => {
this.transportClient?.sendMediaMessage({
type: "GET_STATUS",
requestId: 0

View File

@@ -6,12 +6,16 @@ import stream from "stream";
import mime from "mime-types";
import messaging from "../messaging";
import type { Messenger } from "../messaging";
import { convertSrtToVtt } from "../lib/subtitles";
export let mediaServer: http.Server | undefined;
export async function startMediaServer(filePath: string, port: number) {
export async function startMediaServer(
messaging: Messenger,
filePath: string,
port: number
) {
if (mediaServer?.listening) {
await stopMediaServer();
}

View File

@@ -1,4 +1,4 @@
import messaging, { Message } from "./messaging";
import type { Messenger, Message } from "./messaging";
import { handleCastMessage } from "./components/cast";
import Discovery from "./components/cast/discovery";
@@ -29,7 +29,8 @@ const remotes = new Map<string, Remote>();
* Initializes the counterpart objects and is responsible
* for managing existing ones.
*/
messaging.on("message", (message: Message) => {
export function run(messaging: Messenger) {
messaging.on("message", (message: Message) => {
switch (message.subject) {
case "bridge:getInfo":
case "bridge:/getInfo": {
@@ -54,6 +55,7 @@ messaging.on("message", (message: Message) => {
remotes.set(
device.id,
new Remote(device.host, {
port: device.port,
// RECEIVER_STATUS
onReceiverStatusUpdate(status) {
messaging.sendMessage({
@@ -116,7 +118,7 @@ messaging.on("message", (message: Message) => {
// Media server
case "bridge:startMediaServer": {
const { filePath, port } = message.data;
startMediaServer(filePath, port);
startMediaServer(messaging, filePath, port);
break;
}
case "bridge:stopMediaServer": {
@@ -125,7 +127,8 @@ messaging.on("message", (message: Message) => {
}
default: {
handleCastMessage(message);
handleCastMessage(messaging, message);
}
}
});
});
}

View File

@@ -14,6 +14,7 @@ import type {
CastSessionCreatedDetails,
CastSessionUpdatedDetails
} from "./messagingTypes";
import type { WebSocket } from "ws";
/**
* IMPORTANT:
@@ -218,7 +219,15 @@ interface MessengerEvents {
message: (message: Message) => void;
}
class Messenger extends TypedEmitter<MessengerEvents> {
export abstract class Messenger extends TypedEmitter<MessengerEvents> {
abstract sendMessage(message: Message): void;
abstract send(data: unknown): void;
}
export class StdioMessenger
extends TypedEmitter<MessengerEvents>
implements Messenger
{
// Native messaging transforms
private decodeTransform = new DecodeTransform();
private encodeTransform = new EncodeTransform();
@@ -244,7 +253,7 @@ class Messenger extends TypedEmitter<MessengerEvents> {
/** Sends a message to the extension. */
sendMessage(message: Message) {
this.encodeTransform.write(message);
this.send(message);
}
send(data: unknown) {
@@ -252,4 +261,33 @@ class Messenger extends TypedEmitter<MessengerEvents> {
}
}
export default new Messenger();
export class WebsocketMessenger
extends TypedEmitter<MessengerEvents>
implements Messenger
{
private socket: WebSocket;
constructor(socket: WebSocket) {
super();
this.socket = socket;
socket.on("message", (message: string) => {
try {
const parsed = JSON.parse(message) as Message;
this.emit("message", parsed);
} catch (err) {
// Catch parse errors and close socket
socket.close();
}
});
}
/** Sends a message to the extension. */
sendMessage(message: Message) {
this.send(message);
}
send(data: unknown) {
this.socket.send(JSON.stringify(data));
}
}

View File

@@ -1,20 +1,17 @@
import http from "http";
import https from "https";
import { ChildProcess, spawn } from "child_process";
import { Readable } from "stream";
import { Readable, Writable } from "stream";
import * as bridge from "./bridge";
import chalk from "chalk";
import WebSocket from "ws";
import { DecodeTransform, EncodeTransform } from "./transforms.js";
const bridgeInstances = new Set<ChildProcess>();
import { WebsocketMessenger } from "./bridge/messaging";
// Ensure child processes are killed on exit
process.on("SIGTERM", async () => {
for (const bridge of bridgeInstances) {
bridge.kill();
}
process.exit(1);
});
@@ -38,45 +35,7 @@ export function init(opts: DaemonOpts) {
const wss = new WebSocket.Server({ noServer: true });
wss.on("connection", socket => {
// Stream for incoming WebSocket messages
const messageStream = new Readable({ objectMode: true });
// eslint-disable-next-line @typescript-eslint/no-empty-function
messageStream._read = () => {};
socket.on("message", (message: string) => {
try {
messageStream.push(JSON.parse(message));
} catch (err) {
// Catch parse errors and close socket
socket.close();
}
});
/**
* Daemon and bridge are the same binary, so spawn a new
* version of self in bridge mode.
*/
const bridge = spawn(process.execPath, [process.argv[1]]);
bridgeInstances.add(bridge);
// socket -> bridge.stdin
messageStream.pipe(new EncodeTransform()).pipe(bridge.stdin);
// bridge.stdout -> socket
bridge.stdout.pipe(new DecodeTransform()).on("data", data => {
if (socket.readyState !== WebSocket.OPEN) {
return;
}
socket.send(JSON.stringify(data));
});
// Handle termination
socket.on("close", () => bridge.kill());
bridge.on("exit", () => {
socket.close();
bridgeInstances.delete(bridge);
});
bridge.run(new WebsocketMessenger(socket));
});
/**

View File

@@ -6,6 +6,7 @@ import yargs from "yargs";
import type { DaemonOpts } from "./daemon";
import { applicationName, applicationVersion } from "../config.json";
import { Messenger, StdioMessenger } from "./bridge/messaging";
const argv = yargs()
.scriptName(applicationName)
@@ -103,8 +104,9 @@ function parseConfig(configPath: string) {
return config;
}
if (argv.daemon) {
import("./daemon").then(daemon => {
async function main() {
if (argv.daemon) {
const daemon = await import("./daemon");
const daemonOpts: DaemonOpts = {
host: argv.host,
port: argv.port,
@@ -117,7 +119,10 @@ if (argv.daemon) {
}
daemon.init(daemonOpts);
});
} else {
import("./bridge");
} else {
const bridge = await import("./bridge");
bridge.run(new StdioMessenger());
}
}
main();

View File

@@ -4,7 +4,6 @@
"requires": true,
"packages": {
"": {
"name": "extension",
"devDependencies": {
"@types/firefox-webext-browser": "^94.0.1",
"@types/semver": "^7.3.9",

3227
package-lock.json generated

File diff suppressed because it is too large Load Diff