Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Current state: Accepted

...

To reduce network transmission and skip Plusar management, the new interface will allow users to input the path of some data files(json, numpy, etc.) on MinIO/S3 storage, and let the data nodes directly read these files and parse them into segments. The internal logic of the process becomes:

        1. client calls import() to pass some file paths to Milvus proxy node  

        2. proxy node passes the file paths to data coordinator

        3. data coordinator pick a data node or multiple data node (according to the files count) to parse files, each file can be parsed to a segment or multiple segments.


Some points to consider:

  • JSON format is flexible, ideally, the import  API ought to parse user's JSON files without asking user to reformat the files according to a strict rule.
  • Users can store scalar fields and vector fields in a JSON file, with row-based or column-based. The import() API can support both of them.

...

Code Block
{
	"data_source": {			          // required
		"type": "minio",		          // required, "minio" or "s3", case insensitive
		"address": "localhost:9000",	  // optional, milvus server will use its minio/s3 configuration if without this value
		"accesskey_id": "minioadmin",	  // optional, milvus server will use its minio/s3 configuration if without this value
		"accesskey_secret": "minioadmin", // optional, milvus server will use its minio/s3 configuration if without this value
		"use_ssl": false,		          // optional, milvus server will use its minio/s3 configuration if without this value
		"bucket_name": "aaa"		      // optional, milvus server will use its minio/s3 configuration if without this value
	},

	"internal_data": {		     // optional, external_data or internal_data. (external files include json, npy, etc. internal files are exported by milvus)
		"path": "xxx/xxx/xx",	 // required, relative path to the source storage where store the exported data
		"collections_mapping": { // optional, give a new name to collection during importing
			"coll_a": "coll_b",	     // collection name mapping, key is the source collection name, value is a new collection name
			"coll_c": "coll_d"
		}
	},

	"external_data": {			                     // optional, external_data or internal_data. (external files include json, npy, etc. internal files are exported by milvus)
		"target_collection": "xxx",	                 // required, target collection name
		"chunks": [{			                     // required, chunk list, each chunk can be import as one segment or split into multiple segments
				"files": [{	                         // required, files that provide data of a chunk
					"path": "xxxx / xx.json", 		 // required, relative path under the storage source defined by DataSource, currently support json/npy
					"type": "row_based",	  		 // required, "row_based" or "column_based", tell milvus how to parse this json file, case insensitive
                    "from": 0,                       // optional, import part of the file from a number
                    "to": 1000,                      // optional, import part of the file end by a number
					"fields_mapping": {			     // optional, specify the target fields which should be imported. Milvus will import all fields if this list is empty
						"table.rows.id": "uid",		 // field name mapping, tell milvus how to insert data to correct field, key is a json node path, value is a field name of the collection. If the file is numpy format, the key is a field name of the collection same with value.
						"table.rows.year": "year",
						"table.rows.vector": "vector"
					}
				}]
			}
		],
		"default_fields": { // optional, use default value to fill some fields
			"age": 0,       // key is a field name, value is default value of this field, can be number or string
			"weight": 0.0
		}
	}
}

...

Code Block
rpc Import(ImportRequest) returns (MutationResult) {}

message ImportRequest {
  common.MsgBase base = 1;
  string options = 2;      // the json options string
}


message MutationResult {
  common.Status status = 1;
  schema.IDs IDs = 2;             // return auto-id for insert/import, deleted id for delete
  repeated uint32 succ_index = 3; // succeed indexes for insert/import
  repeated uint32 err_index = 4;  // error indexes for insert/import
  bool acknowledged = 5;
  int64 insert_cnt = 6;           // how many entities were inserted or imported
  int64 delete_cnt = 7;
  int64 upsert_cnt = 8;
  uint64 timestamp = 9;
}

...