root/trunk/harvester/fetch.rb

Revision 31, 5.8 kB (checked in by astro, 3 years ago)

Move HTree link absolutizer from fetch.rb to generate.rb

  • Property svn:executable set to *
Line 
1 #!/usr/bin/env ruby
2
3 require 'dbi'
4 require 'yaml'
5 require 'uri'
6 require 'net/http'
7 begin
8   require 'net/https'
9 rescue LoadError
10   $stderr.puts "WARNING: No https support!"
11 end
12 begin
13   require 'fastthread'
14 rescue LoadError
15   require 'thread'
16 end
17
18 require 'mrss'
19
20
21 config = YAML::load File.new('config.yaml')
22 timeout = config['settings']['timeout'].to_i
23 sizelimit = config['settings']['size limit'].to_i
24 dbi = DBI::connect(config['db']['driver'], config['db']['user'], config['db']['password'])
25
26 # Hack, an explicit lock would look much better
27 class << dbi
28   def transaction(*a)
29     Thread::critical = true
30     super
31     Thread::critical = false
32   end
33 end
34
35 #######################
36 # Database maintenance
37 #######################
38
39 puts "Looking for sources to purge..."
40 purge = []
41 dbi.select_all("SELECT collection, rss FROM sources") { |dbc,dbr|
42   purge << [dbc, dbr] unless (config['collections'][dbc] || []).include? dbr
43 }
44
45 purge_rss = []
46 purge.each { |c,r|
47   puts "Removing #{c}:#{r}..."
48   dbi.do "DELETE FROM sources WHERE collection=? AND rss=?", c, r
49   purge_rss << r
50 }
51
52 purge_rss.delete_if { |r|
53   purge_this = true
54
55   config['collections'].each { |cfc,cfr|
56     if purge_this
57       puts "Must keep #{r} because it's still in #{cfc}" if cfr.include? r
58       purge_this = !(cfr.include? r)
59     end
60   }
61
62   !purge_this
63 }
64 purge_rss.each { |r|
65   puts "Purging items from feed #{r}"
66   dbi.do "DELETE FROM items WHERE rss=?", r
67 }
68
69 ###########
70 # Fetching
71 ###########
72
73 maxurlsize = 0
74 config['collections'].each { |collection,rss_urls|
75   rss_urls.each { |rss_url|
76     maxurlsize = (rss_url.size > maxurlsize) ? rss_url.size : maxurlsize
77   }
78 }
79
80 dbi['AutoCommit'] = false
81 last_get_started = Time.new
82 pending = []
83 pending_lock = Mutex.new
84
85 config['collections'].each { |collection,rss_urls|
86   rss_urls.each { |rss_url|
87     pending_lock.synchronize { pending << rss_url }
88     Thread.new {
89       db_rss, last = dbi.select_one "SELECT rss, last FROM sources WHERE collection=? AND rss=?", collection, rss_url
90       is_new = db_rss.nil?
91
92       uri = URI::parse rss_url
93       logprefix = "[#{uri.to_s.ljust maxurlsize}]"
94
95       http = Net::HTTP.new uri.host, uri.port
96       http.use_ssl = (uri.kind_of? URI::HTTPS) if defined? OpenSSL
97       request = (if is_new or last.nil?
98         puts "#{logprefix} GET"
99         Net::HTTP::Get.new uri.request_uri
100       else
101         puts "#{logprefix} GET with If-Modified-Since: #{last}"
102         Net::HTTP::Get.new uri.request_uri, {'If-Modified-Since'=>last}
103       end)
104       request.basic_auth(uri.user, uri.password) if uri.user
105
106       last_get_started = Time.new
107       begin
108         response = http.request request
109       rescue
110         puts "Skipped (request error)"
111         pending_lock.synchronize { pending.delete rss_url }
112       end
113       puts "#{logprefix} #{response.code} #{response.message}"
114
115       if response.kind_of? Net::HTTPOK
116         if response.body.size > sizelimit
117           puts "#{logprefix} #{response.body.size} bytes big!"
118         else
119           begin dbi.transaction do
120             rss = MRSS::parse response.body
121
122             # Update source
123             if is_new
124               dbi.do "INSERT INTO sources (collection, rss, last, title, link, description) VALUES (?, ?, ?, ?, ?, ?)",
125                 collection, rss_url, response['Last-Modified'], rss.title, rss.link, rss.description
126               puts "#{logprefix} Source added"
127             else
128               dbi.do "UPDATE sources SET last=?, title=?, link=?, description=? WHERE collection=? AND rss=?",
129                 response['Last-Modified'], rss.title, rss.link, rss.description, collection, rss_url
130               puts "#{logprefix} Source updated"
131             end
132
133             items_new, items_updated = 0, 0
134             rss.items.each { |item|
135               description = item.description
136
137               # Link mangling
138               begin
139                 link = URI::join((rss.link.to_s == '') ? uri.to_s : rss.link.to_s, item.link || rss.link).to_s
140               rescue URI::Error
141                 link = item.link
142               end
143
144               # Push into database
145               db_title = dbi.select_one "SELECT title FROM items WHERE rss=? AND link=?", rss_url, link
146               item_is_new = db_title.nil?
147
148               if item_is_new
149                 begin
150                   dbi.do "INSERT INTO items (rss, title, link, date, description) VALUES (?, ?, ?, ?, ?)",
151                     rss_url, item.title, link, item.date, description
152                   items_new += 1
153                 rescue DBI::ProgrammingError
154                   puts description
155                   puts "#{$!.class}: #{$!}\n#{$!.backtrace.join("\n")}"
156                 end
157               else
158                 dbi.do "UPDATE items SET title=?, description=? WHERE rss=? AND link=?",
159                   item.title, description, rss_url, link
160                 items_updated += 1
161               end
162
163               # Remove all enclosures
164               dbi.do "DELETE FROM enclosures WHERE rss=? AND link=?", rss_url, link
165               # Re-add all enclosures
166               item.enclosures.each do |enclosure|
167                 href = URI::join((rss.link.to_s == '') ? link.to_s : rss.link.to_s, enclosure['href']).to_s
168                 dbi.do "INSERT INTO enclosures (rss, link, href, mime, title, length) VALUES (?, ?, ?, ?, ?, ?)",
169                   rss_url, link, href, enclosure['type'], enclosure['title'], enclosure['length']
170               end
171             }
172             puts "#{logprefix} New: #{items_new} Updated: #{items_updated}"
173           end; rescue
174             puts "#{logprefix} Error: #{$!.class}: #{$!}\n#{$!.backtrace.join("\n")}"
175           end
176         end
177       end
178
179       pending_lock.synchronize { pending.delete rss_url }
180     }
181   }
182 }
183
184 while Time.new < last_get_started + timeout and pending.size > 0
185   sleep 1
186 end
187 pending_lock.synchronize {
188   pending.each { |rss_url|
189     puts "[#{rss_url.ljust maxurlsize}] Timed out"
190   }
191 }
Note: See TracBrowser for help on using the browser.