使用httpClient可模拟请求Url获取资源,使用单线程的请求速度上会有一定的限制,参考了Apache给出的例子,自己做了测试实现多线程并发请求,以下代码需要HttpClient 4.2的包,可以在下载
1、并发请求
package generate.httpclient;import java.util.List;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import org.apache.http.HttpEntity;import org.apache.http.HttpResponse;import org.apache.http.client.HttpClient;import org.apache.http.client.methods.HttpGet;import org.apache.http.conn.ClientConnectionManager;import org.apache.http.conn.params.ConnManagerParams;import org.apache.http.conn.scheme.PlainSocketFactory;import org.apache.http.conn.scheme.Scheme;import org.apache.http.conn.scheme.SchemeRegistry;import org.apache.http.impl.client.DefaultHttpClient;import org.apache.http.impl.conn.PoolingClientConnectionManager;import org.apache.http.params.BasicHttpParams;import org.apache.http.params.HttpConnectionParams;import org.apache.http.params.HttpParams;import org.apache.http.protocol.BasicHttpContext;import org.apache.http.protocol.HttpContext;import org.apache.http.util.EntityUtils;public class ThreadPoolHttpClient { // 线程池 private ExecutorService exe = null; // 线程池的容量 private static final int POOL_SIZE = 20; private HttpClient client = null; String[] urls=null; public ThreadPoolHttpClient(String[] urls){ this.urls=urls; } public void test() throws Exception { exe = Executors.newFixedThreadPool(POOL_SIZE); HttpParams params =new BasicHttpParams(); /* 从连接池中取连接的超时时间 */ ConnManagerParams.setTimeout(params, 1000); /* 连接超时 */ HttpConnectionParams.setConnectionTimeout(params, 2000); /* 请求超时 */ HttpConnectionParams.setSoTimeout(params, 4000); SchemeRegistry schemeRegistry = new SchemeRegistry(); schemeRegistry.register( new Scheme("http", 80, PlainSocketFactory.getSocketFactory())); //ClientConnectionManager cm = new PoolingClientConnectionManager(schemeRegistry); PoolingClientConnectionManager cm=new PoolingClientConnectionManager(schemeRegistry); cm.setMaxTotal(10); final HttpClient httpClient = new DefaultHttpClient(cm,params); // URIs to perform GETs on final String[] urisToGet = urls; /* 有多少url创建多少线程,url多时机子撑不住 // create a thread for each URI GetThread[] threads = new GetThread[urisToGet.length]; for (int i = 0; i < threads.length; i++) { HttpGet httpget = new HttpGet(urisToGet[i]); threads[i] = new GetThread(httpClient, httpget); } // start the threads for (int j = 0; j < threads.length; j++) { threads[j].start(); } // join the threads,等待所有请求完成 for (int j = 0; j < threads.length; j++) { threads[j].join(); } 使用线程池*/ for (int i = 0; i < urisToGet.length; i++) { final int j=i; System.out.println(j); HttpGet httpget = new HttpGet(urisToGet[i]); exe.execute( new GetThread(httpClient, httpget)); } //创建线程池,每次调用POOL_SIZE /* for (int i = 0; i < urisToGet.length; i++) { final int j=i; System.out.println(j); exe.execute(new Thread() { @Override public void run() { this.setName("threadsPoolClient"+j); try { this.sleep(100); System.out.println(j); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } HttpGet httpget = new HttpGet(urisToGet[j]); new GetThread(httpClient, httpget).get(); } }); } */ //exe.shutdown(); System.out.println("Done"); } static class GetThread extends Thread{ private final HttpClient httpClient; private final HttpContext context; private final HttpGet httpget; public GetThread(HttpClient httpClient, HttpGet httpget) { this.httpClient = httpClient; this.context = new BasicHttpContext(); this.httpget = httpget; } @Override public void run(){ this.setName("threadsPoolClient"); try { Thread.sleep(5000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } get(); } public void get() { try { HttpResponse response = this.httpClient.execute(this.httpget, this.context); HttpEntity entity = response.getEntity(); if (entity != null) { System.out.println(this.httpget.getURI()+": status"+response.getStatusLine().toString()); } // ensure the connection gets released to the manager EntityUtils.consume(entity); } catch (Exception ex) { this.httpget.abort(); }finally{ httpget.releaseConnection(); } } }}
2、多线程异步请求
package generate.httpclient;import java.io.ByteArrayOutputStream;import java.io.IOException;import java.io.InputStream;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.concurrent.CountDownLatch;import org.apache.http.HttpResponse;import org.apache.http.client.methods.HttpGet;import org.apache.http.concurrent.FutureCallback;import org.apache.http.impl.nio.client.DefaultHttpAsyncClient;import org.apache.http.nio.client.HttpAsyncClient;import org.apache.http.nio.reactor.IOReactorException;public class AsynClient{ /** * @param args * @throws IOReactorException * @throws InterruptedException */ private Listurls; private HandlerFailThread failHandler; public AsynClient(List list){ failHandler=new HandlerFailThread(); urls=list; } public Map asynGet() throws IOReactorException, InterruptedException { final HttpAsyncClient httpclient = new DefaultHttpAsyncClient(); httpclient.start(); int urlLength=urls.size(); HttpGet[] requests = new HttpGet[urlLength]; int i=0; for(String url : urls){ requests[i]=new HttpGet(url); i++; } final CountDownLatch latch = new CountDownLatch(requests.length); final Map responseMap=new HashMap (); try { for (final HttpGet request : requests) { httpclient.execute(request, new FutureCallback () { public void completed(final HttpResponse response) { latch.countDown(); responseMap.put(request.getURI().toString(), response.getStatusLine().toString()); try { System.out.println(request.getRequestLine() + "->" + response.getStatusLine()+"->"); //+readInputStream(response.getEntity().getContent()) } catch (IllegalStateException e) { failHandler.putFailUrl(request.getURI().toString(), response.getStatusLine().toString()); e.printStackTrace(); } catch (Exception e) { failHandler.putFailUrl(request.getURI().toString(), response.getStatusLine().toString()); e.printStackTrace(); } } public void failed(final Exception ex) { latch.countDown(); ex.printStackTrace(); failHandler.putFailUrl(request.getURI().toString(), ex.getMessage()); } public void cancelled() { latch.countDown(); } }); } System.out.println("Doing..."); } finally { latch.await(); httpclient.shutdown(); } System.out.println("Done"); failHandler.printFailUrl(); return responseMap; } private String readInputStream(InputStream input) throws IOException{ byte[] buffer = new byte[128]; int len = 0; ByteArrayOutputStream bytes = new ByteArrayOutputStream(); while((len = input.read(buffer)) >= 0) { bytes.write(buffer, 0, len); } return bytes.toString(); } /** * Test * @param args */ public static void main(String[] args) { List urls=new ArrayList (); urls.add("http://127.0.0.1/examples/servlets/"); urls.add("http://127.0.0.1/examples/servlets/"); urls.add("http://127.0.0.1/examples/servlets/"); for(int i=0;i<10;i++){ urls.addAll(urls); } System.out.println(urls.size()); AsynClient client=new AsynClient(urls); try { client.asynGet(); } catch (IOReactorException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("done"); }}
3、创建一个线程记录失败的请求
package generate.httpclient;import java.util.HashMap;import java.util.Map;public class HandlerFailThread extends Thread{ MapfailUrl=new HashMap (); public void putFailUrl(String url,String status){ synchronized (failUrl) { failUrl.put(url,status); } } @Override public void run() { while(true){ } } public void printFailUrl(){ for(Map.Entry m: failUrl.entrySet()){ System.out.println("****fail:url:"+m.getKey()+ " code :"+m.getValue()); } }}
异步请求,也可通过pool管理,例如
ConnectingIOReactor nio=new DefaultConnectingIOReactor();
PoolingClientAsyncConnectionManager manager=new PoolingClientAsyncConnectionManager(nio); manager.setMaxTotal(1000); manager.setDefaultMaxPerRoute(100); HttpParams params=new BasicHttpParams(); /* 连接超时 */ HttpConnectionParams.setConnectionTimeout(params, 10000); /* 请求超时 */ HttpConnectionParams.setSoTimeout(params, 60*1000); DefaultHttpAsyncClient.setDefaultHttpParams(params); final HttpAsyncClient httpclient = new DefaultHttpAsyncClient(manager); httpclient.start();
HttpClient相关可参看,里面有很多说明与例子