Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1707] feat(mysql): Support mysql index. #1715

Merged
merged 5 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ public class Indexes {

public static final Index[] EMPTY_INDEXES = new Index[0];

/** MySQL does not support setting the name of the primary key, so the default name is used. */
public static final String DEFAULT_MYSQL_PRIMARY_KEY_NAME = "PRIMARY";

/**
* Create a unique index on columns. Like unique (a) or unique (a, b), for complex like unique
*
Expand All @@ -20,6 +23,16 @@ public static Index unique(String name, String[][] fieldNames) {
return of(Index.IndexType.UNIQUE_KEY, name, fieldNames);
}

/**
* To create a MySQL primary key, you need to use the default primary key name.
*
* @param fieldNames The field names under the table contained in the index.
* @return The primary key index
*/
public static Index createMysqlPrimaryKey(String[][] fieldNames) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's odd to add a method here, especally createMysqlPrimaryKey only used for test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I plan to change the name to not a mandatory field

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please address the comment here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If mysql doesn't support specify the key name, why don't you either ignore the key name or throw the exception, instead of adding a method here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image The code will throw an exception. This is just an API utility class.

return primary(DEFAULT_MYSQL_PRIMARY_KEY_NAME, fieldNames);
}

/**
* Create a primary index on columns. Like primary (a), for complex like primary
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ public Table loadTable(NameIdentifier tableIdent) throws NoSuchTableException {
// Remove id from comment
.withComment(StringIdentifier.removeIdFromComment(load.comment()))
.withProperties(properties)
.withIndexes(load.index())
.build();
}

Expand Down Expand Up @@ -361,7 +362,6 @@ public Table createTable(
SortOrder[] sortOrders,
Index[] indexes)
throws NoSuchSchemaException, TableAlreadyExistsException {
Preconditions.checkArgument(indexes.length == 0, "Jdbc-catalog does not support indexes");
Preconditions.checkArgument(
null == distribution || distribution == Distributions.NONE,
"jdbc-catalog does not support distribution");
Expand Down Expand Up @@ -396,7 +396,8 @@ public Table createTable(
jdbcColumns,
StringIdentifier.addToComment(identifier, comment),
resultProperties,
partitioning);
partitioning,
indexes);

return new JdbcTable.Builder()
.withAuditInfo(
Expand All @@ -406,6 +407,7 @@ public Table createTable(
.withComment(comment)
.withProperties(jdbcTablePropertiesMetadata.convertFromJdbcProperties(resultProperties))
.withPartitioning(partitioning)
.withIndexes(indexes)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ protected JdbcTable internalBuild() {
jdbcTable.columns = columns;
jdbcTable.partitioning = partitioning;
jdbcTable.sortOrders = sortOrders;
jdbcTable.indexes = indexes;
return jdbcTable;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.indexes.Index;
import com.google.common.collect.Lists;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
Expand Down Expand Up @@ -63,13 +64,14 @@ public void create(
JdbcColumn[] columns,
String comment,
Map<String, String> properties,
Transform[] partitioning)
Transform[] partitioning,
Index[] indexes)
throws TableAlreadyExistsException {
LOG.info("Attempting to create table {} in database {}", tableName, databaseName);
try (Connection connection = getConnection(databaseName)) {
JdbcConnectorUtils.executeUpdate(
connection,
generateCreateTableSql(tableName, columns, comment, properties, partitioning));
generateCreateTableSql(tableName, columns, comment, properties, partitioning, indexes));
LOG.info("Created table {} in database {}", tableName, databaseName);
} catch (final SQLException se) {
throw this.exceptionMapper.toGravitinoException(se);
Expand Down Expand Up @@ -215,7 +217,8 @@ protected abstract String generateCreateTableSql(
JdbcColumn[] columns,
String comment,
Map<String, String> properties,
Transform[] partitioning);
Transform[] partitioning,
Index[] indexes);

protected abstract String generateRenameTableSql(String oldTableName, String newTableName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.datastrato.gravitino.exceptions.TableAlreadyExistsException;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.indexes.Index;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
Expand Down Expand Up @@ -41,14 +42,16 @@ void initialize(
* @param comment The comment of the table.
* @param properties The properties of the table.
* @param partitioning The partitioning of the table.
* @param indexes The indexes of the table.
*/
void create(
String databaseName,
String tableName,
JdbcColumn[] columns,
String comment,
Map<String, String> properties,
Transform[] partitioning)
Transform[] partitioning,
Index[] indexes)
throws TableAlreadyExistsException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.datastrato.gravitino.catalog.jdbc.JdbcColumn;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.indexes.Index;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
Expand All @@ -20,7 +21,8 @@ protected String generateCreateTableSql(
JdbcColumn[] columns,
String comment,
Map<String, String> properties,
Transform[] partitioning) {
Transform[] partitioning,
Index[] indexes) {
StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("CREATE TABLE ").append(tableName).append(" (");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.datastrato.gravitino.exceptions.NoSuchTableException;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.indexes.Indexes;
import com.datastrato.gravitino.rel.types.Type;
import com.datastrato.gravitino.rel.types.Types;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -121,7 +122,7 @@ public void testOperationTable() {
Assertions.assertDoesNotThrow(
() ->
JDBC_TABLE_OPERATIONS.create(
DATABASE_NAME, table1, jdbcColumns, null, properties, null));
DATABASE_NAME, table1, jdbcColumns, null, properties, null, Indexes.EMPTY_INDEXES));

