Big Memory

This section provides a brief overview of Big Memory Pile, please refer to NFX documentation for details.

Big Memory Pile - is a part of NFX library and is used throughout Agni OS. The technology consists of the following sub-components:

  • Pile - a custom memory manager/allocator. 100% managed code
  • Cache - a dictionary-like structure that holds data objects in Pile
  • Custom "Big" Data Structures - custom structures that can be developed to store hundreds of millions of nodes in RAM using Pile, e.g. a radix tree, a linked list, a convolutional net etc.

The Problem

Managed runtimes can not handle more than 10 million objects without pauses for garbage collection. Sometimes even 10 MM resident objects start to pause process. GC Heap defragmentation kills the application by adding unpredictable stalls. The addition of physical RAM leads to increase of unpredictability even more as GC can now postpone its full sweep. So it is practically impossible to use in 64/128/256GB RAM nowadays, occupied by small CLR objects - the process just stalls.

All of the modern techniques (concurrent, background, parallel, server mode) try to solve this problem but without any significant success. This is because the combinatorics of reference updates explodes with the increase of object count and their inter-dependencies, especially when the memory needs to be shifted and millions of scattered references need to be updated to point to the new place. Secondarily, CLR objects consume lots of RAM, for example a string "ABC" holds around 30 bytes instead of 4 (with null terminator or size prefix) - this is by design and expected, the field alignments, Unicode strings and object prefix (pointers to VMT, locking segments, GC flags) all take space. Java has the same issue, in spite of the fact that its VMs have way more options to control GC, but it is not enough for storing of hundreds of millions of objects. The root of the problem: the GC "sees" references and has to traverse them to see what is still reachable and what is not, obviously it takes time and slows down the application as some operations can not be done by parallel threads.

Why keep that Many Objects in RAM?

Many resident data objects are needed for different types of applications such as: neural networks, caches, pre-computed data, etc. Pure stateless designs are really rare majority of business applications need state. A common technique is to use an external component (such as RDBMS).

The application state is not only limited to user session state - it is the state of the database itself (business data) which is needed, for example in an eCommerce application it is beneficial to cache all products with their SKU specifications for quick access while building the shopping checkout page.

A social app needs to store a graph of objects with ability to traverse connections between users for various purposes.

While many of the problems described above are solvable using external stores (i.e. Redis), they have the following drawbacks:

  • Adds extra component to solution (maintenance etc.) - increases complexity
  • Adds IPC/context switching and possibly network traffic
  • Custom serialization - business domain objects need to be serialized so they can be stored in external systems. This usually introduces extra DTO (data transfer objects) just to move data around*

* for some reason - most of benchmarks completely disregard the problem domain - they all measure the performance using byte[] or strings, completely forgetting the fact that strings and byte[] are useless for business problems - one needs to serialize/deserialize the payload - and that time is usually left outside of the benchmarking scope. Our experience points at the improper serialization choices (e.g. XML text for sending arrays) as the major performance constraining factor

The Solution

NFX solves the problem by "hiding" data from the GC. Until recently there wasn’t any general-purpose library implementation of this concept in .NET. We called this technology Pile. Pile is a custom memory manager and it has 100%-managed code. Pile can store native CLR objects in a tightly-serialized form and free the managed garbage collector from having to deal with this objects. In addition piles can be either local (allocate local RAM on the server), or distributed (allocate RAM on many servers, e.g. in Agni cluster). Contrary to many concerns of performance degradation due to serialization, Pile yields better results in terms of both time and space than using out-of-process serialization (i.e. Redis/Memcache).

                                            
    public struct PilePointer : IEquatable
    {
        // Distributed Node ID. The local pile sets this to -1 rendering
        // this pointer as !DistributedValid
        public readonly int NodeID;

        // Segment # within pile
        public readonly int Segment;

        // Address within the segment (offset)
        public readonly int Address;
    }
                                            
                                        

Usage example:

                                            
    var person = new Person { LastName="a", FirstName="b"....};

    var ptr =  pile.Put(person); // ptr is a value type
    ...
    var originalPerson = pile.Get(ptr);
    pile.Delete(ptr);
                                            
                                        

