# async-file **Repository Path**: yunjiao-source/async-file ## Basic Information - **Project Name**: async-file - **Description**: Java异步文件读写工具(Asynchronous File Reader and Writer tools for Java) - **Primary Language**: Java - **License**: MIT - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2021-11-16 - **Last Updated**: 2022-08-14 ## Categories & Tags **Categories**: Uncategorized **Tags**: NIO, BufferReader, AsynchronousFileChannel, completionhandler ## README # async-file [![Gitee stars](https://gitee.com/pinweiwan/async-file/badge/star.svg)](https://gitee.com/pinweiwan/async-file/stargazers) [![Gitee forks](https://gitee.com/pinweiwan/async-file/badge/fork.svg)](https://gitee.com/pinweiwan/async-file/members) [![Maven central](https://img.shields.io/maven-central/v/io.github.kavahub/kavahub-async-file.svg)](https://search.maven.org/artifact/io.github.kavahub/kavahub-async-file) [![License](https://img.shields.io/github/license/kavahub/async-file.svg)](https://github.com/kavahub/async-file/blob/main/LICENSE) [![GitHub stars](https://img.shields.io/github/stars/kavahub/async-file?style=flat-square&logo=GitHub)](https://github.com/kavahub/async-file/stargazers) [![GitHub forks](https://img.shields.io/github/forks/kavahub/async-file?style=flat-square&logo=GitHub)](https://github.com/kavahub/async-file/network/members) [![GitHub watchers](https://img.shields.io/github/watchers/kavahub/async-file?style=flat-square&logo=GitHub)](https://github.com/kavahub/async-file/watchers) [![GitHub release](https://img.shields.io/github/release/kavahub/async-file?style=flat-square&logo=GitHub?color=blu)](https://github.com/kavahub/async-file/releases) [Gitee](https://gitee.com/pinweiwan/async-file) [GitHub](https://github.com/kavahub/async-file) #### 介绍 async-file工具提供Java异步读写文件的能力,使用Java NIO 库开发。Java应用程序引入框架可以简单的,异步和非阻塞的读写文件。框架包含三个工具类: - [`AIOFileReader`](src/main/java/io/github/kavahub/file/reader/AIOFileReader.java) 异步读取文件,使用Java NIO库 [`AsynchronousFileChannel`](https://docs.oracle.com/javase/10/docs/api/java/nio/channels/AsynchronousFileChannel.html) 和 [`CompletionHandler`](https://docs.oracle.com/javase/10/docs/api/java/nio/channels/CompletionHandler.html) 实现。 - [`AIOFileWriter`](src/main/java/io/github/kavahub/file/writer/AIOFileWriter.java) 异步写入文件,使用Java NIO库 [`AsynchronousFileChannel`](https://docs.oracle.com/javase/10/docs/api/java/nio/channels/AsynchronousFileChannel.html) 和 [`CompletionHandler`](https://docs.oracle.com/javase/10/docs/api/java/nio/channels/CompletionHandler.html) 实现。 - [`NIOFileLineReader`](src/main/java/io/github/kavahub/file/reader/NIOFileLineReader.java) 非阻塞读取文件,使用 [`ForkJoinPool`](https://docs.oracle.com/javase/10/docs/api/java/util/concurrent/ForkJoinPool.html) 和 [`BufferedReader`](https://docs.oracle.com/javase/10/docs/api/java/io/BufferedReader.html) 实现 提示:Java提供的 [`Files`](https://docs.oracle.com/javase/10/docs/api/java/nio/file/Files.html) 文件读取功能是阻塞的。 #### 安装教程 首先,如果项目使用Maven工具,在项目的pom.xml文件中添加依赖 ```xml io.github.kavahub kava-async-file 1.0.1.RELEASE ``` 如果是Gradle项目,需要添加依赖: ```groovy implementation 'io.github.kavahub:kava-async-file:1.0.1.RELEASE' ``` #### AIOFileReader使用说明 [`AIOFileReader`](src/main/java/io/github/kavahub/file/reader/AIOFileReader.java)方法列表: - `Query bytes(Path file)` : 读取文件,返回文件数据字节数组,每次读取的大小由默认缓冲区决定。 - `Query allBytes(Path file)` : 读取文件,返回文件所有数据字节数组。每次按默认缓冲区读取文件,完成后合并。 - `Query line(Path file)` : 读取文件,返回文件行字符串。每次按默认缓冲区读取文件数据字节数组,按换行符分割字节数组。 - `Query allLines(Path file)` : 读取文件,返回文件所有数据字符串。每次按默认缓冲区读取文件数据字节数组,合并后转换成字符串。 默认缓冲区大小定义: ```java public static final int BUFFER_SIZE = 4096 * 4; ``` 示例: ```java // 按行读取文件,并输出到控制台 final Path FILE = Paths.get("src", "test", "resources", "fileWithmanyOfLine.txt"); AIOFileReader.line(FILE) // 订阅行数据 .subscribe(data -> { // 文件行处理,如输出到控制台 System.out.println(data); // doSomethingData(data) }, err -> { // 异常处理 err.printStackTrace(); // doSomethingError(err) }) // 等待文件读取完成 .join(); // 也可以这样写 AIOFileReader.line(FILE) // 订阅数据 .onNext(data -> { // 文件行处理,如输出到控制台 System.out.println(data); // doSomethingData(data) }) // 订阅异常 .onError(err -> { // 异常处理 err.printStackTrace(); // doSomethingError(err) }) // 等待文件读取完成 .blockingSubscribe(); ``` 这种适用广泛,我们经常读取文本文件,统计相关信息,如:单词统计等,只需要编写 `doSomethingData` 代码。`doSomethingError` 方法,处理读或者业务处理中的异常,我们建议将异常输出到日志 `doSomethingData` 业务中的异常,会导致读文件中断,整个文件操作终止,所有的异常信息可以在 `doSomethingError` 中处理 示例: ```java // 按行读取文件,并输出到控制台 final Path FILE = Paths.get("src", "test", "resources", "fileWithmanyOfLine.txt"); CompletableFuture future = AIOFileReader.line(FILE) // 订阅行数据 .subscribe(data -> { // 文件行处理,如输出到控制台 System.out.println(data); // doSomethingData(data) }, err -> { // 异常处理 err.printStackTrace(); // doSomethingError(err) }); // 处理其他业务逻辑 TimeUnit.SECONDS.sleep(2); // 模拟业务 // 循环,直到文件处理完成 while (!future.isDone()) { } System.out.println("文件处理完成"); ``` `AIOFileReader` 读文件是异步的,所以可以处理写其他的业务 示例: ```java // 统计文件中单词个数,并找出次数最多的单词 final Path FILE = Paths.get("src", "test", "resources", "fileToCount.txt"); final int MIN = 5; final int MAX = 10; ConcurrentHashMap words = new ConcurrentHashMap<>(); AIOFileReader.line(FILE) // 过滤掉前14行 .filter(line -> !line.trim().isEmpty()).skip(14) // 使用空格分隔 .flatMapMerge(line -> Query.of(line.split(" "))) // 过滤单词 .filter(word -> word.length() > MIN && word.length() < MAX) // 统计单词次数 .onNext((w) -> words.merge(w, 1, Integer::sum)) // 异常处理 .onError(err -> err.printStackTrace()) // 阻塞,直到文件统计完毕 .blockingSubscribe(); Map.Entry common = Collections.max(words.entrySet(), Comparator.comparingInt(e -> e.getValue().intValue())); assertEquals("Hokosa", common.getKey()); assertEquals(183, common.getValue().intValue()); ``` 示例: ```java // 统计“*** END OF ”行之前所有单词的数量 // 当读取到"*** END OF "行时,读线程会取消读操作,避免继续读取不需要处理的数据 final Path FILE = Paths.get("src", "test", "resources", "fileToCount.txt"); int[] count = { 0 }; AIOFileReader.line(FILE) // 过滤空行 .filter(line -> !line.trim().isEmpty()) // 忽略前14行 .skip(14) // 忽略掉‘*** END OF ’以后的行 .takeWhile(line -> !line.contains("*** END OF ")) // 行按空格切割成单词 .flatMapMerge(line -> Query.of(line.split("\\W+"))) // 去重 .distinct() // 统计数量 .onNext((word) -> { count[0]++; }) // 异常处理 .onError(err -> err.printStackTrace()) // 阻塞,知道文件读取完成 .blockingSubscribe(); assertEquals(5206, count[0]); ``` 示例: ```java // 详细演示takeWhile的功能: // 1. 控制台输出前部文件内容,框架日志提示[Cancel file reading. [16384 bytes] has been readed],读取操作取消,不在读取文件数据。 // 2. [16384 bytes] 信息中,16384是框架默认读取缓冲区大小,由此可以判断:文件只读取了一次 final Path FILE = Paths.get("src", "test", "resources", "fileWithmanyOfLine.txt"); AIOFileReader.line(FILE) // 控制台输出 .onNext((data) -> { System.out.println("before:" + data); }) // 终止文件读操纵 .takeWhile(line -> false) // .onNext((data) -> { System.out.println("after:" + data); }) // 异常处理 .onError(err -> err.printStackTrace()) .blockingSubscribe(); ``` 示例: ```java // 也可以使用cancel方法中断读文件操作 final Path FILE = Paths.get("src", "test", "resources", "fileWithmanyOfLine.txt"); CompletableFuture future = AIOFileReader.line(FILE).subscribe(data -> { try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }, err -> err.printStackTrace()); TimeUnit.MILLISECONDS.sleep(1000); future.cancel(false); ``` 示例: ```java // 显示读文件线程的名称 final Path FILE = Paths.get("src", "test", "resources", "fileWithmanyOfLine.txt"); AIOFileReader.bytes(FILE).subscribe(data -> { System.out.println(Thread.currentThread().getName()); }, err -> err.printStackTrace()).join(); ``` 输出结果如下: ```text Thread-8 Thread-7 Thread-8 Thread-7 Thread-8 Thread-7 Thread-8 Thread-7 Thread-8 Thread-7 ... ``` 其结果表明:有两个线程读取文件,线程交替读取以保证读取文件数据的顺序,这是 [`AsynchronousFileChannel`](https://docs.oracle.com/javase/10/docs/api/java/nio/channels/AsynchronousFileChannel.html) 实现的 #### AIOFileWriter使用说明 [`AIOFileWriter`](src/main/java/io/github/kavahub/file/writer/AIOFileWriter.java)方法列表: - `CompletableFuture write(Path file, byte[] bytes)` : 字节数组数据写入文件。 - `CompletableFuture write(Path file, String line)` : 字符串数据写入文件。 - `CompletableFuture write(Path file, Query lines)` : 字符串流数据写入文件。 - `CompletableFuture write(Path file, Iterable lines)` : 字符串集合数据写入文件。 示例: ```java // 写入字符串 AIOFileWriter.write(Paths.get(FILE_TO_WRITE), "This is file content:你好").join(); ``` 示例: ```java // 分割字符串写入 final String content = "This is file content:你好"; AIOFileWriter.write(Paths.get(FILE_TO_WRITE), String.join(System.lineSeparator(), content.split(" "))).join(); ``` 示例: ```java // 边度边写 final Path FILE = Paths.get("src", "test", "resources", "fileWithmanyOfLine.txt"); try (CompletableFileWriter writer = AIOFileWriter.of(FILE_TO_WRITE)) { AIOFileReader.line(FILE) // 忽略前2行 .skip(2) // 过滤掉空行 .filter(line -> !line.isBlank()) // 转换成大写 .map(String::toUpperCase) // 加入换行符 .map(line -> line + System.lineSeparator()).subscribe(data -> { writer.write(data); }, err -> err.printStackTrace()).join(); // 等待写入完成 writer.getPosition().whenComplete((size, error) -> { if (error != null) { error.printStackTrace(); } System.out.println("总共写入字节数:" + size); }).join(); } ``` #### NIOFileLineReader使用说明 [`NIOFileLineReader`](src/main/java/io/github/kavahub/file/reader/NIOFileLineReader.java) 方法列表: `Query read(Path file)` : 读取文件行。 ```java // 读取文件行并过滤 final Path FILE = Paths.get("src", "test", "resources", "fileWithmanyOfLine.txt"); NIOFileLineReader.read(FILE).subscribe(data -> { System.out.println(Thread.currentThread().getName()); }, err -> err.printStackTrace()).join(); ``` #### 使用建议 - 文件的异步读写,并不是为了提高文件的读取性能,而是提高文件读取的吞吐量(读取更多的文件,并保持性能,使JVM可以稳定运行)。 - 在大多数情况下,使用Jdk提供的[`Files`](https://docs.oracle.com/javase/10/docs/api/java/nio/file/Files.html)或许更合适。 - 不要为了异步而异步,找到问题所在,也许解决问题的关键不是异步。 建议使用优先级: `Java NIO Files` > `NIOFileLineReader` > `AIOFileReader` #### 性能 性能测试,参考 [`ReadLineBenchmark`](src/test/java/io/github/kavahub/file/performance/ReadLineBenchmark.java) 。 其他开源项目文件读写的性能测试 [`ReadFileBenchmark`](https://gitee.com/yangyunjiao/learn-java/blob/master/core-java/core-java-io/src/main/java/net/learnjava/ReadFileBenchmark.java) #### 构建项目 克隆代码到本地,然后运行mvn命令, 执行编译,测试,打包项目: ```text mvn clean install ``` #### 发布项目 首先,确保项目可以正确构建。然后执行下面的命令(发布的文件要以release结尾,如:kavahub-async-file-1.0.0.RELEASE.jar): ```text mvn release:prepare -Prelease mvn release:perform -Prelease ``` 上面操作将包上传到了Staging Repository,需要转入Release Repository,执行命令: ```text mvn nexus-staging:release ``` 以上操作全部成功,发布完成。 发布SNAPSHOT包到仓库,命令如下: ```text mvn clean deploy -Prelease ``` 取消Staging Repository中的包,命令如下: ```text mvn nexus-staging:drop ``` #### 其他开源项目 - [RxIo](https://github.com/javasync/RxIo) : Asynchronous non-blocking File Reader and Writer library for Java #### 历史版本 [1.0.0-RELEASE](./README-1.0.0.RELEASE.md) #### 参考文档 - [vsCode利用git连接github](https://www.jianshu.com/p/f836da434e18) - [如何将自己的代码发布到Maven中央仓库](https://www.cnblogs.com/songyz/p/11387978.html) - [Deploying to OSSRH with Apache Maven - Introduction](https://central.sonatype.org/publish/publish-maven/)