Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -615,17 +615,10 @@ private boolean mayTsFileContainUnprocessedData(final TsFileResource resource) {
}

if (startIndex instanceof StateProgressIndex) {
// Some different tsFiles may share the same max progressIndex, thus tsFiles with an
// "equals" max progressIndex must be transmitted to avoid data loss
final ProgressIndex innerProgressIndex =
((StateProgressIndex) startIndex).getInnerProgressIndex();
return !innerProgressIndex.isAfter(resource.getMaxProgressIndexAfterClose())
&& !innerProgressIndex.equals(resource.getMaxProgressIndexAfterClose());
startIndex = ((StateProgressIndex) startIndex).getInnerProgressIndex();
}

// Some different tsFiles may share the same max progressIndex, thus tsFiles with an
// "equals" max progressIndex must be transmitted to avoid data loss
return !startIndex.isAfter(resource.getMaxProgressIndexAfterClose());
return !startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
&& !startIndex.equals(resource.getMaxProgressIndexAfterClose());
}

private boolean mayTsFileResourceOverlappedWithPattern(final TsFileResource resource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.iotdb.common.rpc.thrift.TShowConfigurationResp;
import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.CommonConfig;
Expand Down Expand Up @@ -517,14 +518,18 @@ public TLoadResp sendTsFilePieceNode(final TTsFilePieceReq req) {

@Override
public TLoadResp sendLoadCommand(TLoadCommandReq req) {
final ProgressIndex progressIndex;
if (req.isSetProgressIndex()) {
progressIndex = ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex()));
final Map<TTimePartitionSlot, ProgressIndex> timePartitionProgressIndexMap = new HashMap<>();
if (req.isSetTimePartition2ProgressIndex()) {
for (Map.Entry<TTimePartitionSlot, ByteBuffer> entry :
req.getTimePartition2ProgressIndex().entrySet()) {
timePartitionProgressIndexMap.put(
entry.getKey(), ProgressIndexType.deserializeFrom(entry.getValue()));
}
} else {
// fallback to use local generated progress index for compatibility
progressIndex = PipeDataNodeAgent.runtime().getNextProgressIndexForTsFileLoad();
LOGGER.info(
"Use local generated load progress index {} for uuid {}.", progressIndex, req.uuid);
final TSStatus status = new TSStatus();
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
status.setMessage("Load command requires time partition to progress index map");
return createTLoadResp(status);
}

return createTLoadResp(
Expand All @@ -533,7 +538,7 @@ public TLoadResp sendLoadCommand(TLoadCommandReq req) {
LoadTsFileScheduler.LoadCommand.values()[req.commandType],
req.uuid,
req.isSetIsGeneratedByPipe() && req.isGeneratedByPipe,
progressIndex));
timePartitionProgressIndexMap));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
Expand Down Expand Up @@ -56,8 +57,10 @@
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -219,7 +222,7 @@ private void dispatchRemote(TTsFilePieceReq loadTsFileReq, TEndPoint endPoint)
}

