Webmagic学习总结

WebMagic概览

WebMagic项目代码分为核心和扩展两部分。核心部分(webmagic-core)是一个精简的、模块化的爬虫实现,而扩展部分则包括一些便利的、实用性的功能。WebMagic的架构设计参照了Scrapy,目标是尽量的模块化,并体现爬虫的功能特点。

这部分提供非常简单、灵活的API,在基本不改变开发模式的情况下,编写一个爬虫。

扩展部分(webmagic-extension)提供一些便捷的功能,例如注解模式编写爬虫等。同时内置了一些常用的组件,便于爬虫开发。

另外WebMagic还包括一些外围扩展和一个正在开发的产品化项目webmagic-avalon。

核心组件

结构图

webmagic

四大组件

  • 1.Downloader:下载器
  • 2.PageProcessor:抽取器
  • 3.Scheduler:调度器
  • 4.Pipeline:结果处理器

源码分析(主类Spider)

各组件初始化及可扩展

初始化Scheduler

初始化Scheduler:(默认QueueScheduler)
protected Scheduler scheduler = new QueueScheduler();

采用新的Scheduler:

public Spider setScheduler(Scheduler scheduler) {
        checkIfRunning();
        Scheduler oldScheduler = this.scheduler;
        this.scheduler = scheduler;
        if (oldScheduler != null) {
            Request request;
            while ((request = oldScheduler.poll(this)) != null) {
            //复制原来的url到新的scheduler
                this.scheduler.push(request, this);
            }
        }
        return this;
    }

初始化Downloader

初始化Downloader:(默认HttpClientDownloader)

 protected void initComponent() {
        if (downloader == null) {
        //用户没有自定义Downloader,默认为HttpClientDownloader()
            this.downloader = new HttpClientDownloader();
        }
        if (pipelines.isEmpty()) {
        //用户没有自定义Pipeline,默认为ConsolePipeline()
            pipelines.add(new ConsolePipeline());
        }
        downloader.setThread(threadNum);
        if (threadPool == null || threadPool.isShutdown()) {
               //自定义线程池
            if (executorService != null && !executorService.isShutdown()) {
                threadPool = new CountableThreadPool(threadNum, executorService);
            } else {
                threadPool = new CountableThreadPool(threadNum);
            }
        }
        if (startRequests != null) {
            for (Request request : startRequests) {
                scheduler.push(request, this);
            }
            startRequests.clear();
        }
        startTime = new Date();
    }  

初始化Pipeline

初始化Pipeline:(默认ConsolePipeline)

初始化PageProcessor

初始化PageProcessor:(用户自定义完成,按需求抽取html)

如何实现多线程

初始化线程池

(默认Executors.newFixedThreadPool(threadNum))
Executors.newFixedThreadPool作用:创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待.

 public CountableThreadPool(int threadNum) {
        this.threadNum = threadNum;
        this.executorService = Executors.newFixedThreadPool(threadNum);
    }

多线程并发控制

public void execute(final Runnable runnable) {
        if (threadAlive.get() >= threadNum) {
            try {
                reentrantLock.lock();//同步锁  下面为保护代码块
                while (threadAlive.get() >= threadNum) {
                    try {
                        condition.await();
                    } catch (InterruptedException e) {
                    }
                }
            } finally {
                reentrantLock.unlock();
            }
        }
        threadAlive.incrementAndGet();
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    runnable.run();
                } finally {
                    try {
                        reentrantLock.lock();
                        threadAlive.decrementAndGet();
                        //线程数量减少一个时,通过signal()方法通知前面condition.await()的线程
                        condition.signal();
                    } finally {
                        reentrantLock.unlock();
                    }
                }
            }
        });
    }

Java中的ReentrantLock和synchronized两种锁定机制的对比

ReentrantLock默认情况下为不公平锁

private ReentrantLock lock = new ReentrantLock(); //参数默认false,不公平锁
private ReentrantLock lock = new ReentrantLock(true); //公平锁
try {
lock.lock(); //如果被其它资源锁定,会在此等待锁释放,达到暂停的效果
//操作
} finally {
lock.unlock();
}

