Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,13 @@ alterTimeseries
;

alterClause
: RENAME beforeName=attributeKey TO currentName=attributeKey
| SET attributePair (COMMA attributePair)*
| DROP attributeKey (COMMA attributeKey)*
| ADD TAGS attributePair (COMMA attributePair)*
| ADD ATTRIBUTES attributePair (COMMA attributePair)*
| UPSERT aliasClause? tagClause? attributeClause?
: RENAME TO newPath=fullPath #renameTimeseriesPath
| RENAME beforeName=attributeKey TO currentName=attributeKey #renameTagOrAttribute
| SET attributePair (COMMA attributePair)* #setAlter
| DROP attributeKey (COMMA attributeKey)* #dropAlter
| ADD TAGS attributePair (COMMA attributePair)* #addTagsAlter
| ADD ATTRIBUTES attributePair (COMMA attributePair)* #addAttributesAlter
| UPSERT aliasClause? tagClause? attributeClause? #upsertAlter
;

alterEncodingCompressor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ public enum CnToDnAsyncRequestType {
DELETE_DATA_FOR_DELETE_SCHEMA,
DELETE_TIMESERIES,

LOCK_ALIAS,
CREATE_ALIAS_SERIES,
MARK_SERIES_DISABLED,
UPDATE_PHYSICAL_ALIAS_REF,
DROP_ALIAS_SERIES,
ENABLE_PHYSICAL_SERIES,
UNLOCK_FOR_ALIAS,

ALTER_ENCODING_COMPRESSOR,

CONSTRUCT_SCHEMA_BLACK_LIST_WITH_TEMPLATE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iotdb.commons.client.request.DataNodeInternalServiceRequestManager;
import org.apache.iotdb.commons.client.request.TestConnectionUtils;
import org.apache.iotdb.commons.exception.UncheckedStartupException;
import org.apache.iotdb.confignode.client.async.handlers.rpc.AliasTimeSeriesRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.CheckTimeSeriesExistenceRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.CountPathsUsingTemplateRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.DataNodeAsyncRequestRPCHandler;
Expand All @@ -48,6 +49,7 @@
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.ConsumerGroupPushMetaRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.TopicPushMetaRPCHandler;
import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TAliasTimeSeriesReq;
import org.apache.iotdb.mpp.rpc.thrift.TAlterEncodingCompressorReq;
import org.apache.iotdb.mpp.rpc.thrift.TAlterViewReq;
import org.apache.iotdb.mpp.rpc.thrift.TCheckSchemaRegionUsingTemplateReq;
Expand All @@ -57,6 +59,7 @@
import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListWithTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TConstructViewSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateAliasSeriesReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreatePipePluginInstanceReq;
Expand All @@ -69,16 +72,20 @@
import org.apache.iotdb.mpp.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.mpp.rpc.thrift.TDeleteViewSchemaReq;
import org.apache.iotdb.mpp.rpc.thrift.TDeviceViewReq;
import org.apache.iotdb.mpp.rpc.thrift.TDropAliasSeriesReq;
import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TDropPipePluginInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TEnablePhysicalSeriesReq;
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateColumnCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateTableCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TKillQueryInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TLockAndGetSchemaInfoForAliasReq;
import org.apache.iotdb.mpp.rpc.thrift.TMarkSeriesDisabledReq;
import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq;
Expand All @@ -98,6 +105,7 @@
import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceDeletionWithPatternAndFilterReq;
import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceDeletionWithPatternOrModReq;
import org.apache.iotdb.mpp.rpc.thrift.TTableDeviceInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdatePhysicalAliasRefReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTableReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
Expand Down Expand Up @@ -310,6 +318,39 @@ protected void initActionMapBuilder() {
CnToDnAsyncRequestType.DELETE_TIMESERIES,
(req, client, handler) ->
client.deleteTimeSeries((TDeleteTimeSeriesReq) req, (SchemaUpdateRPCHandler) handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.LOCK_ALIAS,
(req, client, handler) ->
client.lockAndGetSchemaInfoForAlias(
(TLockAndGetSchemaInfoForAliasReq) req, (AliasTimeSeriesRPCHandler) handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.CREATE_ALIAS_SERIES,
(req, client, handler) ->
client.createAliasSeries(
(TCreateAliasSeriesReq) req, (SchemaUpdateRPCHandler) handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.MARK_SERIES_DISABLED,
(req, client, handler) ->
client.markSeriesDisabled(
(TMarkSeriesDisabledReq) req, (SchemaUpdateRPCHandler) handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.UPDATE_PHYSICAL_ALIAS_REF,
(req, client, handler) ->
client.updatePhysicalAliasRef(
(TUpdatePhysicalAliasRefReq) req, (SchemaUpdateRPCHandler) handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.DROP_ALIAS_SERIES,
(req, client, handler) ->
client.dropAliasSeries((TDropAliasSeriesReq) req, (SchemaUpdateRPCHandler) handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.ENABLE_PHYSICAL_SERIES,
(req, client, handler) ->
client.enablePhysicalSeries(
(TEnablePhysicalSeriesReq) req, (SchemaUpdateRPCHandler) handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.UNLOCK_FOR_ALIAS,
(req, client, handler) ->
client.unlockForAlias((TAliasTimeSeriesReq) req, (SchemaUpdateRPCHandler) handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.ALTER_ENCODING_COMPRESSOR,
(req, client, handler) ->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under this License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.client.async.handlers.rpc;

import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
import org.apache.iotdb.mpp.rpc.thrift.TAliasTimeSeriesResp;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.CountDownLatch;

public class AliasTimeSeriesRPCHandler
extends DataNodeAsyncRequestRPCHandler<TAliasTimeSeriesResp> {

private static final Logger LOGGER = LoggerFactory.getLogger(AliasTimeSeriesRPCHandler.class);

public AliasTimeSeriesRPCHandler(
CnToDnAsyncRequestType requestType,
int requestId,
TDataNodeLocation targetDataNode,
Map<Integer, TDataNodeLocation> dataNodeLocationMap,
Map<Integer, TAliasTimeSeriesResp> responseMap,
CountDownLatch countDownLatch) {
super(requestType, requestId, targetDataNode, dataNodeLocationMap, responseMap, countDownLatch);
}

@Override
public void onComplete(TAliasTimeSeriesResp resp) {
TSStatus tsStatus = resp.getStatus();
responseMap.put(requestId, resp);
if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
nodeLocationMap.remove(requestId);
LOGGER.info("Successfully executed alias time series operation on DataNode: {}", targetNode);
} else if (tsStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
nodeLocationMap.remove(requestId);
LOGGER.error(
"Failed to execute alias time series operation on DataNode {}, {}", targetNode, tsStatus);
} else {
LOGGER.error(
"Failed to execute alias time series operation on DataNode {}, {}", targetNode, tsStatus);
}
countDownLatch.countDown();
}

@Override
public void onError(Exception e) {
String errorMsg =
"Alias time series operation error on DataNode: {id="
+ targetNode.getDataNodeId()
+ ", internalEndPoint="
+ targetNode.getInternalEndPoint()
+ "}"
+ e.getMessage();
LOGGER.error(errorMsg);

countDownLatch.countDown();
TAliasTimeSeriesResp resp = new TAliasTimeSeriesResp();
resp.setStatus(
new TSStatus(
RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode(), errorMsg)));
responseMap.put(requestId, resp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.CheckSchemaRegionUsingTemplateRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.ConsumerGroupPushMetaRPCHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.TopicPushMetaRPCHandler;
import org.apache.iotdb.mpp.rpc.thrift.TAliasTimeSeriesResp;
import org.apache.iotdb.mpp.rpc.thrift.TCheckSchemaRegionUsingTemplateResp;
import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceResp;
import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp;
Expand Down Expand Up @@ -184,6 +185,27 @@ public static DataNodeAsyncRequestRPCHandler<?> buildHandler(
dataNodeLocationMap,
(Map<Integer, TTestConnectionResp>) responseMap,
countDownLatch);
case LOCK_ALIAS:
return new AliasTimeSeriesRPCHandler(
requestType,
requestId,
targetDataNode,
dataNodeLocationMap,
(Map<Integer, TAliasTimeSeriesResp>) responseMap,
countDownLatch);
case CREATE_ALIAS_SERIES:
case MARK_SERIES_DISABLED:
case UPDATE_PHYSICAL_ALIAS_REF:
case DROP_ALIAS_SERIES:
case ENABLE_PHYSICAL_SERIES:
case UNLOCK_FOR_ALIAS:
return new SchemaUpdateRPCHandler(
requestType,
requestId,
targetDataNode,
dataNodeLocationMap,
(Map<Integer, TSStatus>) responseMap,
countDownLatch);
case DETECT_TREE_DEVICE_VIEW_FIELD_TYPE:
return new TreeDeviceViewFieldDetectionHandler(
requestType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.path.PathPatternUtil;
import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest;
Expand Down Expand Up @@ -134,6 +135,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TAINodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TAINodeRestartReq;
import org.apache.iotdb.confignode.rpc.thrift.TAINodeRestartResp;
import org.apache.iotdb.confignode.rpc.thrift.TAliasTimeSeriesReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterEncodingCompressorReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterOrDropTableReq;
Expand Down Expand Up @@ -2218,6 +2220,22 @@ public TSStatus alterEncodingCompressor(final TAlterEncodingCompressorReq req) {
}
}

@Override
public TSStatus aliasTimeSeries(TAliasTimeSeriesReq req) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
String queryId = req.getQueryId();
PartialPath oldPath =
(PartialPath) PathDeserializeUtil.deserialize(ByteBuffer.wrap(req.getOldPath()));
PartialPath newPath =
(PartialPath) PathDeserializeUtil.deserialize(ByteBuffer.wrap(req.getNewPath()));
boolean isGeneratedByPipe = req.isSetIsGeneratedByPipe() && req.isIsGeneratedByPipe();
return procedureManager.aliasTimeSeries(queryId, oldPath, newPath, isGeneratedByPipe);
} else {
return status;
}
}

@Override
public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) {
TSStatus status = confirmLeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TAINodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TAINodeRestartReq;
import org.apache.iotdb.confignode.rpc.thrift.TAINodeRestartResp;
import org.apache.iotdb.confignode.rpc.thrift.TAliasTimeSeriesReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterEncodingCompressorReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterOrDropTableReq;
Expand Down Expand Up @@ -701,6 +702,9 @@ TDataPartitionTableResp getOrCreateDataPartition(

TSStatus alterEncodingCompressor(TAlterEncodingCompressorReq req);

/** Alias timeseries. */
TSStatus aliasTimeSeries(TAliasTimeSeriesReq req);

/** Delete timeseries. */
TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.iotdb.confignode.procedure.impl.region.RegionMigrationPlan;
import org.apache.iotdb.confignode.procedure.impl.region.RegionOperationProcedure;
import org.apache.iotdb.confignode.procedure.impl.region.RemoveRegionPeerProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.AliasTimeSeriesProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.AlterEncodingCompressorProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.AlterLogicalViewProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.DeactivateTemplateProcedure;
Expand Down Expand Up @@ -386,6 +387,46 @@ public TSStatus deleteTimeSeries(
return waitingProcedureFinished(procedure);
}

public TSStatus aliasTimeSeries(
String queryId, PartialPath oldPath, PartialPath newPath, boolean isGeneratedByPipe) {
AliasTimeSeriesProcedure procedure = null;
synchronized (this) {
boolean hasOverlappedTask = false;
ProcedureType type;
AliasTimeSeriesProcedure aliasTimeSeriesProcedure;
for (Procedure<?> runningProcedure : executor.getProcedures().values()) {
type = ProcedureFactory.getProcedureType(runningProcedure);
if (type == null || !type.equals(ProcedureType.ALIAS_TIMESERIES_PROCEDURE)) {
continue;
}
aliasTimeSeriesProcedure = ((AliasTimeSeriesProcedure) runningProcedure);
if (queryId.equals(aliasTimeSeriesProcedure.getQueryId())) {
procedure = aliasTimeSeriesProcedure;
break;
}
// Check if there's overlap with old path or new path
if (oldPath.equals(aliasTimeSeriesProcedure.getOldPath())
|| oldPath.equals(aliasTimeSeriesProcedure.getNewPath())
|| newPath.equals(aliasTimeSeriesProcedure.getOldPath())
|| newPath.equals(aliasTimeSeriesProcedure.getNewPath())) {
hasOverlappedTask = true;
break;
}
}

if (procedure == null) {
if (hasOverlappedTask) {
return RpcUtils.getStatus(
TSStatusCode.OVERLAP_WITH_EXISTING_TASK,
"Some other task is aliasing some target timeseries.");
}
procedure = new AliasTimeSeriesProcedure(queryId, oldPath, newPath, isGeneratedByPipe);
this.executor.submitProcedure(procedure);
}
}
return waitingProcedureFinished(procedure);
}

public TSStatus deleteLogicalView(TDeleteLogicalViewReq req) {
String queryId = req.getQueryId();
PathPatternTree patternTree =
Expand Down
Loading
Loading