-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathstreamer.js
More file actions
executable file
·103 lines (90 loc) · 2.76 KB
/
streamer.js
File metadata and controls
executable file
·103 lines (90 loc) · 2.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/**
* Module dependencies.
*/
var config = require('./config');
var io = require('socket.io').listen(config.ws_port);
var sys = require('util');
var TwitterStreamClient = require(config.twitter_stream_client).TwitterStreamClient;
var dynode = require('dynode');
var redis = require('redis');
var redis_host = config.redis_host;
var redis_port = config.redis_port;
var dynamodb = new dynode.Client({
'region': config.aws_region,
'accessKeyId': config.aws_access_key,
'secretAccessKey': config.aws_secret_key
});
var publisher = redis.createClient(redis_port, redis_host);
var twitterStreamClients = {};
var publicStreamClients = {};
var pubStreamUrl = "https://stream.twitter.com/1/statuses/sample.json";
var userStreamUrl = "https://userstream.twitter.com/2/user.json"
io.sockets.on('connection', function(socket){
socket.emit('who', {});
socket.on('i am', function (userId){
console.log('subscribing to user update channel ' + userId);
socket.subscriber = redis.createClient(redis_port, redis_host);
socket.subscriber.subscribe(userId);
socket.subscriber.on("message", function(channel, message) {
socket.emit('feed', JSON.parse(message));
});
var client = getClient(userId);
client.init.call(client);
});
socket.on('setPubStreamRate', function (args){
console.log('setting pubstream rate for user' + args.user);
if(args.user === undefined || args.rate === undefined){
console.error("Invalid args: " + args);
return;
}
var client = getClient(args.user);
client.setRate.call(client, args.rate);
});
socket.on('disconnect', function(){
if(socket.subscriber != null){
socket.subscriber.unsubscribe();
socket.subscriber.end();
}
});
});
function getClient(user) {
var client = publicStreamClients[user];
if(client == null){
client = createClient(user)
publicStreamClients[user] = client;
}
return client;
}
function createClient(user){
return new TwitterStreamClient(user, pubStreamUrl, storeFeed);
}
function storeFeed(user, feed){
console.log(feed);
var writeRequests = {};
writeRequests[config.user_feeds_table] = [
{put : {id: user, time: feed.time, messageId: feed.id }}
];
writeRequests[config.feeds_table] = [
{put : feed}
];
dynamodb.batchWriteItem(writeRequests, function(error, meta){
if(error){
console.error(error);
feed['error'] = 1;
}
notifyUpdate(user, feed);
console.log(meta);
});
}
function getTwitterToken(user, callback){
dynamodb.getItem(config.user_data_table, user, {AttributesToGet: ['twitter_token', 'twitter_token_secret']}, function(obj, data, meta){
if(data != null){
callback(data);
} else {
console.error(user + " does not has authenticated with Twittter");
}
});
}
function notifyUpdate(user, feed){
publisher.publish(user, JSON.stringify(feed));
}