Key-Value DB (KDB)

Agni provides a simple key-value store called "KDB". The store is sharded and can accommodate very many records. The primary purpose of KDB is to store short key associations. This is beneficial in many ways, especially with systems where data is sharded "by hand". The manual sharding approach provides an unmatched performance as it allows for designs where related data is co-located on the same physical hardware, which reduces the cross-shard joins by orders of magnitude. The drawbacks of "manual" sharding is usually the increased complexity of initial design and dependence of data location on a single primary key. KDB is used in this case as a fast look-aside translation mechanism to map external IDs to internal ones.

The KDB store is organized in tables, each having a primary key of byte[] storing either raw byte[] or Rows.

Data supports optional sliding and/or absolute expiration time stamps - this is used in many business cases when old mappings need to be auto-purged. The Get* family of methods supports "dontTouch" flag which is used to try get the data without updating its last use time.

The Data abstraction:

                                            
    public struct KDBRecord<TResult> where TResult : class
    {
        public readonly TResult Value;
        public readonly int SlidingExpirationDays;
        public readonly DateTime LastUseDate;
        public readonly DateTime? AbsoluteExpirationDateUTC;

        public bool IsAssigned { get { return Value != null; } }
    }
                                            
                                        

The store contract:

                                            
    // Stipulates a contract for KDBDataStore
    public interface IKDBDataStore : IDataStore
    {
        // Gets a row of data by key, or null if row with such key was not found or data is not Row
        // table: Required must be non-null valid identifier string less than 32 chars
        // key: Byte array key, must be non-null array with at least one element
        Row Get(string table, byte[] key);

        // Gets a row of data projected in the specified typed model, or null if row
        // with such key was not found or data is not Row
        // table: Required must be non-null valid identifier string less than 32 chars
        // key: Byte array key, must be non-null array with at least one element
        // dontToch: If true then does not update the item's last use time
        KDBRecord<TRow> Get<TRow>(string table, 
                                  byte[] key,
                                  bool dontTouch = false) where TRow : Row;

        // Gets a raw byte[] of data or null if data does not exist or data is not
        // raw byte[] but Row
        // table: Required must be non-null valid identifier string less than 32 chars
        // key: Byte array key, must be non-null array with at least one element
        // dontTouch: If true then does not update the item's last use time
        KDBRecord<byte[]> GetRaw(string table, byte[] key, bool dontTouch = false);

        // Puts a row of data under the specified key
        // table: Required must be non-null valid identifier string less than 32 chars
        // key:  Byte array key, must be non-null array with at least one element
        // value: Data object must be non-null
        // slidingExpirationDays:
        //  When set, specifies the sliding expiration of the entry in days.
        //  The system DOES NOT guarantee the instantaneous deletion of expired data
        // absoluteExpirationDateUtc:
        //  When set, specifies when garbage collector should auto-delete the value.
        //  It does not guarantee that the value is deleted right at that date
        void Put(string table,
                 byte[] key,
                 Row value, 
                 int slidingExpirationDays = -1, 
                 DateTime? absoluteExpirationDateUtc = null);

        // Puts a raw byte[] value under the specified key
        // table: Required must be non-null valid identifier string less than 32 chars
        // key: Byte array key, must be non-null array with at least one element
        // value: byte[] must be non-null
        // slidingExpirationDays:
        // When set, specifies the sliding expiration of the entry in days.
        // The system DOES NOT guarantee the instantaneous deletion of expired data
        // absoluteExpirationDateUtc:
        // When set, specifies when garbage collector should auto-delete the value.
        // It does not guarantee that the value is deleted right at that date
        void PutRaw(string table, 
                    byte[] key,
                    byte[] value,
                    int slidingExpirationDays = -1,
                    DateTime? absoluteExpirationDateUtc = null);

        // Deletes a row of data under the specified key returning true if 
        // deletion succeeded
        // table: Required must be non-null valid identifier string less than 32 chars
        // key: Byte array key, must be non-null array with at least one element
        bool Delete(string table, byte[] key);
    }
                                            
                                        

