JAVA基础之七-Collection和它的并行和流处理

Collection 翻下词典,有许多含义:

收集;聚集;(常指同类的)收藏品;募捐,募集;作品集;聚积;取走;一群人;拿走;(常为季节性推出的)系列时装(或家用品);一批物品

选择“集合”作为翻译名,我觉得可行,除非我们现在重新创造一个汉语词组。

 

对于CRUD和非CRUD,集合都是一个无比重要的东西,因为计算机的本质是对信息的处理。

信息一般不是单个,是一堆,一堆堆,一块块,一个个....

 

网上关于集合的资料无比多,所以本文主要是做一个简要的介绍,并添加一些注意事项和个人感悟。

 

一、简介

不过Collection的子孙过于多,用现有词汇命名这些子孙并不容易,有待创建新的词汇。

常用知名子孙有:

List  --  列表,javaDoc的释义是:有序集合。

   --ArrayList   动态大小列表 ,这是crud中最常用的类型 。不保证顺序

   --LinkedList  双链列表,可以固定成员顺序。本身实现了Deque的接口,可用于辅助实现FiLo的算法

Set  - 无重复集合,允许有一个null成员

  ---TreeSet   有序集合

  -- HastSet  哈希集合  ,主要是操作的性能好一些

    -- LinkedHashSet  双向链哈希集合,保持了插入顺序,又具有对应的性能

Queue -队列

   --Deque  双端操作队列。它有一个著名的实现  LinkedList

Buffer --缓冲

  不过这个主要是阿帕奇的实现org.apache.commons.collections.Buffer,算不得java的基础类型

 

如果是初级程序员,或者以CRUD为主的,那么只要学些掌握ArrayList就差不多了,因为现在的大部分的ORM或者JDBC的上级实现都适用ArrayList来存储数据集。

 

二、集合的基本方法

仅仅介绍Collection的接口方法,为了便于理解,以LinkedList为例子。

这些方法都极其简单,也没有什么特别好解释的,直接上例子吧!

package study.base.types.collection.list;

import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

/**
 * 演示Collection接口的基本操作和LinkedList的一些典型操作
 * @author lto
 */
public class TestLinkedList {
    private LinkedList<MoneyJar> list;
    private String[] givers = new String[]{"爸爸","妈妈","哥哥","姐姐","爷爷","奶奶"};
    private Random random = new Random();
    private Map<String,Long> realGivers;

    public TestLinkedList(int size) {
        this.list = new LinkedList<>();
        this.realGivers = new HashMap<>();
        //插入100个MoneyJar,金额和日期都是随机的,giver是随机
        for (int i = 0; i < size; i++) {
            String giver = givers[random.nextInt(givers.length)];
            int amount = random.nextInt(100);
            this.list.add(new MoneyJar(giver, amount, new Date()));
        }
        //按照giver分组统计个数,并赋值给realGivers
        this.list.stream().collect(Collectors.groupingBy(MoneyJar::giver,
                Collectors.counting())).forEach((k,v)->{
            realGivers.put(k,(long)v);});
        //打印realGivers
        this.realGivers.forEach((k,v)->{System.out.println(String.format("%s共有%d个", k, v));});
    }

    public void count(){
        long start = System.currentTimeMillis();
        final long[] total = {0};
        this.list.spliterator().forEachRemaining(mj-> total[0] += mj.amount());
        System.out.println(String.format("总共%d元",total[0]));
        System.out.println("耗费时间:"+(System.currentTimeMillis()-start));
    }