// list table.
List<String> allTables = JDBC_TABLE_OPERATIONS.listTables(DATABASE_NAME);
Expand Down
1 change: 1 addition & 0 deletions catalogs/catalog-jdbc-mysql/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies {
implementation(project(":core"))
implementation(project(":api"))
implementation(project(":catalogs:catalog-jdbc-common"))
implementation(libs.guava)
implementation(libs.bundles.log4j)
implementation(libs.commons.lang3)
implementation(libs.commons.collections4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.indexes.Index;
import com.datastrato.gravitino.rel.indexes.Indexes;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
Expand All @@ -31,6 +35,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
Expand Down Expand Up @@ -87,6 +92,9 @@ public JdbcTable load(String databaseName, String tableName) throws NoSuchTableE
comment = tableProperties.getOrDefault(COMMENT, comment);
}

// 4.Get index information
List<Index> indexes = getIndexes(databaseName, tableName, metaData);

return new JdbcTable.Builder()
.withName(tableName)
.withColumns(jdbcColumns.toArray(new JdbcColumn[0]))
Expand All @@ -102,13 +110,51 @@ public JdbcTable load(String databaseName, String tableName) throws NoSuchTableE
}
}
}))
.withIndexes(indexes.toArray(new Index[0]))
.withAuditInfo(AuditInfo.EMPTY)
.build();
} catch (SQLException e) {
throw exceptionMapper.toGravitinoException(e);
}
}

public List<Index> getIndexes(String databaseName, String tableName, DatabaseMetaData metaData)
throws SQLException {
List<Index> indexes = new ArrayList<>();

// Get primary key information
SetMultimap<String, String> primaryKeyGroupByName = HashMultimap.create();
ResultSet primaryKeys = metaData.getPrimaryKeys(databaseName, null, tableName);
while (primaryKeys.next()) {
String columnName = primaryKeys.getString("COLUMN_NAME");
primaryKeyGroupByName.put(primaryKeys.getString("PK_NAME"), columnName);
}
for (String key : primaryKeyGroupByName.keySet()) {
indexes.add(Indexes.primary(key, convertIndexFieldNames(primaryKeyGroupByName.get(key))));
}

// Get unique key information
SetMultimap<String, String> indexGroupByName = HashMultimap.create();
ResultSet indexInfo = metaData.getIndexInfo(databaseName, null, tableName, false, false);
while (indexInfo.next()) {
String indexName = indexInfo.getString("INDEX_NAME");
if (!indexInfo.getBoolean("NON_UNIQUE")
&& !StringUtils.equalsIgnoreCase(Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME, indexName)) {
String columnName = indexInfo.getString("COLUMN_NAME");
indexGroupByName.put(indexName, columnName);
}
}
for (String key : indexGroupByName.keySet()) {
indexes.add(Indexes.unique(key, convertIndexFieldNames(indexGroupByName.get(key))));
}

return indexes;
}