public Future<FragInstanceDispatchResult> dispatchCommand(
TLoadCommandReq loadCommandReq, Set<TRegionReplicaSet> replicaSets) {
TLoadCommandReq originalLoadCommandReq, Set<TRegionReplicaSet> replicaSets) {
Set<TEndPoint> allEndPoint = new HashSet<>();
for (TRegionReplicaSet replicaSet : replicaSets) {
for (TDataNodeLocation dataNodeLocation : replicaSet.getDataNodeLocations()) {
Expand All @@ -228,23 +231,27 @@ public Future<FragInstanceDispatchResult> dispatchCommand(
}

for (TEndPoint endPoint : allEndPoint) {
// duplicate for progress index binary serialization
final TLoadCommandReq duplicatedLoadCommandReq = originalLoadCommandReq.deepCopy();
try (SetThreadName threadName =
new SetThreadName(
"load-dispatcher"
+ "-"
+ LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType]
+ LoadTsFileScheduler.LoadCommand.values()[duplicatedLoadCommandReq.commandType]
+ "-"
+ loadCommandReq.uuid)) {
+ duplicatedLoadCommandReq.uuid)) {
if (isDispatchedToLocal(endPoint)) {
dispatchLocally(loadCommandReq);
dispatchLocally(duplicatedLoadCommandReq);
} else {
dispatchRemote(loadCommandReq, endPoint);
dispatchRemote(duplicatedLoadCommandReq, endPoint);
}
} catch (FragmentInstanceDispatchException e) {
LOGGER.warn("Cannot dispatch LoadCommand for load operation {}", loadCommandReq, e);
LOGGER.warn(
"Cannot dispatch LoadCommand for load operation {}", duplicatedLoadCommandReq, e);
return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus()));
} catch (Exception t) {
LOGGER.warn("Cannot dispatch LoadCommand for load operation {}", loadCommandReq, t);
LOGGER.warn(
"Cannot dispatch LoadCommand for load operation {}", duplicatedLoadCommandReq, t);
return immediateFuture(
new FragInstanceDispatchResult(
RpcUtils.getStatus(
Expand All @@ -256,17 +263,18 @@ public Future<FragInstanceDispatchResult> dispatchCommand(

private void dispatchLocally(TLoadCommandReq loadCommandReq)
throws FragmentInstanceDispatchException {
final ProgressIndex progressIndex;
if (loadCommandReq.isSetProgressIndex()) {
progressIndex =
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(loadCommandReq.getProgressIndex()));
final Map<TTimePartitionSlot, ProgressIndex> timePartitionProgressIndexMap = new HashMap<>();
if (loadCommandReq.isSetTimePartition2ProgressIndex()) {
for (Map.Entry<TTimePartitionSlot, ByteBuffer> entry :
loadCommandReq.getTimePartition2ProgressIndex().entrySet()) {
timePartitionProgressIndexMap.put(
entry.getKey(), ProgressIndexType.deserializeFrom(entry.getValue()));
}
} else {
// fallback to use local generated progress index for compatibility
progressIndex = PipeDataNodeAgent.runtime().getNextProgressIndexForTsFileLoad();
LOGGER.info(
"Use local generated load progress index {} for uuid {}.",
progressIndex,
loadCommandReq.uuid);
final TSStatus status = new TSStatus();
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
status.setMessage("Load command requires time partition to progress index map");
throw new FragmentInstanceDispatchException(status);
}

final TSStatus resultStatus =
Expand All @@ -275,7 +283,7 @@ private void dispatchLocally(TLoadCommandReq loadCommandReq)
LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType],
loadCommandReq.uuid,
loadCommandReq.isSetIsGeneratedByPipe() && loadCommandReq.isGeneratedByPipe,
progressIndex);
timePartitionProgressIndexMap);
if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) {
throw new FragmentInstanceDispatchException(resultStatus);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
Expand Down Expand Up @@ -140,6 +141,7 @@ public class LoadTsFileScheduler implements IScheduler {
private final PlanFragmentId fragmentId;
private final Set<TRegionReplicaSet> allReplicaSets;
private final boolean isGeneratedByPipe;
private final Map<TTimePartitionSlot, ProgressIndex> timePartitionSlotToProgressIndex;
private final LoadTsFileDataCacheMemoryBlock block;

public LoadTsFileScheduler(
Expand All @@ -158,6 +160,7 @@ public LoadTsFileScheduler(
this.partitionFetcher = new DataPartitionBatchFetcher(partitionFetcher);
this.allReplicaSets = new HashSet<>();
this.isGeneratedByPipe = isGeneratedByPipe;
this.timePartitionSlotToProgressIndex = new HashMap<>();
this.block = LoadTsFileMemoryManager.getInstance().allocateDataCacheMemoryBlock();

for (FragmentInstance fragmentInstance : distributedQueryPlan.getInstances()) {
Expand Down Expand Up @@ -408,7 +411,26 @@ private boolean secondPhase(

try {
loadCommandReq.setIsGeneratedByPipe(isGeneratedByPipe);
loadCommandReq.setProgressIndex(assignProgressIndex(tsFileResource));
loadCommandReq.setTimePartition2ProgressIndex(
timePartitionSlotToProgressIndex.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream dataOutputStream =
new DataOutputStream(byteArrayOutputStream)) {
entry.getValue().serialize(dataOutputStream);
return ByteBuffer.wrap(
byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
} catch (final IOException e) {
throw new RuntimeException(
String.format(
"Serialize Progress Index error, isFirstPhaseSuccess: %s, uuid: %s, tsFile: %s",
isFirstPhaseSuccess, uuid, tsFile.getAbsolutePath()),
e);
}
})));
Future<FragInstanceDispatchResult> dispatchResultFuture =
dispatcher.dispatchCommand(loadCommandReq, allReplicaSets);

Expand All @@ -431,14 +453,6 @@ private boolean secondPhase(
stateMachine.transitionToFailed(status);
return false;
}
} catch (IOException e) {
LOGGER.warn(
"Serialize Progress Index error, isFirstPhaseSuccess: {}, uuid: {}, tsFile: {}",
isFirstPhaseSuccess,
uuid,
tsFile.getAbsolutePath());
stateMachine.transitionToFailed(e);
return false;
} catch (InterruptedException | ExecutionException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -639,6 +653,12 @@ public FragmentInfo getFragmentInfo() {
return null;
}

private void computeTimePartitionSlotToProgressIndexIfAbsent(
final TTimePartitionSlot timePartitionSlot) {
timePartitionSlotToProgressIndex.putIfAbsent(
timePartitionSlot, PipeDataNodeAgent.runtime().getNextProgressIndexForTsFileLoad());
}

public enum LoadCommand {
EXECUTE,
ROLLBACK
Expand Down Expand Up @@ -686,6 +706,7 @@ private boolean addOrSendChunkData(ChunkData chunkData) throws LoadFileException
nonDirectionalChunkData.add(chunkData);
dataSize += chunkData.getDataSize();
block.addMemoryUsage(chunkData.getDataSize());
scheduler.computeTimePartitionSlotToProgressIndexIfAbsent(chunkData.getTimePartitionSlot());

if (!isMemoryEnough()) {
routeChunkData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.concurrent.ExceptionalCountDownLatch;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
Expand Down Expand Up @@ -980,13 +981,13 @@ public TSStatus executeLoadCommand(
LoadTsFileScheduler.LoadCommand loadCommand,
String uuid,
boolean isGeneratedByPipe,
ProgressIndex progressIndex) {
Map<TTimePartitionSlot, ProgressIndex> timePartitionProgressIndexMap) {
TSStatus status = new TSStatus();

try {
switch (loadCommand) {
case EXECUTE:
if (loadTsFileManager.loadAll(uuid, isGeneratedByPipe, progressIndex)) {
if (loadTsFileManager.loadAll(uuid, isGeneratedByPipe, timePartitionProgressIndexMap)) {
status = RpcUtils.SUCCESS_STATUS;
} else {
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
Expand Down Expand Up @@ -273,7 +274,10 @@ private String getNextFolder() throws DiskSpaceInsufficientException {
return FOLDER_MANAGER.get().getNextFolder();
}

public boolean loadAll(String uuid, boolean isGeneratedByPipe, ProgressIndex progressIndex)
public boolean loadAll(
String uuid,
boolean isGeneratedByPipe,
Map<TTimePartitionSlot, ProgressIndex> timePartitionProgressIndexMap)
throws IOException, LoadFileException {
if (!uuid2WriterManager.containsKey(uuid)) {
return false;
Expand All @@ -282,7 +286,7 @@ public boolean loadAll(String uuid, boolean isGeneratedByPipe, ProgressIndex pro
final Optional<CleanupTask> cleanupTask = Optional.of(uuid2CleanupTask.get(uuid));
cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
try {
uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe, progressIndex);
uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe, timePartitionProgressIndexMap);
} finally {
cleanupTask.ifPresent(CleanupTask::markLoadTaskNotRunning);
}
Expand Down Expand Up @@ -497,7 +501,9 @@ private void writeDeletion(DataRegion dataRegion, DeletionData deletionData)
}
}

private void loadAll(boolean isGeneratedByPipe, ProgressIndex progressIndex)
private void loadAll(
boolean isGeneratedByPipe,
Map<TTimePartitionSlot, ProgressIndex> timePartitionProgressIndexMap)
throws IOException, LoadFileException {
if (isClosed) {
throw new IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
Expand All @@ -516,7 +522,11 @@ private void loadAll(boolean isGeneratedByPipe, ProgressIndex progressIndex)

final DataRegion dataRegion = entry.getKey().getDataRegion();
final TsFileResource tsFileResource = dataPartition2Resource.get(entry.getKey());
endTsFileResource(writer, tsFileResource, progressIndex);
endTsFileResource(
writer,
tsFileResource,
timePartitionProgressIndexMap.getOrDefault(
entry.getKey().getTimePartitionSlot(), MinimumProgressIndex.INSTANCE));
dataRegion.loadNewTsFile(tsFileResource, true, isGeneratedByPipe);

// Metrics
Expand Down Expand Up @@ -681,6 +691,10 @@ public DataRegion getDataRegion() {
return dataRegion;
}

public TTimePartitionSlot getTimePartitionSlot() {
return timePartitionSlot;
}

@Override
public String toString() {
return String.join(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public class CommonConfig {
(int) (RpcUtils.THRIFT_FRAME_MAX_SIZE * 0.8);

private boolean isSeperatedPipeHeartbeatEnabled = true;
private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 100;
private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 30;
private long pipeMetaSyncerInitialSyncDelayMinutes = 3;
private long pipeMetaSyncerSyncIntervalMinutes = 3;
private long pipeMetaSyncerAutoRestartPipeCheckIntervalRound = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ struct TLoadCommandReq {
1: required i32 commandType
2: required string uuid
3: optional bool isGeneratedByPipe
4: optional binary progressIndex
4: optional map<common.TTimePartitionSlot, binary> timePartition2ProgressIndex
}

struct TAttributeUpdateReq {
Expand Down
Loading