diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java index 1595799c..57190211 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java @@ -1,7 +1,10 @@ package org.hypertrace.core.documentstore; import static org.hypertrace.core.documentstore.utils.Utils.readFileFromResource; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -11,10 +14,16 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import org.hypertrace.core.documentstore.expression.impl.DataType; +import org.hypertrace.core.documentstore.expression.impl.IdentifierExpression; +import org.hypertrace.core.documentstore.expression.impl.RelationalExpression; +import org.hypertrace.core.documentstore.expression.operators.RelationalOperator; import org.hypertrace.core.documentstore.postgres.PostgresDatastore; +import org.hypertrace.core.documentstore.query.Query; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -169,16 +178,115 @@ public static void shutdown() { class UpsertTests { @Test - @DisplayName("Should throw UnsupportedOperationException for upsert") - void testUpsertNewDocument() { + @DisplayName("Should upsert a new document with TypedDocument") + void testUpsertNewDocument() throws IOException { + // Create TypedDocument with value and type coupled together + TypedDocument typedDoc = + TypedDocument.builder() + .field("item", "NewItem", DataType.STRING) + .field("price", 99, DataType.INTEGER) + .field("quantity", 50, DataType.INTEGER) + .field("in_stock", true, DataType.BOOLEAN) + .arrayField("tags", List.of("new", "test"), DataType.STRING) + .jsonbField("props", Map.of("brand", "TestBrand", "size", "medium")) + .build(); + + Key key = new SingleValueKey("default", "100"); + + // Perform upsert + boolean result = flatCollection.upsert(key, typedDoc); + assertTrue(result, "Upsert should return true for new document"); + + // Verify the document was inserted by querying it + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("item"), + RelationalOperator.EQ, + org.hypertrace.core.documentstore.expression.impl.ConstantExpression.of( + "NewItem"))) + .build(); + + Iterator results = flatCollection.find(query); + assertTrue(results.hasNext(), "Should find the inserted document"); + + Document foundDoc = results.next(); + assertNotNull(foundDoc); + + JsonNode foundJson = OBJECT_MAPPER.readTree(foundDoc.toJson()); + assertEquals("NewItem", foundJson.get("item").asText()); + assertEquals(99, foundJson.get("price").asInt()); + assertEquals(50, foundJson.get("quantity").asInt()); + assertTrue(foundJson.get("in_stock").asBoolean()); + + // Verify array field + JsonNode tagsNode = foundJson.get("tags"); + assertNotNull(tagsNode); + assertTrue(tagsNode.isArray()); + assertEquals(2, tagsNode.size()); + + // Verify JSONB field + JsonNode propsResult = foundJson.get("props"); + assertNotNull(propsResult); + assertEquals("TestBrand", propsResult.get("brand").asText()); + } + + @Test + @DisplayName("Should update existing document on upsert with TypedDocument") + void testUpsertExistingDocument() throws IOException { + // First, get an existing document ID from the initial data + String existingId = "1"; // ID 1 exists in initial data + + // Create TypedDocument with updated values + TypedDocument typedDoc = + TypedDocument.builder() + .field("item", "UpdatedSoap", DataType.STRING) + .field("price", 999, DataType.INTEGER) + .field("quantity", 100, DataType.INTEGER) + .field("in_stock", false, DataType.BOOLEAN) + .build(); + + Key key = new SingleValueKey("default", existingId); + + // Perform upsert (should update existing) + boolean result = flatCollection.upsert(key, typedDoc); + assertTrue(result, "Upsert should return true for existing document update"); + + // Verify the document was updated + Query query = + Query.builder() + .setFilter( + RelationalExpression.of( + IdentifierExpression.of("item"), + RelationalOperator.EQ, + org.hypertrace.core.documentstore.expression.impl.ConstantExpression.of( + "UpdatedSoap"))) + .build(); + + Iterator results = flatCollection.find(query); + assertTrue(results.hasNext(), "Should find the updated document"); + + Document foundDoc = results.next(); + JsonNode foundJson = OBJECT_MAPPER.readTree(foundDoc.toJson()); + assertEquals("UpdatedSoap", foundJson.get("item").asText()); + assertEquals(999, foundJson.get("price").asInt()); + assertEquals(100, foundJson.get("quantity").asInt()); + } + + @Test + @DisplayName("Should throw IllegalArgumentException for non-TypedDocument") + void testUpsertWithoutTypedDocument() { ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); - objectNode.put("_id", 100); objectNode.put("item", "NewItem"); objectNode.put("price", 99); Document document = new JSONDocument(objectNode); Key key = new SingleValueKey("default", "100"); - assertThrows(UnsupportedOperationException.class, () -> flatCollection.upsert(key, document)); + assertThrows( + IllegalArgumentException.class, + () -> flatCollection.upsert(key, document), + "Should throw IllegalArgumentException when not using TypedDocument"); } @Test diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/TypedDocument.java b/document-store/src/main/java/org/hypertrace/core/documentstore/TypedDocument.java new file mode 100644 index 00000000..fd36188a --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/TypedDocument.java @@ -0,0 +1,177 @@ +package org.hypertrace.core.documentstore; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.hypertrace.core.documentstore.expression.impl.DataType; + +/** + * A Document implementation where each field carries its value and type information. + * + *

For flat PostgreSQL tables, type information is needed to correctly set values in prepared + * statements, especially for arrays and JSONB columns. + * + *

Usage: + * + *

{@code
+ * TypedDocument typedDoc = TypedDocument.builder()
+ *     .field("item", "Soap", DataType.STRING)
+ *     .field("price", 99, DataType.INTEGER)
+ *     .field("in_stock", true, DataType.BOOLEAN)
+ *     .arrayField("tags", List.of("hygiene", "bath"), DataType.STRING)
+ *     .jsonbField("props", Map.of("brand", "Dove", "size", "large"))
+ *     .build();
+ * }
+ */ +public class TypedDocument implements Document { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private final Map fields; + + private TypedDocument(Map fields) { + this.fields = Collections.unmodifiableMap(fields); + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public String toJson() { + Map jsonMap = new LinkedHashMap<>(); + for (Map.Entry entry : fields.entrySet()) { + jsonMap.put(entry.getKey(), entry.getValue().getValue()); + } + try { + return MAPPER.writeValueAsString(jsonMap); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize TypedDocument to JSON", e); + } + } + + @Override + public DocumentType getDocumentType() { + return DocumentType.FLAT; + } + + /** Returns all fields in this document. */ + public Map getFields() { + return fields; + } + + /** Returns a specific field by name, or null if not present. */ + public TypedField getField(String fieldName) { + return fields.get(fieldName); + } + + /** Represents a field's value and type information. */ + public static class TypedField { + private final Object value; + private final FieldKind kind; + private final DataType dataType; // For SCALAR or ARRAY element type + + private TypedField(Object value, FieldKind kind, DataType dataType) { + this.value = value; + this.kind = kind; + this.dataType = dataType; + } + + public static TypedField scalar(Object value, DataType dataType) { + return new TypedField(value, FieldKind.SCALAR, dataType); + } + + public static TypedField array(Collection value, DataType elementType) { + return new TypedField(value, FieldKind.ARRAY, elementType); + } + + public static TypedField jsonb(Object value) { + return new TypedField(value, FieldKind.JSONB, null); + } + + public Object getValue() { + return value; + } + + public FieldKind getKind() { + return kind; + } + + /** Returns the DataType for scalar fields, or the element type for array fields. */ + public DataType getDataType() { + return dataType; + } + + public boolean isScalar() { + return kind == FieldKind.SCALAR; + } + + public boolean isArray() { + return kind == FieldKind.ARRAY; + } + + public boolean isJsonb() { + return kind == FieldKind.JSONB; + } + } + + /** The kind of field (scalar, array, or JSONB). */ + public enum FieldKind { + SCALAR, + ARRAY, + JSONB + } + + /** Builder for TypedDocument */ + public static class Builder { + private final Map fields = new LinkedHashMap<>(); + + private Builder() {} + + /** + * Adds a scalar field with its value and type. + * + * @param name the column name + * @param value the field value + * @param type the data type + * @return this builder + */ + public Builder field(String name, Object value, DataType type) { + fields.put(name, TypedField.scalar(value, type)); + return this; + } + + /** + * Adds an array field with its values and element type. + * + * @param name the column name + * @param values the array values + * @param elementType the data type of array elements + * @return this builder + */ + public Builder arrayField(String name, List values, DataType elementType) { + fields.put(name, TypedField.array(values, elementType)); + return this; + } + + /** + * Adds a JSONB field with its value (will be serialized as JSON). + * + * @param name the column name + * @param value the value (typically a Map or object that can be serialized to JSON) + * @return this builder + */ + public Builder jsonbField(String name, Object value) { + fields.put(name, TypedField.jsonb(value)); + return this; + } + + public TypedDocument build() { + return new TypedDocument(fields); + } + } +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 0b7afc62..1020ad93 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -1,6 +1,16 @@ package org.hypertrace.core.documentstore.postgres; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; +import java.sql.Array; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; @@ -14,13 +24,17 @@ import org.hypertrace.core.documentstore.Document; import org.hypertrace.core.documentstore.Filter; import org.hypertrace.core.documentstore.Key; +import org.hypertrace.core.documentstore.TypedDocument; import org.hypertrace.core.documentstore.UpdateResult; +import org.hypertrace.core.documentstore.expression.impl.DataType; import org.hypertrace.core.documentstore.model.options.QueryOptions; import org.hypertrace.core.documentstore.model.options.UpdateOptions; import org.hypertrace.core.documentstore.model.subdoc.SubDocumentUpdate; import org.hypertrace.core.documentstore.postgres.query.v1.PostgresQueryParser; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType; import org.hypertrace.core.documentstore.postgres.query.v1.transformer.FlatPostgresFieldTransformer; import org.hypertrace.core.documentstore.query.Query; +import org.postgresql.util.PGobject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,8 +48,10 @@ public class FlatPostgresCollection extends PostgresCollection { private static final Logger LOGGER = LoggerFactory.getLogger(FlatPostgresCollection.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); private static final String WRITE_NOT_SUPPORTED = "Write operations are not supported for flat collections yet!"; + private static final String ID_COLUMN = "id"; FlatPostgresCollection(final PostgresClient client, final String collectionName) { super(client, collectionName); @@ -76,7 +92,175 @@ private PostgresQueryParser createParser(Query query) { @Override public boolean upsert(Key key, Document document) throws IOException { - throw new UnsupportedOperationException(WRITE_NOT_SUPPORTED); + if (!(document instanceof TypedDocument)) { + throw new IllegalArgumentException( + "FlatPostgresCollection.upsert requires a TypedDocument. " + + "Use TypedDocument.builder(doc).field(...).build()"); + } + + TypedDocument typedDoc = (TypedDocument) document; + return upsertTypedDocument(key, typedDoc); + } + + private boolean upsertTypedDocument(Key key, TypedDocument typedDoc) throws IOException { + try { + List columns = new ArrayList<>(); + List typedFields = new ArrayList<>(); + + // Add the id column + columns.add(ID_COLUMN); + typedFields.add(TypedDocument.TypedField.scalar(key.toString(), DataType.STRING)); + + // Extract fields from TypedDocument + for (Map.Entry entry : typedDoc.getFields().entrySet()) { + String fieldName = entry.getKey(); + + // Skip id as we already added it + if (ID_COLUMN.equals(fieldName)) { + continue; + } + + columns.add(fieldName); + typedFields.add(entry.getValue()); + } + + // Build and execute the upsert SQL + String sql = buildUpsertSql(columns); + return executeUpsert(sql, columns, typedFields); + + } catch (SQLException | JsonProcessingException e) { + LOGGER.error("Exception during upsert for key: {}", key, e); + throw new IOException(e); + } + } + + private String buildUpsertSql(List columns) { + StringBuilder sql = new StringBuilder(); + sql.append("INSERT INTO ").append(tableIdentifier).append(" ("); + + // Column names + for (int i = 0; i < columns.size(); i++) { + if (i > 0) sql.append(", "); + sql.append("\"").append(columns.get(i)).append("\""); + } + + sql.append(") VALUES ("); + + // Placeholders + for (int i = 0; i < columns.size(); i++) { + if (i > 0) sql.append(", "); + sql.append("?"); + } + + sql.append(") ON CONFLICT (\"").append(ID_COLUMN).append("\") DO UPDATE SET "); + + // Update clause (skip _id) + boolean first = true; + for (int i = 1; i < columns.size(); i++) { + if (!first) sql.append(", "); + sql.append("\"") + .append(columns.get(i)) + .append("\" = EXCLUDED.\"") + .append(columns.get(i)) + .append("\""); + first = false; + } + + return sql.toString(); + } + + private boolean executeUpsert( + String sql, List columns, List typedFields) + throws SQLException, JsonProcessingException { + + Connection connection = client.getConnection(); + try (PreparedStatement ps = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { + + for (int i = 0; i < typedFields.size(); i++) { + int paramIndex = i + 1; + TypedDocument.TypedField field = typedFields.get(i); + Object value = field.getValue(); + + if (value == null) { + ps.setObject(paramIndex, null); + } else if (field.isJsonb()) { + // JSONB column - serialize value to JSON string + PGobject jsonObj = new PGobject(); + jsonObj.setType("jsonb"); + String jsonValue = + (value instanceof String) ? (String) value : MAPPER.writeValueAsString(value); + jsonObj.setValue(jsonValue); + ps.setObject(paramIndex, jsonObj); + } else if (field.isArray()) { + // Array column + DataType elementType = field.getDataType(); + PostgresDataType pgType = PostgresDataType.fromDataType(elementType); + Object[] arrayValues = ((Collection) value).toArray(); + Array sqlArray = connection.createArrayOf(pgType.getSqlType(), arrayValues); + ps.setArray(paramIndex, sqlArray); + } else { + // Scalar column + setScalarParameter(ps, paramIndex, value, field.getDataType()); + } + } + + int result = ps.executeUpdate(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Upsert result: {}, SQL: {}", result, sql); + } + return result > 0; + } + } + + private void setScalarParameter(PreparedStatement ps, int index, Object value, DataType type) + throws SQLException { + if (value == null) { + ps.setObject(index, null); + return; + } + + if (type == null) { + // No type specified, let JDBC infer + ps.setObject(index, value); + return; + } + + switch (type) { + case STRING: + ps.setString(index, value.toString()); + break; + case INTEGER: + ps.setInt(index, ((Number) value).intValue()); + break; + case LONG: + ps.setLong(index, ((Number) value).longValue()); + break; + case FLOAT: + ps.setFloat(index, ((Number) value).floatValue()); + break; + case DOUBLE: + ps.setDouble(index, ((Number) value).doubleValue()); + break; + case BOOLEAN: + ps.setBoolean(index, (Boolean) value); + break; + case TIMESTAMPTZ: + if (value instanceof Timestamp) { + ps.setTimestamp(index, (Timestamp) value); + } else { + ps.setObject(index, value); + } + break; + case DATE: + if (value instanceof java.sql.Date) { + ps.setDate(index, (java.sql.Date) value); + } else { + ps.setObject(index, value); + } + break; + default: + ps.setObject(index, value); + } } @Override