不公平锁与公平锁的区别:
公平情况下,操作会排一个队按顺序执行,来保证执行顺序。(会消耗更多的时间来排队)
不公平情况下,是无序状态允许插队,jvm会自动计算如何处理更快速来调度插队。(如果不关心顺序,这个速度会更快)

AtomicInteger && CAS

AtomicInteger,一个提供原子操作的Integer的类。在Java语言中,++i和i++操作并不是线程安全的,在使用的时候,不可避免的会用到synchronized关键字。而AtomicInteger则通过一种线程安全的加减操作接口。
首先要说一下,AtomicInteger类compareAndSet通过原子操作实现了CAS操作,最底层基于汇编语言实现
CAS是Compare And Set的一个简称,如下理解:
1,已知当前内存里面的值current和预期要修改成的值new传入
2,内存中AtomicInteger对象地址对应的真实值(因为有可能别修改)real与current对比,相等表示real未被修改过,是“安全”的,将new赋给real结束然后返回;不相等说明real已经被修改,结束并重新执行1直到修改成功

程序如何终止

//while循环结束,则程序完成任务并终止
 while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {
            Request request = scheduler.poll(this);
            //当scheduler内目标URL为空时
            if (request == null) {
                //线程池中已经没有线程在运行了, exitWhenComplete默认为true
                if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {
                    break;
                }
                // wait until new url added
                waitNewUrl();
            }

HttpClient使用http连接池发送http请求

将用户设置的线程数设置为httpclient最大连接池数

public void setThread(int thread) {
        httpClientGenerator.setPoolSize(thread);
    }
 public HttpClientGenerator setPoolSize(int poolSize) {
        // 将最大连接数增加为poolSize
        connectionManager.setMaxTotal(poolSize);
        return this;
    }

URL在Scheduler中去重

将下载结果页面中的链接抽取出来并放入scheduler中

 public void push(Request request, Task task) {
        logger.trace("get a candidate url {}", request.getUrl());
        if (!duplicatedRemover.isDuplicate(request, task) || shouldReserved(request)) {
            logger.debug("push to queue {}", request.getUrl());
            pushWhenNoDuplicate(request, task);
        }
    }

redischedulerURL去重复

boolean isDuplicate = jedis.sismember(getSetKey(task), request.getUrl());
//获得key值
protected String getSetKey(Task task) {
        return SET_PREFIX + task.getUUID();
    }
//生成唯一的UUID
public String getUUID() {
        if (uuid != null) {
            return uuid;
        }
        if (site != null) {
            return site.getDomain();
        }
        uuid = UUID.randomUUID().toString();
        return uuid;
    }


//RedisScheduler初始化方式,传入Redis的ip地址即可
public RedisScheduler(String host) {
//JedisPool使用JedisPoolConfig中默认的参数进行初始化
        this(new JedisPool(new JedisPoolConfig(), host));
    }

    public RedisScheduler(JedisPool pool) {
        this.pool = pool;
        setDuplicateRemover(this);
    }

RedisScheduler 中判断url是否重复的方法,因为一个Spider就是对应只有一个UUID,故上述的判断则是:判断当前的url是否是uuid集合的元素

System.out.println(jedis.sismember(“sname”, “minxr”));// 判断 minxr是否是sname集合的元素

bloomFilter URL去重复

boolean isDuplicate = bloomFilter.mightContain(getUrl(request));

Bloom-Filter,即布隆过滤器,1970年由Bloom中提出。它可以用于检索一个元素是否在一个集合中。
Bloom Filter(BF)是一种空间效率很高的随机数据结构,它利用位数组很简洁地表示一个集合,并能判断一个元素是否属于这个集合。它是一个判断元素是否存在集合的快速的概率算法。Bloom Filter有可能会出现错误判断,但不会漏掉判断。也就是Bloom Filter判断元素不再集合,那肯定不在。如果判断元素存在集合中,有一定的概率判断错误。因此,Bloom Filter不适合那些“零错误”的应用场合。而在能容忍低错误率的应用场合下,Bloom Filter比其他常见的算法(如hash,折半查找)极大节省了空间。
优点:
1)节约缓存空间(空值的映射),不再需要空值映射。
2)减少数据库或缓存的请求次数。
3)提升业务的处理效率以及业务隔离性。
缺点:
1)存在误判的概率。
2)传统的Bloom Filter不能作删除操作。