The store sharding scheme controls the distribution of read/write requests. Once the shards are added/removed - the system rebalances the data automatically. The KDB store uses the static sharding scheme with fallbacks, that is - the shards are pre-set in the config file. This design is very simple and works very well for key-value store as the number of shards is usually known beforehand as {key:value} mappings take little space, for example a shard set of 5 servers with 500Gb drives may hold around 10+ billion associations while using backend like MongoDB.

The following example was taken from the KDB integration test

Config:

                                            
    app
    {
        data-store
        {
            type='Agni.KDB.DefaultKDBStore, Agni.MongoDB'

            shard
            {
                order=0
                primary-cs='mongo{server=""mongo://localhost:27017"" db=""UT_ACKDB0""}'
                secondary-cs='mongo{server=""mongo://localhost:27017"" db=""UT_ACKDB0""}'
            }

            shard
            {
                order=1
                primary-cs='mongo{server=""mongo://localhost:27017"" db=""UT_ACKDB1""}'
                secondary-cs='mongo{server=""mongo://localhost:27017"" db=""UT_ACKDB1""}'
            }

            fallback
            {
                shard
                {
                    order=0
                    primary-cs='mongo{server=""mongo://localhost:27017"" db=""UT_ACKDB0""}'
                    secondary-cs='mongo{server=""mongo://localhost:27017"" db=""UT_ACKDB0""}'
                }
            }
        }
    }
                                            
                                        

Example of Put, Get and Delete:

                                            
    var kds = MyApp.KDBStore;
    var key = new byte[] {1,2,3};
    var absExp = new DateTime(2300, 1, 1, 14, 00, 00, DateTimeKind.Utc);
    kds.Put("table1", key, new TestRow { Name="SomeName", Description="SomeDescr" }, 1234, absExp);
    ...
    var result = kds.Get<TestRow>("table1", key);
    Console.WriteLine(kds.Value.Description); // SomeDescr
    ...
    kds.Delete("table1", key); //true

                                            
                                        

MDB Sharding Router

Agni OS provides the API model for organizing "Medium Databases" - aka MDB. The term "Medium" is used in a sense of a "middleware agent" through which the actual data is transacted. MDB relies on Global Distributed IDs (see the definition) being consecutive integers which allows for range partitioning.

MDB is designed to store relational and document data organized by sharding ID in "briefcases" within areas.

The data is organized in the named namespaces called "areas". Areas provide logical separation of data and physical scheme for partitioning and sharding. The purpose of each area is to provide the best possible load accommodation for particular data type. For example: User area holds user-related data (profile, transactions, balances) for users partitioning it on GDID. This approach allows to use traditional query methods within one "briefcase" of data.

Within an area, a "briefcase" is a segment of relational tables which is keyed on the same partition GDID - a "briefcase key". Briefcases represent logical set of data needed for business logic.

For example: User briefcase stores all user-related tables on the same RDBMS server. This allows for quick traversal/joins of data within the briefcase identified by GDID ID. Briefcases are not physical entities, they are logical sets of rows in various tables within the same DB server. An area may hold MORE THAN ONE type of briefcases (as its name should reflect).

Internally, briefcases are usually stored in hierarchical table structures. For example:

All areas except "CENTRAL" store data in partitions that are range-based on GDID, then within a partition data is sharded among the shard count.

The SHARD COUNT within partition is normally immutable (pre-arranged). If the shard count needs to change within a partition, briefcase resharding within that partition has to be done (see below).

Partitioning IDs are always GDID (global unique identifier), not strings; this is because MDB uses range partitioning in all but CENTRAL area.

CENTRAL Area DOES NOT use any partitioning. It is a special Area used for global definitions/indexing.
It uses sub-shards still. CENTRAL Area is the only area that has SHARDING KEY: object (not gdid), as it allows to lookup by strings and other shard keys.

When a STRING ID (i.e. user email) needs to be mapped to GDID (i.e. user briefcase GDID), the CENTRAL Area should be queried (index tables). Most of the data in Central area is static, so it gets aggressively cached.

