Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ target
.devbox
/.envrc
*.credentials
/.idea
/examples/docker-compose/data
devbox.lock
/var/cache
73 changes: 55 additions & 18 deletions ice/src/main/java/com/altinity/ice/cli/internal/cmd/Delete.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,25 @@
*/
package com.altinity.ice.cli.internal.cmd;

import com.altinity.ice.cli.Main.PartitionFilter;
import com.altinity.ice.cli.internal.iceberg.Partitioning;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.rest.RESTCatalog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -33,30 +41,57 @@ private Delete() {}
public static void run(
RESTCatalog catalog,
TableIdentifier tableId,
List<com.altinity.ice.cli.Main.PartitionFilter> partitions,
List<PartitionFilter> partitions,
boolean dryRun)
throws IOException, URISyntaxException {

Table table = catalog.loadTable(tableId);
TableScan scan = table.newScan();
if (partitions != null && !partitions.isEmpty()) {
org.apache.iceberg.expressions.Expression expr = null;
for (com.altinity.ice.cli.Main.PartitionFilter pf : partitions) {
org.apache.iceberg.expressions.Expression e = null;

Snapshot currentSnapshot = table.currentSnapshot();
if (currentSnapshot == null) {
logger.error("There are no snapshots in this table");
return;
}

FileIO io = table.io();
Map<Integer, PartitionSpec> specsById = table.specs();

List<ManifestFile> dataManifests = currentSnapshot.dataManifests(io);
List<DataFile> filesToDelete = new ArrayList<>();

Expression expression = null;

if (partitions != null) {
for (PartitionFilter pf : partitions) {
String fieldName = pf.name();

Expression fieldExpr = null;
for (Object value : pf.values()) {
org.apache.iceberg.expressions.Expression valueExpr =
org.apache.iceberg.expressions.Expressions.equal(pf.name(), value);
e = (e == null) ? valueExpr : org.apache.iceberg.expressions.Expressions.or(e, valueExpr);
Integer transformed = Partitioning.applyTimestampTransform(table, fieldName, value);
if (transformed != null) {
value = transformed;
}

Expression singleValueExpr = Expressions.equal(fieldName, value);
fieldExpr =
fieldExpr == null ? singleValueExpr : Expressions.or(fieldExpr, singleValueExpr);
}
if (fieldExpr == null) {
continue;
}
expr = (expr == null) ? e : org.apache.iceberg.expressions.Expressions.and(expr, e);
expression = expression == null ? fieldExpr : Expressions.and(expression, fieldExpr);
}
scan = scan.filter(expr);
}
List<DataFile> filesToDelete = new ArrayList<>();

try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
for (FileScanTask task : tasks) {
filesToDelete.add(task.file());
for (ManifestFile manifest : dataManifests) {
ManifestReader<DataFile> reader = ManifestFiles.read(manifest, io, specsById);
if (expression != null) {
reader.filterPartitions(expression);
}
try (reader) {
for (DataFile dataFile : reader) {
filesToDelete.add(dataFile);
}
}
}

Expand All @@ -73,6 +108,8 @@ public static void run(
}
rewrite.commit();
}
} else {
logger.info("No files to delete");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -27,6 +30,7 @@
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.UpdatePartitionSpec;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
Expand All @@ -37,6 +41,7 @@
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SerializableFunction;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
Expand All @@ -49,6 +54,23 @@ public final class Partitioning {

private Partitioning() {}

// Formatter with optional time component (2025-01-01 or 2025-01-01T00:00:00)
private static final DateTimeFormatter DATE_TIME_INPUT_FORMATTER =
new DateTimeFormatterBuilder()
.append(DateTimeFormatter.ISO_LOCAL_DATE)
.optionalStart()
.appendLiteral("T")
.append(DateTimeFormatter.ISO_LOCAL_TIME)
.optionalEnd()
.optionalStart()
.appendOffsetId()
.optionalEnd()
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
.parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
.parseDefaulting(ChronoField.NANO_OF_SECOND, 0)
.toFormatter();

public record InferPartitionKeyResult(
@Nullable PartitionKey partitionKey, @Nullable String failureReason) {
public boolean success() {
Expand Down Expand Up @@ -347,4 +369,45 @@ public static long toEpochMicros(Object tsValue) {
throw new UnsupportedOperationException("unexpected value type: " + tsValue.getClass());
}
}

/**
* Converts a datetime string input value to the table's partition transform unit if it is a
* timestamp transform. The transformed value is the Iceberg internal representation (e.g. days
* since Unix epoch).
*
* @return The timestamp converted to the partition unit as an integer, or null if not
* convertible.
*/
@Nullable
public static Integer applyTimestampTransform(Table table, String fieldName, Object value) {
PartitionField partitionField = getPartitionField(table, fieldName);
if (partitionField == null) return null;

Transform<?, ?> transform = partitionField.transform();
if (transform.isIdentity() || !(value instanceof String s)) {
return null;
}

Type sourceType = table.schema().findType(partitionField.sourceId());
if (!(sourceType instanceof Types.TimestampType)) {
return null;
}

long timestampMicros = toEpochMicros(LocalDateTime.parse(s, DATE_TIME_INPUT_FORMATTER));

@SuppressWarnings("unchecked")
Transform<Long, Integer> typedTransform = (Transform<Long, Integer>) transform;

return typedTransform.bind(sourceType).apply(timestampMicros);
}

@Nullable
private static PartitionField getPartitionField(Table table, String fieldName) {
for (PartitionField field : table.spec().fields()) {
if (field.name().equals(fieldName)) {
return field;
}
}
return null;
}
}