...
For solving Problem 3, We redefine the interface implemented by minio. All file storage needs to implement this interface.
Code Block title ChunkManager interface linenumbers true type ChunkManager interface { GetPath(filePath string) (string, error) GetSize(filePath string) (int64, error) Write(filePath string, content []byte) error MultiWrite(contents map[string][]byte) error Exist(filePath string) bool Read(filePath string) ([]byte, error) MultiRead(filePaths []string) ([][]byte, error) ReadWithPrefix(prefix string) ([]string, [][]byte, error) ReadAt(filePath string, off int64, length int64) (p []byte, err error) Remove(filePath string) error MultiRemove(filePaths []string) error RemoveWithPrefix(prefix string) error }
For this interface we will have three implementations, LocalChunkManager, MinioChunkmanager and VectorChunkManager.
VectorChunkManager is an optimized management class for vector reading under distributed milvus. It will use minio as a storagechunkManager and local file system as a vectorChunkStorage. When reading a file, it will be downloaded from the minio to the local, and then the relevant data will be read from the local. In the standalone version, we will replace minioChunkManager with localChunkManager as the implementation of storageChunkManager.- ChunkManagerFactory
For Problem 1, a chunkManagerFactory similar to msgStream. Factory is added to generate chunkManagers with different configurations.
Code Block title ChunkManager Factory linenumbers true type ChunkManagerFactory struct { ChunkStorage string VectorCacheStorage string } func NewChunkManagerFactory(ChunkStorage,VectorCacheStorage string) *ChunkManagerFactory{} func (cmf *ChunkManagerFactory) NewChunkStorage(opts ...storage.Options){ switch (cmf.ChunkStorage){ case "s3": case "minio": ... } } func (cmf *ChunkManagerFactory) NewVectorCacheManager(opts ...storage.Options){}
Options is needed when generating a new chunkManager. The Options maybe like this.
Code Block title ChunkManager Config linenumbers true type config struct { address string bucketName string accessKeyID string secretAccessKeyID string useSSL bool createBucket bool rootPath string } type Option func(*config) func Address(addr string) Option { return func(c *config) { c.address = addr } }
This structure will have some redundancy. For example, local storage will not require parameters such as address and bucketname. but will be easier to reuse.
Use a more generic factory instead of the existing msgFactory to build nodes.
Code Block title Interface extened factory := newMsgFactory(localMsg) rc, err := components.NewRootCoord(ctx, factory) ↓↓↓↓↓↓↓↓↓↓↓↓↓ factory := newFactory(localMsg) rc, err := components.NewRootCoord(ctx, factory)
And the factory will be like this
Code Block title Factory Struct linenumbers true type Factory struct { msgF msgstream.Factory storageF storage.ChunkManagerFactory } func newDefaultFactory(opts ...Option) *Factory { c := newDefaultConfig() for opt := range opts { opt(c) } return &Factory{ MsgFactory: msgstream.NewFactory(c.msgstream), storageF: storage.NewChunkManagerFactory(c.vectorCacheStorage, c.chunkStorage), } } type config struct { msgStream string vectorCacheStorage string chunkStorage string } func newDefaultConfig() *config{} type Option func(*config) func vectorCacheStorage(vectorCacheStorage string) Option { return func(c *config) { c.vectorCacheStorage = vectorCachestorage } }
...