    public void sortByAmount(){
        this.list.sort((o1, o2) -> o1.amount().compareTo(o2.amount()));
    }
    /**
     * 统计每个giver给的钱,并打印结果
     */
    public void sumByGiver(){
        System.out.println("--------------------****************-----------------------------");
        //根据giver分组统计每个giver给的钱,并返回一个ListMap
        long start = System.currentTimeMillis();
        Map<String, Integer> result= this.list.stream().collect(Collectors.groupingBy(MoneyJar::giver,
                Collectors.summingInt(MoneyJar::amount)));
        //打印统计结果
        result.forEach((k,v)->{System.out.println(String.format("%s给的钱是%d", k, v));});
        System.out.println("耗费时间:"+(System.currentTimeMillis()-start));

        //采用for循环的方式,分组统计
        System.out.println("采用for循环的方式,分组统计-----------------------------");
        long start1 = System.currentTimeMillis();
        Map<String,List<Integer>> result1= new HashMap<>();
        //初始化result1,把realGivers的每个元素作为key,初始值为0
         this.realGivers.forEach((k,v)->{
            result1.put(k,new ArrayList<>());
        });
        //遍历list,计算每个giver给的钱

        for (MoneyJar moneyJar : list) {
           result1.get(moneyJar.giver()).add(moneyJar.amount());
        }

        //根据result1的成员个数,创建对应的线程,然后在线程中计算每个giver给的钱,并计算总和
        int numThreads=result1.size();
        CountDownLatch latch = new CountDownLatch(numThreads);

        ExecutorService executor = Executors.newFixedThreadPool(numThreads);
        result1.forEach((k,v)->{
            Runnable worker = () -> {
                try {
                    long sum=0;
                    for (int i : v) {
                        sum+=i;
                    }
                    System.out.println(String.format("%s给的钱是%d", k, sum));
                } finally {
                    latch.countDown(); // 计数减一
                }
            };
            // 使用executor提交任务,而不是直接启动Thread
            executor.submit(worker);
        });
        try {
            // 等待所有线程完成
            latch.await();
            System.out.println("All threads have finished.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 关闭executor,释放资源
        executor.shutdown();
        System.out.println("耗费时间:"+(System.currentTimeMillis()-start1));
    }


    public void splitToSum(){
        //把钱罐的钱分为n份,分别统计,然后再合并总的金额,并统计耗费时间
        System.out.println("--   采用并行流的方法");
        long start = System.currentTimeMillis();
        Long total=list.parallelStream().mapToLong(MoneyJar::amount).sum();
        System.out.println("耗费时间:"+(System.currentTimeMillis()-start));
        System.out.println("总金额是"+total.toString());

        //采用传统的for循环方式累积
        System.out.println("--   采用传统的for循环的方法");
        start = System.currentTimeMillis();
        long sum=0;
        for (MoneyJar moneyJar : list) {
            sum+=moneyJar.amount();
        }
        System.out.println("总金额是"+sum);
        System.out.println("耗费时间:"+(System.currentTimeMillis()-start));
    }

    /**
     * 把小于等于指定的金额的钱都清理掉
     * @param amount
     */
    public void purgeSmallMoney(int amount){
        this.list.removeIf(moneyJar -> moneyJar.amount()<=amount);
    }

    record MoneyJar(String giver,Integer amount,
                    Date putDay){
    }

    public static void main(String[] args) {
        //当10万个的时候,并行的速度反而是for的3倍左右。
        TestLinkedList test = new TestLinkedList(200);
        test.splitToSum();
        test.sortByAmount();
        System.out.println("-- 排序后 -----");
        for (MoneyJar moneyJar : test.list) {
            System.out.println(moneyJar);
        }
        //测试100万的情况
        TestLinkedList test100 = new TestLinkedList(1000000);
        test100.splitToSum();

        //测试2000万的情况
        TestLinkedList test1000 = new TestLinkedList(20000000);
        test1000.splitToSum();

        //以上三个例子,哈无例外,都是简单的循环胜出。那么parametrizedStream的效率就值得怀疑了。
        //是否因为没有正确设置并行度,还是计算机的环境存在问题
        test1000.sumByGiver();
        test1000.count();
    }
}

以上例子并没有测试每一个接口方法,是因为有些太简单不值得浪费篇幅。

三、并行处理和流处理

在J8之前,如果把一个集合,以ArrayList为例子,进行并行处理,那么必须自己来动手,过程可能是这样的:

1.分隔集合为n个子集

2.创建n个线程,用于分别处理n个子集

3.如果需要合并处理,还需要特定注意线程的等待和合并

写起来还是相对比较麻烦的。当然,现在借助于ai,没有那么复杂。但和J8之后提供的特性相比,自然还是麻烦一些。

至于流,更不用说了,J8之前并没有这个概念。

 

在JDK17中,可以看到Collection接口和并发以及流有关的方法:

default Stream<E> parallelStream() {
        return StreamSupport.stream(spliterator(), true);
    }
@Override
    default Spliterator<E> spliterator() {
        return Spliterators.spliterator(this, 0);
    }
default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }

三个都是默认方法,可以直接使用

parallelStream可以提供并行流处理。

--

根据已知的一些报告和我几次不是很严谨的测试,Stream和for相比并没有什么优势。

由此可以得出一个不是很严谨的结论:

在相当大的业务场景(crud为主的信息系统)中,甚至可以说,在大部分的业务场景中,Stream其实居于下风。

stream的作用仅仅是为了节约工程师的精力和体力

 

只有数据集巨大,且cpu充足的情况下,例如千万级别左右,并行流才会有一些可见的优势。但是,又有多少面向

企业基别的信息系统,会在应用级别这样疯狂地处理千万级别的数据,难道不怕jvm爆了吗?

用数据库的集合运算功能不是更好更简单吗?

 

四、工具类

