Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,6 @@ private void getAnySatisfiedTimestamp(
alignedTVList.getMinTime(), alignedTVList.getMaxTime())) {
return;
}
BitMap allValueColDeletedMap = alignedTVList.getAllValueColDeletedMap();
int rowCount = alignedTVList.rowCount();
List<int[]> valueColumnDeleteCursor = new ArrayList<>();
if (valueColumnsDeletionList != null) {
Expand Down Expand Up @@ -379,8 +378,9 @@ private void getAnySatisfiedTimestamp(
int limit = (i == timestampsList.size() - 1) ? rowCount - i * ARRAY_SIZE : ARRAY_SIZE;
for (int j = 0; j < limit; j++) {
row++;
// the row is deleted
if (allValueColDeletedMap != null && allValueColDeletedMap.isMarked(row)) {
// the row is deleted or has no value columns (unwritten columns count as null)
int valueIndex = indices == null ? row : indices[j];
if (ignoreAllNullRows && alignedTVList.isEmptyValueRowAtValueIndex(valueIndex)) {
continue;
}
long timestamp = timestamps[j];
Expand Down Expand Up @@ -551,12 +551,8 @@ public void encodeWorkingAlignedTVList(
BlockingQueue<Object> ioTaskQueue,
long maxNumberOfPointsInChunk,
int maxNumberOfPointsInPage) {
BitMap allValueColDeletedMap;
AlignedTVList alignedWorkingListForFlush = (AlignedTVList) workingListForFlush;

allValueColDeletedMap =
ignoreAllNullRows ? alignedWorkingListForFlush.getAllValueColDeletedMap() : null;

boolean[] timeDuplicateInfo = null;

List<List<Integer>> chunkRange = new ArrayList<>();
Expand Down Expand Up @@ -592,8 +588,8 @@ public void encodeWorkingAlignedTVList(

int nextRowIndex = sortedRowIndex + 1;
while (nextRowIndex < alignedWorkingListForFlush.rowCount()
&& ((allValueColDeletedMap != null
&& allValueColDeletedMap.isMarked(
&& ((ignoreAllNullRows
&& alignedWorkingListForFlush.isEmptyValueRowAtValueIndex(
alignedWorkingListForFlush.getValueIndex(nextRowIndex)))
|| alignedWorkingListForFlush.isTimeDeleted(nextRowIndex))) {
nextRowIndex++;
Expand All @@ -615,15 +611,13 @@ public void encodeWorkingAlignedTVList(
chunkRange.add(pageRange);
}

handleEncoding(
ioTaskQueue, chunkRange, timeDuplicateInfo, allValueColDeletedMap, maxNumberOfPointsInPage);
handleEncoding(ioTaskQueue, chunkRange, timeDuplicateInfo, maxNumberOfPointsInPage);
}

private void handleEncoding(
BlockingQueue<Object> ioTaskQueue,
List<List<Integer>> chunkRange,
boolean[] timeDuplicateInfo,
BitMap allValueColDeletedMap,
int maxNumberOfPointsInPage) {
AlignedTVList alignedWorkingListForFlush = (AlignedTVList) workingListForFlush;
List<TSDataType> dataTypes = alignedWorkingListForFlush.getTsDataTypes();
Expand All @@ -643,8 +637,8 @@ private void handleEncoding(
sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
sortedRowIndex++) {
// skip empty row
if (allValueColDeletedMap != null
&& allValueColDeletedMap.isMarked(
if (ignoreAllNullRows
&& alignedWorkingListForFlush.isEmptyValueRowAtValueIndex(
alignedWorkingListForFlush.getValueIndex(sortedRowIndex))) {
continue;
}
Expand Down Expand Up @@ -753,8 +747,8 @@ private void handleEncoding(
sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
sortedRowIndex++) {
// skip empty row
if (((allValueColDeletedMap != null
&& allValueColDeletedMap.isMarked(
if (((ignoreAllNullRows
&& alignedWorkingListForFlush.isEmptyValueRowAtValueIndex(
alignedWorkingListForFlush.getValueIndex(sortedRowIndex)))
|| (alignedWorkingListForFlush.isTimeDeleted(sortedRowIndex)))) {
continue;
Expand Down Expand Up @@ -893,6 +887,70 @@ public boolean isEmpty() {
return false;
}

/**
* Extra memory for allocating value arrays in the current (last) chunk when columns are written
* for the first time in that chunk.
*/
public long getTvListArrayMemCostIncrement(
String[] insertingMeasurements, TSDataType[] insertingTypes, Object[] insertingValues) {
long memCostIncrement = 0;
for (int i = 0; i < insertingMeasurements.length; i++) {
if (insertingTypes[i] == null || insertingMeasurements[i] == null) {
continue;
}
Integer columnIndex = measurementIndexMap.get(insertingMeasurements[i]);
if (columnIndex == null) {
continue;
}
if (!list.isLastValueArrayUnallocated(columnIndex)) {
continue;
}
if (insertingValues != null && insertingValues[i] != null) {
memCostIncrement += AlignedTVList.valueListArrayMemCost(insertingTypes[i]);
}
}
return memCostIncrement;
}

/**
* Extra memory for tablet insertion: allocate value arrays only when the column has non-null
* values in the inserting range of the last chunk.
*/
public long getTvListArrayMemCostIncrementForTablet(
String[] insertingMeasurements,
TSDataType[] insertingTypes,
Object[] columns,
BitMap[] bitMaps,
int start,
int end) {
long memCostIncrement = 0;
for (int i = 0; i < insertingMeasurements.length; i++) {
if (insertingTypes[i] == null || insertingMeasurements[i] == null || columns[i] == null) {
continue;
}
Integer columnIndex = measurementIndexMap.get(insertingMeasurements[i]);
if (columnIndex == null || !list.isLastValueArrayUnallocated(columnIndex)) {
continue;
}
if (columnHasNonNullInRange(columns[i], bitMaps == null ? null : bitMaps[i], start, end)) {
memCostIncrement += AlignedTVList.valueListArrayMemCost(insertingTypes[i]);
}
}
return memCostIncrement;
}

private static boolean columnHasNonNullInRange(Object column, BitMap bitMap, int start, int end) {
if (bitMap == null) {
return true;
}
for (int i = start; i < end; i++) {
if (!bitMap.isMarked(i)) {
return true;
}
}
return false;
}

@Override
public int serializedSize() {
int size = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
Expand Down Expand Up @@ -544,6 +545,7 @@ private long[] checkAlignedMemCost(
insertTabletNode.getMeasurements(),
insertTabletNode.getDataTypes(),
insertTabletNode.getColumns(),
insertTabletNode.getBitMaps(),
insertTabletNode.getColumnCategories(),
splitStart,
splitEnd,
Expand Down Expand Up @@ -784,11 +786,11 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRow(
chunkMetadataIncrement +=
ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, TSDataType.VECTOR)
* dataTypes.length;
memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes, columnCategories);
memTableIncrement +=
AlignedTVList.alignedTvListArrayMemCost(dataTypes, columnCategories, values);
} else {
// For existed device of this mem table
AlignedWritableMemChunk alignedMemChunk = (AlignedWritableMemChunk) memChunk;
List<TSDataType> dataTypesInTVList = new ArrayList<>();
for (int i = 0; i < dataTypes.length; i++) {
// Skip failed Measurements
if (dataTypes[i] == null
Expand All @@ -804,15 +806,19 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRow(
+ (alignedMemChunk.alignedListSize() % PrimitiveArrayManager.ARRAY_SIZE > 0
? 1
: 0);
memTableIncrement += currentArrayNum * AlignedTVList.valueListArrayMemCost(dataTypes[i]);
dataTypesInTVList.add(dataTypes[i]);
long columnArrayCost =
values[i] != null
? AlignedTVList.valueListArrayMemCost(dataTypes[i])
: AlignedTVList.emptyValueListArrayMemCost();
memTableIncrement += currentArrayNum * columnArrayCost;
}
}
// this insertion will result in a new array
if ((alignedMemChunk.alignedListSize() % PrimitiveArrayManager.ARRAY_SIZE) == 0) {
dataTypesInTVList.addAll(alignedMemChunk.getWorkingTVList().getTsDataTypes());
memTableIncrement += alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost();
}
memTableIncrement +=
alignedMemChunk.getTvListArrayMemCostIncrement(measurements, dataTypes, values);
}

for (int i = 0; i < dataTypes.length; i++) {
Expand Down Expand Up @@ -848,7 +854,7 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRows(List<InsertRowNode> ins
chunkMetadataIncrement +=
ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, TSDataType.VECTOR)
* dataTypes.length;
memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes, null);
memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes, null, values);
for (int i = 0; i < dataTypes.length; i++) {
// Skip failed Measurements
if (dataTypes[i] == null
Expand All @@ -867,7 +873,6 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRows(List<InsertRowNode> ins
// For existed device of this mem table
AlignedWritableMemChunk alignedMemChunk = (AlignedWritableMemChunk) memChunk;
int currentChunkPointNum = alignedMemChunk == null ? 0 : alignedMemChunk.alignedListSize();
List<TSDataType> dataTypesInTVList = new ArrayList<>();
Pair<Map<String, TSDataType>, Integer> addingPointNumInfo =
increasingMemTableInfo.computeIfAbsent(deviceId, k -> new Pair<>(new HashMap<>(), 0));
for (int i = 0; i < dataTypes.length; i++) {
Expand All @@ -892,22 +897,25 @@ private long[] checkAlignedMemCostAndAddToTspInfoForRows(List<InsertRowNode> ins
> 0
? 1
: 0);
memTableIncrement +=
currentArrayNum * AlignedTVList.valueListArrayMemCost(dataTypes[i]);
long columnArrayCost =
values[i] != null
? AlignedTVList.valueListArrayMemCost(dataTypes[i])
: AlignedTVList.emptyValueListArrayMemCost();
memTableIncrement += currentArrayNum * columnArrayCost;
}
}
int addingPointNum = addingPointNumInfo.right;
// Here currentChunkPointNum + addingPointNum >= 1
if (((currentChunkPointNum + addingPointNum) % PrimitiveArrayManager.ARRAY_SIZE) == 0) {
if (alignedMemChunk != null) {
dataTypesInTVList.addAll(alignedMemChunk.getWorkingTVList().getTsDataTypes());
memTableIncrement += alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost();
} else {
memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes, null, values);
}
dataTypesInTVList.addAll(addingPointNumInfo.left.values());
}
if (alignedMemChunk != null) {
memTableIncrement +=
alignedMemChunk != null
? alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost()
: AlignedTVList.alignedTvListArrayMemCost(
dataTypesInTVList.toArray(new TSDataType[0]), null);
alignedMemChunk.getTvListArrayMemCostIncrement(measurements, dataTypes, values);
}
addingPointNumInfo.setRight(addingPointNum + 1);
}
Expand Down Expand Up @@ -962,6 +970,7 @@ private long[] checkAlignedMemCostAndAddToTspForTablet(
String[] measurements,
TSDataType[] dataTypes,
Object[] columns,
BitMap[] bitMaps,
TsTableColumnCategory[] columnCategories,
int start,
int end,
Expand All @@ -981,6 +990,7 @@ private long[] checkAlignedMemCostAndAddToTspForTablet(
end,
memIncrements,
columns,
bitMaps,
columnCategories,
noFailure,
results);
Expand Down Expand Up @@ -1038,6 +1048,7 @@ private void updateAlignedMemCost(
int end,
long[] memIncrements,
Object[] columns,
BitMap[] bitMaps,
TsTableColumnCategory[] columnCategories,
boolean noFailure,
TSStatus[] results) {
Expand Down Expand Up @@ -1078,11 +1089,14 @@ private void updateAlignedMemCost(
int numArraysToAdd =
incomingPointNum / PrimitiveArrayManager.ARRAY_SIZE
+ (incomingPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0);
boolean[] columnHasNonNull =
buildAlignedColumnHasNonNull(dataTypes, columns, bitMaps, columnCategories, start, end);
memIncrements[0] +=
numArraysToAdd * AlignedTVList.alignedTvListArrayMemCost(dataTypes, columnCategories);
numArraysToAdd
* AlignedTVList.alignedTvListArrayMemCost(
dataTypes, columnCategories, null, columns, columnHasNonNull);
} else {
AlignedWritableMemChunk alignedMemChunk = (AlignedWritableMemChunk) memChunk;
List<TSDataType> dataTypesInTVList = new ArrayList<>();
int currentPointNum = alignedMemChunk.alignedListSize();
int newPointNum = currentPointNum + incomingPointNum;
for (int i = 0; i < dataTypes.length; i++) {
Expand All @@ -1097,11 +1111,17 @@ private void updateAlignedMemCost(
}

if (!alignedMemChunk.containsMeasurement(measurementIds[i])) {
// add a new column in the TVList, the new column should be as long as existing ones
memIncrements[0] +=
(currentPointNum / PrimitiveArrayManager.ARRAY_SIZE + 1)
* AlignedTVList.valueListArrayMemCost(dataType);
dataTypesInTVList.add(dataType);
int newColumnArrayNum =
currentPointNum / PrimitiveArrayManager.ARRAY_SIZE
+ (currentPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0)
+ incomingPointNum / PrimitiveArrayManager.ARRAY_SIZE
+ (incomingPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0);
long columnArrayCost =
alignedColumnHasNonNullInRange(
column, bitMaps == null ? null : bitMaps[i], start, end)
? AlignedTVList.valueListArrayMemCost(dataType)
: AlignedTVList.emptyValueListArrayMemCost();
memIncrements[0] += newColumnArrayNum * columnArrayCost;
}
}

Expand All @@ -1115,11 +1135,12 @@ private void updateAlignedMemCost(
long acquireArray = newArrayCnt - currentArrayCnt;

if (acquireArray != 0) {
// memory of extending the TVList
dataTypesInTVList.addAll(alignedMemChunk.getWorkingTVList().getTsDataTypes());
memIncrements[0] +=
acquireArray * alignedMemChunk.getWorkingTVList().alignedTvListArrayMemCost();
}
memIncrements[0] +=
alignedMemChunk.getTvListArrayMemCostIncrementForTablet(
measurementIds, dataTypes, columns, bitMaps, start, end);
}

// flexible-length data size
Expand All @@ -1141,6 +1162,41 @@ private void updateAlignedMemCost(
}
}

private static boolean[] buildAlignedColumnHasNonNull(
TSDataType[] dataTypes,
Object[] columns,
BitMap[] bitMaps,
TsTableColumnCategory[] columnCategories,
int start,
int end) {
boolean[] columnHasNonNull = new boolean[dataTypes.length];
for (int i = 0; i < dataTypes.length; i++) {
if (dataTypes[i] == null || columns[i] == null) {
continue;
}
if (columnCategories != null && columnCategories[i] != TsTableColumnCategory.FIELD) {
continue;
}
columnHasNonNull[i] =
alignedColumnHasNonNullInRange(
columns[i], bitMaps == null ? null : bitMaps[i], start, end);
}
return columnHasNonNull;
}

private static boolean alignedColumnHasNonNullInRange(
Object column, BitMap bitMap, int start, int end) {
if (bitMap == null) {
return true;
}
for (int i = start; i < end; i++) {
if (!bitMap.isMarked(i)) {
return true;
}
}
return false;
}

private void updateMemoryInfo(
long memTableIncrement, long chunkMetadataIncrement, long textDataIncrement)
throws WriteProcessRejectException {
Expand Down
Loading
Loading