From d6331fee8902bb55c5e2c9fe9f651927d0786ec8 Mon Sep 17 00:00:00 2001 From: Geequlim Date: Tue, 21 May 2019 10:47:34 +0800 Subject: [PATCH] Implement MessageTransports for the websocket connection Drop stream wrap fro the websocket connection --- package.json | 2 - src/lsp/GDScriptLanguageClient.ts | 84 ++++---------- src/lsp/MessageBuffer.ts | 87 ++++++++++++++ src/lsp/MessageIO.ts | 184 ++++++++++++++++++++++++++++++ 4 files changed, 291 insertions(+), 66 deletions(-) create mode 100644 src/lsp/MessageBuffer.ts create mode 100644 src/lsp/MessageIO.ts diff --git a/package.json b/package.json index bfab4ed..31dba3e 100644 --- a/package.json +++ b/package.json @@ -78,9 +78,7 @@ "devDependencies": { "@types/mocha": "^2.2.42", "@types/node": "^10.12.21", - "@types/socket.io": "^2.1.2", "@types/ws": "^6.0.1", - "stream-mock": "^2.0.2", "tslint": "^5.16.0", "typescript": "^3.4.5", "vscode": "^1.1.33", diff --git a/src/lsp/GDScriptLanguageClient.ts b/src/lsp/GDScriptLanguageClient.ts index 43b81a0..b2694b0 100644 --- a/src/lsp/GDScriptLanguageClient.ts +++ b/src/lsp/GDScriptLanguageClient.ts @@ -2,9 +2,10 @@ import { workspace } from "vscode"; import { LanguageClient, LanguageClientOptions, ServerOptions } from "vscode-languageclient"; import { is_debug_mode, get_configuration } from "../utils"; import logger from "../loggger"; -import {DuplexMock} from 'stream-mock'; import * as WebSocket from 'ws'; import * as vscode from 'vscode'; +import { EventEmitter } from "events"; +import { MessageIO, MessageIOReader, MessageIOWriter } from "./MessageIO"; function getClientOptions(): LanguageClientOptions { return { @@ -20,76 +21,31 @@ function getClientOptions(): LanguageClientOptions { }; } -class MessageIO { - stream: DuplexMock = null; - socket: WebSocket = null; - - public get server_uri() : string { - let port = get_configuration("gdscript_lsp_server_port", 6008); - return `ws://localhost:${port}`; - } - - constructor() { - this.stream = new DuplexMock(); - const origin_write = this.stream._write.bind(this.stream); - this.stream._write = (chunk: any, encoding: string, callback: (error?: Error | null) => void) => { - this.send_message(chunk); - origin_write(chunk, encoding, callback); - }; - } - - protected send_message(chunk: Buffer) { - let message = chunk.toString(); - if (this.socket) { - this.socket.send(message); - this.stream.pause(); - } - logger.log("[client]", message); - } - - protected on_recive_message(chunk: WebSocket.Data) { - let message = chunk.toString(); - this.stream.emit('data',message); - this.stream.resume(); - logger.log("[server]", message); - } - - connect_to_language_server():Promise { - return new Promise((resolve, reject) => { - this.socket = null; - const ws = new WebSocket(this.server_uri); - ws.on('open', ()=>{ this.on_connected(ws); resolve(); }); - ws.on('message', this.on_recive_message.bind(this)); - ws.on('error', this.on_disconnected.bind(this)); - ws.on('close', this.on_disconnected.bind(this)); +function get_server_uri() : string { + let port = get_configuration("gdscript_lsp_server_port", 6008); + return `ws://localhost:${port}`; +} + +const io = new MessageIO(get_server_uri()); +const serverOptions: ServerOptions = () => { + return new Promise((resolve, reject) => { + io.connect_to_language_server().then(()=>{ + resolve({reader: new MessageIOReader(io), writer: new MessageIOWriter(io)}); }); - } - - private on_connected(socket: WebSocket) { - this.socket = socket; + }); +}; + +export default class GDScriptLanguageClient extends LanguageClient { + constructor() { + super(`GDScriptLanguageClient`, serverOptions, getClientOptions()); + io.on('disconnected', this.on_disconnected.bind(this)); } private on_disconnected() { - this.socket = null; vscode.window.showErrorMessage(`Failed connect to GDScript Language Server`, 'Retry', 'Close').then(item=>{ if (item == 'Retry') { - this.connect_to_language_server(); + io.connect_to_language_server(); } }); } }; - - -const io = new MessageIO(); -const serverOptions: ServerOptions = () => { - return new Promise((resolve, reject) => { - io.connect_to_language_server().then(()=>{ - resolve({reader: io.stream, writer: io.stream}); - }); - }); -}; -export default class GDScriptLanguageClient extends LanguageClient { - constructor() { - super(`GDScriptLanguageClient`, serverOptions, getClientOptions()); - } -}; diff --git a/src/lsp/MessageBuffer.ts b/src/lsp/MessageBuffer.ts new file mode 100644 index 0000000..2ac1041 --- /dev/null +++ b/src/lsp/MessageBuffer.ts @@ -0,0 +1,87 @@ +/* -------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for license information. + * ------------------------------------------------------------------------------------------ */ + +const DefaultSize: number = 8192; +const CR: number = Buffer.from('\r', 'ascii')[0]; +const LF: number = Buffer.from('\n', 'ascii')[0]; +const CRLF: string = '\r\n'; + +export default class MessageBuffer { + + private encoding: string; + private index: number; + private buffer: Buffer; + + constructor(encoding: string = 'utf8') { + this.encoding = encoding; + this.index = 0; + this.buffer = Buffer.allocUnsafe(DefaultSize); + } + + public append(chunk: Buffer | String): void { + var toAppend: Buffer = chunk; + if (typeof (chunk) === 'string') { + var str = chunk; + var bufferLen = Buffer.byteLength(str, this.encoding); + toAppend = Buffer.allocUnsafe(bufferLen); + toAppend.write(str, 0, bufferLen, this.encoding); + } + if (this.buffer.length - this.index >= toAppend.length) { + toAppend.copy(this.buffer, this.index, 0, toAppend.length); + } else { + var newSize = (Math.ceil((this.index + toAppend.length) / DefaultSize) + 1) * DefaultSize; + if (this.index === 0) { + this.buffer = Buffer.allocUnsafe(newSize); + toAppend.copy(this.buffer, 0, 0, toAppend.length); + } else { + this.buffer = Buffer.concat([this.buffer.slice(0, this.index), toAppend], newSize); + } + } + this.index += toAppend.length; + } + + public tryReadHeaders(): { [key: string]: string; } | undefined { + let result: { [key: string]: string; } | undefined = undefined; + let current = 0; + while (current + 3 < this.index && (this.buffer[current] !== CR || this.buffer[current + 1] !== LF || this.buffer[current + 2] !== CR || this.buffer[current + 3] !== LF)) { + current++; + } + // No header / body separator found (e.g CRLFCRLF) + if (current + 3 >= this.index) { + return result; + } + result = Object.create(null); + let headers = this.buffer.toString('ascii', 0, current).split(CRLF); + headers.forEach((header) => { + let index: number = header.indexOf(':'); + if (index === -1) { + throw new Error('Message header must separate key and value using :'); + } + let key = header.substr(0, index); + let value = header.substr(index + 1).trim(); + result![key] = value; + }) + + let nextStart = current + 4; + this.buffer = this.buffer.slice(nextStart); + this.index = this.index - nextStart; + return result; + } + + public tryReadContent(length: number): string | null { + if (this.index < length) { + return null; + } + let result = this.buffer.toString(this.encoding, 0, length); + let nextStart = length; + this.buffer.copy(this.buffer, 0, nextStart); + this.index = this.index - nextStart; + return result; + } + + public get numberOfBytes(): number { + return this.index; + } +}; diff --git a/src/lsp/MessageIO.ts b/src/lsp/MessageIO.ts new file mode 100644 index 0000000..206d079 --- /dev/null +++ b/src/lsp/MessageIO.ts @@ -0,0 +1,184 @@ +import { AbstractMessageReader, MessageReader, DataCallback } from "vscode-jsonrpc/lib/messageReader"; +import { EventEmitter } from "events"; +import * as WebSocket from 'ws'; +import MessageBuffer from "./MessageBuffer"; +import logger from "../loggger"; +import { AbstractMessageWriter, MessageWriter } from "vscode-jsonrpc/lib/messageWriter"; +import { Message } from "vscode-jsonrpc"; + +export class MessageIO extends EventEmitter { + + private socket: WebSocket = null; + private url: string = ""; + + constructor(url: string) { + super(); + this.url = url; + } + + public send_message(message: string) { + if (this.socket) { + this.socket.send(message); + } + logger.log("[client]", message); + } + + protected on_message(chunk: WebSocket.Data) { + let message = chunk.toString(); + this.emit('data', message); + logger.log("[server]", message); + } + + connect_to_language_server():Promise { + return new Promise((resolve, reject) => { + this.socket = null; + const ws = new WebSocket(this.url); + ws.on('open', ()=>{ this.on_connected(ws); resolve(); }); + ws.on('message', this.on_message.bind(this)); + ws.on('error', this.on_disconnected.bind(this)); + ws.on('close', this.on_disconnected.bind(this)); + }); + } + + private on_connected(socket: WebSocket) { + this.socket = socket; + this.emit("connected"); + } + + private on_disconnected() { + this.socket = null; + this.emit('disconnected'); + } +}; + + +export class MessageIOReader extends AbstractMessageReader implements MessageReader { + + private io: MessageIO; + private callback: DataCallback; + private buffer: MessageBuffer; + private nextMessageLength: number; + private messageToken: number; + private partialMessageTimer: NodeJS.Timer | undefined; + private _partialMessageTimeout: number; + + public constructor(io: MessageIO, encoding: string = 'utf8') { + super(); + this.io = io; + this.buffer = new MessageBuffer(encoding); + this._partialMessageTimeout = 10000; + } + + public set partialMessageTimeout(timeout: number) { + this._partialMessageTimeout = timeout; + } + + public get partialMessageTimeout(): number { + return this._partialMessageTimeout; + } + + public listen(callback: DataCallback): void { + this.nextMessageLength = -1; + this.messageToken = 0; + this.partialMessageTimer = undefined; + this.callback = callback; + this.io.on('data', (data: Buffer) => { + this.onData(data); + }); + this.io.on('error', (error: any) => this.fireError(error)); + this.io.on('close', () => this.fireClose()); + } + + private onData(data: Buffer | String): void { + this.buffer.append(data); + while (true) { + if (this.nextMessageLength === -1) { + let headers = this.buffer.tryReadHeaders(); + if (!headers) { + return; + } + let contentLength = headers['Content-Length']; + if (!contentLength) { + throw new Error('Header must provide a Content-Length property.'); + } + let length = parseInt(contentLength); + if (isNaN(length)) { + throw new Error('Content-Length value must be a number.'); + } + this.nextMessageLength = length; + // Take the encoding form the header. For compatibility + // treat both utf-8 and utf8 as node utf8 + } + var msg = this.buffer.tryReadContent(this.nextMessageLength); + if (msg === null) { + /** We haven't received the full message yet. */ + this.setPartialMessageTimer(); + return; + } + this.clearPartialMessageTimer(); + this.nextMessageLength = -1; + this.messageToken++; + var json = JSON.parse(msg); + this.callback(json); + } + } + + private clearPartialMessageTimer(): void { + if (this.partialMessageTimer) { + clearTimeout(this.partialMessageTimer); + this.partialMessageTimer = undefined; + } + } + + private setPartialMessageTimer(): void { + this.clearPartialMessageTimer(); + if (this._partialMessageTimeout <= 0) { + return; + } + this.partialMessageTimer = setTimeout((token, timeout) => { + this.partialMessageTimer = undefined; + if (token === this.messageToken) { + this.firePartialMessage({ messageToken: token, waitingTime: timeout }); + this.setPartialMessageTimer(); + } + }, this._partialMessageTimeout, this.messageToken, this._partialMessageTimeout); + } +} + +const ContentLength: string = 'Content-Length: '; +const CRLF = '\r\n'; +export class MessageIOWriter extends AbstractMessageWriter implements MessageWriter { + + private io: MessageIO; + private encoding: string; + private errorCount: number; + + public constructor(io: MessageIO, encoding: string = 'utf8') { + super(); + this.io = io; + this.encoding = encoding; + this.errorCount = 0; + this.io.on('error', (error: any) => this.fireError(error)); + this.io.on('close', () => this.fireClose()); + } + + public write(msg: Message): void { + let json = JSON.stringify(msg); + let contentLength = Buffer.byteLength(json, this.encoding); + + let headers: string[] = [ + ContentLength, contentLength.toString(), CRLF, + CRLF + ]; + try { + // Header must be written in ASCII encoding + this.io.send_message(headers.join('')); + // Now write the content. This can be written in any encoding + this.io.send_message(json); + this.errorCount = 0; + } catch (error) { + this.errorCount++; + this.fireError(error, msg, this.errorCount); + } + } +}