private String[][] convertIndexFieldNames(Set<String> fieldNames) {
return fieldNames.stream().map(colName -> new String[] {colName}).toArray(String[][]::new);
}

private Map<String, String> loadTablePropertiesFromSql(Connection connection, String tableName)
throws SQLException {
try (PreparedStatement statement = connection.prepareStatement("SHOW TABLE STATUS LIKE ?")) {
Expand Down Expand Up @@ -178,7 +224,8 @@ protected String generateCreateTableSql(
JdbcColumn[] columns,
String comment,
Map<String, String> properties,
Transform[] partitioning) {
Transform[] partitioning,
Index[] indexes) {
if (ArrayUtils.isNotEmpty(partitioning)) {
throw new UnsupportedOperationException("Currently we do not support Partitioning in mysql");
}
Expand All @@ -201,6 +248,9 @@ protected String generateCreateTableSql(
sqlBuilder.append(",\n");
}
}

appendIndexesSql(indexes, sqlBuilder);

sqlBuilder.append("\n)");

// Add table comment if specified
Expand All @@ -223,6 +273,42 @@ protected String generateCreateTableSql(
return result;
}

public static void appendIndexesSql(Index[] indexes, StringBuilder sqlBuilder) {
for (Index index : indexes) {
String fieldStr =
Arrays.stream(index.fieldNames())
.map(
colNames -> {
if (colNames.length > 1) {
throw new IllegalArgumentException(
"Index does not support complex fields in MySQL");
}
return BACK_QUOTE + colNames[0] + BACK_QUOTE;
})
.collect(Collectors.joining(", "));
sqlBuilder.append(",\n");
switch (index.type()) {
case PRIMARY_KEY:
if (null != index.name()
&& !StringUtils.equalsIgnoreCase(
index.name(), Indexes.DEFAULT_MYSQL_PRIMARY_KEY_NAME)) {
throw new IllegalArgumentException("Primary key name must be PRIMARY in MySQL");
}
sqlBuilder.append("CONSTRAINT ").append("PRIMARY KEY (").append(fieldStr).append(")");
break;
case UNIQUE_KEY:
sqlBuilder.append("CONSTRAINT ");
if (null != index.name()) {
sqlBuilder.append(BACK_QUOTE).append(index.name()).append(BACK_QUOTE);
}
sqlBuilder.append(" UNIQUE (").append(fieldStr).append(")");
break;
default:
throw new IllegalArgumentException("MySQL doesn't support index : " + index.type());
}
}
}

@Override
protected JdbcColumn extractJdbcColumnFromResultSet(ResultSet resultSet) {
// We have rewritten the `load` method, so there is no need to implement this method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.indexes.Index;
import com.datastrato.gravitino.rel.types.Types;
import com.google.common.base.Preconditions;
import java.sql.Connection;
Expand Down Expand Up @@ -225,7 +226,8 @@ protected String generateCreateTableSql(
JdbcColumn[] columns,
String comment,
Map<String, String> properties,
Transform[] partitioning) {
Transform[] partitioning,
Index[] indexes) {
if (ArrayUtils.isNotEmpty(partitioning)) {
throw new UnsupportedOperationException(
"Currently we do not support Partitioning in PostgreSQL");
Expand Down
Loading