2020import static org .apache .hadoop .hbase .backup .BackupInfo .withState ;
2121import static org .apache .hadoop .hbase .backup .BackupRestoreConstants .CONF_CONTINUOUS_BACKUP_WAL_DIR ;
2222import static org .apache .hadoop .hbase .backup .BackupRestoreConstants .JOB_NAME_CONF_KEY ;
23+ import static org .apache .hadoop .hbase .mapreduce .HFileOutputFormat2 .MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY ;
2324
2425import java .io .IOException ;
2526import java .net .URI ;
3334import java .util .stream .Collectors ;
3435import org .apache .commons .io .FilenameUtils ;
3536import org .apache .commons .lang3 .StringUtils ;
37+ import org .apache .hadoop .conf .Configuration ;
3638import org .apache .hadoop .fs .FileSystem ;
3739import org .apache .hadoop .fs .LocatedFileStatus ;
3840import org .apache .hadoop .fs .Path ;
@@ -251,8 +253,12 @@ private void mergeSplitAndCopyBulkloadedHFiles(List<String> activeFiles,
251253 private void mergeSplitAndCopyBulkloadedHFiles (List <String > files , TableName tn , FileSystem tgtFs )
252254 throws IOException {
253255 MapReduceHFileSplitterJob player = new MapReduceHFileSplitterJob ();
256+ Configuration conf = new Configuration (this .conf );
254257 conf .set (MapReduceHFileSplitterJob .BULK_OUTPUT_CONF_KEY ,
255258 getBulkOutputDirForTable (tn ).toString ());
259+ if (backupInfo .isContinuousBackupEnabled ()) {
260+ conf .setBoolean (MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY , false );
261+ }
256262 player .setConf (conf );
257263
258264 String inputDirs = StringUtils .join (files , "," );
@@ -361,10 +367,26 @@ public void execute() throws IOException, ColumnFamilyMismatchException {
361367 setupRegionLocator ();
362368 // convert WAL to HFiles and copy them to .tmp under BACKUP_ROOT
363369 convertWALsToHFiles (tablesToWALFileList , tablesToPrevBackupTs );
364- incrementalCopyHFiles (new String [] { getBulkOutputDir ().toString () },
365- backupInfo .getBackupRootDir ());
370+
371+ String [] bulkOutputFiles ;
372+ String backupDest = backupInfo .getBackupRootDir ();
373+ if (backupInfo .isContinuousBackupEnabled ()) {
374+ // For the continuous backup case, the WALs have been converted to HFiles in a separate
375+ // map-reduce job for each table. In order to prevent MR job failures due to HBASE-29891,
376+ // these HFiles were sent to a different output directory for each table. This means
377+ // continuous backups require a list of source directories and a different destination
378+ // directory when copying HFiles to the incremental backup directory.
379+ List <String > uniqueNamespaces = tablesToWALFileList .keySet ().stream ()
380+ .map (TableName ::getNamespaceAsString ).distinct ().toList ();
381+ bulkOutputFiles = uniqueNamespaces .stream ()
382+ .map (ns -> new Path (getBulkOutputDir (), ns ).toString ()).toArray (String []::new );
383+ backupDest = backupDest + Path .SEPARATOR + backupId ;
384+ } else {
385+ bulkOutputFiles = new String [] { getBulkOutputDir ().toString () };
386+ }
387+ incrementalCopyHFiles (bulkOutputFiles , backupDest );
366388 } catch (Exception e ) {
367- String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId ;
389+ String msg = "Unexpected exception in incremental-backup: incremental copy " + backupId + " " ;
368390 // fail the overall backup and return
369391 failBackup (conn , backupInfo , backupManager , e , msg , BackupType .INCREMENTAL , conf );
370392 throw new IOException (e );
@@ -418,7 +440,8 @@ protected void incrementalCopyHFiles(String[] files, String backupDest) throws I
418440 System .arraycopy (files , 0 , strArr , 0 , files .length );
419441 strArr [strArr .length - 1 ] = backupDest ;
420442
421- String jobname = "Incremental_Backup-HFileCopy-" + backupInfo .getBackupId ();
443+ String jobname = "Incremental_Backup-HFileCopy-" + backupInfo .getBackupId () + "-"
444+ + System .currentTimeMillis ();
422445 if (LOG .isDebugEnabled ()) {
423446 LOG .debug ("Setting incremental copy HFiles job name to : " + jobname );
424447 }
@@ -517,23 +540,25 @@ protected boolean tableExists(TableName table, Connection conn) throws IOExcepti
517540 protected void walToHFiles (List <String > dirPaths , List <String > tableList , long previousBackupTs )
518541 throws IOException {
519542 Tool player = new WALPlayer ();
543+ Configuration conf = new Configuration (this .conf );
520544
521545 // Player reads all files in arbitrary directory structure and creates
522546 // a Map task for each file. We use ';' as separator
523547 // because WAL file names contains ','
524548 String dirs = StringUtils .join (dirPaths , ';' );
525- String jobname = "Incremental_Backup-" + backupId ;
549+ String jobname = "Incremental_Backup-" + backupId + "-" + System . currentTimeMillis () ;
526550
527- Path bulkOutputPath = getBulkOutputDir ();
528- conf .set (WALPlayer .BULK_OUTPUT_CONF_KEY , bulkOutputPath .toString ());
551+ setBulkOutputPath (conf , tableList );
529552 conf .set (WALPlayer .INPUT_FILES_SEPARATOR_KEY , ";" );
530553 conf .setBoolean (WALPlayer .MULTI_TABLES_SUPPORT , true );
531554 conf .set (JOB_NAME_CONF_KEY , jobname );
532- boolean diskBasedSortingEnabledOriginalValue = HFileOutputFormat2 .diskBasedSortingEnabled (conf );
533555 conf .setBoolean (HFileOutputFormat2 .DISK_BASED_SORTING_ENABLED_KEY , true );
534556 if (backupInfo .isContinuousBackupEnabled ()) {
535557 conf .set (WALInputFormat .START_TIME_KEY , Long .toString (previousBackupTs ));
536558 conf .set (WALInputFormat .END_TIME_KEY , Long .toString (backupInfo .getIncrCommittedWalTs ()));
559+ // We do not want a multi-table HFile format here because continuous backups run the WALPlayer
560+ // individually on each table in the backup set.
561+ conf .setBoolean (MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY , false );
537562 }
538563 String [] playerArgs = { dirs , StringUtils .join (tableList , "," ) };
539564
@@ -548,43 +573,70 @@ protected void walToHFiles(List<String> dirPaths, List<String> tableList, long p
548573 } catch (Exception ee ) {
549574 throw new IOException ("Can not convert from directory " + dirs
550575 + " (check Hadoop, HBase and WALPlayer M/R job logs) " , ee );
551- } finally {
552- conf .setBoolean (HFileOutputFormat2 .DISK_BASED_SORTING_ENABLED_KEY ,
553- diskBasedSortingEnabledOriginalValue );
554- conf .unset (WALPlayer .INPUT_FILES_SEPARATOR_KEY );
555- conf .unset (JOB_NAME_CONF_KEY );
556576 }
557577 }
558578
579+ private void setBulkOutputPath (Configuration conf , List <String > tableList ) {
580+ Path bulkOutputPath = getBulkOutputDir ();
581+ if (backupInfo .isContinuousBackupEnabled ()) {
582+ if (tableList .size () != 1 ) {
583+ // Continuous backups run the WALPlayer job on one table at a time, so the list of tables
584+ // should have only one element.
585+ throw new RuntimeException (
586+ "Expected table list to have only one element, but got: " + tableList );
587+ }
588+ bulkOutputPath = getTmpBackupDirForTable (TableName .valueOf (tableList .get (0 )));
589+ }
590+ conf .set (WALPlayer .BULK_OUTPUT_CONF_KEY , bulkOutputPath .toString ());
591+ }
592+
559593 private void incrementalCopyBulkloadHFiles (FileSystem tgtFs , TableName tn ) throws IOException {
560594 Path bulkOutDir = getBulkOutputDirForTable (tn );
595+ Configuration conf = new Configuration (this .conf );
561596
562597 if (tgtFs .exists (bulkOutDir )) {
563598 conf .setInt (MapReduceBackupCopyJob .NUMBER_OF_LEVELS_TO_PRESERVE_KEY , 2 );
564599 Path tgtPath = getTargetDirForTable (tn );
565- try {
566- RemoteIterator <LocatedFileStatus > locatedFiles = tgtFs .listFiles (bulkOutDir , true );
567- List <String > files = new ArrayList <>();
568- while (locatedFiles .hasNext ()) {
569- LocatedFileStatus file = locatedFiles .next ();
570- if (file .isFile () && HFile .isHFileFormat (tgtFs , file .getPath ())) {
571- files .add (file .getPath ().toString ());
572- }
600+ RemoteIterator <LocatedFileStatus > locatedFiles = tgtFs .listFiles (bulkOutDir , true );
601+ List <String > files = new ArrayList <>();
602+ while (locatedFiles .hasNext ()) {
603+ LocatedFileStatus file = locatedFiles .next ();
604+ if (file .isFile () && HFile .isHFileFormat (tgtFs , file .getPath ())) {
605+ files .add (file .getPath ().toString ());
573606 }
574- incrementalCopyHFiles (files .toArray (files .toArray (new String [0 ])), tgtPath .toString ());
575- } finally {
576- conf .unset (MapReduceBackupCopyJob .NUMBER_OF_LEVELS_TO_PRESERVE_KEY );
577607 }
608+ incrementalCopyHFiles (files .toArray (files .toArray (new String [0 ])), tgtPath .toString ());
578609 }
579610 }
580611
612+ /**
613+ * Creates a path to the bulk load output directory for a table. This directory will look like:
614+ * .../backupRoot/.tmp/backupId/namespace/table/data
615+ * @param table The table whose HFiles are being bulk loaded
616+ * @return A Path object representing the directory
617+ */
581618 protected Path getBulkOutputDirForTable (TableName table ) {
619+ Path tablePath = getTmpBackupDirForTable (table );
620+ return new Path (tablePath , "data" );
621+ }
622+
623+ /**
624+ * Creates a path to a table's directory within the temporary directory. This directory will look
625+ * like: .../backupRoot/.tmp/backupId/namespace/table
626+ * @param table The table whose HFiles are being bulk loaded
627+ * @return A Path object representing the directory
628+ */
629+ protected Path getTmpBackupDirForTable (TableName table ) {
582630 Path tablePath = getBulkOutputDir ();
583631 tablePath = new Path (tablePath , table .getNamespaceAsString ());
584- tablePath = new Path (tablePath , table .getQualifierAsString ());
585- return new Path (tablePath , "data" );
632+ return new Path (tablePath , table .getQualifierAsString ());
586633 }
587634
635+ /**
636+ * Creates a path to a temporary backup directory. This directory will look like:
637+ * .../backupRoot/.tmp/backupId
638+ * @return A Path object representing the directory
639+ */
588640 protected Path getBulkOutputDir () {
589641 String backupId = backupInfo .getBackupId ();
590642 Path path = new Path (backupInfo .getBackupRootDir ());
@@ -593,6 +645,12 @@ protected Path getBulkOutputDir() {
593645 return path ;
594646 }
595647
648+ /**
649+ * Creates a path to a destination directory for bulk loaded HFiles. This directory will look
650+ * like: .../backupRoot/backupID/namespace/table
651+ * @param table The table whose HFiles are being bulk loaded
652+ * @return A Path object representing the directory
653+ */
596654 private Path getTargetDirForTable (TableName table ) {
597655 Path path = new Path (backupInfo .getBackupRootDir () + Path .SEPARATOR + backupInfo .getBackupId ());
598656 path = new Path (path , table .getNamespaceAsString ());
0 commit comments