需要分析来自许多Web服务器的Apache日志文件时。 虽然我们可以吧每个日志文件都复制到HDFS中, 但通常而言HADOOP处理单个大文件会比处理多个小文件更有效率。此外从分析的目的来看,我们吧日志数据视为一个大文件。 日志数据被分散在多个文件是由于WEB服务器采用分布式构架带来的副作用。一种解决办法是先将所有的文件合并,然后复制到HDFS。 可是文件合并需要占用本地计算机打来能干的磁盘空间爱你,如果我们能够在向HDFS复制的过程中合并他们 事情就简单的多。
应此,我们需要一个PutMerge类型的操作。Hadoop command tool supply 一个getmerge命令, 用于把一组HDFS文件复制到本地计算机以前进行合并。但我们想要的相反。故无法在Hadoop的文件爱你管理工具中获得。我们用HDFS API来变成实现它。
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class PutMerge {
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
FileSystem hdfs = FileSystem.get(conf);
FileSystem local = FileSystem.getLocal(conf);
Path inputDir = new Path(args[0]); // 设定输入目录与输出文件
Path hdfsFile = new Path(args[1]);
try {
FileStatus[] inputFiles = local.listStatus(inputDir); //得到本地文件列表
FSDataOutputStream out = hdfs.create(hdfsFile); //生成HDFS输出流
for (int i=0; i
System.out.println(inputFiles[i].getPath().getName());
FSDataInputStream in = local.open(inputFiles[i].getPath()); //打开本地输入流
byte buffer[] = new byte[256];
int bytesRead = 0;
while( (bytesRead = in.read(buffer)) > 0) {
out.write(buffer, 0, bytesRead);
}
in.close();
}
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
PS
测试:
直接在Eclipse上跑会出出错:
Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: hdfs://localhost:8020/user/training, expected: file:///
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:410)
at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:56)
at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:335)
at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:493)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:378)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:365)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:584)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:565)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:472)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:464)
at my.utility.PutMerge.main(PutMerge.java:25)
查看源代码
static final URI NAME = URI.create("file:///");
是在比对的时候 总是会认为本地的scheme是 file:/// 而我给的是 hdfs:///就不匹配了
方法 1:
export 出class变成 jar 直接在 服务器上跑就可以了:
hadoop jar merge.jar my.utility.PutMerge /home/training/putmerge /user/training/mergedata
方法 2 (eclipse:)
package my.utility;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class PutMerge {
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:8020/"); //设置fs配置
FileSystem hdfs = FileSystem.get(conf);
FileSystem local = FileSystem.getLocal(conf);
Path inputDir = new Path(args[0]);//( "file://" + args[0]);
Path hdfsFile = new Path("hdfs://localhost:8020/" + args[1]);
try {
FileStatus[] inputFiles = local.listStatus(inputDir);
FSDataOutputStream out = hdfs.create(hdfsFile);
for (int i=0; i<inputFiles.length; i++) {
System.out.println(inputFiles[i].getPath().getName());
FSDataInputStream in = local.open(inputFiles[i].getPath());
byte buffer[] = new byte[256];
int bytesRead = 0;
while( (bytesRead = in.read(buffer)) > 0) {
out.write(buffer, 0, bytesRead);
}
in.close();
}
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}