Skip to content

integrate-suggested-solution-to-streaming-llm-phase-I-b-training-data #260

@david-thrower

Description

@david-thrower

Add prototype for Phase I-B training with a tf.data.Dataset (from generator)

From #255

TLDR

Stream the operation performed in prepare_data() in https://github.com/david-thrower/cerebros-core-algorithm-alpha/blob/255-copy-of-branch-254-updated-hpo-script-for-cicd-scale-testing/generative-proof-of-concept-CPU-preprocessing-in-memory.py ... with numerous arbitrary constraints not listed here ...

AI Suggested solution

import tensorflow as tf
import numpy as np

# Assuming these are defined globally or passed in
MAX_SEQ_LENGTH = 40
VOCABULARY_SIZE = len(tokenizer)  # e.g., 128260

class SampleExpansionGenerator:
    def __init__(self, raw_text_samples, tokenizer, sample_expansion_batch_size=100):
        self.raw_text_samples = raw_text_samples
        self.tokenizer = tokenizer
        self.sample_expansion_batch_size = sample_expansion_batch_size
        self.data = []
        self.labels = []
        self.current_index = 0

    def _expand_next_batch(self):
        # Determine the next meta-batch
        start_idx = self.current_index
        end_idx = min(start_idx + self.sample_expansion_batch_size, len(self.raw_text_samples))
        if start_idx >= end_idx:
            raise StopIteration("No more raw samples to process.")

        batch_samples = self.raw_text_samples[start_idx:end_idx]
        self.current_index = end_idx

        # Run prepare_data on this batch
        input_ids_list, labels_list, _ = prepare_data(batch_samples, max_seq_length=MAX_SEQ_LENGTH)

        # Assign to internal queues
        self.data = input_ids_list
        self.labels = labels_list

    def __iter__(self):
        return self

    def __next__(self):
        # Check for mismatched state
        if (len(self.data) == 0) != (len(self.labels) == 0):
            raise ValueError("Data and labels queues are out of sync.")

        # If queues are empty, expand next batch
        if len(self.data) == 0:
            self._expand_next_batch()

        # Pop and return one sample
        input_sample = [self.data.pop(0)]  # Nested as per model input spec
        label_sample = [self.labels.pop(0)]  # Nested as per model output spec

        return (input_sample, label_sample)


# Create the tf.data.Dataset
def create_dataset(raw_text_samples, tokenizer, sample_expansion_batch_size=100):
    generator = SampleExpansionGenerator(raw_text_samples, tokenizer, sample_expansion_batch_size)

    dataset = tf.data.Dataset.from_generator(
        lambda: generator,
        output_signature=(
            tf.TensorSpec(shape=(1, MAX_SEQ_LENGTH), dtype=tf.int32),       # Nested input
            tf.TensorSpec(shape=(1, VOCABULARY_SIZE), dtype=tf.float32)   # Nested one-hot label
        )
    )
    return dataset


model = cerebros.get_best_model()

# Create dataset
dataset = create_dataset(non_instruct_samples, tokenizer, sample_expansion_batch_size=5)

# Optional: Batch the dataset for training (e.g., model batch size = 64)
model_batch_size = 64
dataset = dataset.batch(model_batch_size).prefetch(tf.data.AUTOTUNE)

# Train the model
model.fit(dataset, epochs=3)

Metadata

Metadata

Assignees

No one assigned

    Labels

    audience/technicalIssue primarily for technical review and service.kind/enhancementNew feature or requestkind/experimentalExperimental features requiring new CICD tests and other vetting.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions