Sunday, 12 March 2017

Turn a net socket into an observable

I have a net tcp socket node.js section of code that I would like to convert from using a callback to rx.

I looks like this in feed.js module:

var net = require('net');
var server = net.createServer(function(socket) {
...

    // Handle incoming messages from clients.
    socket.on('data', function (data) {
        broadcast(data, socket);
    });

...
}

function broadcast(message, sender)
{  
   ...
   onChangeHandler(stock.symbol, 'stock', stock);
}

function start(onChange) {
    onChangeHandler = onChange;
}

exports.start = start;
server.listen(....);

Then the client of the above call registers a callback:

feed.start(function(room, type, message) {
   //...Do something with the message
});

I would like to convert this to use an Rx Observable/Observer. I see that there is a way to make an observable stream from the web socket (although it uses a bidirectional Subject which I don't need):

fromWebSocket(address, protocol) {
    var ws = new WebSocket(address, protocol);

    // Handle the data
    var osbervable = Rx.Observable.create (function (obs) {
        // Handle messages  
        ws.onmessage = obs.onNext.bind(obs);
        ws.onerror = obs.onError.bind(obs);
        ws.onclose = obs.onCompleted.bind(obs);

        // Return way to unsubscribe
        return ws.close.bind(ws);
    });

    var observer = Rx.Observer.create(function (data) {
        if (ws.readyState === WebSocket.OPEN) { ws.send(data); }
    });

    return Rx.Subject.create(observer, observable);
}

var socketSubject = fromWebSocket('ws://localhost:9999', 'sampleProtocol');

// Receive data
socketSubject.subscribe(
    function (data) {
        // Do something with the data
    },
    function (error) {
        // Do something with the error
    },
    function () {
        // Do something on completion
    });

// Send data
socketSubject.onNext(42);

What is the equivalent for a net socket? If there is a standard library to use that is ok.



via Ivan

No comments:

Post a Comment