Free Source Code – Twitter Database Server: parse_tweets.php
Twitter Database Server
Download
Install
Code Architecture
MySQL Database Schema
Phirehose Library
140dev_config.php
db_config.php
db_lib.php
db_test.php
get_tweets.php
monitor_tweets.php
parse_tweets.php
Twitter Display
Parse_tweets.php processes each new tweet that is added to the json_cache
table by get_tweets.php. The data for these tweets is extracted from the raw_tweet
field in the json_cache
table and inserted into the tables: tweets
, tweet_tags
, tweet_urls
, tweet_mentions
, and users
. If a user is already in the users
table, the user’s data is updated with the values in the most recent tweet.
The entities object in each tweet payload contains data on every @mention, tag, and URL in the tweet. This object is parsed and the individual @mentions, tags, and URLs are stored in the tweet_mentions
, tweet_tags
, and tweet_urls
tables. These tables can be data mined to gather statistics such as most frequently used tags and urls.
The program needs to run continuously to process all the tweets as quickly as possible, so they will be available for the rest of the 140dev framework. To make this possible it should be run as a background process. On *nix servers this is done with:
nohup php ./parse_tweets.php > /dev/null &
The details behind this command are described in the install instructions.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | <?php /** * parse_tweets.php * Populate the database with new tweet data from the json_cache table * Latest copy of this code: http://140dev.com/free-twitter-api-source-code-library/ * @author Adam Green <140dev@gmail.com> * @license GNU Public License * @version BETA 0.30 */ require_once('140dev_config.php'); require_once('db_lib.php'); $oDB = new db; // This should run continuously as a background process while (true) { // Process all new tweets $query = 'SELECT cache_id, raw_tweet ' . 'FROM json_cache'; $result = $oDB->select($query); while($row = mysqli_fetch_assoc($result)) { $cache_id = $row['cache_id']; // Each JSON payload for a tweet from the API was stored in the database // by serializing it as text and saving it as base64 raw data $tweet_object = unserialize(base64_decode($row['raw_tweet'])); // Delete cached copy of tweet $oDB->select("DELETE FROM json_cache WHERE cache_id = $cache_id"); // Limit tweets to a single language, // such as 'en' for English if ($tweet_object->lang <> 'en') {continue;} // The streaming API sometimes sends duplicates, // Test the tweet_id before inserting $tweet_id = $tweet_object->id_str; if ($oDB->in_table('tweets','tweet_id=' . $tweet_id )) {continue;} // Gather tweet data from the JSON object // $oDB->escape() escapes ' and " characters, and blocks characters that // could be used in a SQL injection attempt if (isset($tweet_object->retweeted_status)) { // This is a retweet // Use the original tweet's entities, they are more complete $entities = $tweet_object->retweeted_status->entities; $is_rt = 1; } else { $entities = $tweet_object->entities; $is_rt = 0; } $tweet_text = $oDB->escape($tweet_object->text); $created_at = $oDB->date($tweet_object->created_at); if (isset($tweet_object->geo)) { $geo_lat = $tweet_object->geo->coordinates[0]; $geo_long = $tweet_object->geo->coordinates[1]; } else { $geo_lat = $geo_long = 0; } $user_object = $tweet_object->user; $user_id = $user_object->id_str; $screen_name = $oDB->escape($user_object->screen_name); $name = $oDB->escape($user_object->name); $profile_image_url = $user_object->profile_image_url; // Add a new user row or update an existing one $field_values = 'screen_name = "' . $screen_name . '", ' . 'profile_image_url = "' . $profile_image_url . '", ' . 'user_id = ' . $user_id . ', ' . 'name = "' . $name . '", ' . 'location = "' . $oDB->escape($user_object->location) . '", ' . 'url = "' . $user_object->url . '", ' . 'description = "' . $oDB->escape($user_object->description) . '", ' . 'created_at = "' . $oDB->date($user_object->created_at) . '", ' . 'followers_count = ' . $user_object->followers_count . ', ' . 'friends_count = ' . $user_object->friends_count . ', ' . 'statuses_count = ' . $user_object->statuses_count . ', ' . 'time_zone = "' . $user_object->time_zone . '", ' . 'last_update = "' . $oDB->date($tweet_object->created_at) . '"' ; if ($oDB->in_table('users','user_id="' . $user_id . '"')) { $oDB->update('users',$field_values,'user_id = "' .$user_id . '"'); } else { $oDB->insert('users',$field_values); } // Add the new tweet $field_values = 'tweet_id = ' . $tweet_id . ', ' . 'tweet_text = "' . $tweet_text . '", ' . 'created_at = "' . $created_at . '", ' . 'geo_lat = ' . $geo_lat . ', ' . 'geo_long = ' . $geo_long . ', ' . 'user_id = ' . $user_id . ', ' . 'screen_name = "' . $screen_name . '", ' . 'name = "' . $name . '", ' . 'profile_image_url = "' . $profile_image_url . '", ' . 'is_rt = ' . $is_rt; $oDB->insert('tweets',$field_values); // The mentions, tags, and URLs from the entities object are also // parsed into separate tables so they can be data mined later foreach ($entities->user_mentions as $user_mention) { $where = 'tweet_id=' . $tweet_id . ' ' . 'AND source_user_id=' . $user_id . ' ' . 'AND target_user_id=' . $user_mention->id; if(! $oDB->in_table('tweet_mentions',$where)) { $field_values = 'tweet_id=' . $tweet_id . ', ' . 'source_user_id=' . $user_id . ', ' . 'target_user_id=' . $user_mention->id; $oDB->insert('tweet_mentions',$field_values); } } foreach ($entities->hashtags as $hashtag) { $where = 'tweet_id=' . $tweet_id . ' ' . 'AND tag="' . $hashtag->text . '"'; if(! $oDB->in_table('tweet_tags',$where)) { $field_values = 'tweet_id=' . $tweet_id . ', ' . 'tag="' . $hashtag->text . '"'; $oDB->insert('tweet_tags',$field_values); } } foreach ($entities->urls as $url) { if (empty($url->expanded_url)) { $url = $url->url; } else { $url = $url->expanded_url; } $where = 'tweet_id=' . $tweet_id . ' ' . 'AND url="' . $url . '"'; if(! $oDB->in_table('tweet_urls',$where)) { $field_values = 'tweet_id=' . $tweet_id . ', ' . 'url="' . $url . '"'; $oDB->insert('tweet_urls',$field_values); } } } // You can adjust the sleep interval to handle the tweet flow and // server load you experience sleep(1); } ?> |
streaming_framework