DataX二次开发——HdfsReader和HdfsWriter插件增加parquet文件读写
一、研发背景
DataX官方开源的版本支持HDFS文件的读写,但是截止目前,并没有支持Parquet文件的读写,得益于DataX出色的数据同步性能,去年公司的项目大部分采用了DataX作为数据同步工具,但是从CDH集群同步Parquet或者将其他数据源的数据以Parquet格式写入HDFS,这两个常用场景没有进行支持。因此只能自己动手,补充HdfsReader和HdfsWriter插件,以支持Parquet文件的读写。
二、HdfsReader插件
本插件比较简单,一共五个类,具体类名及对应修改项如下:
- DFSUtil:增加是否Parquet文件类型判断方法、增加Parquet文件读取转换方法。
- HdfsConstant:增加Parquet文件类的枚举项。
- HdfsReader:增加判断是否配置为Parquet文件类型的判断条件分支。
- HdfsReaderErrorCode:无需更改。
- Type:无需更改。
按需修改其中四个类即可,具体代码如下:
DFSUtil
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import com.alibaba.datax.common.base.Key; import com.alibaba.datax.common.element.BoolColumn; import com.alibaba.datax.common.element.BytesColumn; import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.element.ColumnEntry; import com.alibaba.datax.common.element.DateColumn; import com.alibaba.datax.common.element.DoubleColumn; import com.alibaba.datax.common.element.LongColumn; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.element.StringColumn; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordSender; import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.storage.reader.StorageReaderErrorCode; import com.alibaba.datax.storage.reader.StorageReaderUtil; import org.apache.avro.Conversions; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.io.RCFileRecordReader; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.orc.TypeDescription; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.example.data.Group; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.io.api.Binary; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; import java.math.BigDecimal; import java.math.RoundingMode; import java.nio.ByteBuffer; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import static com.alibaba.datax.common.base.Key.COLUMN; import static com.alibaba.datax.common.base.Key.NULL_FORMAT; public class DFSUtil { private static final Logger LOG = LoggerFactory.getLogger(DFSUtil.class); // the offset of julian, 2440588 is 1970/1/1 private static final int JULIAN_EPOCH_OFFSET_DAYS = 2440588; private static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1); private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1); private static final int DIRECTORY_SIZE_GUESS = 16 * 1024; private final org.apache.hadoop.conf.Configuration hadoopConf; private final boolean haveKerberos; private final HashSet<String> sourceHDFSAllFilesList = new HashSet<>(); private String specifiedFileType = null; private String kerberosKeytabFilePath; private String kerberosPrincipal; public DFSUtil(Configuration taskConfig) { hadoopConf = new org.apache.hadoop.conf.Configuration(); //io.file.buffer.size 性能参数 //http://blog.csdn.net/yangjl38/article/details/7583374 Configuration hadoopSiteParams = taskConfig.getConfiguration(Key.HADOOP_CONFIG); JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(taskConfig.getString(Key.HADOOP_CONFIG)); if (null != hadoopSiteParams) { Set<String> paramKeys = hadoopSiteParams.getKeys(); for (String each : paramKeys) { hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each)); } } hadoopConf.set(HdfsConstant.HDFS_DEFAULT_KEY, taskConfig.getString(Key.DEFAULT_FS)); //是否有Kerberos认证 this.haveKerberos = taskConfig.getBool(Key.HAVE_KERBEROS, false); if (haveKerberos) { this.kerberosKeytabFilePath = taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH); this.kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL); this.hadoopConf.set(HdfsConstant.HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos"); } this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath); LOG.info("hadoopConfig details:{}", JSON.toJSONString(this.hadoopConf)); } private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath) { if (haveKerberos && StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) { UserGroupInformation.setConfiguration(hadoopConf); try { UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath); } catch (Exception e) { String message = String.format("kerberos认证失败,请确定kerberosKeytabFilePath[%s]和kerberosPrincipal[%s]填写正确", kerberosKeytabFilePath, kerberosPrincipal); throw DataXException.asDataXException(HdfsReaderErrorCode.KERBEROS_LOGIN_ERROR, message, e); } } } /** * 获取指定路径列表下符合条件的所有文件的绝对路径 * * @param srcPaths 路径列表 * @param specifiedFileType 指定文件类型 * @return set of string */ public Set<String> getAllFiles(List<String> srcPaths, String specifiedFileType) { this.specifiedFileType = specifiedFileType; if (!srcPaths.isEmpty()) { for (String eachPath : srcPaths) { LOG.info("get HDFS all files in path = [{}]", eachPath); getHDFSAllFiles(eachPath); } } return sourceHDFSAllFilesList; } private void addSourceFileIfNotEmpty(FileStatus f) { if (f.isFile()) { String filePath = f.getPath().toString(); if (f.getLen() > 0) { addSourceFileByType(filePath); } else { LOG.warn("文件[{}]长度为0,将会跳过不作处理!", filePath); } } } public void getHDFSAllFiles(String hdfsPath) { try { FileSystem hdfs = FileSystem.get(hadoopConf); //判断hdfsPath是否包含正则符号 if (hdfsPath.contains("*") || hdfsPath.contains("?")) { Path path = new Path(hdfsPath); FileStatus[] stats = hdfs.globStatus(path); for (FileStatus f : stats) { if (f.isFile()) { addSourceFileIfNotEmpty(f); } else if (f.isDirectory()) { getHDFSAllFilesNORegex(f.getPath().toString(), hdfs); } } } else { getHDFSAllFilesNORegex(hdfsPath, hdfs); } } catch (IOException e) { String message = String.format("无法读取路径[%s]下的所有文件,请确认您的配置项fs.defaultFS, path的值是否正确," + "是否有读写权限,网络是否已断开!", hdfsPath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.PATH_CONFIG_ERROR, e); } } private void getHDFSAllFilesNORegex(String path, FileSystem hdfs) throws IOException { // 获取要读取的文件的根目录 Path listFiles = new Path(path); // If the network disconnected, this method will retry 45 times // each time the retry interval for 20 seconds // 获取要读取的文件的根目录的所有二级子文件目录 FileStatus[] stats = hdfs.listStatus(listFiles); for (FileStatus f : stats) { // 判断是不是目录,如果是目录,递归调用 if (f.isDirectory()) { LOG.info("[{}] 是目录, 递归获取该目录下的文件", f.getPath()); getHDFSAllFilesNORegex(f.getPath().toString(), hdfs); } else if (f.isFile()) { addSourceFileIfNotEmpty(f); } else { String message = String.format("该路径[%s]文件类型既不是目录也不是文件,插件自动忽略。", f.getPath()); LOG.info(message); } } } // 根据用户指定的文件类型,将指定的文件类型的路径加入sourceHDFSAllFilesList private void addSourceFileByType(String filePath) { // 检查file的类型和用户配置的fileType类型是否一致 boolean isMatchedFileType = checkHdfsFileType(filePath, this.specifiedFileType); if (isMatchedFileType) { String msg = String.format("[%s]是[%s]类型的文件, 将该文件加入source files列表", filePath, this.specifiedFileType); LOG.info(msg); sourceHDFSAllFilesList.add(filePath); } else { String message = String.format("文件[%s]的类型与用户配置的fileType类型不一致," + "请确认您配置的目录下面所有文件的类型均为[%s]" , filePath, this.specifiedFileType); LOG.error(message); throw DataXException.asDataXException( HdfsReaderErrorCode.FILE_TYPE_UNSUPPORTED, message); } } public InputStream getInputStream(String filepath) { InputStream inputStream; Path path = new Path(filepath); try { FileSystem fs = FileSystem.get(hadoopConf); //If the network disconnected, this method will retry 45 times //each time the retry interval for 20 seconds inputStream = fs.open(path); return inputStream; } catch (IOException e) { String message = String.format("读取文件 : [%s] 时出错,请确认文件:[%s]存在且配置的用户有权限读取", filepath, filepath); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message, e); } } public void sequenceFileStartRead(String sourceSequenceFilePath, Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector) { LOG.info("Start Read sequence file [{}].", sourceSequenceFilePath); Path seqFilePath = new Path(sourceSequenceFilePath); try (SequenceFile.Reader reader = new SequenceFile.Reader(this.hadoopConf, SequenceFile.Reader.file(seqFilePath))) { //获取SequenceFile.Reader实例 //获取key 与 value Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), this.hadoopConf); Text value = new Text(); while (reader.next(key, value)) { if (StringUtils.isNotBlank(value.toString())) { StorageReaderUtil.transportOneRecord(recordSender, readerSliceConfig, taskPluginCollector, value.toString()); } } } catch (Exception e) { String message = String.format("SequenceFile.Reader读取文件[%s]时出错", sourceSequenceFilePath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_SEQUENCE_FILE_ERROR, message, e); } } public void rcFileStartRead(String sourceRcFilePath, Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector) { LOG.info("Start Read rc-file [{}].", sourceRcFilePath); List<ColumnEntry> column = StorageReaderUtil .getListColumnEntry(readerSliceConfig, COLUMN); // warn: no default value '\N' String nullFormat = readerSliceConfig.getString(NULL_FORMAT); Path rcFilePath = new Path(sourceRcFilePath); RCFileRecordReader recordReader = null; try (FileSystem fs = FileSystem.get(rcFilePath.toUri(), hadoopConf)) { long fileLen = fs.getFileStatus(rcFilePath).getLen(); FileSplit split = new FileSplit(rcFilePath, 0, fileLen, (String[]) null); recordReader = new RCFileRecordReader(hadoopConf, split); LongWritable key = new LongWritable(); BytesRefArrayWritable value = new BytesRefArrayWritable(); Text txt = new Text(); while (recordReader.next(key, value)) { String[] sourceLine = new String[value.size()]; txt.clear(); for (int i = 0; i < value.size(); i++) { BytesRefWritable v = value.get(i); txt.set(v.getData(), v.getStart(), v.getLength()); sourceLine[i] = txt.toString(); } StorageReaderUtil.transportOneRecord(recordSender, column, sourceLine, nullFormat, taskPluginCollector); } } catch (IOException e) { String message = String.format("读取文件[%s]时出错", sourceRcFilePath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_RCFILE_ERROR, message, e); } finally { try { if (recordReader != null) { recordReader.close(); LOG.info("Finally, Close RCFileRecordReader."); } } catch (IOException e) { LOG.warn(String.format("finally: 关闭RCFileRecordReader失败, %s", e.getMessage())); } } } public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector) { LOG.info("Start Read orc-file [{}].", sourceOrcFilePath); List<ColumnEntry> column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN); String nullFormat = readerSliceConfig.getString(NULL_FORMAT); try { Path orcFilePath = new Path(sourceOrcFilePath); Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(hadoopConf)); TypeDescription schema = reader.getSchema(); assert column != null; if (column.isEmpty()) { for (int i = 0; i < schema.getChildren().size(); i++) { ColumnEntry columnEntry = new ColumnEntry(); columnEntry.setIndex(i); columnEntry.setType(schema.getChildren().get(i).getCategory().getName()); column.add(columnEntry); } } VectorizedRowBatch rowBatch = schema.createRowBatch(1024); org.apache.orc.RecordReader rowIterator = reader.rows(reader.options().schema(schema)); while (rowIterator.nextBatch(rowBatch)) { transportOrcRecord(rowBatch, column, recordSender, taskPluginCollector, nullFormat); } } catch (Exception e) { String message = String.format("从orc-file文件路径[%s]中读取数据发生异常,请联系系统管理员。" , sourceOrcFilePath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message); } } private void transportOrcRecord(VectorizedRowBatch rowBatch, List<ColumnEntry> columns, RecordSender recordSender, TaskPluginCollector taskPluginCollector, String nullFormat) { Record record; for (int row = 0; row < rowBatch.size; row++) { record = recordSender.createRecord(); try { for (ColumnEntry column : columns) { Column columnGenerated; if (column.getValue() != null) { if (!"null".equals(column.getValue())) { columnGenerated = new StringColumn(column.getValue()); } else { columnGenerated = new StringColumn(); } record.addColumn(columnGenerated); continue; } int i = column.getIndex(); String columnType = column.getType().toUpperCase(); ColumnVector col = rowBatch.cols[i]; Type type = Type.valueOf(columnType); if (col.isNull[row]) { record.addColumn(new StringColumn(null)); continue; } switch (type) { case INT: case LONG: case BOOLEAN: case BIGINT: columnGenerated = new LongColumn(((LongColumnVector) col).vector[row]); break; case DATE: columnGenerated = new DateColumn(new Date(((LongColumnVector) col).vector[row])); break; case DOUBLE: columnGenerated = new DoubleColumn(((DoubleColumnVector) col).vector[row]); break; case DECIMAL: columnGenerated = new DoubleColumn(((DecimalColumnVector) col).vector[row].doubleValue()); break; case BINARY: BytesColumnVector b = (BytesColumnVector) col; byte[] val = Arrays.copyOfRange(b.vector[row], b.start[row], b.start[row] + b.length[row]); columnGenerated = new BytesColumn(val); break; case TIMESTAMP: columnGenerated = new DateColumn(((TimestampColumnVector) col).getTime(row)); break; default: // type is string or other String v = ((BytesColumnVector) col).toString(row); columnGenerated = v.equals(nullFormat) ? new StringColumn() : new StringColumn(v); break; } record.addColumn(columnGenerated); } recordSender.sendToWriter(record); } catch (Exception e) { if (e instanceof DataXException) { throw (DataXException) e; } taskPluginCollector.collectDirtyRecord(record, e.getMessage()); } } } public void parquetFileStartRead(String sourceParquetFilePath, Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector) { LOG.info("Start Read parquet-file [{}].", sourceParquetFilePath); List<ColumnEntry> column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN); String nullFormat = readerSliceConfig.getString(NULL_FORMAT); Path parquetFilePath = new Path(sourceParquetFilePath); hadoopConf.set("parquet.avro.readInt96AsFixed", "true"); JobConf conf = new JobConf(hadoopConf); GenericData decimalSupport = new GenericData(); decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion()); try (ParquetReader<GenericData.Record> reader = AvroParquetReader .<GenericData.Record>builder(HadoopInputFile.fromPath(parquetFilePath, hadoopConf)) .withDataModel(decimalSupport) .withConf(conf) .build()) { GenericData.Record gRecord = reader.read(); Schema schema = gRecord.getSchema(); if (null == column || column.isEmpty()) { column = new ArrayList<>(schema.getFields().size()); String sType; // 用户没有填写具体的字段信息,需要从parquet文件构建 for (int i = 0; i < schema.getFields().size(); i++) { ColumnEntry columnEntry = new ColumnEntry(); columnEntry.setIndex(i); Schema type; if (schema.getFields().get(i).schema().getType() == Schema.Type.UNION) { type = schema.getFields().get(i).schema().getTypes().get(1); } else { type = schema.getFields().get(i).schema(); } sType = type.getProp("logicalType") != null ? type.getProp("logicalType") : type.getType().getName(); if (sType.startsWith("timestamp")) { columnEntry.setType("timestamp"); } else { columnEntry.setType(sType); } column.add(columnEntry); } } while (gRecord != null) { transportParquetRecord(column, gRecord, recordSender, taskPluginCollector, nullFormat); gRecord = reader.read(); } } catch (IOException e) { String message = String.format("从parquet file文件路径[%s]中读取数据发生异常,请联系系统管理员。" , sourceParquetFilePath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message); } } /* * create a transport record for Parquet file * * */ private void transportParquetRecord(List<ColumnEntry> columnConfigs, GenericData.Record gRecord, RecordSender recordSender, TaskPluginCollector taskPluginCollector, String nullFormat) { Record record = recordSender.createRecord(); Column columnGenerated; int scale = 10; try { for (ColumnEntry columnEntry : columnConfigs) { String columnType = columnEntry.getType(); Integer columnIndex = columnEntry.getIndex(); String columnConst = columnEntry.getValue(); String columnValue = null; if (null != columnIndex) { if (null != gRecord.get(columnIndex)) { columnValue = gRecord.get(columnIndex).toString(); } else { record.addColumn(new StringColumn(null)); continue; } } else { columnValue = columnConst; } if (columnType.startsWith("decimal(")) { String ps = columnType.replace("decimal(", "").replace(")", ""); columnType = "decimal"; if (ps.contains(",")) { scale = Integer.parseInt(ps.split(",")[1].trim()); } else { scale = 0; } } Type type = Type.valueOf(columnType.toUpperCase()); if (StringUtils.equals(columnValue, nullFormat)) { columnValue = null; } try { switch (type) { case STRING: columnGenerated = new StringColumn(columnValue); break; case INT: case LONG: columnGenerated = new LongColumn(columnValue); break; case DOUBLE: columnGenerated = new DoubleColumn(columnValue); break; case DECIMAL: if (null == columnValue) { columnGenerated = new DoubleColumn((Double) null); } else { columnGenerated = new DoubleColumn(new BigDecimal(columnValue).setScale(scale, RoundingMode.HALF_UP)); } break; case BOOLEAN: columnGenerated = new BoolColumn(columnValue); break; case DATE: if (columnValue == null) { columnGenerated = new DateColumn((Date) null); } else { String formatString = columnEntry.getFormat(); if (StringUtils.isNotBlank(formatString)) { // 用户自己配置的格式转换 SimpleDateFormat format = new SimpleDateFormat( formatString); columnGenerated = new DateColumn( format.parse(columnValue)); } else { // 框架尝试转换 columnGenerated = new DateColumn(new StringColumn(columnValue).asDate()); } } break; case TIMESTAMP: if (null == columnValue) { columnGenerated = new DateColumn(); } else if (columnValue.startsWith("[")) { // INT96 https://github.com/apache/parquet-mr/pull/901 GenericData.Fixed fixed = (GenericData.Fixed) gRecord.get(columnIndex); Date date = new Date(getTimestampMills(fixed.bytes())); columnGenerated = new DateColumn(date); } else { columnGenerated = new DateColumn(Long.parseLong(columnValue) * 1000); } break; case BINARY: columnGenerated = new BytesColumn(((ByteBuffer) gRecord.get(columnIndex)).array()); break; default: String errorMessage = String.format("您配置的列类型暂不支持 : [%s]", columnType); LOG.error(errorMessage); throw DataXException.asDataXException(StorageReaderErrorCode.NOT_SUPPORT_TYPE, errorMessage); } } catch (Exception e) { throw new IllegalArgumentException(String.format( "类型转换错误, 无法将[%s] 转换为[%s], %s", columnValue, type, e)); } record.addColumn(columnGenerated); } // end for recordSender.sendToWriter(record); } catch (IllegalArgumentException | IndexOutOfBoundsException iae) { taskPluginCollector.collectDirtyRecord(record, iae.getMessage()); } catch (Exception e) { if (e instanceof DataXException) { throw (DataXException) e; } // 每一种转换失败都是脏数据处理,包括数字格式 & 日期格式 taskPluginCollector.collectDirtyRecord(record, e.getMessage()); } } private TypeDescription getOrcSchema(String filePath) { Path path = new Path(filePath); try { Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(hadoopConf)); // return reader.getTypes().get(0).getSubtypesCount() return reader.getSchema(); } catch (IOException e) { String message = "读取orc-file column列数失败,请联系系统管理员"; throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message); } } public boolean checkHdfsFileType(String filepath, String specifiedFileType) { Path file = new Path(filepath); try (FileSystem fs = FileSystem.get(hadoopConf); FSDataInputStream in = fs.open(file)) { if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.ORC)) { return isORCFile(file, fs, in); } else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.RC)) { return isRCFile(filepath, in); } else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.SEQ)) { return isSequenceFile(file, in); } else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.PARQUET)) { return isParquetFile(file); } else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.CSV) || StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.TEXT)) { return true; } } catch (Exception e) { String message = String.format("检查文件[%s]类型失败,目前支持 %s 格式的文件," + "请检查您文件类型和文件是否正确。", filepath, HdfsConstant.SUPPORT_FILE_TYPE); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message, e); } return false; } // 判断file是否是ORC File private boolean isORCFile(Path file, FileSystem fs, FSDataInputStream in) { try { // figure out the size of the file using the option or filesystem long size = fs.getFileStatus(file).getLen(); //read last bytes into buffer to get PostScript int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS); in.seek(size - readSize); ByteBuffer buffer = ByteBuffer.allocate(readSize); in.readFully(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); //read the PostScript //get length of PostScript int psLen = buffer.get(readSize - 1) & 0xff; String orcMagic = org.apache.orc.OrcFile.MAGIC; int len = orcMagic.length(); if (psLen < len + 1) { return false; } int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - 1 - len; byte[] array = buffer.array(); // now look for the magic string at the end of the postscript. if (Text.decode(array, offset, len).equals(orcMagic)) { return true; } else { // If it isn't there, this may be the 0.11.0 version of ORC. // Read the first 3 bytes of the file to check for the header in.seek(0); byte[] header = new byte[len]; in.readFully(header, 0, len); // if it isn't there, this isn't an ORC file if (Text.decode(header, 0, len).equals(orcMagic)) { return true; } } } catch (IOException e) { LOG.info("检查文件类型: [{}] 不是ORC File.", file); } return false; } // 判断file是否是RC file private boolean isRCFile(String filepath, FSDataInputStream in) { // The first version of RCFile used the sequence file header. final byte[] originalMagic = {(byte) 'S', (byte) 'E', (byte) 'Q'}; // The 'magic' bytes at the beginning of the RCFile final byte[] rcMagic = {(byte) 'R', (byte) 'C', (byte) 'F'}; // the version that was included with the original magic, which is mapped // into ORIGINAL_VERSION final byte ORIGINAL_MAGIC_VERSION_WITH_METADATA = 6; // All the versions should be place in this list. final int ORIGINAL_VERSION = 0; // version with SEQ // version with RCF // final int NEW_MAGIC_VERSION = 1 // final int CURRENT_VERSION = NEW_MAGIC_VERSION final int CURRENT_VERSION = 1; byte version; byte[] magic = new byte[rcMagic.length]; try { in.seek(0); in.readFully(magic); if (Arrays.equals(magic, originalMagic)) { if (in.readByte() != ORIGINAL_MAGIC_VERSION_WITH_METADATA) { return false; } version = ORIGINAL_VERSION; } else { if (!Arrays.equals(magic, rcMagic)) { return false; } // Set 'version' version = in.readByte(); if (version > CURRENT_VERSION) { return false; } } if (version == ORIGINAL_VERSION) { try { Class<?> keyCls = hadoopConf.getClassByName(Text.readString(in)); Class<?> valCls = hadoopConf.getClassByName(Text.readString(in)); if (!keyCls.equals(RCFile.KeyBuffer.class) || !valCls.equals(RCFile.ValueBuffer.class)) { return false; } } catch (ClassNotFoundException e) { return false; } } // boolean decompress = in.readBoolean(); // is compressed? if (version == ORIGINAL_VERSION) { // is block-compressed? it should be always false. boolean blkCompressed = in.readBoolean(); return !blkCompressed; } return true; } catch (IOException e) { LOG.info("检查文件类型: [{}] 不是RC File.", filepath); } return false; } // 判断file是否是Sequence file private boolean isSequenceFile(Path filepath, FSDataInputStream in) { final byte[] seqMagic = {(byte) 'S', (byte) 'E', (byte) 'Q'}; byte[] magic = new byte[seqMagic.length]; try { in.seek(0); in.readFully(magic); return Arrays.equals(magic, seqMagic); } catch (IOException e) { LOG.info("检查文件类型: [{}] 不是Sequence File.", filepath); } return false; } //判断是否为parquet(考虑判断parquet文件的schema是否不为空) private boolean isParquetFile(Path file) { try { GroupReadSupport readSupport = new GroupReadSupport(); ParquetReader.Builder<Group> reader = ParquetReader.builder(readSupport, file); ParquetReader<Group> build = reader.build(); if (build.read() != null) { return true; } } catch (IOException e) { LOG.info("检查文件类型: [{}] 不是Parquet File.", file); } return false; } /** * Returns GMT's timestamp from binary encoded parquet timestamp (12 bytes - julian date + time of day nanos). * * @param timestampBinary INT96 parquet timestamp * @return timestamp in millis, GMT timezone */ public static long getTimestampMillis(Binary timestampBinary) { if (timestampBinary.length() != 12) { return 0; } byte[] bytes = timestampBinary.getBytes(); return getTimestampMills(bytes); } public static long getTimestampMills(byte[] bytes) { assert bytes.length == 12; // little endian encoding - need to invert byte order long timeOfDayNanos = Longs.fromBytes(bytes[7], bytes[6], bytes[5], bytes[4], bytes[3], bytes[2], bytes[1], bytes[0]); int julianDay = Ints.fromBytes(bytes[11], bytes[10], bytes[9], bytes[8]); return julianDayToMillis(julianDay) + (timeOfDayNanos / NANOS_PER_MILLISECOND); } private static long julianDayToMillis(int julianDay) { return (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY; } }
HdfsConstant
package com.alibaba.datax.plugin.reader.hdfsreader; import com.alibaba.datax.common.base.Constant; import java.util.Arrays; import java.util.List; public class HdfsConstant extends Constant { public static final String SOURCE_FILES = "sourceFiles"; public static final String TEXT = "TEXT"; public static final String ORC = "ORC"; public static final String CSV = "CSV"; public static final String SEQ = "SEQ"; public static final String RC = "RC"; public static final String PARQUET = "PARQUET"; //新增parquet文件类型 public static final String HDFS_DEFAULT_KEY = "fs.defaultFS"; public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication"; protected static final List<String> SUPPORT_FILE_TYPE = Arrays.asList(HdfsConstant.CSV, HdfsConstant.ORC, HdfsConstant.RC, HdfsConstant.SEQ, HdfsConstant.TEXT, HdfsConstant.PARQUET); private HdfsConstant() {} }
HdfsReader
package com.alibaba.datax.plugin.reader.hdfsreader; import com.alibaba.datax.common.base.Key; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordSender; import com.alibaba.datax.common.spi.Reader; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.storage.reader.StorageReaderUtil; import com.alibaba.datax.storage.util.FileHelper; import org.apache.commons.io.Charsets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.InputStream; import java.nio.charset.UnsupportedCharsetException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import static com.alibaba.datax.common.base.Key.COLUMN; import static com.alibaba.datax.common.base.Key.ENCODING; import static com.alibaba.datax.common.base.Key.INDEX; import static com.alibaba.datax.common.base.Key.TYPE; import static com.alibaba.datax.common.base.Key.VALUE; public class HdfsReader extends Reader { /** * Job 中的方法仅执行一次,Task 中方法会由框架启动多个 Task 线程并行执行。 * <p> * 整个 Reader 执行流程是: * <pre> * Job类init-->prepare-->split * Task类init-->prepare-->startRead-->post-->destroy * Task类init-->prepare-->startRead-->post-->destroy * Job类post-->destroy * </pre> */ public static class Job extends Reader.Job { private static final Logger LOG = LoggerFactory.getLogger(Job.class); private Configuration readerOriginConfig = null; private HashSet<String> sourceFiles; private String specifiedFileType = null; private DFSUtil dfsUtil = null; private List<String> path = null; @Override public void init() { LOG.info("init() begin..."); this.readerOriginConfig = getPluginJobConf(); validate(); dfsUtil = new DFSUtil(readerOriginConfig); LOG.info("init() ok and end..."); } public void validate() { readerOriginConfig.getNecessaryValue(Key.DEFAULT_FS, HdfsReaderErrorCode.DEFAULT_FS_NOT_FIND_ERROR); // path check String pathInString = readerOriginConfig.getNecessaryValue(Key.PATH, HdfsReaderErrorCode.REQUIRED_VALUE); if (!pathInString.startsWith("[") && !pathInString.endsWith("]")) { path = Collections.singletonList(pathInString); } else { path = readerOriginConfig.getList(Key.PATH, String.class); if (null == path || path.isEmpty()) { throw DataXException.asDataXException(HdfsReaderErrorCode.REQUIRED_VALUE, "您需要指定待读取的源目录或文件"); } for (String eachPath : path) { if (!eachPath.startsWith("/")) { String message = String.format("请检查参数path:[%s],需要配置为绝对路径", eachPath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.ILLEGAL_VALUE, message); } } } specifiedFileType = readerOriginConfig.getNecessaryValue(Key.FILE_TYPE, HdfsReaderErrorCode.REQUIRED_VALUE).toUpperCase(); if (!HdfsConstant.SUPPORT_FILE_TYPE.contains(specifiedFileType)) { String message = "HdfsReader插件目前支持 " + HdfsConstant.SUPPORT_FILE_TYPE + " 几种格式的文件"; throw DataXException.asDataXException(HdfsReaderErrorCode.FILE_TYPE_ERROR, message); } String encoding = this.readerOriginConfig.getString(ENCODING, "UTF-8"); try { Charsets.toCharset(encoding); } catch (UnsupportedCharsetException uce) { throw DataXException.asDataXException( HdfsReaderErrorCode.ILLEGAL_VALUE, String.format("不支持的编码格式 : [%s]", encoding), uce); } catch (Exception e) { throw DataXException.asDataXException( HdfsReaderErrorCode.ILLEGAL_VALUE, String.format("运行配置异常 : %s", e.getMessage()), e); } //check Kerberos boolean haveKerberos = readerOriginConfig.getBool(Key.HAVE_KERBEROS, false); if (haveKerberos) { readerOriginConfig.getNecessaryValue(Key.KERBEROS_KEYTAB_FILE_PATH, HdfsReaderErrorCode.REQUIRED_VALUE); readerOriginConfig.getNecessaryValue(Key.KERBEROS_PRINCIPAL, HdfsReaderErrorCode.REQUIRED_VALUE); } // validate the Columns validateColumns(); // validate compress String compress = readerOriginConfig.getString(Key.COMPRESS, "NONE"); if ("gzip".equalsIgnoreCase(compress)) { // correct to gz readerOriginConfig.set(Key.COMPRESS, "gz"); } } private void validateColumns() { // 检测是column 是否为 ["*"] 若是则填为空 List<Configuration> column = this.readerOriginConfig.getListConfiguration(COLUMN); if (null != column && 1 == column.size() && ("\"*\"".equals(column.get(0).toString()) || "'*'".equals(column.get(0).toString()))) { readerOriginConfig.set(COLUMN, new ArrayList<String>()); } else { // column: 1. index type 2.value type 3.when type is Data, may be has dateFormat value List<Configuration> columns = readerOriginConfig.getListConfiguration(COLUMN); if (null == columns || columns.isEmpty()) { throw DataXException.asDataXException(HdfsReaderErrorCode.CONFIG_INVALID_EXCEPTION, "您需要指定 columns"); } for (Configuration eachColumnConf : columns) { eachColumnConf.getNecessaryValue(TYPE, HdfsReaderErrorCode.REQUIRED_VALUE); Integer columnIndex = eachColumnConf.getInt(INDEX); String columnValue = eachColumnConf.getString(VALUE); if (null == columnIndex && null == columnValue) { throw DataXException.asDataXException( HdfsReaderErrorCode.NO_INDEX_VALUE, "由于您配置了type, 则至少需要配置 index 或 value, 当前配置为:" + eachColumnConf); } if (null != columnIndex && null != columnValue) { throw DataXException.asDataXException(HdfsReaderErrorCode.MIXED_INDEX_VALUE, "您混合配置了index, value, 每一列同时仅能选择其中一种"); } } } } @Override public void prepare() { LOG.info("prepare(), start to getAllFiles..."); this.sourceFiles = (HashSet<String>) dfsUtil.getAllFiles(path, specifiedFileType); LOG.info("您即将读取的文件数为: [{}], 列表为: [{}]", sourceFiles.size(), sourceFiles); } @Override public List<Configuration> split(int adviceNumber) { LOG.info("split() begin..."); List<Configuration> readerSplitConfigs = new ArrayList<>(); // warn:每个slice拖且仅拖一个文件, int splitNumber = sourceFiles.size(); if (0 == splitNumber) { throw DataXException.asDataXException(HdfsReaderErrorCode.EMPTY_DIR_EXCEPTION, String.format("未能找到待读取的文件,请确认您的配置项path: %s", readerOriginConfig.getString(Key.PATH))); } List<List<String>> splitSourceFiles = FileHelper.splitSourceFiles(new ArrayList<>(sourceFiles), splitNumber); for (List<String> files : splitSourceFiles) { Configuration splitConfig = readerOriginConfig.clone(); splitConfig.set(HdfsConstant.SOURCE_FILES, files); readerSplitConfigs.add(splitConfig); } return readerSplitConfigs; } @Override public void post() { // } @Override public void destroy() { // } } public static class Task extends Reader.Task { private static final Logger LOG = LoggerFactory.getLogger(Task.class); private Configuration taskConfig; private List<String> sourceFiles; private String specifiedFileType; private DFSUtil dfsUtil = null; @Override public void init() { this.taskConfig = getPluginJobConf(); this.sourceFiles = taskConfig.getList(HdfsConstant.SOURCE_FILES, String.class); this.specifiedFileType = taskConfig.getNecessaryValue(Key.FILE_TYPE, HdfsReaderErrorCode.REQUIRED_VALUE); this.dfsUtil = new DFSUtil(taskConfig); } @Override public void prepare() { // } @Override public void startRead(RecordSender recordSender) { LOG.info("read start"); for (String sourceFile : this.sourceFiles) { LOG.info("reading file : [{}]", sourceFile); if (specifiedFileType.equalsIgnoreCase(HdfsConstant.TEXT) || specifiedFileType.equalsIgnoreCase(HdfsConstant.CSV)) { InputStream inputStream = dfsUtil.getInputStream(sourceFile); StorageReaderUtil.readFromStream(inputStream, sourceFile, taskConfig, recordSender, getTaskPluginCollector()); } else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.ORC)) { dfsUtil.orcFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector()); } else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.SEQ)) { dfsUtil.sequenceFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector()); } else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.RC)) { dfsUtil.rcFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector()); } else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.PARQUET)) { dfsUtil.parquetFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector()); } else { String message = "HdfsReader插件目前支持ORC, TEXT, CSV, SEQUENCE, RC、PARQUET等六种格式的文件," + "请将fileType选项的值配置为ORC, TEXT, CSV, SEQUENCE, RC, PARQUET"; throw DataXException.asDataXException(HdfsReaderErrorCode.FILE_TYPE_UNSUPPORTED, message); } if (recordSender != null) { recordSender.flush(); } } LOG.info("end read source files..."); } @Override public void post() { // } @Override public void destroy() { // } } }
HdfsWriter插件
本插件比较简单,一共五个类,具体类名及对应修改项如下:
- HdfsHelper:增加是否Parquet文件类型判断方法、增加Parquet文件读取转换方法。
- HdfsWriter:增加Parquet文件类的枚举项。
- SupportHiveDataType:无需更改。
- HdfsWriterErrorCode:无需更改。
- Type:无需更改。
按需修改其中四个类即可,具体代码如下:
HdfsHelper
package com.alibaba.datax.plugin.writer.hdfswriter; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.datax.common.base.Constant; import com.alibaba.datax.common.base.Key; import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordReceiver; import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.unstructuredstorage.util.ColumnTypeUtil; import com.alibaba.datax.unstructuredstorage.util.HdfsUtil; import org.apache.avro.Conversions; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authentication.util.KerberosName; import org.apache.orc.CompressionKind; import org.apache.orc.OrcFile; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Field; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.util.*; public class HdfsHelper { public static final Logger LOG = LoggerFactory.getLogger(HdfsHelper.class); public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication"; public static final String HDFS_DEFAULT_FS_KEY = "fs.defaultFS"; private FileSystem fileSystem = null; private JobConf conf = null; private org.apache.hadoop.conf.Configuration hadoopConf = null; // Kerberos private boolean haveKerberos = false; private String kerberosKeytabFilePath; private String kerberosPrincipal; private String krb5ConfPath; public static MutablePair<Text, Boolean> transportOneRecord( Record record, char fieldDelimiter, List<Configuration> columnsConfiguration, TaskPluginCollector taskPluginCollector) { MutablePair<List<Object>, Boolean> transportResultList = transportOneRecord(record, columnsConfiguration, taskPluginCollector); //保存<转换后的数据,是否是脏数据> MutablePair<Text, Boolean> transportResult = new MutablePair<>(); transportResult.setRight(false); Text recordResult = new Text(StringUtils.join(transportResultList.getLeft(), fieldDelimiter)); transportResult.setRight(transportResultList.getRight()); transportResult.setLeft(recordResult); return transportResult; } public static MutablePair<List<Object>, Boolean> transportOneRecord( Record record, List<Configuration> columnsConfiguration, TaskPluginCollector taskPluginCollector) { MutablePair<List<Object>, Boolean> transportResult = new MutablePair<>(); transportResult.setRight(false); List<Object> recordList = new ArrayList<>(); int recordLength = record.getColumnNumber(); if (0 != recordLength) { Column column; for (int i = 0; i < recordLength; i++) { column = record.getColumn(i); if (null != column.getRawData()) { String rowData = column.getRawData().toString(); SupportHiveDataType columnType = SupportHiveDataType.valueOf( columnsConfiguration.get(i).getString(Key.TYPE).toUpperCase()); //根据writer端类型配置做类型转换 try { switch (columnType) { case TINYINT: recordList.add(Byte.valueOf(rowData)); break; case SMALLINT: recordList.add(Short.valueOf(rowData)); break; case INT: case INTEGER: recordList.add(Integer.valueOf(rowData)); break; case BIGINT: recordList.add(column.asLong()); break; case FLOAT: recordList.add(Float.valueOf(rowData)); break; case DOUBLE: recordList.add(column.asDouble()); break; case STRING: case VARCHAR: case CHAR: recordList.add(column.asString()); break; case DECIMAL: recordList.add(HiveDecimal.create(column.asBigDecimal())); break; case BOOLEAN: recordList.add(column.asBoolean()); break; case DATE: recordList.add(org.apache.hadoop.hive.common.type.Date.valueOf(column.asString())); break; case TIMESTAMP: recordList.add(Timestamp.valueOf(column.asString())); break; case BINARY: recordList.add(column.asBytes()); break; default: throw DataXException .asDataXException( HdfsWriterErrorCode.ILLEGAL_VALUE, String.format( "您的配置文件中的列配置信息有误. 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%s]. 请修改表中该字段的类型或者不同步该字段.", columnsConfiguration.get(i).getString(Key.NAME), columnsConfiguration.get(i).getString(Key.TYPE))); } } catch (Exception e) { // warn: 此处认为脏数据 e.printStackTrace(); String message = String.format( "字段类型转换错误:你目标字段为[%s]类型,实际字段值为[%s].", columnsConfiguration.get(i).getString(Key.TYPE), column.getRawData()); taskPluginCollector.collectDirtyRecord(record, message); transportResult.setRight(true); break; } } else { // warn: it's all ok if nullFormat is null recordList.add(null); } } } transportResult.setLeft(recordList); return transportResult; } public static GenericRecord transportParRecord( Record record, List<Configuration> columnsConfiguration, TaskPluginCollector taskPluginCollector, GenericRecordBuilder builder) { int recordLength = record.getColumnNumber(); if (0 != recordLength) { Column column; for (int i = 0; i < recordLength; i++) { column = record.getColumn(i); String colName = columnsConfiguration.get(i).getString(Key.NAME); String typename = columnsConfiguration.get(i).getString(Key.TYPE).toUpperCase(); if (null == column || column.getRawData() == null) { builder.set(colName, null); } else { String rowData = column.getRawData().toString(); SupportHiveDataType columnType = SupportHiveDataType.valueOf(typename); //根据writer端类型配置做类型转换 try { switch (columnType) { case INT: case INTEGER: builder.set(colName, Integer.valueOf(rowData)); break; case LONG: builder.set(colName, column.asLong()); break; case FLOAT: builder.set(colName, Float.valueOf(rowData)); break; case DOUBLE: builder.set(colName, column.asDouble()); break; case STRING: builder.set(colName, column.asString()); break; case DECIMAL: builder.set(colName, new BigDecimal(column.asString()).setScale(columnsConfiguration.get(i).getInt(Key.SCALE), BigDecimal.ROUND_HALF_UP)); break; case BOOLEAN: builder.set(colName, column.asBoolean()); break; case BINARY: builder.set(colName, column.asBytes()); break; case TIMESTAMP: builder.set(colName, column.asLong() / 1000); break; default: throw DataXException .asDataXException( HdfsWriterErrorCode.ILLEGAL_VALUE, String.format( "您的配置文件中的列配置信息有误. 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%s]. 请修改表中该字段的类型或者不同步该字段.", columnsConfiguration.get(i).getString(Key.NAME), columnsConfiguration.get(i).getString(Key.TYPE))); } } catch (Exception e) { // warn: 此处认为脏数据 String message = String.format( "字段类型转换错误:目标字段为[%s]类型,实际字段值为[%s].", columnsConfiguration.get(i).getString(Key.TYPE), column.getRawData()); taskPluginCollector.collectDirtyRecord(record, message); break; } } } } return builder.build(); } public static String generateParquetSchemaFromColumnAndType(List<Configuration> columns) { Map<String, ColumnTypeUtil.DecimalInfo> decimalColInfo = new HashMap<>(16); ColumnTypeUtil.DecimalInfo PARQUET_DEFAULT_DECIMAL_INFO = new ColumnTypeUtil.DecimalInfo(10, 2); Types.MessageTypeBuilder typeBuilder = Types.buildMessage(); for (Configuration column : columns) { String name = column.getString("name"); String colType = column.getString("type"); Validate.notNull(name, "column.name can't be null"); Validate.notNull(colType, "column.type can't be null"); switch (colType.toLowerCase()) { case "tinyint": case "smallint": case "int": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).named(name); break; case "bigint": case "long": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(name); break; case "float": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.FLOAT).named(name); break; case "double": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.DOUBLE).named(name); break; case "binary": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).named(name); break; case "char": case "varchar": case "string": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(name); break; case "boolean": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BOOLEAN).named(name); break; case "timestamp": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT96).named(name); break; case "date": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).as(OriginalType.DATE).named(name); break; default: if (ColumnTypeUtil.isDecimalType(colType)) { ColumnTypeUtil.DecimalInfo decimalInfo = ColumnTypeUtil.getDecimalInfo(colType, PARQUET_DEFAULT_DECIMAL_INFO); typeBuilder.optional(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) .as(OriginalType.DECIMAL) .precision(decimalInfo.getPrecision()) .scale(decimalInfo.getScale()) .length(HdfsUtil.computeMinBytesForPrecision(decimalInfo.getPrecision())) .named(name); decimalColInfo.put(name, decimalInfo); } else { typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).named(name); } break; } } return typeBuilder.named("m").toString(); } public FileSystem getFileSystem(String defaultFS, Configuration taskConfig) { this.hadoopConf = new org.apache.hadoop.conf.Configuration(); Configuration hadoopSiteParams = taskConfig.getConfiguration(Key.HADOOP_CONFIG); JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(taskConfig.getString(Key.HADOOP_CONFIG)); if (null != hadoopSiteParams) { Set<String> paramKeys = hadoopSiteParams.getKeys(); for (String each : paramKeys) { hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each)); } } this.hadoopConf.set(HDFS_DEFAULT_FS_KEY, defaultFS); //是否有Kerberos认证 this.haveKerberos = taskConfig.getBool(Key.HAVE_KERBEROS, false); if (haveKerberos) { LOG.info("krb5.conf路径:【{}】 \n keytab路径:【{}】 \n principal:【{}】\n", taskConfig.getString(Key. KRB5_CONF_FILE_PATH), taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH), taskConfig.getString(Key.KERBEROS_PRINCIPAL)); this.krb5ConfPath = taskConfig.getString(Key. KRB5_CONF_FILE_PATH); this.kerberosKeytabFilePath = taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH); this.kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL); LOG.info("检测到kerberos认证,正在进行认证"); } System.setProperty("java.security.krb5.conf",krb5ConfPath); System.setProperty("sun.security.krb5.Config",krb5ConfPath); refreshConfig(); this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath,hadoopConf,this.krb5ConfPath); conf = new JobConf(hadoopConf); try { fileSystem = FileSystem.get(conf); } catch (IOException e) { String message = String.format("获取FileSystem时发生网络IO异常,请检查您的网络是否正常!HDFS地址:[message:defaultFS = %s]", defaultFS); LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e); } catch (Exception e) { String message = String.format("获取FileSystem失败,请检查HDFS地址是否正确: [%s]", "message:defaultFS =" + defaultFS); LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e); } if (null == fileSystem) { String message = String.format("获取FileSystem失败,请检查HDFS地址是否正确: [message:defaultFS = %s]", defaultFS); LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, message); } return fileSystem; } /** 刷新krb内容信息 */ public static void refreshConfig() { try { sun.security.krb5.Config.refresh(); Field defaultRealmField = KerberosName.class.getDeclaredField("defaultRealm"); defaultRealmField.setAccessible(true); defaultRealmField.set( null, org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm()); // reload java.security.auth.login.config javax.security.auth.login.Configuration.setConfiguration(null); } catch (Exception e) { LOG.warn( "resetting default realm failed, current default realm will still be used.", e); } } public void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf,String krb5ConfPath) { hadoopConf.set("hadoop.security.authentication", "kerberos"); hadoopConf.set("hive.security.authentication", "kerberos"); hadoopConf.set("hadoop.security.authorization", "true"); hadoopConf.set("dfs.permissions","false"); hadoopConf.set("hadoop.security.auth_to_local","RULE:[2:$1@$0](.*@CDHDEV.COM)s/.*/hadoop/ \n" + " DEFAULT"); if (StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) { UserGroupInformation.setConfiguration(hadoopConf); KerberosName.resetDefaultRealm(); try { LOG.info("开始认证"); UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath); } catch (Exception e) { LOG.info("kerberos认证失败"); String message = String.format("kerberos认证失败,请检查 " + "kerberosKeytabFilePath[%s] 和 kerberosPrincipal[%s]", kerberosKeytabFilePath, kerberosPrincipal); e.printStackTrace(); throw DataXException.asDataXException(HdfsWriterErrorCode.KERBEROS_LOGIN_ERROR, message, e); } } } /** * 获取指定目录下的文件列表 * * @param dir 需要搜索的目录 * @return 文件数组,文件是全路径, * eg:hdfs://10.101.204.12:9000/user/hive/warehouse/writer.db/text/test.textfile */ public Path[] hdfsDirList(String dir) { Path path = new Path(dir); Path[] files; try { FileStatus[] status = fileSystem.listStatus(path); files = new Path[status.length]; for (int i = 0; i < status.length; i++) { files[i] = status[i].getPath(); } } catch (IOException e) { String message = String.format("获取目录[%s]文件列表时发生网络IO异常,请检查您的网络是否正常!", dir); LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e); } return files; } // public boolean isPathExists(String filePath) { // // Path path = new Path(filePath); // boolean exist; // try { // exist = fileSystem.exists(path); // } // catch (IOException e) { // String message = String.format("判断文件路径[%s]是否存在时发生网络IO异常,请检查您的网络是否正常!", // "message:filePath =" + filePath); // e.printStackTrace(); // LOG.error(message); // throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e); // } // return exist; // } public boolean isPathDir(String filePath) { Path path = new Path(filePath); boolean isDir; try { isDir = fileSystem.getFileStatus(path).isDirectory(); } catch (IOException e) { String message = String.format("判断路径[%s]是否是目录时发生网络IO异常,请检查您的网络是否正常!", filePath); LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e); } return isDir; } public void deleteFilesFromDir(Path dir) { try { final RemoteIterator<LocatedFileStatus> files = fileSystem.listFiles(dir, false); while (files.hasNext()) { final LocatedFileStatus next = files.next(); fileSystem.deleteOnExit(next.getPath()); } } catch (FileNotFoundException fileNotFoundException) { throw new DataXException(HdfsWriterErrorCode.FILE_NOT_FOUND, fileNotFoundException.getMessage()); } catch (IOException ioException) { throw new DataXException(HdfsWriterErrorCode.IO_ERROR, ioException.getMessage()); } } public void deleteDir(Path path) { LOG.info("start delete tmp dir [{}] .", path); try { if (fileSystem.exists(path)) { fileSystem.delete(path, true); } } catch (Exception e) { LOG.error("删除临时目录[{}]时发生IO异常,请检查您的网络是否正常!", path); throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e); } LOG.info("finish delete tmp dir [{}] .", path); } /** * move all files in sourceDir to targetDir * * @param sourceDir the source directory * @param targetDir the target directory */ public void moveFilesToDest(Path sourceDir, Path targetDir) { try { final FileStatus[] fileStatuses = fileSystem.listStatus(sourceDir); for (FileStatus file : fileStatuses) { if (file.isFile() && file.getLen() > 0) { LOG.info("start move file [{}] to dir [{}].", file.getPath(), targetDir.getName()); fileSystem.rename(file.getPath(), new Path(targetDir, file.getPath().getName())); } } } catch (IOException e) { throw DataXException.asDataXException(HdfsWriterErrorCode.IO_ERROR, e); } LOG.info("finish move file(s)."); } //关闭FileSystem public void closeFileSystem() { try { fileSystem.close(); } catch (IOException e) { LOG.error("关闭FileSystem时发生IO异常,请检查您的网络是否正常!"); throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e); } } // 写text file类型文件 public void textFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName, TaskPluginCollector taskPluginCollector) { char fieldDelimiter = config.getChar(Key.FIELD_DELIMITER); List<Configuration> columns = config.getListConfiguration(Key.COLUMN); String compress = config.getString(Key.COMPRESS, "NONE").toUpperCase().trim(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm"); String attempt = "attempt_" + dateFormat.format(new Date()) + "_0001_m_000000_0"; conf.set(JobContext.TASK_ATTEMPT_ID, attempt); if (!"NONE".equals(compress)) { // fileName must remove suffix, because the FileOutputFormat will add suffix fileName = fileName.substring(0, fileName.lastIndexOf(".")); Class<? extends CompressionCodec> codecClass = getCompressCodec(compress); if (null != codecClass) { FileOutputFormat.setOutputCompressorClass(conf, codecClass); } } Path outputPath = new Path(fileName); FileOutputFormat.setOutputPath(conf, outputPath); FileOutputFormat.setWorkOutputPath(conf, outputPath); try { RecordWriter<NullWritable, Text> writer = new TextOutputFormat<NullWritable, Text>() .getRecordWriter(fileSystem, conf, outputPath.toString(), Reporter.NULL); Record record; while ((record = lineReceiver.getFromReader()) != null) { MutablePair<Text, Boolean> transportResult = transportOneRecord(record, fieldDelimiter, columns, taskPluginCollector); if (Boolean.FALSE.equals(transportResult.getRight())) { writer.write(NullWritable.get(), transportResult.getLeft()); } } writer.close(Reporter.NULL); } catch (Exception e) { LOG.error("写文件文件[{}]时发生IO异常,请检查您的网络是否正常!", fileName); Path path = new Path(fileName); deleteDir(path.getParent()); throw DataXException.asDataXException(HdfsWriterErrorCode.Write_FILE_IO_ERROR, e); } } // compress 已经转为大写 public Class<? extends CompressionCodec> getCompressCodec(String compress) { compress = compress.toUpperCase(); Class<? extends CompressionCodec> codecClass; switch (compress) { case "GZIP": codecClass = org.apache.hadoop.io.compress.GzipCodec.class; break; case "BZIP2": codecClass = org.apache.hadoop.io.compress.BZip2Codec.class; break; case "SNAPPY": codecClass = org.apache.hadoop.io.compress.SnappyCodec.class; break; case "LZ4": codecClass = org.apache.hadoop.io.compress.Lz4Codec.class; break; case "ZSTD": codecClass = org.apache.hadoop.io.compress.ZStandardCodec.class; break; case "DEFLATE": case "ZLIB": codecClass = org.apache.hadoop.io.compress.DeflateCodec.class; break; default: throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("目前不支持您配置的 compress 模式 : [%s]", compress)); } return codecClass; } /* * 写Parquet file类型文件 * 一个parquet文件的schema类似如下: * { * "type": "record", * "name": "testFile", * "doc": "test records", * "fields": * [{ * "name": "id", * "type": ["null", "int"] * * }, * { * "name": "empName", * "type": "string" * } * ] * } * "null" 表示该字段允许为空 */ public void parquetFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName, TaskPluginCollector taskPluginCollector) { List<Configuration> columns = config.getListConfiguration(Key.COLUMN); String compress = config.getString(Key.COMPRESS, "UNCOMPRESSED").toUpperCase().trim(); if ("NONE".equals(compress)) { compress = "UNCOMPRESSED"; } // construct parquet schema Schema schema = generateParquetSchema(columns); Path path = new Path(fileName); LOG.info("write parquet file {}", fileName); CompressionCodecName codecName = CompressionCodecName.fromConf(compress); GenericData decimalSupport = new GenericData(); decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion()); try (ParquetWriter<GenericRecord> writer = AvroParquetWriter .<GenericRecord>builder(path) .withRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE) .withPageSize(ParquetWriter.DEFAULT_PAGE_SIZE) .withSchema(schema) .withConf(hadoopConf) .withCompressionCodec(codecName) .withValidation(false) .withDictionaryEncoding(false) .withDataModel(decimalSupport) .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0) .build()) { Record record; while ((record = lineReceiver.getFromReader()) != null) { GenericRecordBuilder builder = new GenericRecordBuilder(schema); GenericRecord transportResult = transportParRecord(record, columns, taskPluginCollector, builder); writer.write(transportResult); } } catch (Exception e) { LOG.error("写文件文件[{}]时发生IO异常,请检查您的网络是否正常!", fileName); deleteDir(path.getParent()); throw DataXException.asDataXException(HdfsWriterErrorCode.Write_FILE_IO_ERROR, e); } } private Schema generateParquetSchema(List<Configuration> columns) { List<Schema.Field> fields = new ArrayList<>(); String fieldName; String type; List<Schema> unionList = new ArrayList<>(2); for (Configuration column : columns) { unionList.clear(); fieldName = column.getString(Key.NAME); type = column.getString(Key.TYPE).trim().toUpperCase(); unionList.add(Schema.create(Schema.Type.NULL)); switch (type) { case "DECIMAL": Schema dec = LogicalTypes .decimal(column.getInt(Key.PRECISION, Constant.DEFAULT_DECIMAL_MAX_PRECISION), column.getInt(Key.SCALE, Constant.DEFAULT_DECIMAL_MAX_SCALE)) .addToSchema(Schema.createFixed(fieldName, null, null, 16)); unionList.add(dec); break; case "DATE": Schema date = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)); unionList.add(date); break; case "TIMESTAMP": Schema ts = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)); unionList.add(ts); break; case "UUID": Schema uuid = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)); unionList.add(uuid); break; case "BINARY": unionList.add(Schema.create(Schema.Type.BYTES)); break; default: // other types unionList.add(Schema.create(Schema.Type.valueOf(type))); break; } fields.add(new Schema.Field(fieldName, Schema.createUnion(unionList), null, null)); } Schema schema = Schema.createRecord("dataxTestParquet", null, "parquet", false); schema.setFields(fields); return schema; } private void setRow(VectorizedRowBatch batch, int row, Record record, List<Configuration> columns, TaskPluginCollector taskPluginCollector) { for (int i = 0; i < columns.size(); i++) { Configuration eachColumnConf = columns.get(i); String type = eachColumnConf.getString(Key.TYPE).trim().toUpperCase(); SupportHiveDataType columnType; ColumnVector col = batch.cols[i]; if (type.startsWith("DECIMAL")) { columnType = SupportHiveDataType.DECIMAL; } else { columnType = SupportHiveDataType.valueOf(type); } if (record.getColumn(i) == null || record.getColumn(i).getRawData() == null) { col.isNull[row] = true; col.noNulls = false; continue; } try { switch (columnType) { case TINYINT: case SMALLINT: case INT: case BIGINT: case BOOLEAN: case DATE: ((LongColumnVector) col).vector[row] = record.getColumn(i).asLong(); break; case FLOAT: case DOUBLE: ((DoubleColumnVector) col).vector[row] = record.getColumn(i).asDouble(); break; case DECIMAL: HiveDecimalWritable hdw = new HiveDecimalWritable(); hdw.set(HiveDecimal.create(record.getColumn(i).asBigDecimal()) .setScale(eachColumnConf.getInt(Key.SCALE), HiveDecimal.ROUND_HALF_UP)); ((DecimalColumnVector) col).set(row, hdw); break; case TIMESTAMP: ((TimestampColumnVector) col).set(row, java.sql.Timestamp.valueOf(record.getColumn(i).asString())); break; case STRING: case VARCHAR: case CHAR: byte[] buffer; if (record.getColumn(i).getType() == Column.Type.BYTES) { //convert bytes to base64 string buffer = Base64.getEncoder().encode((byte[]) record.getColumn(i).getRawData()); } else { buffer = record.getColumn(i).getRawData().toString().getBytes(StandardCharsets.UTF_8); } ((BytesColumnVector) col).setRef(row, buffer, 0, buffer.length); break; case BINARY: byte[] content = (byte[]) record.getColumn(i).getRawData(); ((BytesColumnVector) col).setRef(row, content, 0, content.length); break; default: throw DataXException .asDataXException( HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("您的配置文件中的列配置信息有误. 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%s]. " + "请修改表中该字段的类型或者不同步该字段.", eachColumnConf.getString(Key.NAME), eachColumnConf.getString(Key.TYPE))); } } catch (Exception e) { taskPluginCollector.collectDirtyRecord(record, e.getMessage()); throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("设置Orc数据行失败,源列类型: %s, 目的原始类型:%s, 目的列Hive类型: %s, 字段名称: %s, 源值: %s, 错误根源:%n%s", record.getColumn(i).getType(), columnType, eachColumnConf.getString(Key.TYPE), eachColumnConf.getString(Key.NAME), record.getColumn(i).getRawData(), e)); } } } /* * 写orcfile类型文件 */ public void orcFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName, TaskPluginCollector taskPluginCollector) { List<Configuration> columns = config.getListConfiguration(Key.COLUMN); String compress = config.getString(Key.COMPRESS, "NONE").toUpperCase(); StringJoiner joiner = new StringJoiner(","); for (Configuration column : columns) { if ("decimal".equals(column.getString(Key.TYPE))) { joiner.add(String.format("%s:%s(%s,%s)", column.getString(Key.NAME), "decimal", column.getInt(Key.PRECISION, Constant.DEFAULT_DECIMAL_MAX_PRECISION), column.getInt(Key.SCALE, Constant.DEFAULT_DECIMAL_MAX_SCALE))); } else if ("date".equalsIgnoreCase(column.getString(Key.TYPE))) { joiner.add(String.format("%s:bigint", column.getString(Key.NAME))); } else { joiner.add(String.format("%s:%s", column.getString(Key.NAME), column.getString(Key.TYPE))); } } TypeDescription schema = TypeDescription.fromString("struct<" + joiner + ">"); try (Writer writer = OrcFile.createWriter(new Path(fileName), OrcFile.writerOptions(conf) .setSchema(schema) .compress(CompressionKind.valueOf(compress)))) { Record record; VectorizedRowBatch batch = schema.createRowBatch(1024); while ((record = lineReceiver.getFromReader()) != null) { int row = batch.size++; setRow(batch, row, record, columns, taskPluginCollector); if (batch.size == batch.getMaxSize()) { writer.addRowBatch(batch); batch.reset(); } } if (batch.size != 0) { writer.addRowBatch(batch); batch.reset(); } } catch (IOException e) { LOG.error("写文件文件[{}]时发生IO异常,请检查您的网络是否正常!", fileName); Path path = new Path(fileName); deleteDir(path.getParent()); throw DataXException.asDataXException(HdfsWriterErrorCode.Write_FILE_IO_ERROR, e); } } }
HdfsWriter
package com.alibaba.datax.plugin.writer.hdfswriter; import com.alibaba.datax.common.base.Constant; import com.alibaba.datax.common.base.Key; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordReceiver; import com.alibaba.datax.common.spi.Writer; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.storage.util.FileHelper; import org.apache.commons.io.Charsets; import org.apache.commons.lang3.StringUtils; import com.alibaba.datax.unstructuredstorage.util.ColumnTypeUtil; import com.alibaba.datax.unstructuredstorage.util.HdfsUtil; import org.apache.commons.lang3.Validate; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.orc.CompressionKind; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageTypeParser; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.file.Paths; import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; public class HdfsWriter extends Writer { public static class Job extends Writer.Job { private static final Logger LOG = LoggerFactory.getLogger(Job.class); // 写入文件的临时目录,完成写入后,该目录需要删除 private String tmpStorePath; private Configuration writerSliceConfig = null; private String defaultFS; private String path; private String fileName; private String writeMode; private HdfsHelper hdfsHelper = null; private FileSystem filsSystem; public static final Set<String> SUPPORT_FORMAT = new HashSet<>(Arrays.asList("ORC", "PARQUET", "TEXT")); @Override public void init() { this.writerSliceConfig = this.getPluginJobConf(); this.validateParameter(); hdfsHelper = new HdfsHelper(); filsSystem = hdfsHelper.getFileSystem(defaultFS, this.writerSliceConfig); } private void validateParameter() { this.defaultFS = this.writerSliceConfig.getNecessaryValue(Key.DEFAULT_FS, HdfsWriterErrorCode.REQUIRED_VALUE); //fileType check String fileType = this.writerSliceConfig.getNecessaryValue(Key.FILE_TYPE, HdfsWriterErrorCode.REQUIRED_VALUE).toUpperCase(); if (!SUPPORT_FORMAT.contains(fileType)) { String message = String.format("[%s] 文件格式不支持, HdfsWriter插件目前仅支持 %s, ", fileType, SUPPORT_FORMAT); throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, message); } //path this.path = this.writerSliceConfig.getNecessaryValue(Key.PATH, HdfsWriterErrorCode.REQUIRED_VALUE); if (!path.startsWith("/")) { String message = String.format("请检查参数path:[%s],需要配置为绝对路径", path); LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, message); } if (path.contains("*") || path.contains("?")) { String message = String.format("请检查参数path:[%s],不能包含*,?等特殊字符", path); LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, message); } //fileName this.fileName = this.writerSliceConfig.getNecessaryValue(Key.FILE_NAME, HdfsWriterErrorCode.REQUIRED_VALUE); //columns check List<Configuration> columns = this.writerSliceConfig.getListConfiguration(Key.COLUMN); if (null == columns || columns.isEmpty()) { throw DataXException.asDataXException(HdfsWriterErrorCode.REQUIRED_VALUE, "您需要指定 columns"); } else { boolean rewriteFlag = false; for (int i = 0; i < columns.size(); i++) { Configuration eachColumnConf = columns.get(i); eachColumnConf.getNecessaryValue(Key.NAME, HdfsWriterErrorCode.COLUMN_REQUIRED_VALUE); eachColumnConf.getNecessaryValue(Key.TYPE, HdfsWriterErrorCode.COLUMN_REQUIRED_VALUE); if (eachColumnConf.getString(Key.TYPE).toUpperCase().startsWith("DECIMAL")) { String type = eachColumnConf.getString(Key.TYPE); eachColumnConf.set(Key.TYPE, "decimal"); eachColumnConf.set(Key.PRECISION, getDecimalPrecision(type)); eachColumnConf.set(Key.SCALE, getDecimalScale(type)); columns.set(i, eachColumnConf); rewriteFlag = true; } } if (rewriteFlag) { this.writerSliceConfig.set(Key.COLUMN, columns); } } //writeMode check this.writeMode = this.writerSliceConfig.getNecessaryValue(Key.WRITE_MODE, HdfsWriterErrorCode.REQUIRED_VALUE); if (!Constant.SUPPORTED_WRITE_MODE.contains(writeMode)) { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("仅支持append, nonConflict, overwrite三种模式, 不支持您配置的 writeMode 模式 : [%s]", writeMode)); } if ("TEXT".equals(fileType)) { //fieldDelimiter check String fieldDelimiter = this.writerSliceConfig.getString(Key.FIELD_DELIMITER, null); if (StringUtils.isEmpty(fieldDelimiter)) { throw DataXException.asDataXException(HdfsWriterErrorCode.REQUIRED_VALUE, String.format("写TEXT格式文件,必须提供有效的[%s] 参数.", Key.FIELD_DELIMITER)); } if (1 != fieldDelimiter.length()) { // warn: if it has, length must be one throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("仅仅支持单字符切分, 您配置的切分为 : [%s]", fieldDelimiter)); } } //compress check String compress = this.writerSliceConfig.getString(Key.COMPRESS, "NONE").toUpperCase().trim(); if ("ORC".equals(fileType)) { try { CompressionKind.valueOf(compress); } catch (IllegalArgumentException e) { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("目前ORC 格式仅支持 %s 压缩,不支持您配置的 compress 模式 : [%s]", Arrays.toString(CompressionKind.values()), compress)); } } if ("PARQUET".equals(fileType)) { // parquet 默认的非压缩标志是 UNCOMPRESSED ,而不是常见的 NONE,这里统一为 NONE if ("NONE".equals(compress)) { compress = "UNCOMPRESSED"; } try { CompressionCodecName.fromConf(compress); } catch (Exception e) { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("目前PARQUET 格式仅支持 %s 压缩, 不支持您配置的 compress 模式 : [%s]", Arrays.toString(CompressionCodecName.values()), compress)); } } boolean haveKerberos = this.writerSliceConfig.getBool(Key.HAVE_KERBEROS, false); if (haveKerberos) { this.writerSliceConfig.getNecessaryValue(Key.KERBEROS_KEYTAB_FILE_PATH, HdfsWriterErrorCode.REQUIRED_VALUE); this.writerSliceConfig.getNecessaryValue(Key.KERBEROS_PRINCIPAL, HdfsWriterErrorCode.REQUIRED_VALUE); } // encoding check String encoding = this.writerSliceConfig.getString(Key.ENCODING, Constant.DEFAULT_ENCODING); try { encoding = encoding.trim(); this.writerSliceConfig.set(Key.ENCODING, encoding); Charsets.toCharset(encoding); } catch (Exception e) { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("不支持您配置的编码格式:[%s]", encoding), e); } } public boolean isPathExists(String filePath) { Path path = new Path(filePath); boolean exist; try { exist = hdfsHelper.getFileSystem(this.defaultFS,this.writerSliceConfig).exists(path); } catch (IOException e) { String message = String.format("判断文件路径[%s]是否存在时发生网络IO异常,请检查您的网络是否正常!", "message:filePath =" + filePath); e.printStackTrace(); LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e); } return exist; } @Override public void prepare() { //临时存放路径 this.tmpStorePath = buildTmpFilePath(path); //若路径已经存在,检查path是否是目录 if (isPathExists(path)) { if (!hdfsHelper.isPathDir(path)) { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("您配置的path: [%s] 不是一个合法的目录, 请您注意文件重名, 不合法目录名等情况.", path)); } //根据writeMode对目录下文件进行处理 // 写入之前,当前目录下已有的文件,根据writeMode判断是否覆盖 Path[] existFilePaths = hdfsHelper.hdfsDirList(path); boolean isExistFile = existFilePaths.length > 0; if ("append".equals(writeMode)) { LOG.info("由于您配置了writeMode = append, 写入前不做清理工作, [{}] 目录下写入相应文件名前缀 [{}] 的文件", path, fileName); } else if ("nonConflict".equals(writeMode) && isExistFile) { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("由于您配置了writeMode= nonConflict,但您配置的 path: [%s] 目录不为空, 下面存在其他文件或文件夹: %s", path, String.join(",", Arrays.stream(existFilePaths).map(Path::getName).collect(Collectors.toSet())))); } } else { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("您配置的path: [%s] 不存在, 请先创建目录.", path)); } } @Override public void post() { if ("overwrite".equals(writeMode)) { hdfsHelper.deleteFilesFromDir(new Path(path)); } hdfsHelper.moveFilesToDest(new Path(this.tmpStorePath), new Path(this.path)); // 删除临时目录 hdfsHelper.deleteDir(new Path(tmpStorePath)); } @Override public void destroy() { hdfsHelper.closeFileSystem(); } @Override public List<Configuration> split(int mandatoryNumber) { LOG.info("begin splitting ..."); List<Configuration> writerSplitConfigs = new ArrayList<>(); String filePrefix = fileName; //获取该路径下的所有已有文件列表 Set<String> allFiles = Arrays.stream(hdfsHelper.hdfsDirList(path)).map(Path::toString).collect(Collectors.toSet()); String fileType = this.writerSliceConfig.getString(Key.FILE_TYPE, "txt").toLowerCase(); String tmpFullFileName; String endFullFileName; for (int i = 0; i < mandatoryNumber; i++) { // handle same file name Configuration splitTaskConfig = this.writerSliceConfig.clone(); tmpFullFileName = String.format("%s/%s_%s.%s", tmpStorePath, filePrefix, FileHelper.generateFileMiddleName(), fileType); endFullFileName = String.format("%s/%s_%s.%s", path, filePrefix, FileHelper.generateFileMiddleName(), fileType); // 如果文件已经存在,则重新生成文件名 while (allFiles.contains(endFullFileName)) { tmpFullFileName = String.format("%s/%s_%s.%s", tmpStorePath, filePrefix, FileHelper.generateFileMiddleName(), fileType); endFullFileName = String.format("%s/%s_%s.%s", path, filePrefix, FileHelper.generateFileMiddleName(), fileType); } allFiles.add(endFullFileName); splitTaskConfig.set(Key.FILE_NAME, tmpFullFileName); LOG.info("split wrote file name:[{}]", tmpFullFileName); writerSplitConfigs.add(splitTaskConfig); } LOG.info("end splitting."); return writerSplitConfigs; } /** * 创建临时目录 * 在给定目录的下,创建一个已点开头,uuid为名字的文件夹,用于临时存储写入的文件 * * @param userPath hdfs path * @return temporary path */ private String buildTmpFilePath(String userPath) { String tmpDir; String tmpFilePath; // while (true) { tmpDir = "." + UUID.randomUUID().toString().replace('-', '_'); tmpFilePath = Paths.get(userPath, tmpDir).toString(); if (isPathExists(tmpFilePath)) { return tmpFilePath; } else { return null; } //} } /** * get decimal type precision * if not specified, use DECIMAL_DEFAULT_PRECISION as default * example: * <pre> * decimal -> 38 * decimal(10) -> 10 * </pre> * * @param type decimal type including precision and scale (if present) * @return decimal precision */ private static int getDecimalPrecision(String type) { if (!type.contains("(")) { return Constant.DEFAULT_DECIMAL_MAX_PRECISION; } else { String regEx = "[^0-9]"; Pattern p = Pattern.compile(regEx); Matcher m = p.matcher(type); return Integer.parseInt(m.replaceAll(" ").trim().split(" ")[0]); } } /** * get decimal type scale * if precision is not present, return DECIMAL_DEFAULT_SCALE * if precision is present and not specify scale, return 0 * example: * <pre> * decimal -> 10 * decimal(8) -> 0 * decimal(8,2) -> 2 * </pre> * * @param type decimal type string, including precision and scale (if present) * @return decimal scale */ private static int getDecimalScale(String type) { if (!type.contains("(")) { return Constant.DEFAULT_DECIMAL_MAX_SCALE; } if (!type.contains(",")) { return 0; } else { return Integer.parseInt(type.split(",")[1].replace(")", "").trim()); } } public void unitizeParquetConfig(Configuration writerSliceConfig) { String parquetSchema = writerSliceConfig.getString(Key.PARQUET_SCHEMA); if (StringUtils.isNotBlank(parquetSchema)) { LOG.info("parquetSchema has config. use parquetSchema:\n{}", parquetSchema); return; } List<Configuration> columns = writerSliceConfig.getListConfiguration(Key.COLUMN); if (columns == null || columns.isEmpty()) { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_BLANK,"parquetSchema or column can't be blank!"); } parquetSchema = generateParquetSchemaFromColumn(columns); // 为了兼容历史逻辑,对之前的逻辑做保留,但是如果配置的时候报错,则走新逻辑 try { MessageTypeParser.parseMessageType(parquetSchema); } catch (Throwable e) { LOG.warn("The generated parquetSchema {} is illegal, try to generate parquetSchema in another way", parquetSchema); parquetSchema = HdfsHelper.generateParquetSchemaFromColumnAndType(columns); LOG.info("The last generated parquet schema is {}", parquetSchema); } writerSliceConfig.set(Key.PARQUET_SCHEMA, parquetSchema); LOG.info("DataxParquetMode use default fields."); writerSliceConfig.set(Key.PARQUET_MODE, "fields"); } private String generateParquetSchemaFromColumn(List<Configuration> columns) { StringBuffer parquetSchemaStringBuffer = new StringBuffer(); parquetSchemaStringBuffer.append("message m {"); for (Configuration column: columns) { String name = column.getString("name"); Validate.notNull(name, "column.name can't be null"); String type = column.getString("type"); Validate.notNull(type, "column.type can't be null"); String parquetColumn = String.format("optional %s %s;", type, name); parquetSchemaStringBuffer.append(parquetColumn); } parquetSchemaStringBuffer.append("}"); String parquetSchema = parquetSchemaStringBuffer.toString(); LOG.info("generate parquetSchema:\n{}", parquetSchema); return parquetSchema; } } public static class Task extends Writer.Task { private static final Logger LOG = LoggerFactory.getLogger(Task.class); private Configuration writerSliceConfig; private String fileType; private String fileName; private HdfsHelper hdfsHelper = null; @Override public void init() { this.writerSliceConfig = this.getPluginJobConf(); String defaultFS = this.writerSliceConfig.getString(Key.DEFAULT_FS); this.fileType = this.writerSliceConfig.getString(Key.FILE_TYPE).toUpperCase(); hdfsHelper = new HdfsHelper(); hdfsHelper.getFileSystem(defaultFS, writerSliceConfig); //得当的已经是绝对路径,eg:/user/hive/warehouse/writer.db/text/test.snappy this.fileName = this.writerSliceConfig.getString(Key.FILE_NAME); } @Override public void prepare() { // } @Override public void startWrite(RecordReceiver lineReceiver) { LOG.info("write to file : [{}]", this.fileName); if ("TEXT".equals(fileType)) { //写TEXT FILE hdfsHelper.textFileStartWrite(lineReceiver, writerSliceConfig, fileName, getTaskPluginCollector()); } else if ("ORC".equals(fileType)) { //写ORC FILE hdfsHelper.orcFileStartWrite(lineReceiver, writerSliceConfig, fileName, getTaskPluginCollector()); } else if ("PARQUET".equals(fileType)) { //写Parquet FILE hdfsHelper.parquetFileStartWrite(lineReceiver, writerSliceConfig, fileName, getTaskPluginCollector()); } LOG.info("end do write"); } @Override public void post() { // } @Override public void destroy() { // } } }
以上类需修改或增加方法,以支持Parquet文件的读写,当前代码已在生产环境稳定运行一年有余,未遇到报错问题,大家如有问题可联系我。
热门相关:帝少的专属:小甜心,太缠人 仗剑高歌 网游之逆天飞扬 照见星星的她 霸皇纪