@ -1,3 +1,4 @@ | |||
out/* | |||
node_modules/* | |||
testdata/node_modules/* | |||
Dockerfile |
@ -0,0 +1,71 @@ | |||
package transformer_test | |||
import ( | |||
"os" | |||
"testing" | |||
"github.com/dop251/goja" | |||
"github.com/sirupsen/logrus" | |||
"code.dopame.me/veonik/squircy3/plugins/babel/transformer" | |||
"code.dopame.me/veonik/squircy3/vm" | |||
) | |||
func init() { | |||
if _, err := os.Stat("../../../testdata/node_modules"); os.IsNotExist(err) { | |||
panic("tests in this package require node dependencies to be installed in the testdata directory") | |||
} | |||
} | |||
func HandleRuntimeInit(vmp *vm.VM) func(*goja.Runtime) { | |||
return func(gr *goja.Runtime) { | |||
vmp.SetTransformer(nil) | |||
b, err := transformer.New(gr) | |||
if err != nil { | |||
logrus.Warnln("unable to run babel init script:", err) | |||
return | |||
} | |||
vmp.SetTransformer(b.Transform) | |||
} | |||
} | |||
var registry = vm.NewRegistry("../../../testdata") | |||
func TestBabel_Transform(t *testing.T) { | |||
vmp, err := vm.New(registry) | |||
if err != nil { | |||
t.Fatalf("unexpected error creating VM: %s", err) | |||
} | |||
// vmp.SetModule(&vm.Module{Name: "events", Path: "./events.js", Main: "index"}) | |||
vmp.OnRuntimeInit(HandleRuntimeInit(vmp)) | |||
if err = vmp.Start(); err != nil { | |||
t.Fatalf("unexpected error starting VM: %s", err) | |||
} | |||
res, err := vmp.RunString(`require('regenerator-runtime'); | |||
(async () => { | |||
let output = null; | |||
setTimeout(() => { | |||
output = "HELLO!"; | |||
}, 200); | |||
const sleep = async (d) => { | |||
return new Promise(resolve => { | |||
setTimeout(() => resolve(), d); | |||
}); | |||
}; | |||
await sleep(500); | |||
return output; | |||
})(); | |||
`).Await() | |||
if err != nil { | |||
t.Fatalf("error requiring module: %s", err) | |||
} | |||
expected := "HELLO!" | |||
if res.String() != expected { | |||
t.Fatalf("expected: %s\ngot: %s", expected, res.String()) | |||
} | |||
} |
@ -0,0 +1,120 @@ | |||
package main | |||
import "code.dopame.me/veonik/squircy3/vm" | |||
// Module Http is a polyfill for the node http module. | |||
var Http = &vm.Module{ | |||
Name: "http", | |||
Main: "index", | |||
Path: "http", | |||
Body: ` | |||
import EventEmitter from 'events'; | |||
import {Server as NetServer} from 'net'; | |||
export class Server extends NetServer { | |||
constructor(options, requestListener) {} | |||
setTimeout(timeout, callback = null) {} | |||
get maxHeadersCount() {} | |||
get timeout() {} | |||
get headersTimeout() {} | |||
get keepAliveTimeout() {} | |||
} | |||
export class OutgoingMessage { | |||
get upgrading() {} | |||
get chunkedEncoding() {} | |||
get shouldKeepAlive() {} | |||
get useChunkedEncodingByDefault() {} | |||
get sendDate() {} | |||
get finished() {} | |||
get headersSent() {} | |||
get connection() {} | |||
constructor() {} | |||
setTimeout(timeout, callback = null) {} | |||
setHeader(name, value) {} | |||
getHeader(name) {} | |||
getHeaders() {} | |||
getHeaderNames() {} | |||
hasHeader(name) {} | |||
removeHeader(name) {} | |||
addTrailers(headers) {} | |||
flushHeaders() {} | |||
} | |||
export class ServerResponse extends OutgoingMessage { | |||
get statusCode() {} | |||
get statusMessage() {} | |||
constructor(req) {} | |||
assignSocket(socket) {} | |||
detachSocket(socket) {} | |||
writeContinue(callback) {} | |||
writeHead(statusCode, reasonPhrase, headers = null) {} | |||
} | |||
export class ClientRequest extends OutgoingMessage { | |||
get connection() {} | |||
get socket() {} | |||
get aborted() {} | |||
constructor(uri, callback = null) {} | |||
get path() {} | |||
abort() {} | |||
onSocket(socket) {} | |||
setTimeout(timeout, callback = null) {} | |||
setNoDelay(noDelay) {} | |||
setSocketKeepAlive(enable, initialDelay = null) {} | |||
} | |||
class IncomingMessage { | |||
constructor(socket) {} | |||
get httpVersion() {} | |||
get httpVersionMajor() {} | |||
get httpVersionMinor() {} | |||
get connection() {} | |||
get headers() {} | |||
get rawHeaders() {} | |||
get trailers() {} | |||
get rawTrailers() {} | |||
setTimeout(timeout, callback = null) {} | |||
get method() {} | |||
get url() {} | |||
get statusCode() {} | |||
get statusMessage() {} | |||
get socket() {} | |||
destroy() {} | |||
} | |||
class Agent { | |||
get maxFreeSockets() { | |||
} | |||
get maxSockets() {} | |||
get sockets() {} | |||
get requests() {} | |||
constructor(options) {} | |||
destroy() {} | |||
} | |||
export const METHODS = []; | |||
export const STATUS_CODES = {}; | |||
export function createServer(options, requestListener) {} | |||
export function request(options, callback) {} | |||
export function get(options, callback) {} | |||
export let globalAgent; | |||
export const maxHeaderSize; | |||
`, | |||
} |
@ -0,0 +1,166 @@ | |||
package internal | |||
import ( | |||
"io" | |||
"net" | |||
"sync" | |||
"github.com/dop251/goja" | |||
"github.com/pkg/errors" | |||
"github.com/sirupsen/logrus" | |||
"code.dopame.me/veonik/squircy3/vm" | |||
) | |||
type NetConn struct { | |||
conn net.Conn | |||
buf []byte | |||
readable bool | |||
} | |||
type readResult struct { | |||
ready bool | |||
value string | |||
error error | |||
mu sync.Mutex | |||
} | |||
func (r *readResult) Ready() bool { | |||
r.mu.Lock() | |||
defer r.mu.Unlock() | |||
return r.ready | |||
} | |||
func (r *readResult) Value() (string, error) { | |||
r.mu.Lock() | |||
defer r.mu.Unlock() | |||
if !r.ready { | |||
return "", errors.New("not ready") | |||
} | |||
return r.value, r.error | |||
} | |||
func (r *readResult) resolve(val string, err error) { | |||
r.mu.Lock() | |||
defer r.mu.Unlock() | |||
if r.ready { | |||
return | |||
} | |||
r.value = val | |||
r.error = err | |||
r.ready = true | |||
} | |||
func NewNetConn(conn net.Conn) (*NetConn, error) { | |||
return &NetConn{ | |||
conn: conn, | |||
buf: make([]byte, 1024), | |||
readable: true, | |||
}, nil | |||
} | |||
func (c *NetConn) Write(s string) (n int, err error) { | |||
return c.conn.Write([]byte(s)) | |||
} | |||
// Read asynchronously reads from the connection. | |||
// A readResult is returned back to the js vm and once the read completes, | |||
// it can be read from. This allows the js vm to avoid blocking for reads. | |||
func (c *NetConn) Read(_ int) *readResult { | |||
res := &readResult{} | |||
if !c.readable { | |||
logrus.Warnln("reading from unreadable conn") | |||
return res | |||
} | |||
go func() { | |||
n, err := c.conn.Read(c.buf) | |||
if err != nil { | |||
if err != io.EOF { | |||
res.resolve("", err) | |||
return | |||
} else { | |||
// on the next call to read, we'll return nil to signal done. | |||
c.readable = false | |||
} | |||
} | |||
logrus.Warnln("read", n, "bytes") | |||
rb := make([]byte, n) | |||
copy(rb, c.buf) | |||
res.resolve(string(rb), nil) | |||
}() | |||
return res | |||
} | |||
func (c *NetConn) Close() error { | |||
return c.conn.Close() | |||
} | |||
func (c *NetConn) LocalAddr() net.Addr { | |||
return c.conn.LocalAddr() | |||
} | |||
func (c *NetConn) RemoteAddr() net.Addr { | |||
return c.conn.RemoteAddr() | |||
} | |||
func Dial(kind, addr string) (*NetConn, error) { | |||
c, err := net.Dial(kind, addr) | |||
if err != nil { | |||
return nil, err | |||
} | |||
return NewNetConn(c) | |||
} | |||
type Server struct { | |||
listener net.Listener | |||
vm *vm.VM | |||
onConnect goja.Callable | |||
} | |||
func (s *Server) accept() { | |||
for { | |||
conn, err := s.listener.Accept() | |||
if err != nil { | |||
logrus.Warnln("failed to accept new connection", err) | |||
return | |||
} | |||
s.vm.Do(func(gr *goja.Runtime) { | |||
nc, err := NewNetConn(conn) | |||
if err != nil { | |||
logrus.Warnln("failed to get NetConn from net.Conn", err) | |||
return | |||
} | |||
if _, err := s.onConnect(nil, gr.ToValue(nc)); err != nil { | |||
logrus.Warnln("error running on-connect callback", err) | |||
} | |||
}) | |||
} | |||
} | |||
func (s *Server) Close() error { | |||
defer func() { | |||
s.onConnect = nil | |||
}() | |||
return s.listener.Close() | |||
} | |||
func (s *Server) Addr() net.Addr { | |||
return s.listener.Addr() | |||
} | |||
func Listen(vmp *vm.VM, onConnect goja.Callable, kind, addr string) (*Server, error) { | |||
l, err := net.Listen(kind, addr) | |||
if err != nil { | |||
return nil, err | |||
} | |||
s := &Server{ | |||
listener: l, | |||
vm: vmp, | |||
onConnect: onConnect, | |||
} | |||
go s.accept() | |||
return s, nil | |||
} |
@ -0,0 +1,225 @@ | |||
package main | |||
import "code.dopame.me/veonik/squircy3/vm" | |||
// Module Net is a polyfill for the node net module. | |||
var Net = &vm.Module{ | |||
Name: "net", | |||
Main: "index", | |||
Path: "net", | |||
Body: ` | |||
import {Duplex} from 'stream'; | |||
import {Buffer} from 'buffer'; | |||
import {EventEmitter} from 'events'; | |||
const goAddrToNode = addr => { | |||
// todo: support ipv6 addresses, udp, ipc, etc | |||
let parts = addr.String().split(':'); | |||
return { | |||
host: parts[0], | |||
port: parseInt(parts[1]), | |||
family: addr.Network(), | |||
}; | |||
}; | |||
export class Socket extends Duplex { | |||
constructor(options = {}) { | |||
super(options); | |||
this.options = options || {}; | |||
this._connection = null; | |||
this._local = null; | |||
this._remote = null; | |||
this._connecting = false; | |||
this.on('ready', () => { | |||
this._connecting = false; | |||
this._local = goAddrToNode(this._connection.LocalAddr()); | |||
this._remote = goAddrToNode(this._connection.RemoteAddr()); | |||
}); | |||
} | |||
_read(size = null) { | |||
if(!this._connection) { | |||
return; | |||
} | |||
let result = this._connection.Read(size); | |||
let wait = 1; | |||
let check = () => { | |||
if(result.Ready()) { | |||
let data = result.Value(); | |||
if(data !== null && data.length) { | |||
this.push(data); | |||
} else { | |||
this.push(null); | |||
} | |||
} else { | |||
if(wait < 64) { | |||
wait *= 2; | |||
} | |||
setTimeout(check, wait); | |||
} | |||
}; | |||
check(); | |||
} | |||
_write(buffer, encoding, callback) { | |||
if(!this._connection) { | |||
callback(Error('not connected')); | |||
return; | |||
} | |||
let err = null; | |||
try { | |||
this._connection.Write(buffer); | |||
} catch(e) { | |||
err = e; | |||
} finally { | |||
callback(err); | |||
} | |||
} | |||
async connect(options, listener = null) { | |||
// todo: support ipc | |||
// todo: udp is defined in Node's dgram module | |||
if(listener !== null) { | |||
this.once('connect', listener); | |||
} | |||
let host = options.host || 'localhost'; | |||
let port = options.port; | |||
if(!port) { | |||
throw new Error('ipc connections are unsupported'); | |||
} | |||
console.log('dialing', host + ':' + port); | |||
this._connecting = true; | |||
this._connection = internal.Dial('tcp', host + ':' + port); | |||
this.emit('connect'); | |||
this.emit('ready'); | |||
} | |||
setEncoding(encoding) { | |||
this.encoding = encoding; | |||
} | |||
_destroy(callback) { | |||
let err = null; | |||
try { | |||
this._connection.Close(); | |||
} catch(e) { | |||
err = e; | |||
console.log('error destroying', err.toString()); | |||
} finally { | |||
this._connection = null; | |||
if(callback) { | |||
callback(err); | |||
} | |||
} | |||
} | |||
// setTimeout(timeout, callback = null) {} | |||
// setNoDelay(noDelay) {} | |||
// setKeepAlive(keepAlive) {} | |||
// address() {} | |||
// unref() {} | |||
// ref() {} | |||
// | |||
// get bufferSize() {} | |||
// get bytesRead() {} | |||
// get bytesWritten() {} | |||
get connecting() { | |||
return this._connecting; | |||
} | |||
get localAddress() { | |||
if(!this._connection) { | |||
return null; | |||
} | |||
return this._local.host; | |||
} | |||
get localPort() { | |||
if(!this._connection) { | |||
return null; | |||
} | |||
return this._local.port; | |||
} | |||
get remoteAddress() { | |||
if(!this._connection) { | |||
return null; | |||
} | |||
return this._remote.host; | |||
} | |||
get remoteFamily() { | |||
if(!this._connection) { | |||
return null; | |||
} | |||
return this._remote.family; | |||
} | |||
get remotePort() { | |||
if(!this._connection) { | |||
return null; | |||
} | |||
return this._remote.port; | |||
} | |||
} | |||
export class Server extends EventEmitter { | |||
constructor(listener = null) { | |||
super(); | |||
if(listener !== null) { | |||
this.on('connection', listener); | |||
} | |||
this._server = null; | |||
} | |||
listen(port, hostname, listener = null) { | |||
if(listener !== null) { | |||
this.on('connection', listener); | |||
} | |||
let addr = hostname + ':' + port; | |||
let accept = (conn) => { | |||
let socket = new Socket(); | |||
socket._connection = conn; | |||
socket.on('end', () => { | |||
console.log('server ended'); | |||
socket.destroy(); | |||
}); | |||
socket.emit('connect'); | |||
this.emit('connection', socket); | |||
socket.emit('ready'); | |||
}; | |||
this._server = internal.Listen(accept, 'tcp4', addr); | |||
this.emit('listening'); | |||
} | |||
close(callback = null) { | |||
this._server.Close(); | |||
this.emit('close'); | |||
if(callback !== null) { | |||
callback(); | |||
} | |||
} | |||
address() { | |||
return goAddrToNode(this._server.Addr()); | |||
} | |||
getConnections(callback) {} | |||
ref() {} | |||
unref() {} | |||
get maxConnections() {} | |||
get connections() {} | |||
get listening() {} | |||
} | |||
// | |||
// export function createServer(options = null, connectionListener = null) {} | |||
// | |||
// export function connect(options, connectionListener = null) {} | |||
// | |||
// export function createConnection(options, connectionListener = null) {} | |||
// | |||
// export function isIP(input) {} | |||
// | |||
// export function isIPv4(input) {} | |||
// | |||
// export function isIPv6(input) {} | |||
`, | |||
} |
@ -0,0 +1,173 @@ | |||
package main_test | |||
import ( | |||
"crypto/sha1" | |||
"fmt" | |||
"os" | |||
"testing" | |||
"github.com/dop251/goja" | |||
"github.com/pkg/errors" | |||
"github.com/sirupsen/logrus" | |||
babel "code.dopame.me/veonik/squircy3/plugins/babel/transformer" | |||
node_compat "code.dopame.me/veonik/squircy3/plugins/node_compat" | |||
"code.dopame.me/veonik/squircy3/plugins/node_compat/internal" | |||
"code.dopame.me/veonik/squircy3/vm" | |||
) | |||
func init() { | |||
if _, err := os.Stat("../../testdata/node_modules"); os.IsNotExist(err) { | |||
panic("tests in this package require node dependencies to be installed in the testdata directory") | |||
} | |||
} | |||
func HandleRuntimeInit(vmp *vm.VM) func(*goja.Runtime) { | |||
return func(gr *goja.Runtime) { | |||
vmp.SetTransformer(nil) | |||
b, err := babel.New(gr) | |||
if err != nil { | |||
logrus.Warnln("unable to run babel init script:", err) | |||
return | |||
} | |||
vmp.SetTransformer(b.Transform) | |||
v := gr.NewObject() | |||
if err := v.Set("Sum", func(b []byte) (string, error) { | |||
return fmt.Sprintf("%x", sha1.Sum(b)), nil | |||
}); err != nil { | |||
logrus.Warnf("%s: error initializing runtime: %s", node_compat.PluginName, err) | |||
} | |||
gr.Set("sha1", v) | |||
v = gr.NewObject() | |||
if err := v.Set("Dial", internal.Dial); err != nil { | |||
logrus.Warnf("%s: error initializing runtime: %s", node_compat.PluginName, err) | |||
} | |||
if err := v.Set("Listen", func(call goja.FunctionCall) goja.Value { | |||
if len(call.Arguments) != 3 { | |||
panic(gr.NewGoError(errors.New("expected exactly 3 arguments"))) | |||
} | |||
arg0 := call.Arguments[0] | |||
fn, ok := goja.AssertFunction(arg0) | |||
if !ok { | |||
panic(gr.NewGoError(errors.New("expected argument 0 to be callable"))) | |||
} | |||
kind := call.Arguments[1].String() | |||
addr := call.Arguments[2].String() | |||
srv, err := internal.Listen(vmp, fn, kind, addr) | |||
if err != nil { | |||
panic(gr.NewGoError(err)) | |||
} | |||
return gr.ToValue(srv) | |||
}); err != nil { | |||
logrus.Warnf("%s: error initializing runtime: %s", node_compat.PluginName, err) | |||
} | |||
gr.Set("internal", v) | |||
_, err = gr.RunString(`this.global = this.global || this; | |||
require('core-js-bundle'); | |||
this.process = this.process || require('process/browser'); | |||
require('regenerator-runtime');`) | |||
if err != nil { | |||
logrus.Warnf("%s: error initializing runtime: %s", node_compat.PluginName, err) | |||
} | |||
} | |||
} | |||
var registry = vm.NewRegistry("../../testdata") | |||
func TestNodeCompat_Net(t *testing.T) { | |||
vmp, err := vm.New(registry) | |||
if err != nil { | |||
t.Fatalf("unexpected error creating VM: %s", err) | |||
} | |||
vmp.SetModule(node_compat.EventEmitter) | |||
vmp.SetModule(node_compat.ChildProcess) | |||
vmp.SetModule(node_compat.Crypto) | |||
vmp.SetModule(node_compat.Stream) | |||
vmp.SetModule(node_compat.Net) | |||
vmp.SetModule(node_compat.Http) | |||
vmp.OnRuntimeInit(HandleRuntimeInit(vmp)) | |||
if err = vmp.Start(); err != nil { | |||
t.Fatalf("unexpected error starting VM: %s", err) | |||
} | |||
res, err := vmp.RunString(` | |||
import {Socket, Server} from 'net'; | |||
const sleep = async (d) => { | |||
return new Promise(resolve => { | |||
setTimeout(() => resolve(), d); | |||
}); | |||
}; | |||
let resolve; | |||
let output = ''; | |||
let result = new Promise(_resolve => { | |||
resolve = _resolve; | |||
}); | |||
// let originalLog = console.log; | |||
// console.log = function log() { | |||
// let args = Array.from(arguments).map(arg => arg.toString()); | |||
// originalLog(args.join(' ')); | |||
// }; | |||
(async () => { | |||
var srv = new Server(); | |||
srv.listen(3333, 'localhost', async conn => { | |||
console.log('connected'); | |||
conn.on('data', data => { | |||
console.log('server received', data.toString()); | |||
}); | |||
conn.on('close', () => console.log('server side disconnected')); | |||
conn.on('end', () => { | |||
console.log('ending server connection from user code!'); | |||
srv.close(); | |||
}); | |||
conn.on('ready', () => { | |||
console.log('server: ' + conn.localAddress + ':' + conn.localPort); | |||
console.log('client: ' + conn.remoteAddress + ':' + conn.remotePort); | |||
}); | |||
conn.write('hi'); | |||
await sleep(500); | |||
conn.write('exit\n'); | |||
}); | |||
srv.on('close', () => { | |||
resolve(output); | |||
}); | |||
console.log('listening on', srv.address()); | |||
})(); | |||
(async () => { | |||
let sock = new Socket(); | |||
console.log('wot'); | |||
sock.on('data', d => { | |||
let data = d.toString(); | |||
console.log('received', data); | |||
if(data.replace(/\n$/, '') === 'exit') { | |||
sock.end('peace!'); | |||
sock.destroy(); | |||
return; | |||
} else { | |||
output += d; | |||
} | |||
}); | |||
sock.on('close', () => console.log('client side disconnected')); | |||
await sock.connect({host: 'localhost', port: 3333}); | |||
sock.write('hello there!\r\n'); | |||
console.log('wot2'); | |||
})(); | |||
result; | |||
`).Await() | |||
if err != nil { | |||
t.Fatalf("error requiring module: %s", err) | |||
} | |||
expected := "hi" | |||
if res.String() != expected { | |||
t.Fatalf("expected: %s\ngot: %s", expected, res.String()) | |||
} | |||
} |
@ -0,0 +1,142 @@ | |||
package main | |||
import ( | |||
"code.dopame.me/veonik/squircy3/vm" | |||
) | |||
// Module Stream is based on stream-browserify and relies on readable-stream. | |||
// See https://github.com/browserify/stream-browserify/blob/v3.0.0/index.js | |||
var Stream = &vm.Module{ | |||
Name: "stream", | |||
Main: "index.js", | |||
Path: "stream", | |||
Body: `// Copyright Joyent, Inc. and other Node contributors. | |||
// | |||
// Permission is hereby granted, free of charge, to any person obtaining a | |||
// copy of this software and associated documentation files (the | |||
// "Software"), to deal in the Software without restriction, including | |||
// without limitation the rights to use, copy, modify, merge, publish, | |||
// distribute, sublicense, and/or sell copies of the Software, and to permit | |||
// persons to whom the Software is furnished to do so, subject to the | |||
// following conditions: | |||
// | |||
// The above copyright notice and this permission notice shall be included | |||
// in all copies or substantial portions of the Software. | |||
// | |||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | |||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | |||
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN | |||
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, | |||
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR | |||
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE | |||
// USE OR OTHER DEALINGS IN THE SOFTWARE. | |||
module.exports = Stream; | |||
var EE = require('events').EventEmitter; | |||
var inherits = require('inherits'); | |||
inherits(Stream, EE); | |||
Stream.Readable = require('readable-stream/lib/_stream_readable.js'); | |||
Stream.Writable = require('readable-stream/lib/_stream_writable.js'); | |||
Stream.Duplex = require('readable-stream/lib/_stream_duplex.js'); | |||
Stream.Transform = require('readable-stream/lib/_stream_transform.js'); | |||
Stream.PassThrough = require('readable-stream/lib/_stream_passthrough.js'); | |||
Stream.finished = require('readable-stream/lib/internal/streams/end-of-Stream.js') | |||
Stream.pipeline = require('readable-stream/lib/internal/streams/pipeline.js') | |||
// Backwards-compat with node 0.4.x | |||
Stream.Stream = Stream; | |||
// old-style streams. Note that the pipe method (the only relevant | |||
// part of this class) is overridden in the Readable class. | |||
function Stream() { | |||
EE.call(this); | |||
} | |||
Stream.prototype.pipe = function(dest, options) { | |||
var source = this; | |||
function ondata(chunk) { | |||
if (dest.writable) { | |||
if (false === dest.write(chunk) && source.pause) { | |||
source.pause(); | |||
} | |||
} | |||
} | |||
source.on('data', ondata); | |||
function ondrain() { | |||
if (source.readable && source.resume) { | |||
source.resume(); | |||
} | |||
} | |||
dest.on('drain', ondrain); | |||
// If the 'end' option is not supplied, dest.end() will be called when | |||
// source gets the 'end' or 'close' events. Only dest.end() once. | |||
if (!dest._isStdio && (!options || options.end !== false)) { | |||
source.on('end', onend); | |||
source.on('close', onclose); | |||
} | |||
var didOnEnd = false; | |||
function onend() { | |||
if (didOnEnd) return; | |||
didOnEnd = true; | |||
dest.end(); | |||
} | |||
function onclose() { | |||
if (didOnEnd) return; | |||
didOnEnd = true; | |||
if (typeof dest.destroy === 'function') dest.destroy(); | |||
} | |||
// don't leave dangling pipes when there are errors. | |||
function onerror(er) { | |||
cleanup(); | |||
if (EE.listenerCount(this, 'error') === 0) { | |||
throw er; // Unhandled Stream error in pipe. | |||
} | |||
} | |||
source.on('error', onerror); | |||
dest.on('error', onerror); | |||
// remove all the event listeners that were added. | |||
function cleanup() { | |||
source.removeListener('data', ondata); | |||
dest.removeListener('drain', ondrain); | |||
source.removeListener('end', onend); | |||
source.removeListener('close', onclose); | |||
source.removeListener('error', onerror); | |||
dest.removeListener('error', onerror); | |||
source.removeListener('end', cleanup); | |||
source.removeListener('close', cleanup); | |||
dest.removeListener('close', cleanup); | |||
} | |||
source.on('end', cleanup); | |||
source.on('close', cleanup); | |||
dest.on('close', cleanup); | |||
dest.emit('pipe', source); | |||
// Allow for unix-like usage: A.pipe(B).pipe(C) | |||
return dest; | |||
};`, | |||
} |
@ -0,0 +1,15 @@ | |||
{ | |||
"private": true, | |||
"dependencies": { | |||
"@babel/standalone": "^7.5.5", | |||
"assert": "^2.0.0", | |||
"assert-polyfill": "^0.0.0", | |||
"buffer": "^5.2.1", | |||
"core-js-bundle": "^3.1.4", | |||
"error-polyfill": "^0.1.2", | |||
"process": "^0.11.10", | |||
"readable-stream": "^3.6.0", | |||
"regenerator-runtime": "^0.13.3", | |||
"regenerator-transform": "^0.14.1" | |||
} | |||
} |
@ -0,0 +1,354 @@ | |||
package vm | |||
import ( | |||
"fmt" | |||
"math/rand" | |||
"regexp" | |||
"time" | |||
"github.com/dop251/goja" | |||
"github.com/pkg/errors" | |||
"github.com/sirupsen/logrus" | |||
) | |||
var ErrExecutionCancelled = errors.New("execution cancelled") | |||
// A Result is the output from executing synchronous code on a VM. | |||
type Result struct { | |||
// Closed when the result is ready. Read from this channel to detect when | |||
// the result has been populated and is safe to inspect. | |||
Ready chan struct{} | |||
// Error associated with the result, if any. Only read from this after | |||
// the result is ready. | |||
Error error | |||
// Value associated with the result if there is no error. Only read from | |||
// this after the result is ready. | |||
Value goja.Value | |||
// vmdone is a copy of the VM's done channel at the time Run* is called. | |||
// This removes the need to synchronize when reading from the channel | |||
// since the copy is made while the VM is locked. | |||
vmdone chan struct{} | |||
// cancel is closed to signal that the result is no longer needed. | |||
cancel chan struct{} | |||
} | |||
// resolve populates the result with the given value or error and signals ready. | |||
func newResult(vmdone chan struct{}) *Result { | |||
r := &Result{Ready: make(chan struct{}), cancel: make(chan struct{}), vmdone: vmdone} | |||
go func() { | |||
for { | |||
select { | |||
case <-r.Ready: | |||
// close the cancel channel if we need to | |||
r.Cancel() | |||
return | |||
case <-r.cancel: | |||
// signal to cancel received, resolve with an error | |||
r.resolve(nil, ErrExecutionCancelled) | |||
case <-r.vmdone: | |||
// VM shutdown without resolving, cancel execution | |||
r.Cancel() | |||
} | |||
} | |||
}() | |||
return r | |||
} | |||
// resolve populates the result with the given value or error and signals ready. | |||
func (r *Result) resolve(v goja.Value, err error) { | |||
select { | |||
case <-r.Ready: | |||
fmt.Println("resolve called on already finished Result") | |||
default: | |||
r.Error = err | |||
r.Value = v | |||
close(r.Ready) | |||
} | |||
} | |||
// Await blocks until the result is ready and returns the result or error. | |||
func (r *Result) Await() (goja.Value, error) { | |||
<-r.Ready | |||
return r.Value, r.Error | |||
} | |||
// Cancel the result to halt execution. | |||
func (r *Result) Cancel() { | |||
select { | |||
case <-r.cancel: | |||
// already cancelled, don't bother | |||
default: | |||
close(r.cancel) | |||
} | |||
} | |||
// runFunc is a proxy for the Do method on a VM. | |||
type runFunc func(func(*goja.Runtime)) | |||
// AsyncResult handles invocations of asynchronous code that returns promises. | |||
// An AsyncResult accepts any goja.Value; non-promises are supported so this | |||
// is safe (if maybe a bit inefficient) to wrap all results produced by using | |||
// one of the Run* methods on a VM. | |||
type AsyncResult struct { | |||
// Closed when the result is ready. Read from this channel to detect when | |||
// the result has been populated and is safe to inspect. | |||
Ready chan struct{} | |||
// Error associated with the result, if any. Only read from this after | |||
// the result is ready. | |||
Error error | |||
// Value associated with the result if there is no error. Only read from | |||
// this after the result is ready. | |||
Value goja.Value | |||
// syncResult contains the original synchronous result. | |||
// Its value may contain a Promise although other types are also handled. | |||
syncResult *Result | |||
// Unique javascript variable containing the result. | |||
stateVar string | |||
// vmdone is a copy of the VM's done channel at the time Run* is called. | |||
// This removes the need to synchronize when reading from the channel | |||
// since the copy is made while the VM is locked. | |||
vmdone chan struct{} | |||
// vmdo will be a pointer to the run method on a scheduler. | |||
// This isn't strictly necessary (a pointer to VM would be fine) but | |||
// this forced indirection reduces the chance of a Result trying to do | |||
// something it shouldn't. | |||
vmdo runFunc | |||
// cancel is closed to signal that the result is no longer needed. | |||
cancel chan struct{} | |||
// waiting is initialized in the run method and used to synchronize | |||
// the result-ready check. | |||
waiting chan struct{} | |||
// done is closed when the run method returns. | |||
// The goroutine spawned by newAsyncResult waits to return until this | |||
// channel is closed. | |||
done chan struct{} | |||
} | |||
func uniqueResultIdentifier() string { | |||
const chars = "abcdefghijklmnopqrstuvwxyzABZDEFGHIJKLMNOPQRSTUVWXYZ12345678890" | |||
b := make([]byte, 20) | |||
for i := range b { | |||
b[i] = chars[rand.Intn(len(chars))] | |||
} | |||
return "____squircy3_await_" + string(b) | |||
} | |||
func newAsyncResult(sr *Result, vmdone chan struct{}, vmdo runFunc) *AsyncResult { | |||
r := &AsyncResult{ | |||
Ready: make(chan struct{}), | |||
syncResult: sr, | |||
stateVar: uniqueResultIdentifier(), | |||
vmdo: vmdo, | |||
vmdone: vmdone, | |||
cancel: make(chan struct{}), | |||
done: make(chan struct{}), | |||
} | |||
go func() { | |||
// wait until the original Result is ready | |||
<-sr.Ready | |||
if sr.Error != nil { | |||
// already have an error, resolve immediately | |||
r.resolve(sr.Value, sr.Error) | |||
return | |||
} | |||
// schedule the job to resolve the value | |||
r.vmdo(r.run) | |||
// note that the scheduler may have been stopped since the | |||
// original Result was ready. | |||
// block until the result is cancelled, the VM is shut down, | |||
// or the result is ready. | |||
select { | |||
case <-r.cancel: | |||
r.resolve(nil, ErrExecutionCancelled) | |||
return | |||
case <-r.vmdone: | |||
r.resolve(nil, ErrExecutionCancelled) | |||
return | |||
case <-r.done: | |||
// carry on | |||
} | |||
}() | |||
return r | |||
} | |||
// resolve populates the result with the given value or error and signals ready. | |||
func (r *AsyncResult) resolve(v goja.Value, err error) { | |||
select { | |||
case <-r.Ready: | |||
logrus.Warnln("resolve called on already finished Result") | |||
default: | |||
r.Error = err | |||
r.Value = v | |||
close(r.Ready) | |||
r.vmdo(r.cleanup) | |||
} | |||
} | |||
// Await blocks until the result is ready and returns the result or error. | |||
func (r *AsyncResult) Await() (goja.Value, error) { | |||
<-r.Ready | |||
return r.Value, r.Error | |||
} | |||
// Cancel the result to halt execution. | |||
func (r *AsyncResult) Cancel() { | |||
select { | |||
case <-r.cancel: | |||
// already cancelled, don't bother | |||
default: | |||
close(r.cancel) | |||
} | |||
} | |||
func getOrCreateResultHandler(gr *goja.Runtime) goja.Callable { | |||
if v := gr.Get("_squircy_handle_result"); v != nil { | |||
fn, _ := goja.AssertFunction(v) | |||
return fn | |||
} | |||
// this script handles the resolution of the Promise if the value is a | |||
// Promise, sets the error if the value is an Error, or sets the result | |||
// as the value for anything else. | |||
v, err := gr.RunString(` | |||
var handler = function(state) { | |||
if(typeof Promise !== 'undefined' && state.value instanceof Promise) { | |||
state.value | |||
.then(function(result) { state.result = result; }) | |||
.catch(function(error) { state.error = error; }) | |||
.finally(function() { state.done = true; }); | |||
} else if(state.error instanceof Error) { | |||
state.error = state.value; | |||
state.done = true; | |||
} else { | |||
state.result = state.value; | |||
state.done = true; | |||
} | |||
} | |||
handler;`) | |||
if err != nil { | |||
logrus.Warnln("unable to set async result handler:", err.Error()) | |||
return nil | |||
} | |||
gr.Set("____squircy3_handle_result", v) | |||
fn, _ := goja.AssertFunction(v) | |||
return fn | |||
} | |||
func (r *AsyncResult) run(gr *goja.Runtime) { | |||
defer func() { | |||
select { | |||
case <-r.done: | |||
// already closed | |||
default: | |||
close(r.done) | |||
} | |||
}() | |||
o := gr.NewObject() | |||
_ = o.Set("value", r.syncResult.Value) | |||
_ = o.Set("result", goja.Undefined()) | |||
_ = o.Set("error", goja.Undefined()) | |||
_ = o.Set("done", false) | |||
gr.Set(r.stateVar, o) | |||
hr := getOrCreateResultHandler(gr) | |||
if hr == nil { | |||
r.resolve(nil, errors.New("unable to get result handler")) | |||
return | |||
} | |||
_, err := hr(nil, gr.Get(r.stateVar)) | |||
if err != nil { | |||
r.resolve(nil, err) | |||
return | |||
} | |||
go r.loop() | |||
} | |||
func (r *AsyncResult) cleanup(gr *goja.Runtime) { | |||
gr.Set(r.stateVar, goja.Undefined()) | |||
} | |||
func (r *AsyncResult) loop() { | |||
defer func() { | |||
select { | |||
case <-r.Ready: | |||
// already closed, no need to close it again | |||
return | |||
default: | |||
close(r.Ready) | |||
} | |||
}() |