 4.1官定工具- Collections

这是集合最重要的工具类。

全路径:java.util.Collections

 

需要特别申明的是,Collections不仅仅会处理Colletion的子子孙孙,也会处理Map,所以不能被它的名称骗了。

由于存在JAVADOC,且这个Colllections的成员巨多,所以不逐一列出,避免浪费篇幅。

Collections方法大体包含三类:

1.运算

例如排序(sort)、翻转(reverse)、打乱(shuffle)、交换元素(swap)、填充元素(fill),经典聚集(min,max),集合运算等等

其中和经典集合运算有关的:

frequency -频率

disjoint-判断是否有交集

总之,结合Collecion自身的实现和Collections工具,要实现两个集合的并集、交集、差集、是否包含等等都是可以的,只不过有点麻烦。

   

2.构造特定类型的对象  

   a.不可修改集合(含map)

   b.线程同步集合(含map)

   c.锁定类型集合

   d.空集合(无元素集合)

   e.单元素集合(Singleton

  前四个都容易理解,最后一个Singleton有点迷惑,就是为了返回只有一个成员的集合?

3.其它杂项

诸如复制、替换等等。

不过没有提供深度复制的方法。

4.2阿帕奇集合工具(CollectionUtils

相比java自带的集合工具,阿帕奇的工具主要集中在以下几个用途:

1.集合运算

这个比java官方的强大多了,所以还是用这个把。看看都有什么:

union(并集),intersection(交集),disjunction(!交集,或者独立并集),substract(移除子集),containAny(是否有交集)

isSubCollection(是否子集),isEqualCollection(是否相等),retainAll(交集),以及其它。

注意:retainAll和intersection都可以用于获取交集,但是二者还是有明显区别的,后者(intersection)会给出不重复的结果,而前者(retainAll)会给出重复的结果

以下是关于这些本人重视的集合运算方法的示例:

public void testApacheCollectionUtils(){
        List<Integer>  me = Arrays.asList(90, 80, 70,90,92,88);
        List<Integer>  mother = Arrays.asList(90, 80, 70,90,92,88);
        List<Integer>  auntScore = Arrays.asList(90, 80, 70,90,92,88);
        List<Integer>  fatherScore = Arrays.asList(99, 81, 71,90,98,88);
        List<Integer>  赵云 = Arrays.asList(90,80);
        List<Integer>  崔颢 = Arrays.asList(77);

        List<Integer>  myNewScore = (List<Integer>) CollectionUtils.union(me, 赵云);
        System.out.println("我和赵云的合并∪="+myNewScore);
        List<Integer>  myIntersectionScore = (List<Integer>) CollectionUtils.intersection(me, fatherScore);
        System.out.println("我和爸爸交集="+myIntersectionScore);
        //差集
        List<Integer>  myDifferenceScore = (List<Integer>) CollectionUtils.subtract(me, fatherScore);
        System.out.println("我和爸爸的差集="+myDifferenceScore);
        //非公共部分
        List<Integer>  myDisJointScore = (List<Integer>) CollectionUtils.disjunction(me, fatherScore);
        System.out.println("我和爸爸的非公共部分="+myDisJointScore);

        //我和爸爸是否有交集
        if(!CollectionUtils.containsAny(me, fatherScore)) {
            System.out.println("我和爸爸没有交集");
        } else {
            System.out.println("我和爸爸有交集");
        }
        //我和崔颢是否有交集
        if(!CollectionUtils.containsAny(me, 崔颢)) {
            System.out.println("我和崔颢没有交集");
        } else {
            System.out.println("我和崔颢有交集");
        }
        //我和赵云的交集
        List<Integer>  myIntersectionScore2 = (List<Integer>) CollectionUtils.retainAll(me, 赵云);
        System.out.println("我和赵云的交集(retainAll)="+myIntersectionScore2);
        System.out.println("我和赵云的交集(inter)="+ CollectionUtils.intersection(me, 赵云));
        //和崔颢Score的交集
        List<Integer>  myIntersectionScore3 = (List<Integer>) CollectionUtils.retainAll(崔颢, 赵云);
        System.out.println("和崔颢的交集="+myIntersectionScore3);

        //赵云是否是me的子集
        if(CollectionUtils.isSubCollection(赵云, me)) {
            System.out.println("赵云是me的子集");
        } else {
            System.out.println("赵云不是me的子集");
        }
        //崔颢Score是否是me的子集
        if(CollectionUtils.isSubCollection(崔颢, me)) {
            System.out.println("崔颢是me的子集");
        } else {
            System.out.println("崔颢不是me的子集");
        }

        //妈妈和阿姨是否一致
        if(CollectionUtils.isEqualCollection(mother, auntScore)) {
            System.out.println("妈妈和阿姨一致");
        } else {
            System.out.println("妈妈和阿姨不一致");
        }
    }

 

输出结果:

我和赵云的合并∪=[80, 70, 88, 90, 90, 92]
我和爸爸交集=[88, 90]
我和爸爸的差集=[80, 70, 90, 92]
我和爸爸的非公共部分=[80, 81, 98, 99, 70, 71, 90, 92]
我和爸爸有交集
我和崔颢没有交集
我和赵云的交集(retainAll)=[90, 80, 90]
我和赵云的交集(inter)=[80, 90]
和崔颢的交集=[]
赵云是me的子集
崔颢不是me的子集
妈妈和阿姨一致

 

 

2.元素处理

find,filter,exists,countMatches、select、collect、get、

3.构造特定类型集合

  • synchronizedCollection
  • unmodifiableCollection
  • predicatedCollection
  • typedCollection

需要注意的是,这里的几个方法,个人倾向于少用,尽量用java标准的Collections。

4.杂项

isEmpty,isNotEmpty,cardinality...

4.3其它杂项工具

现在工具有点泛滥了。这是因为复制工具代码已经很简单,再加上实在有一些个性化的需要,所以越做越多。

Spring有,JSON有,mybatis有...

这些已经泛滥的就不提了,它们主要用于一些极其个性化的,或者自认为更有效率更安全(存疑)。

4.4 小结

为安全起见,我个人都是尽量用官方的Collections和阿帕奇的CollectionUtils。

从工程角度出发,尽量少依赖也是一个大体正确的选择。

 

其它的不是万不得已不要用。当然各个组织也完全可以自行创建工具。

只不过,这两个工具集已包含绝大部分集合有关的操作,再结合Stream和Colllection自有的功能,应该很够用了。

五、CRUD和集合

编写crud的时候,我们可能会常常使用以下几种基于jdbc的方式创建集合:

1.使用基于jdbc的orm,例如典型的mybatis

2.基于sping的jdbcTemplate

实际是对原生jdbc的封装

3.基于原生jdbc

现在已经很少人用jpa来访问处理数据。

在绝大部分CRUD项目中,一般都用mytabis之类的Orm

 

所以,这里主要讨论mybatis(或者类似的框架工具即可)。

当返回集合的时候,mytais支持返回List(ArrayList),Set ,对这两个类型的支持是很友好的。

以下是方法(org.apache.ibatis.jdbc.SqlRunner#getResults,selectAll)的部分


public List<Map<String, Object>> selectAll(String sql, Object... args) throws SQLException {
try (PreparedStatement ps = connection.prepareStatement(sql)) {
setParameters(ps, args);
try (ResultSet rs = ps.executeQuery()) {
return getResults(rs);
}
}
}

private
List<Map<String, Object>> getResults(ResultSet rs) throws SQLException { List<Map<String, Object>> list = new ArrayList<>(); ..... while (rs.next()) { Map<String, Object> row = new HashMap<>(); for (int i = 0, n = columns.size(); i < n; i++) { String name = columns.get(i); TypeHandler<?> handler = typeHandlers.get(i); row.put(name.toUpperCase(Locale.ENGLISH), handler.getResult(rs, name)); } list.add(row); } return list; }

 

可以看出,在mybatis的底层是用ArrayList来承接原生数据集的结果的。用ArrayList是因为一个性能较好,另外一个是因为集合的数量不可测的缘故。

在不考虑极端性能的要求下,用mybatis还是不错的,因为它提供了主要的类型转换和spring的集成。

很少有人考虑使用LinkedList等其它集合来承接数据即可。

由于List实现了Collection接口,所以可以使用mybatis在获得List之后,再做流处理。

六、适用场景和挑战

集合的子孙巨多,有不同的业务场景对应,以最常见的来说:

ArrayList -- crud,随机访问性能高。但crud很少随机访问某个,一般都丢到前端处理了。

   如前,Colletions提供了大量构建特定用途的集合的方法,可以让动态列表用于线程安全等场景。

LinkedList -- 双向链表用途很广,一般不CRUD的时候,常常会考虑用它,它的优缺点:

   频繁进行插入和删除更高效;可以用作用作栈(Stack)和队列(Queue);保持元素插入顺序的场景实现双向遍历

  缺点:随机访问慢

Set   -大量的非crud的,需要保持元素唯一的情况

Queue -队列,主要用于需要堆栈操作的情况

再结合线程同步、不可修改、指定类型等等,可以细分为更多的子场景。

由于子孙太多,如果个人对每个类型的优缺点不是太明白,那么至少要知道大类的适用场景,然后再查看javaDoc/ai即可。

 

6.1 挑战-线程安全

如果,java工具Colleections已经提供了适用于大部分业务场景的并发集合对象,以便在线程操作情况下,能够保证安全。

以非常典型的java.util.Collections.synchronizedList(List<T>)为例子,下面是相关代码:

 public static <T> List<T> synchronizedList(List<T> list) {
        return (list instanceof RandomAccess ?
                new SynchronizedRandomAccessList<>(list) :
                new SynchronizedList<>(list));
    }

static class SynchronizedList<E>
        extends SynchronizedCollection<E>
        implements List<E> {
        @java.io.Serial
        private static final long serialVersionUID = -7754090372962971524L;

        @SuppressWarnings("serial") // Conditionally serializable
        final List<E> list;

        SynchronizedList(List<E> list) {
            super(list);
            this.list = list;
        }
        SynchronizedList(List<E> list, Object mutex) {
            super(list, mutex);
            this.list = list;
        }

        public boolean equals(Object o) {
            if (this == o)
                return true;
            synchronized (mutex) {return list.equals(o);}
        }
        public int hashCode() {
            synchronized (mutex) {return list.hashCode();}
        }

        public E get(int index) {
            synchronized (mutex) {return list.get(index);}
        }
        public E set(int index, E element) {
            synchronized (mutex) {return list.set(index, element);}
        }
        public void add(int index, E element) {
            synchronized (mutex) {list.add(index, element);}
        }
        public E remove(int index) {
            synchronized (mutex) {return list.remove(index);}
        }

        public int indexOf(Object o) {
            synchronized (mutex) {return list.indexOf(o);}
        }
        public int lastIndexOf(Object o) {
            synchronized (mutex) {return list.lastIndexOf(o);}
        }

        public boolean addAll(int index, Collection<? extends E> c) {
            synchronized (mutex) {return list.addAll(index, c);}
        }

        public ListIterator<E> listIterator() {
            return list.listIterator(); // Must be manually synched by user
        }

        public ListIterator<E> listIterator(int index) {
            return list.listIterator(index); // Must be manually synched by user
        }

        public List<E> subList(int fromIndex, int toIndex) {
            synchronized (mutex) {
                return new SynchronizedList<>(list.subList(fromIndex, toIndex),
                                            mutex);
            }
        }

        @Override
        public void replaceAll(UnaryOperator<E> operator) {
            synchronized (mutex) {list.replaceAll(operator);}
        }
        @Override
        public void sort(Comparator<? super E> c) {
            synchronized (mutex) {list.sort(c);}
        }   
        @java.io.Serial
        private Object readResolve() {
            return (list instanceof RandomAccess
                    ? new SynchronizedRandomAccessList<>(list)
                    : this);
        }
    }

 

从代码可以看出,这个SynchronizedList对大部分的集合操作都使用关键字synchronized,包括基本的get,add,indexOf...

但是需要注意,并不是所有的操作都是上同步锁,例如获得迭代器(iterator())就不会。具体哪些不会,需要工程师自己去阅读代码。

实现单个jvm内的线程安全问题不大,工程师主要的调整来自于性能要求,需要谨慎地分辨这些上锁的代价是否过于大,大到不如直接使用串行的

方式进行处理。

通常而言,如果锁内操作很短,而锁外的操作相对长的多,那么还是值得那样进行操作的。

七、小结

1.集合的子孙比较多,建议先认识一遍,这样有助于开发,不要浪费自己的时间

2.应付一般的CRUD,依靠JAVA和阿帕奇的已经基本够了用了。

如果实在不够可以自己额外编写工具集,不推荐采用三方的工具集(存在安全和更新问题)当然类似阿帕奇这样的可以例外。

如果是开发产品,更不推荐采用非知名的小组织/个人的工具包。

3.需要注意线程安全情况下的用法,这个有赖于个人实践之后的体验,虽然JAVADOC有一些说明,但是不够。

4.使用ai辅助编写代码的时候,应该有适当的辨别能力,避免每个集合都是stream()之后再操作

最简单的,例如 list.filter(),没有必要list.stream().filter,除非filter后还挂着其它操作。sort()也是类似。

不能太机械。