hashset URL去重复

//初始化set
private Set urls = Sets.newSetFromMap(new ConcurrentHashMap());
//判断url是否在set中
public boolean isDuplicate(Request request, Task task) {
    return !urls.add(getUrl(request));
}

抽取部分API

方法 说明 示例
xpath(String xpath) 使用XPath选择 html.xpath(“//div[@class=’title’]”)
$(String selector) 使用Css选择器选择 html.$(“div.title”)
css(String selector) 功能同$(),使用Css选择器选择 html.css(“div.title”)
regex(String regex) 使用正则表达式抽取 html.regex(“(.*?)\”)
replace(String regex, String replacement) 替换内容 html.replace(“\”,””)

这部分抽取API返回的都是一个Selectable接口,意思是说,抽取是支持链式调用的。

代理池

代理池初始化:

 
//从以往保存的本地文件中读取代理信息作为新的代理池
public SimpleProxyPool() {
        this(null, true);
    }
//以往保存的本地文件中读取代理+用户输入的httpProxyList合并为新的代理池
public SimpleProxyPool(List httpProxyList) {
        this(httpProxyList, true);
    }
//以往保存的本地文件中读取代理+用户输入的httpProxyList合并为新的代理池(后者可认为操控)
public SimpleProxyPool(List httpProxyList, boolean isUseLastProxy) {
        if (httpProxyList != null) {
            addProxy(httpProxyList.toArray(new String[httpProxyList.size()][]));
        }
        if (isUseLastProxy) {
            if (!new File(proxyFilePath).exists()) {
                setFilePath();
            }
            readProxyList();
            timer.schedule(saveProxyTask, 0, saveProxyInterval);
        }
    }

通过httpProxyList为代理池赋值

String[] source = { "::0.0.0.1:0", "::0.0.0.2:0", "::0.0.0.3:0", "::0.0.0.4:0" };
        for (String line : source) {
            httpProxyList.add(new String[] {line.split(":")[0], line.split(":")[1], line.split(":")[2], line.split(":")[3] });
        }

本地文件Proxy获存储与获取:定时任务

//定时任务
private TimerTask saveProxyTask = new TimerTask() {

        @Override
        public void run() {
            saveProxyList();
            logger.info(allProxyStatus());
        }
    };
//如果需要重复使用本地代理
if (isUseLastProxy) {
            if (!new File(proxyFilePath).exists()) {
                setFilePath();
            }
            readProxyList();
            timer.schedule(saveProxyTask, 0, saveProxyInterval);
        }

saveProxyTask()函数负责把最新的代理池ip写入到本地指定文件

使用JDK自带PriorityQueue管理Proxy

目的:可以根据compareTo方法制定的优先取出代理池中使用间隔较短的代理(一开始默认都为1.5s)优先取出并执行.

public int compareTo(Delayed o) {
        Proxy that = (Proxy) o;
        return canReuseTime > that.canReuseTime ? 1 : (canReuseTime < that.canReuseTime ? -1 : 0);
    }
 private void siftUpComparable(int k, E x) {
        Comparable key = (Comparable) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (key.compareTo((E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = key;
    }
webmagic代理池的策略是: * 1. 在添加时连接相应端口做校验 * 2. 每个代理有1.5S的使用间隔 * 3. 每次失败后,下次取出代理的时间改为1.5S*失败次数 * 4. 如果代理失败次数超过20次,则直接丢弃
 public void returnProxy(HttpHost host, int statusCode) {
        Proxy p = allProxy.get(host.getAddress().getHostAddress());
        if (p == null) {
            return;
        }
        switch (statusCode) {
              //成功
            case Proxy.SUCCESS:
                p.setReuseTimeInterval(reuseInterval);
                p.setFailedNum(0);
                p.setFailedErrorType(new ArrayList());
                p.recordResponse();
                p.successNumIncrement(1);
                break;
            //失败
            case Proxy.ERROR_403:
                // banned,try longer interval
                p.fail(Proxy.ERROR_403);
                p.setReuseTimeInterval(reuseInterval * p.getFailedNum());
                logger.info(host + " >>>> reuseTimeInterval is >>>> " + p.getReuseTimeInterval() / 1000.0);
                break;
            //代理被禁
            case Proxy.ERROR_BANNED:
                p.fail(Proxy.ERROR_BANNED);
                p.setReuseTimeInterval(10 * 60 * 1000 * p.getFailedNum());
                logger.warn("this proxy is banned >>>> " + p.getHttpHost());
                logger.info(host + " >>>> reuseTimeInterval is >>>> " + p.getReuseTimeInterval() / 1000.0);
                break;
            //404
            case Proxy.ERROR_404:
                // p.fail(Proxy.ERROR_404);
                // p.setReuseTimeInterval(reuseInterval * p.getFailedNum());
                break;
            default:
                p.fail(statusCode);
                break;
        }
        //当前代理失败次数超过20:reviveTime = 2 * 60 * 60 * 1000;
        if (p.getFailedNum() > 20) {
            p.setReuseTimeInterval(reviveTime);
            logger.error("remove proxy >>>> " + host + ">>>>" + p.getFailedType() + " >>>> remain proxy >>>> " + proxyQueue.size());
            return;
        }
        //检验代理ip符合下列要求的:当失败次数为5的倍数的时的校验
        if (p.getFailedNum() > 0 && p.getFailedNum() % 5 == 0) {
            if (!ProxyUtils.validateProxy(host)) {
                p.setReuseTimeInterval(reviveTime);
                logger.error("remove proxy >>>> " + host + ">>>>" + p.getFailedType() + " >>>> remain proxy >>>> " + proxyQueue.size());
                return;
            }
        }
        try {
            proxyQueue.put(p);
        } catch (InterruptedException e) {
            logger.warn("proxyQueue return proxy error", e);
        }
    }
使用Socket来校验代理是否有效,客户端为本地.创建与代理的连接
public static boolean validateProxy(HttpHost p) {
        if (localAddr == null) {
            logger.error("cannot get local IP");
            return false;
        }
        boolean isReachable = false;
        Socket socket = null;
        try {
            socket = new Socket();
            socket.bind(new InetSocketAddress(localAddr, 0));
            InetSocketAddress endpointSocketAddr = new InetSocketAddress(p.getAddress().getHostAddress(), p.getPort());
            socket.connect(endpointSocketAddr, 3000);
            logger.debug("SUCCESS - connection established! Local: " + localAddr.getHostAddress() + " remote: " + p);
            isReachable = true;
        } catch (IOException e) {
            logger.warn("FAILRE - CAN not connect! Local: " + localAddr.getHostAddress() + " remote: " + p);
        } finally {
            if (socket != null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    logger.warn("Error occurred while closing socket of validating proxy", e);
                }
            }
        }
        return isReachable;
    }

OOSpider 使用注解配置化

注解的使用

//使用特有的抽取器
class ModelPageProcessor implements PageProcessor 
    @Override
    public void process(Page page) {
        for (PageModelExtractor pageModelExtractor : pageModelExtractorList) {
            extractLinks(page, pageModelExtractor.getHelpUrlRegionSelector(), pageModelExtractor.getHelpUrlPatterns());
            extractLinks(page, pageModelExtractor.getTargetUrlRegionSelector(), pageModelExtractor.getTargetUrlPatterns());
            Object process = pageModelExtractor.process(page);
            if (process == null || (process instanceof List && ((List) process).size() == 0)) {
                continue;
            }
            postProcessPageModel(pageModelExtractor.getClazz(), process);
            page.putField(pageModelExtractor.getClazz().getCanonicalName(), process);
        }
        if (page.getResultItems().getAll().size() == 0) {
            page.getResultItems().setSkip(true);
        }
    }

欢迎大家关注:huazi's微信公众号