mirror of
https://github.com/hensm/fx_cast.git
synced 2026-06-11 01:59:58 +00:00
Add snap support (#60)
* Initial app daemon implementation * Pass script path to child bridge processes * Change WebSocket server port * Fix error sending message whilst WebSocket connection is closing * Initial ext daemon connection implementation
This commit is contained in:
307
app/src/bridge/index.ts
Executable file
307
app/src/bridge/index.ts
Executable file
@@ -0,0 +1,307 @@
|
||||
import dnssd from "dnssd";
|
||||
|
||||
import child_process from "child_process";
|
||||
import events from "events";
|
||||
import fs from "fs";
|
||||
import http from "http";
|
||||
import mime from "mime-types";
|
||||
import path from "path";
|
||||
|
||||
import Media from "./Media";
|
||||
import MediaServer from "./MediaServer";
|
||||
import Session from "./Session";
|
||||
import StatusListener from "./StatusListener";
|
||||
|
||||
import { DecodeTransform
|
||||
, EncodeTransform
|
||||
, ResponseTransform } from "../transforms";
|
||||
|
||||
import { MediaStatus
|
||||
, ReceiverStatus } from "./castTypes";
|
||||
|
||||
import { Message } from "./types";
|
||||
|
||||
import { __applicationName
|
||||
, __applicationVersion } from "../../package.json";
|
||||
|
||||
|
||||
// Increase listener limit
|
||||
events.EventEmitter.defaultMaxListeners = 50;
|
||||
|
||||
|
||||
const browser = new dnssd.Browser(dnssd.tcp("googlecast"));
|
||||
|
||||
// Local media server
|
||||
let mediaServer: MediaServer;
|
||||
|
||||
process.on("SIGTERM", () => {
|
||||
if (mediaServer) {
|
||||
mediaServer.stop();
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
const decodeTransform = new DecodeTransform();
|
||||
const encodeTransform = new EncodeTransform();
|
||||
|
||||
// stdin -> stdout
|
||||
process.stdin
|
||||
.pipe(decodeTransform)
|
||||
.pipe(new ResponseTransform(handleMessage))
|
||||
.pipe(encodeTransform)
|
||||
.pipe(process.stdout);
|
||||
|
||||
/**
|
||||
* Encode and send a message to the extension.
|
||||
*/
|
||||
function sendMessage (message: object) {
|
||||
try {
|
||||
encodeTransform.write(message);
|
||||
} catch (err) {
|
||||
console.error("Failed to encode message");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
interface InitializeOptions {
|
||||
shouldWatchStatus?: boolean;
|
||||
}
|
||||
|
||||
// Existing counterpart Media/Session objects
|
||||
const existingSessions: Map<string, Session> = new Map();
|
||||
const existingMedia: Map<string, Media> = new Map();
|
||||
|
||||
let receiverSelectorApp: child_process.ChildProcess;
|
||||
|
||||
/**
|
||||
* Handle incoming messages from the extension and forward
|
||||
* them to the appropriate handlers.
|
||||
*
|
||||
* Initializes the counterpart objects and is responsible
|
||||
* for managing existing ones.
|
||||
*/
|
||||
async function handleMessage (message: Message) {
|
||||
if (message.subject.startsWith("bridge:/media/")) {
|
||||
if (existingMedia.has(message._id)) {
|
||||
// Forward message to instance message handler
|
||||
existingMedia.get(message._id).messageHandler(message);
|
||||
} else {
|
||||
if (message.subject.endsWith("/initialize")) {
|
||||
// Get Session object media belongs to
|
||||
const parentSession = existingSessions.get(
|
||||
message.data._internalSessionId);
|
||||
|
||||
// Create Media
|
||||
existingMedia.set(message._id, new Media(
|
||||
message.data.sessionId
|
||||
, message.data.mediaSessionId
|
||||
, message._id
|
||||
, parentSession
|
||||
, sendMessage));
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (message.subject.startsWith("bridge:/session/")) {
|
||||
if (existingSessions.has(message._id)) {
|
||||
// Forward message to instance message handler
|
||||
existingSessions.get(message._id).messageHandler(message);
|
||||
} else {
|
||||
if (message.subject.endsWith("/initialize")) {
|
||||
// Create Session
|
||||
existingSessions.set(message._id, new Session(
|
||||
message.data.address
|
||||
, message.data.port
|
||||
, message.data.appId
|
||||
, message.data.sessionId
|
||||
, message._id
|
||||
, sendMessage));
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
switch (message.subject) {
|
||||
case "bridge:/getInfo": {
|
||||
const extensionVersion = message.data;
|
||||
return __applicationVersion;
|
||||
}
|
||||
|
||||
case "bridge:/initialize": {
|
||||
const options: InitializeOptions = message.data;
|
||||
initialize(options);
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
case "bridge:/receiverSelector/open": {
|
||||
const receiverSelectorData = message.data;
|
||||
|
||||
if (process.platform !== "darwin") {
|
||||
console.error("Invalid platform for native selector.");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
if (!receiverSelectorData) {
|
||||
console.error("Missing native selector data.");
|
||||
process.exit(1);
|
||||
} else {
|
||||
try {
|
||||
JSON.parse(receiverSelectorData);
|
||||
} catch (err) {
|
||||
console.error("Invalid native selector data.");
|
||||
}
|
||||
}
|
||||
|
||||
receiverSelectorApp = child_process.spawn(
|
||||
path.join(process.cwd(), "selector")
|
||||
, [ receiverSelectorData ]);
|
||||
|
||||
receiverSelectorApp.stdout.setEncoding("utf8");
|
||||
receiverSelectorApp.stdout.on("data", data => {
|
||||
sendMessage({
|
||||
subject: "main:/receiverSelector/selected"
|
||||
, data: JSON.parse(data)
|
||||
});
|
||||
});
|
||||
|
||||
receiverSelectorApp.addListener("error", err => {
|
||||
sendMessage({
|
||||
subject: "main:/receiverSelector/error"
|
||||
, data: err.message
|
||||
});
|
||||
});
|
||||
|
||||
receiverSelectorApp.on("close", () => {
|
||||
sendMessage({
|
||||
subject: "main:/receiverSelector/close"
|
||||
});
|
||||
});
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case "bridge:/receiverSelector/close": {
|
||||
receiverSelectorApp.kill();
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
case "bridge:/mediaServer/start": {
|
||||
const { filePath, port } = message.data;
|
||||
|
||||
mediaServer = new MediaServer(filePath, port);
|
||||
mediaServer.start();
|
||||
|
||||
mediaServer.on("started", () => {
|
||||
sendMessage({
|
||||
subject: "mediaCast:/mediaServer/started"
|
||||
});
|
||||
});
|
||||
|
||||
mediaServer.on("stopped", () => {
|
||||
sendMessage({
|
||||
subject: "mediaCast:/mediaServer/stopped"
|
||||
});
|
||||
});
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case "bridge:/mediaServer/stop": {
|
||||
if (mediaServer) {
|
||||
mediaServer.stop();
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
function initialize (options: InitializeOptions) {
|
||||
if (options.shouldWatchStatus) {
|
||||
browser.on("serviceUp", onStatusBrowserServiceUp);
|
||||
browser.on("serviceDown", onStatusBrowserServiceDown);
|
||||
}
|
||||
|
||||
browser.on("serviceUp", onBrowserServiceUp);
|
||||
browser.on("servicedown", onBrowserServiceDown);
|
||||
browser.start();
|
||||
|
||||
|
||||
function onBrowserServiceUp (service: dnssd.Service) {
|
||||
sendMessage({
|
||||
subject: "shim:/serviceUp"
|
||||
, data: {
|
||||
host: service.addresses[0]
|
||||
, port: service.port
|
||||
, id: service.txt.id
|
||||
, friendlyName: service.txt.fn
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function onBrowserServiceDown (service: dnssd.Service) {
|
||||
sendMessage({
|
||||
subject: "shim:/serviceDown"
|
||||
, data: {
|
||||
id: service.txt.id
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
// Receiver status listeners for status mode
|
||||
const statusListeners = new Map<string, StatusListener>();
|
||||
|
||||
function onStatusBrowserServiceUp (service: dnssd.Service) {
|
||||
const { id } = service.txt;
|
||||
|
||||
const listener = new StatusListener(
|
||||
service.addresses[0]
|
||||
, service.port);
|
||||
|
||||
listener.on("receiverStatus", (status: ReceiverStatus) => {
|
||||
const receiverStatusMessage: any = {
|
||||
subject: "receiverStatus"
|
||||
, data: {
|
||||
id
|
||||
, status: {
|
||||
volume: {
|
||||
level: status.volume.level
|
||||
, muted: status.volume.muted
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if ("applications" in status) {
|
||||
const application = status.applications[0];
|
||||
|
||||
receiverStatusMessage.data.status.application = {
|
||||
displayName: application.displayName
|
||||
, isIdleScreen: application.isIdleScreen
|
||||
, statusText: application.statusText
|
||||
};
|
||||
}
|
||||
|
||||
sendMessage(receiverStatusMessage);
|
||||
});
|
||||
|
||||
statusListeners.set(id, listener);
|
||||
}
|
||||
|
||||
function onStatusBrowserServiceDown (service: dnssd.Service) {
|
||||
const { id } = service.txt;
|
||||
|
||||
if (statusListeners.has(id)) {
|
||||
statusListeners.get(id).deregister();
|
||||
statusListeners.delete(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
61
app/src/daemon/index.ts
Normal file
61
app/src/daemon/index.ts
Normal file
@@ -0,0 +1,61 @@
|
||||
"use strict";
|
||||
|
||||
import { spawn } from "child_process";
|
||||
import { Readable } from "stream";
|
||||
|
||||
import path from "path";
|
||||
import WebSocket from "ws";
|
||||
|
||||
import { DecodeTransform
|
||||
, EncodeTransform } from "../transforms";
|
||||
|
||||
|
||||
const wss = new WebSocket.Server({ port: 9556 });
|
||||
|
||||
wss.on("connection", socket => {
|
||||
|
||||
/**
|
||||
* Daemon and bridge are the same binary, so spawn a new
|
||||
* version of self in bridge mode.
|
||||
*/
|
||||
const bridge = spawn(process.execPath, [ process.argv[1] ]);
|
||||
|
||||
// Stream for incoming WebSocket messages
|
||||
const messageStream = new Readable({
|
||||
objectMode: true
|
||||
});
|
||||
|
||||
// tslint:disable-next-line:no-empty
|
||||
messageStream._read = () => {};
|
||||
|
||||
/**
|
||||
* Incoming JSON messages from the extension over the
|
||||
* WebSocket connection are parsed and re-encoded to be sent
|
||||
* to bridge stdin.
|
||||
*/
|
||||
socket.on("message", (message: string) => {
|
||||
messageStream.push(JSON.parse(message));
|
||||
});
|
||||
|
||||
messageStream
|
||||
.pipe(new EncodeTransform())
|
||||
.pipe(bridge.stdin);
|
||||
|
||||
/**
|
||||
* Incoming messages from the bridge are decoded and
|
||||
* stringified and sent to the extension over the WebSocket
|
||||
* connection.
|
||||
*/
|
||||
bridge.stdout
|
||||
.pipe(new DecodeTransform())
|
||||
.on("data", data => {
|
||||
// Socket can be CLOSING here
|
||||
if (socket.readyState === WebSocket.OPEN) {
|
||||
socket.send(JSON.stringify(data));
|
||||
}
|
||||
});
|
||||
|
||||
// Handle termination
|
||||
socket.on("close", () => bridge.kill());
|
||||
bridge.on("exit", () => socket.close());
|
||||
});
|
||||
301
app/src/main.ts
Executable file → Normal file
301
app/src/main.ts
Executable file → Normal file
@@ -1,299 +1,18 @@
|
||||
import dnssd from "dnssd";
|
||||
"use strict";
|
||||
|
||||
import child_process from "child_process";
|
||||
import events from "events";
|
||||
import fs from "fs";
|
||||
import http from "http";
|
||||
import mime from "mime-types";
|
||||
import path from "path";
|
||||
|
||||
import Media from "./Media";
|
||||
import MediaServer from "./MediaServer";
|
||||
import Session from "./Session";
|
||||
import StatusListener from "./StatusListener";
|
||||
import * as transforms from "./transforms";
|
||||
|
||||
import { MediaStatus, ReceiverStatus } from "./castTypes";
|
||||
|
||||
import { Message } from "./types";
|
||||
|
||||
import { __applicationName
|
||||
, __applicationVersion } from "../package.json";
|
||||
import minimist from "minimist";
|
||||
|
||||
|
||||
// Increase listener limit
|
||||
events.EventEmitter.defaultMaxListeners = 50;
|
||||
|
||||
|
||||
const browser = new dnssd.Browser(dnssd.tcp("googlecast"));
|
||||
|
||||
// Local media server
|
||||
let mediaServer: MediaServer;
|
||||
|
||||
process.on("SIGTERM", () => {
|
||||
if (mediaServer) {
|
||||
mediaServer.stop();
|
||||
const argv = minimist(process.argv.slice(2), {
|
||||
boolean: [ "daemon" ]
|
||||
, default: {
|
||||
daemon: false
|
||||
}
|
||||
});
|
||||
|
||||
// stdin -> stdout
|
||||
process.stdin
|
||||
.pipe(transforms.decode)
|
||||
.pipe(transforms.response(handleMessage))
|
||||
.pipe(transforms.encode)
|
||||
.pipe(process.stdout);
|
||||
|
||||
/**
|
||||
* Encode and send a message to the extension.
|
||||
*/
|
||||
function sendMessage (message: object) {
|
||||
try {
|
||||
transforms.encode.write(message);
|
||||
} catch (err) {
|
||||
console.error("Failed to encode message");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
interface InitializeOptions {
|
||||
shouldWatchStatus?: boolean;
|
||||
}
|
||||
|
||||
// Existing counterpart Media/Session objects
|
||||
const existingSessions: Map<string, Session> = new Map();
|
||||
const existingMedia: Map<string, Media> = new Map();
|
||||
|
||||
let receiverSelectorApp: child_process.ChildProcess;
|
||||
|
||||
/**
|
||||
* Handle incoming messages from the extension and forward
|
||||
* them to the appropriate handlers.
|
||||
*
|
||||
* Initializes the counterpart objects and is responsible
|
||||
* for managing existing ones.
|
||||
*/
|
||||
async function handleMessage (message: Message) {
|
||||
if (message.subject.startsWith("bridge:/media/")) {
|
||||
if (existingMedia.has(message._id)) {
|
||||
// Forward message to instance message handler
|
||||
existingMedia.get(message._id).messageHandler(message);
|
||||
} else {
|
||||
if (message.subject.endsWith("/initialize")) {
|
||||
// Get Session object media belongs to
|
||||
const parentSession = existingSessions.get(
|
||||
message.data._internalSessionId);
|
||||
|
||||
// Create Media
|
||||
existingMedia.set(message._id, new Media(
|
||||
message.data.sessionId
|
||||
, message.data.mediaSessionId
|
||||
, message._id
|
||||
, parentSession
|
||||
, sendMessage));
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (message.subject.startsWith("bridge:/session/")) {
|
||||
if (existingSessions.has(message._id)) {
|
||||
// Forward message to instance message handler
|
||||
existingSessions.get(message._id).messageHandler(message);
|
||||
} else {
|
||||
if (message.subject.endsWith("/initialize")) {
|
||||
// Create Session
|
||||
existingSessions.set(message._id, new Session(
|
||||
message.data.address
|
||||
, message.data.port
|
||||
, message.data.appId
|
||||
, message.data.sessionId
|
||||
, message._id
|
||||
, sendMessage));
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
switch (message.subject) {
|
||||
case "bridge:/getInfo": {
|
||||
const extensionVersion = message.data;
|
||||
return __applicationVersion;
|
||||
}
|
||||
|
||||
case "bridge:/initialize": {
|
||||
const options: InitializeOptions = message.data;
|
||||
initialize(options);
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
case "bridge:/receiverSelector/open": {
|
||||
const receiverSelectorData = message.data;
|
||||
|
||||
if (process.platform !== "darwin") {
|
||||
console.error("Invalid platform for native selector.");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
if (!receiverSelectorData) {
|
||||
console.error("Missing native selector data.");
|
||||
process.exit(1);
|
||||
} else {
|
||||
try {
|
||||
JSON.parse(receiverSelectorData);
|
||||
} catch (err) {
|
||||
console.error("Invalid native selector data.");
|
||||
}
|
||||
}
|
||||
|
||||
receiverSelectorApp = child_process.spawn(
|
||||
path.join(process.cwd(), "selector")
|
||||
, [ receiverSelectorData ]);
|
||||
|
||||
receiverSelectorApp.stdout.setEncoding("utf8");
|
||||
receiverSelectorApp.stdout.on("data", data => {
|
||||
sendMessage({
|
||||
subject: "main:/receiverSelector/selected"
|
||||
, data: JSON.parse(data)
|
||||
});
|
||||
});
|
||||
|
||||
receiverSelectorApp.addListener("error", err => {
|
||||
sendMessage({
|
||||
subject: "main:/receiverSelector/error"
|
||||
, data: err.message
|
||||
});
|
||||
});
|
||||
|
||||
receiverSelectorApp.on("close", () => {
|
||||
sendMessage({
|
||||
subject: "main:/receiverSelector/close"
|
||||
});
|
||||
});
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case "bridge:/receiverSelector/close": {
|
||||
receiverSelectorApp.kill();
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
case "bridge:/mediaServer/start": {
|
||||
const { filePath, port } = message.data;
|
||||
|
||||
mediaServer = new MediaServer(filePath, port);
|
||||
mediaServer.start();
|
||||
|
||||
mediaServer.on("started", () => {
|
||||
sendMessage({
|
||||
subject: "mediaCast:/mediaServer/started"
|
||||
});
|
||||
});
|
||||
|
||||
mediaServer.on("stopped", () => {
|
||||
sendMessage({
|
||||
subject: "mediaCast:/mediaServer/stopped"
|
||||
});
|
||||
});
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case "bridge:/mediaServer/stop": {
|
||||
if (mediaServer) {
|
||||
mediaServer.stop();
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
function initialize (options: InitializeOptions) {
|
||||
if (options.shouldWatchStatus) {
|
||||
browser.on("serviceUp", onStatusBrowserServiceUp);
|
||||
browser.on("serviceDown", onStatusBrowserServiceDown);
|
||||
}
|
||||
|
||||
browser.on("serviceUp", onBrowserServiceUp);
|
||||
browser.on("servicedown", onBrowserServiceDown);
|
||||
browser.start();
|
||||
|
||||
|
||||
function onBrowserServiceUp (service: dnssd.Service) {
|
||||
sendMessage({
|
||||
subject: "shim:/serviceUp"
|
||||
, data: {
|
||||
host: service.addresses[0]
|
||||
, port: service.port
|
||||
, id: service.txt.id
|
||||
, friendlyName: service.txt.fn
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function onBrowserServiceDown (service: dnssd.Service) {
|
||||
sendMessage({
|
||||
subject: "shim:/serviceDown"
|
||||
, data: {
|
||||
id: service.txt.id
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
// Receiver status listeners for status mode
|
||||
const statusListeners = new Map<string, StatusListener>();
|
||||
|
||||
function onStatusBrowserServiceUp (service: dnssd.Service) {
|
||||
const { id } = service.txt;
|
||||
|
||||
const listener = new StatusListener(
|
||||
service.addresses[0]
|
||||
, service.port);
|
||||
|
||||
listener.on("receiverStatus", (status: ReceiverStatus) => {
|
||||
const receiverStatusMessage: any = {
|
||||
subject: "receiverStatus"
|
||||
, data: {
|
||||
id
|
||||
, status: {
|
||||
volume: {
|
||||
level: status.volume.level
|
||||
, muted: status.volume.muted
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if ("applications" in status) {
|
||||
const application = status.applications[0];
|
||||
|
||||
receiverStatusMessage.data.status.application = {
|
||||
displayName: application.displayName
|
||||
, isIdleScreen: application.isIdleScreen
|
||||
, statusText: application.statusText
|
||||
};
|
||||
}
|
||||
|
||||
sendMessage(receiverStatusMessage);
|
||||
});
|
||||
|
||||
statusListeners.set(id, listener);
|
||||
}
|
||||
|
||||
function onStatusBrowserServiceDown (service: dnssd.Service) {
|
||||
const { id } = service.txt;
|
||||
|
||||
if (statusListeners.has(id)) {
|
||||
statusListeners.get(id).deregister();
|
||||
statusListeners.delete(id);
|
||||
}
|
||||
}
|
||||
if (argv.daemon) {
|
||||
import("./daemon");
|
||||
} else {
|
||||
import("./bridge");
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
"use strict";
|
||||
|
||||
import { Transform } from "stream";
|
||||
import { Message } from "./types";
|
||||
import { Message } from "./bridge/types";
|
||||
|
||||
|
||||
type ResponseHandlerFunction = (message: Message) => Promise<any>;
|
||||
@@ -10,12 +10,21 @@ type ResponseHandlerFunction = (message: Message) => Promise<any>;
|
||||
* Takes a handler function that implements the transform
|
||||
* and calls the transform callback.
|
||||
*/
|
||||
export const response = (handler: ResponseHandlerFunction) => new Transform({
|
||||
readableObjectMode: true
|
||||
, writableObjectMode: true
|
||||
export class ResponseTransform extends Transform {
|
||||
constructor (private _handler: ResponseHandlerFunction) {
|
||||
super({
|
||||
readableObjectMode: true
|
||||
, writableObjectMode: true
|
||||
});
|
||||
}
|
||||
|
||||
, transform (chunk: Message, encoding, callback) {
|
||||
Promise.resolve(handler(chunk))
|
||||
public _transform (
|
||||
chunk: Message
|
||||
, encoding: string
|
||||
// tslint:disable-next-line:ban-types
|
||||
, callback: Function) {
|
||||
|
||||
Promise.resolve(this._handler(chunk))
|
||||
.then(res => {
|
||||
if (res) {
|
||||
callback(null, res);
|
||||
@@ -24,81 +33,100 @@ export const response = (handler: ResponseHandlerFunction) => new Transform({
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Takes input, decodes the message string, parses as JSON
|
||||
* and outputs the parsed result.
|
||||
*/
|
||||
export const decode = new Transform({
|
||||
readableObjectMode: true
|
||||
export class DecodeTransform extends Transform {
|
||||
// Message data
|
||||
private _messageBuffer = Buffer.alloc(0);
|
||||
private _messageLength: number = null;
|
||||
|
||||
, transform (chunk, encoding, callback) {
|
||||
const self = this as any;
|
||||
|
||||
// Setup persistent data
|
||||
if (!this.hasOwnProperty("_buf")
|
||||
&& !this.hasOwnProperty("_messageLength")) {
|
||||
|
||||
self._buf = Buffer.alloc(0);
|
||||
self._messageLength = null;
|
||||
}
|
||||
|
||||
// Append next chunk to buffer
|
||||
self._buf = Buffer.concat([ self._buf, chunk ]);
|
||||
|
||||
while (true) {
|
||||
if (self._messageLength === null) {
|
||||
if (self._buf.length >= 4) {
|
||||
|
||||
// Read message length
|
||||
self._messageLength = self._buf.readUInt32LE(0);
|
||||
|
||||
// Offset buffer
|
||||
self._buf = self._buf.slice(4);
|
||||
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
if (self._buf.length >= self._messageLength) {
|
||||
const message = JSON.parse(self._buf.slice(
|
||||
0, self._messageLength));
|
||||
|
||||
this.push(message);
|
||||
|
||||
// Cleanup persistent data
|
||||
self._buf = self._buf.slice(self._messageLength);
|
||||
self._messageLength = null;
|
||||
|
||||
// Parse next message
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// No complete messages left
|
||||
callback();
|
||||
break;
|
||||
}
|
||||
constructor () {
|
||||
super({
|
||||
readableObjectMode: true
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
public _transform (
|
||||
chunk: any
|
||||
, encoding: string
|
||||
// tslint:disable-next-line:ban-types
|
||||
, callback: Function) {
|
||||
|
||||
// Append next chunk to buffer
|
||||
this._messageBuffer = Buffer.concat([
|
||||
this._messageBuffer
|
||||
, chunk
|
||||
]);
|
||||
|
||||
for (;;) {
|
||||
if (this._messageLength === null) {
|
||||
if (this._messageBuffer.length >= 4) {
|
||||
// Read message length and offset buffer
|
||||
this._messageLength = this._messageBuffer.readUInt32LE(0);
|
||||
this._messageBuffer = this._messageBuffer.slice(4);
|
||||
|
||||
// Next message chunk
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
if (this._messageBuffer.length >= this._messageLength) {
|
||||
const message = JSON.parse(this._messageBuffer
|
||||
.slice(0, this._messageLength)
|
||||
.toString());
|
||||
|
||||
// Push message content
|
||||
this.push(message);
|
||||
|
||||
// Offset buffer to start of next message
|
||||
this._messageBuffer = this._messageBuffer.slice(
|
||||
this._messageLength);
|
||||
this._messageLength = null;
|
||||
|
||||
// Next message
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// No complete messages left
|
||||
callback();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Takes input, encodes the message length and content and
|
||||
* outputs the encoded result.
|
||||
*/
|
||||
export const encode = new Transform({
|
||||
writableObjectMode: true
|
||||
export class EncodeTransform extends Transform {
|
||||
constructor () {
|
||||
super({
|
||||
writableObjectMode: true
|
||||
});
|
||||
}
|
||||
|
||||
public _transform (
|
||||
chunk: any
|
||||
, encoding: string
|
||||
// tslint:disable-next-line:ban-types
|
||||
, callback: Function) {
|
||||
|
||||
, transform (chunk, encoding, callback) {
|
||||
const messageLength = Buffer.alloc(4);
|
||||
const message = Buffer.from(JSON.stringify(chunk));
|
||||
|
||||
// Write message length
|
||||
messageLength.writeUInt32LE(message.length, 0);
|
||||
|
||||
// Output joined message length and content
|
||||
callback(null, Buffer.concat([messageLength, message]));
|
||||
// Output joined length and content
|
||||
callback(null, Buffer.concat([
|
||||
messageLength
|
||||
, message
|
||||
]));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user