1. Wozu ein Small Object Allocator?
Die meisten C++ Objekte sind relativ klein, meistens nur wenige Bytes. Manchmal muss man auch Integer anfordern, beispielsweise für den Referenzzähler bei Smart-pointern:
template<typename T>
class shared_ptr
{
public:
explicit shared_ptr(T* ptr = 0):
ptr(ptr),
counter(new int(1))
{}
shared_ptr(const shared_ptr& other):
ptr(other.ptr),
counter(other.counter)
{}
~shared_ptr()
{
if(--*counter == 0)
{
delete counter;
delete ptr;
}
}
// Und weitere typische smart-pointer methoden...
T& operator*()
{
return *ptr;
}
private:
int* counter;
T* ptr;
};
Das ist eine unvollständige Implementierung eines shared_ptrs, ein Zeiger der ein Objekt automatisch löscht, sobald keine Referenzen mehr darauf existieren. Sie soll nur eines zeigen: Hier wird Speicher für ein sehr kleines Objekt angefordert, einen einfachen Integer.
Gut, wo ist jetzt das Problem?
Die Speicherverwaltung für C++ ist dafür ausgelegt, mit allem zu funktionieren. Auch mit großen Objekten. Dementsprechend ist sie nicht für kleine Objekte optimiert: Je nach Implementation werden 4 bis 32 Byte zusätzlich zum eigentlichen Speicherblock allokiert, für Daten wie z.B. ob der Speicherbereich belegt ist, wie groß er ist etc. Wenn ein int z.B. 4 Byte groß ist, und unsere Speicherverwaltung benötigt 32 Byte an zusätzlichen Daten, haben wir einen Overhead von 800%. Fordert man aber z.B. 4096 Byte an, liegt der maximale Overhead nur noch bei ~0.8%.
Also fordert man optimalerweise große Speicherblöcke über den Standardallokator an und verwaltet diesen selbst. Hier kommt der Small Object Allocator zum Einsatz, der diese Arbeit für uns übernimmt.
2. Die Arbeitsweise
Um den Speicher verwalten zu könnten, benötigen wir für jedes Objekt 2 Daten: Die Position im Speicher und die Objektgröße. Wenn wir allerdings jede Objektgröße erlauben, müssen wir zu jeder Speicherstelle noch eine Variable halten, die die Größe speichert. Im Fall von kleinen Objekten reicht ein Byte aus, aber der Overhead im Beispiel des shared_ptrs wäre immer noch bei 25%. Also lassen wir diese Information weg und erstellen uns einen Allokator, der immer nur Objekte einer bestimmten Größe anfordert.
Eine Information müssen wir allerdings immer noch speichern: wo der verfügbare Speicher überhaupt liegt. Ein Zeiger ist üblicherweise 4 Byte groß, das macht im bekannten shared_ptr Beispiel einen Overhead von 100% aus. Das ist eindeutig zuviel. Man könnte nun folgendes tun: Man speichert lediglich einen Zeiger auf den ersten verfügbaren Speicher. Der verfügbare Speicher selbst zeigt auf den Speicherbereich dahinter, bis irgendwann ein Speicher eine 0 beinhaltet. Wenn wir bei einer 0 ankommen, fordern wir einen neuen größeren Speicherblock an den wir entsprechend mit seinen Verweisen initialisieren. Da der angeforderte Speicher selbst die Informationen zur Verwaltung trägt, haben wir lediglich den Overhead eines einzigen Zeigers für alle Objekte einer Größe. Zur Verdeutlichung, folgende Asciigrafik:
+-------------+ +-------------+ +-------------+
|Speicherblock|-->|Speicherblock|-->|Speicherblock|-->
+-------------+ +-------------+ +-------------+
^
|
+---------------------+
|Zeiger auf den ersten|
|verfügbaren Speicher |
+---------------------+
Wird jetzt Speicher angefordert, wird der erste Speicherbereich ausgelesen, der Zeiger auf den verfügbaren Speicher eins weitergesetzt und die Speicherstelle zurückgegeben, auf die der Zeiger zuerst verwies. Dadurch verlieren wir jegliche Kenntnis vom vorherigen Speicher. Was passiert, wenn wir den Speicher zur Freigabe zurückbekommen? Nehmen wir an, wir hätten 2 mal Speicher angefordert und der erste Speicher wurde vom Nutzer an uns zurückgegeben. Folgende Situation wäre dann gegeben:
+-------------+ +-------------+ +-------------+
|Speicherblock| |Speicherblock| |Speicherblock|-->
+-------------+ +-------------+ +-------------+
^
|
+---------------------+ |
|Zeiger auf den ersten|------------------+
|verfügbaren Speicher |
+---------------------+
Nach der Freigabe des ersten Speicherblocks lassen wir den freigegebenen Speicher auf den nächsten verfügbaren Speicher verweisen und setzen unseren Zeiger auf den eben frei gewordenen Speicherblock. Dadurch können wir wieder freigewordenen Speicher wiederverwenden und haben kein Memoryleak das unendlich wächst. Nach dem Freigeben sieht unsere Struktur so aus:
+----------------------------------+
| v
+-------------+ +-------------+ +-------------+
|Speicherblock| |Speicherblock| |Speicherblock|-->
+-------------+ +-------------+ +-------------+
^
|
+---------------------+
|Zeiger auf den ersten|
|verfügbaren Speicher |
+---------------------+
Diesem System ist es auch egal, ob die Blöcke am Stück im Speicher liegen, oder getrennt sind. Wir können also jederzeit neue größere Speicherblöcke anfordern und in unsere Liste einfügen. Hier wird der Speicher nur angefordert, wenn unser Zeiger auf 0 zeigt, also kein Block mehr verfügbar ist.
3. Implementation
Zunächst sollten wir ein grobes Interface für den Allokator definieren. Das Prinzip das wir hier haben, heißt auch Pool. Dementsprechend benennen wir unsere Struktur:
struct pool
{
void init(std::size_t num_blocks, std::size_t object_size);
void* alloc();
void free(void* p);
void release();
};
init initialisiert das Objekt. Der Parameter num_objects gibt an, wieviele Objekte in einem Speicherblock Platz haben sollen. object_size ist die Größe der Objekte in Bytes. Das Gegenstück zu init, release, gibt alle angeforderten Speicherblöcke wieder frei. Dazu müssen wir zusätzlich Daten speichern; das wird erst am Ende dieses Abschnitts besprochen. (Ich verwende hier bewusst init und release statt Kon- und Destruktoren. Später wollen wir mehr Kontrolle über die Pools haben und selbst bestimmen, wann erzeugt und wann freigegeben wird. Es kommt im Prinzip auf das selbe heraus, wenn wir new und delete für einen Pool nutzen würden.)
alloc gibt den Speicherplatz für ein Objekt zurück, free gibt den übergebenen Speicher wieder frei, bzw. fügt ihn in unsere Liste ein.
Nun zur Implementation: Wir benötigen zuerst eine kleine Hilfsstruktur und 2 Methoden die uns das Leben etwas vereinfachen.
struct block
{
block* next;
};
static void set_next(void* p, void* next)
{
static_cast<block*>(p)->next = static_cast<block*>(next);
}
static void* get_next(void* p)
{
return static_cast<block*>(p)->next;
}
Diese sind direkt innerhalb der Poolstruktur definiert, wir benötigen sie auch sonst nirgends. Wenn ein neuer Speicherblock angelegt wird, wird er zuerst mit den Blöcken überschrieben und die Zeiger der Blöcke entsprechend gesetzt. Da wir normalerweise nur void-Zeiger auf den Speicher haben, gibt es die Funktionen set_next und get_next die uns das casten abnehmen. get_next liefert die Adresse des Speicherbereichs, auf die der Block zeigt, set_next setzt die Adresse neu.
Außerdem benötigen wir in der Struktur noch den Zeiger auf den Anfang unserer Liste. Wir nennen ihn head_of_free_list. Die Größe der Objekte und des Blocks werden auch in der Struktur gespeichert. Die init-Methode unseres Pools sieht so aus:
void pool::init(std::size_t num_objects, std::size_t object_size)
{
this->num_objects = num_objects;
this->object_size = std::max(object_size, sizeof(block));
head_of_free_list = 0;
}
Nichts besonders kompliziertes. Man mag sich fragen, wozu std::max hier gebraucht wird, aber das ist schnell erklärt. Die einzelnen Speicherstellen müssen mindestens so groß sein wie ein Objekt unserer Blockstruktur, damit wir unsere verkettete Liste aufbauen können. Man hat aber selten Objekte, die noch kleiner sind als die Blockstruktur, die schließlich nur einen Zeiger hält.
Jetzt wird es etwas schwieriger: die Implementation von alloc.
void* pool::alloc()
{
void* p = head_of_free_list;
if(p)
{
head_of_free_list = get_next(p);
}
else
{
char* new_block = new char[num_objects * object_size];
for(std::size_t i = object_size; i < (num_objects - 1) * object_size; i += object_size)
{
set_next(&new_block[i], &new_block[i + object_size]);
}
set_next(&new_block[(num_objects - 1) * object_size], 0);
p = new_block;
head_of_free_list = &new_block[object_size];
}
return p;
}
Ist noch Platz in der Liste, ist das ganze trivial: p ist nicht 0, also wird head_of_free_list eins weiter gesetzt und p zurückgegeben, das vorher den Wert von head_of_free_list angenommen hatte.
Etwas unverständlicher sieht dagegen der else-Teil aus. Zunächst werden für den neuen Block num_objects * object_size Bytes Speicher angefordert. Die seltsam anmutende Schleife beschreibt das zweite bis zum vorletzten Objekt mit jeweils einem Zeiger auf das nächste. Das erste Objekt wird nicht beschrieben, da es ja sowieso danach an den Anwender zurückgegeben wird. Das letzte benötigt auch eine Spezialbehandlung, da es ins Nirvana zeigen soll, um das Ende der Liste anzugeben. Danach wird noch head_of_free_list auf das 2. Element des Blockes gesetzt und das 1. Element zurückgegeben. Das wars schon.
free gestaltet sich relativ einfach:
void pool::free(void* p)
{
if(p == 0)
return;
block* dead_object = static_cast<block*>(p);
dead_object->next = static_cast<block*>(head_of_free_list);
head_of_free_list = dead_object;
}
Der freigegebene Speicher verweist nun auf das vorher erste Objekt in der Liste und der Zeiger auf den Listenanfang wurde neu gesetzt.
Übergebene 0-Zeiger ignorieren wir übrigens einfach aus Konsistenzgründen. In C++ ist schließlich auch ein delete 0; ohne weiteres möglich.
Zu release:
Das endgültige Freigeben des Speichers an das Betriebssystem beim Zerstören des Pools kann auch weggelassen werden. Dadurch haben wir nicht unbedingt ein Memoryleak. Memoryleaks entstehen, wenn man keinen Zugriff mehr auf Speicher hat, der noch nicht freigegeben wurde. Dadurch, dass wir aber die Objekte in unserer Liste verwalten, wächst der Speicherbedarf nicht ins unendliche, sondern erreicht einen Maximalwert. Es sei denn, jemand kommt auf die Idee, den pool immer wieder in einer Schleife anzulegen, dann entsteht natürlich ein Leak. Später in der Kapselung werden wir allerdings dafür sorgen, dass ein pool das komplette Programm überdauert. Sollte man dennoch den Speicher freigeben wollen, fügt man leider ein wenig Overhead hinzu: man muss beim Anfordern eines größeren Blockes einen Zeiger in einen std::vector einfügen, beim release durchlaufen und jedes Element mit delete [] löschen. Auf diese Weise kann man auch lokale Pools verwenden, falls man das unbedingt benötigt.
4. Kapselung des Pools
Zunächst sollten wir uns überlegen, wie die Anwendung aussehen soll. Es würde z.B. sehr einfach werden, wenn man Objekte einfach nur von einer Klasse small_object ableiten müsste, und schon wird bei jedem new automatisch dieser Allokator genutzt. Damit small_object die Größe des Typen kennt, werden wir daraus ein Template machen und als Parameter den Typen nehmen, der ableitet. In small_object reichen dann ein paar einfache Überladungen von operator new und delete und wir sind fast fertig. Zuerst benötigen wir einen Wrapper um den Pool, der automatisch init und release aufruft, und als dummy-Parameter für alloc und free die Größe des Speichers übernimmt. Wozu das gut ist, sehen wir später. Hier zuerst die einfache Implementation des fixed_size_allocators:
class fixed_size_allocator
{
public:
fixed_size_allocator(std::size_t num_objects, std::size_t object_size)
{
allocator.init(num_objects, object_size);
}
~fixed_size_allocator()
{
allocator.release();
}
void* alloc(std::size_t)
{
return allocator.alloc();
}
void free(void* p, std::size_t)
{
allocator.free(p);
}
private:
fixed_size_allocator(const fixed_size_allocator&);
fixed_size_allocator& operator=(const fixed_size_allocator&);
pool allocator;
};
Die Implementation von small_object kann dann so aussehen:
template<typename T>
class small_object
{
static void* operator new(std::size_t size)
{
return allocator.alloc(size);
}
static void* operator new(std::size_t size, void* p)
{
return p;
}
static void operator delete(void* p, std::size_t size)
{
allocator.free(p, size);
}
private:
static fixed_size_allocator allocator;
};
template<typename T>
fixed_size_allocator small_object<T>::allocator(64, sizeof(T));
Der 2. operator new sorgt übrigens nur dafür, dass placement-new immer noch möglich ist.
Die Anwendung sieht folgendermaßen aus:
struct foo : small_object<foo>
{
int i;
};
Mehr braucht man nicht. Nur ableiten. Ein Problem hat diese Implementierung allerdings: Für jeden Typen wird ein neuer pool angelegt. Auch wenn 2 Typen die selbe Größe haben. Um das zu umgehen, schreiben wir nun eine Klasse small_obj_allocator, die Objekte beliebiger Größe anfordern kann. Sie sammelt dazu lediglich alle pools in einem vector und sucht nach dem mit der passenden Größe.
class small_obj_allocator
{
static bool compare_pool_size(const pool& p, std::size_t size)
{
return p.object_size < size;
}
public:
small_obj_allocator(std::size_t num_objects, std::size_t max_object_size):
num_objects(num_objects),
max_object_size(max_object_size),
allocators(),
last_alloc(0),
last_free(0)
{
}
~small_obj_allocator()
{
std::for_each(allocators.begin(), allocators.end(),
std::mem_fun_ref(&pool::release));
}
void* alloc(std::size_t size)
{
if(size > max_object_size)
{
return ::operator new(size);
}
size = std::max(size, sizeof(pool::block));
if(last_alloc && last_alloc->object_size == size)
{
return last_alloc->alloc();
}
std::vector<pool>::iterator iter =
std::lower_bound(allocators.begin(),
allocators.end(),
size,
compare_pool_size);
if(iter == allocators.end() || iter->object_size != size)
{
iter = allocators.insert(iter, pool());
iter->init(num_objects, size);
last_free = &allocators.front();
}
last_alloc = &*iter;
return last_alloc->alloc();
}
void free(void* p, std::size_t size)
{
if(size > max_object_size)
{
::operator delete(p);
return;
}
if(last_free && last_free->object_size == size)
{
last_free->free(p);
return;
}
std::vector<pool>::iterator iter =
std::lower_bound(allocators.begin(),
allocators.end(),
size,
compare_pool_size);
last_free = &*iter;
last_free->free(p);
}
private:
std::size_t num_objects;
std::size_t max_object_size;
std::vector<pool> allocators;
pool* last_alloc;
pool* last_free;
};
Falls der Code jemandem bekannt vorkommt: Das ist die selbe Strategie, mit der Andrei Alexandrescus SmallObjAllocator die Anfragen an seine Pools in der
Loki-lib weiterleitet. Ich werde hier nicht näher darauf eingehen, nur erklären wie er ungefähr arbeitet: Da man häufig mehrere Objekte hintereinander mit gleicher Größe anfordert oder freigibt, speichert small_obj_allocator den zuletzt verwendeten pool. Ist dieser für die Freigabe oder Allokation nicht zuständig, wird mit einer Binären Suche der passende pool festgestellt. Das ist möglich, da der vector ständig sortiert wird.
Nun müssen wir das noch in unser small_object einbauen – wir sollten dem Benutzer aber die Wahl lassen. Die Suche nach dem passenden pool geht auf Kosten der Laufzeit, auf der anderen Seite benötigt der fixed_size_allocator wegen möglichen Doppelungen von pools mehr Speicher.
Wir testen einfach den an small_object übergebenen typen. Wenn er ein von uns vordefinierter dummy-typ ist, wird der small_obj_allocator verwendet, ansonsten der fixed_size_allocator.
Hier zeigt sich auch der Vorteil der Übergabe des size-Parameters an fixed_size_allocator: small_obj_allocator benötigt die Größe, fixed_size_allocator eigentlich nicht. Dadurch, dass aber beide nun die selbe Schnittstelle besitzen, müssen wir lediglich über ein typedef den Allokatortypen festlegen und der Rest geht von allein. Zusätzlich legen wir noch eine Konstante object_size fest. Bei dem vordefinierten dummy-typen wird sie auf 64 gesetzt, ansonsten auf sizeof(T). Das bedeutet, sobald vom small_obj_allocator ein Speicherbereich abgefragt wird, der größer als 64 Byte ist, wird die Anfrage an den globalen operator new weitergeleitet. Ebenso beim Freigeben an operator delete.
Unser neues small_object-Template:
struct empty_type {};
template<typename T = empty_type>
class small_object
{
typedef is_same<T, empty_type> is_empty_type;
public:
typedef typename if_
<
is_empty_type,
small_obj_allocator,
fixed_size_allocator
>::type allocator_type;
static const std::size_t object_size = is_empty_type::value ? 64 : sizeof(T);
static void* operator new(std::size_t size)
{
return allocator.alloc(size);
}
static void* operator new(std::size_t size, void* p)
{
return p;
}
static void operator delete(void* p, std::size_t size)
{
allocator.free(p, size);
}
private:
static allocator_type allocator;
};
template<typename T>
typename small_object<T>::allocator_type
small_object<T>::allocator(64, small_object<T>::object_size);
is_same ist ein einfaches Template mit einer statischen integralen Konstante, die true ist, wenn beide übergebenen Typen gleich sind, ansonsten false. if_ wertet den ersten Parameter als Bedingung aus, wenn dessen value true ist, wird ein typedef auf den 2. übergebenen Typen gemacht, ansonsten auf den 3. Ein Compile-time-if also.
Hier kurz ihre Definitionen:
template<bool If, typename Then, typename Else>
struct if_c
{
typedef Then type;
};
template<typename Then, typename Else>
struct if_c<false, Then, Else>
{
typedef Else type;
};
template<typename If, typename Then, typename Else>
struct if_ : if_c<If::value, Then, Else> {};
template<typename T, typename U>
struct is_same
{
static const bool value = false;
};
template<typename T>
struct is_same<T, T>
{
static const bool value = true;
};
Ende. Ein einsatzbereiter Small Object Allocator.