I have a net
tcp
socket node.js
section of code that I would like to convert from using a callback
to rx
.
I looks like this in feed.js
module:
var net = require('net');
var server = net.createServer(function(socket) {
...
// Handle incoming messages from clients.
socket.on('data', function (data) {
broadcast(data, socket);
});
...
}
function broadcast(message, sender)
{
...
onChangeHandler(stock.symbol, 'stock', stock);
}
function start(onChange) {
onChangeHandler = onChange;
}
exports.start = start;
server.listen(....);
Then the client of the above call registers a callback:
feed.start(function(room, type, message) {
//...Do something with the message
});
I would like to convert this to use an Rx
Observable/Observer
. I see that there is a way to make an observable stream from the web socket
(although it uses a bidirectional Subject
which I don't need):
fromWebSocket(address, protocol) {
var ws = new WebSocket(address, protocol);
// Handle the data
var osbervable = Rx.Observable.create (function (obs) {
// Handle messages
ws.onmessage = obs.onNext.bind(obs);
ws.onerror = obs.onError.bind(obs);
ws.onclose = obs.onCompleted.bind(obs);
// Return way to unsubscribe
return ws.close.bind(ws);
});
var observer = Rx.Observer.create(function (data) {
if (ws.readyState === WebSocket.OPEN) { ws.send(data); }
});
return Rx.Subject.create(observer, observable);
}
var socketSubject = fromWebSocket('ws://localhost:9999', 'sampleProtocol');
// Receive data
socketSubject.subscribe(
function (data) {
// Do something with the data
},
function (error) {
// Do something with the error
},
function () {
// Do something on completion
});
// Send data
socketSubject.onNext(42);
What is the equivalent for a net
socket? If there is a standard library to use that is ok.
via Ivan
No comments:
Post a Comment