All other areas use Range partitioning. Range partitioning works as follows:

  • All sharding IDS are monotonically-increasing GDIDs (with authority bits discarded: Era 32bit + Counter 60 bit)
  • Sharding GDID is requested
  • The system needs to locate a shard that holds that data within the area.
  • System looks-up the system config that maps GDID start {Era, Counter} to the partition
  • The partition is further broken-down by shards, this is needed so that write/read load of current data does not create hotspot on one server. The Number of shards within partition is not expected to be changed (or briefcase data rebalancing would need to take place with 3rd party tools, see below).

Benefits:

  • Data does not need to be physically moved between partitions on partition addition. Once a partition has been assigned, data remains there
  • Quick mapping of GDID_SHARDING_ID -> physical server
  • Ability to gradually increase capacity: start business with one partition, assess the load and add more partitions when necessary
  • Fast in-area queries - as data is physically pre-grouped in "briefcases" by GDID_SHARDING_ID (all briefcase-related data is on the same physical server)

Drawbacks:

  • If "older GDID" data gets stale*, the older shards experience less load
  • Possibly uneven distribution of "newer/hotter" data goes towards the end
  • Theoretically not 100% even distribution as some USERS (or other briefcases) may have more data than others, 100 users on one server!=100 users on another. This should not happen as proper design should create briefcases of uniform size. In case of non-uniform load the briefcase should contain pointers to other briefcases (a-la balanced tree nodes).

* - keep in mind, "older" users still have new transactions coming into their shard, as transactions/balances are co-located with user

Archiving

With time (after X years), some data may get deleted from the MDB. Older customer data may get archived and moved-out into a long-term storage. Instead of adding new partitions, we can set a GDID brakepoint (one number) after which the range partitioning tables will start over - that is the GDIDs below the brakepoint will get routed according to the first range set, after breakpoint, to another.. and so on.

How to re-shard the data within the partition (briefcase move)

The business logic-aware tool(script/utility) need to be constructed to move briefcases (all logically-grouped data) between DB servers. It is important to note that AUTO-INC ids SHOULD NOT be used because of possible collisions, instead GDIDs need to be used throughout all tables.

Parcel DB

Parcel DB (PDB) is an archi-global data store that stores data in Parcels - hence the name. A Parcel is a unit of data change and replication in the PDB system. Parcel are version controlled GDID-addressable data units auto-replicating between multiple geo-spread data regions. The following describes the the features of ParcelDB (PDB):

  • A Parcel - a document with a flexible schema, supporting:
    • Schema upgrade
    • Version control
    • Version conflict resolution via Merge()
    • Domain entity design - C# high-level format (a class with data properties)
    • Tight data format for serialization/moving. Support data types: CLR primitives, nullable, system (GDID, NLSMap) etc.
  • Multi-data center, multi master design
  • Regulatory replication constraint (i.e. keep data for US citizens in US data centers only)
  • Self-healing fault tolerant self-replicating shard silos. Resolves data corruption by majority vote. Hot-swap data servers (i.e. replace servers without restart/shutdown) - the system detects imbalance and replicates necessary data parcels
  • Built-in caching layer, supports various data veracity (fidelity modes)
  • Flexible shard technology - plug-in MongoDB, MySQL, or any other data backend. Uses storage as pods/devices
  • Consistent hashing sharding
  • Designed to handle data capacities limited only by available hardware
  • Runs on Agni Cluster OS - shares same semantic, configuration, and operational requirements/standards

BFS - Big File System

The BFS file system is implemented on top of Parcel DB (PDB), consequently it inherits all of the guarantees of multi-master replication and fault tolerance that PDB provides. BFS provides high-level functional abstraction exposed as NFX.IO.FileSystem implementation, supporting the following functions:

  • Geo-aware data access
  • Random access files (seek)
  • Optimized streaming access
  • Transactions (cancel multiple file changes)
  • Granular user rights
  • Strong file permission structure (supports thousands of permission handles)
  • Support metadata streams (additional metadata auxiliary to file data)
  • Detailed access logs (create, last access, modify)
  • File links, sharing between users
  • Unlimited capacity - limits are hardware only (i.e. supports hundreds of millions of users with gigabytes of data)
  • Supports large files (2^64) max length

The BFS system is similar to google Drive - in a public file storage sense - both systems provide strong sharing/permission model with public folders.