From 6347a00d4dbe581f6a2427c6c92141516a2a3557 Mon Sep 17 00:00:00 2001 From: isamu-isozaki Date: Mon, 1 Jul 2024 20:53:10 -0400 Subject: [PATCH 1/6] Dynamic generation with outlines --- llm_exl2_client_multi_outlines.py | 648 ++++++++++++++++++------------ 1 file changed, 391 insertions(+), 257 deletions(-) diff --git a/llm_exl2_client_multi_outlines.py b/llm_exl2_client_multi_outlines.py index fc43250..fbfd8a5 100644 --- a/llm_exl2_client_multi_outlines.py +++ b/llm_exl2_client_multi_outlines.py @@ -1,20 +1,12 @@ import asyncio import json import os -import logging import time import configparser import argparse -import tiktoken -import torch -import random from typing import AsyncIterable, List, Generator, Union, Optional import traceback -from typing import Mapping -import requests -import sseclient import subprocess -import re from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware @@ -23,11 +15,11 @@ from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline, TextStreamer, TextIteratorStreamer from threading import Thread import queue -import numpy as np +import traceback +import re + import sys, os -import outlines -from outlines.samplers import multinomial sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from exllamav2 import( @@ -39,11 +31,11 @@ ExLlamaV2Tokenizer, ) -from exllamav2.generator import ( - ExLlamaV2StreamingGenerator, - ExLlamaV2Sampler -) +from exllamav2.generator import ExLlamaV2DynamicGenerator, ExLlamaV2DynamicJob, ExLlamaV2Sampler import uuid +from blessed import Terminal +import textwrap +from outlines.integrations.exllamav2 import RegexFilter, TextFilter, JSONFilter, ChoiceFilter def generate_unique_id(): return uuid.uuid4() @@ -96,13 +88,16 @@ class ChatCompletionRequest(BaseModel): # Add argument for port with default type as integer parser.add_argument('--port', type=int, help='Port to run the server on.') -parser.add_argument('--use_outlines', action='store_true', help='Use outlines.') -parser.add_argument('--gpu_split', type=str, default="17,19,19,19", help='GPU splits.') -parser.add_argument('--max_context', type=int, default=12288, help='Context length.') -parser.add_argument('--cache_8bit', action='store_true', help='Use 8 bit cache.') -parser.add_argument('--cache_q4', action='store_true', help='Use 4 bit cache.') +parser.add_argument('--max_context', type=int, default=8192, help='Context length.') parser.add_argument('--repo_str', type=str, default='llama3-70b-instruct', help='The model repository name') -parser.add_argument('--outlines_device', type=int, default=2, help='The cuda device to which the outlines device is set') +parser.add_argument('--total_context', type=int, default=32768, help="Total context length") +parser.add_argument('--max_chunk_size', type=int, default=2048, help='Max chunk size.') +parser.add_argument('--max_new_tokens', type=int, default=2048, help='Max new tokens.') +parser.add_argument('--display_mode', type=int, default=1, help='Display mode.') +parser.add_argument('--use_draft_model', action="store_true", help='Do speculative decoding') +parser.add_argument('--not_paged', action="store_true", help='Do not do paged attention') + + # Parse the arguments args = parser.parse_args() @@ -115,87 +110,246 @@ class ChatCompletionRequest(BaseModel): host = config.get('settings', 'host') port = args.port if args.port is not None else config.getint('settings', 'port') +class StatusArea: + def __init__(self, num_lines): + self.num_lines = num_lines + self.messages = [""] * num_lines + + def update(self, message, line=None): + if line is not None: + # Update a specific line + if 0 <= line < self.num_lines: + self.messages[line] = message + else: + # Handle multi-line message + lines = message.split('\n') + if len(lines) > self.num_lines: + # Truncate to last num_lines if exceeds num_lines + lines = lines[-self.num_lines:] + + # Update messages, padding with empty strings if necessary + self.messages = lines + [""] * (self.num_lines - len(lines)) + + self.display() + + def display(self): + for i, message in enumerate(self.messages): + wrapped_message = textwrap.shorten(message, width=term.width, placeholder="...") + print(term.move_xy(0, i) + term.clear_eol + wrapped_message) + + # Move cursor below the status area + print(term.move_xy(0, self.num_lines), end='', flush=True) + + +class JobStatusDisplay: + + def __init__(self, job, status_lines): + #self.console_line = console_line + status_lines + self.console_line = None + self.job = job + self.prefill = 0 + self.max_prefill = 0 + self.collected_output = "" + self.tokens = 0 + self.spaces = " " * 150 + self.status_lines = status_lines + self.display_text = "" + #text = term.black(f"{self.console_line:3}:") + #text += term.blue("enqueued") + #print(term.move_xy(0, self.console_line) + text) -# only allow one client at a time -busy = False -condition = asyncio.Condition() + def update_position(self, index): + self.console_line = self.status_lines + index + self.init_display_text() -config = ExLlamaV2Config() -config.model_dir = repo_id -config.prepare() + def init_display_text(self): + self.display_text = term.black(f"{self.console_line:3}:") + term.blue("enqueued") -use_dynamic_rope_scaling = False -dynamic_rope_mult = 1.5 -dynamic_rope_offset = 0.0 -ropescale = 1.0 + def update(self, r): + if self.console_line is None: + return # Skip update if position hasn't been set yet + stage = r["stage"] + stage = r.get("eos_reason", stage) + + self.collected_output += r.get("text", "").replace("\n", "\\n") + + token_ids = r.get("token_ids", None) + if token_ids is not None: self.tokens += token_ids.shape[-1] + + self.prefill = r.get("curr_progress", self.prefill) + self.max_prefill = r.get("max_progress", self.max_prefill) + + text = term.black(f"{self.console_line:3}:") + text += term.blue(f"{stage:16}") + text += "prefill [ " + term.yellow(f"{self.prefill: 5} / {self.max_prefill: 5}")+" ]" + text += " " + text += term.green(f"{self.tokens: 5} t") + text += term.black(" -> ") + text += (self.spaces + self.collected_output)[-150:].replace("\t", " ") + + if "accepted_draft_tokens" in r: + acc = r["accepted_draft_tokens"] + rej = r["rejected_draft_tokens"] + eff = acc / (acc + rej) * 100.0 + text += term.bright_magenta(f" SD eff.: {eff:6.2f}%") + + #print(term.move_xy(0, self.console_line) + text) + self.display_text = text + + def display(self): + if self.console_line is not None: + print(term.move_xy(0, self.console_line) + self.display_text) + + +config = configparser.ConfigParser() +config.read('config.ini') + +repo_id = config.get(repo_str, 'repo') +specrepo_id = config.get(repo_str, 'specrepo') +host = config.get('settings', 'host') + +port = args.port if args.port is not None else config.getint('settings', 'port') +# Display modes for this demo: +# 1: One line per job, updated continuously +# 2: Print completions as jobs finish +# 3: Step over output iteration by iteration +# 4: Space heater mode (no output) +display_mode = args.display_mode + +# Whether to use paged mode or not. The generator is very handicapped in unpaged mode, does not support batching +# or CFG, but it will work without flash-attn 2.5.7+ +paged = not args.not_paged + +# Where to find our model +model_dir = repo_id + +# Total number of tokens to allocate space for. This is not the max_seq_len supported by the model but +# the total to distribute dynamically over however many jobs are active at once +total_context = total_context = args.total_context + +# Max individual context max_context = args.max_context -config.scale_alpha_value = ropescale -config.max_seq_len = max_context -base_model_native_max = 8192 -cache_8bit = args.cache_8bit -cache_q4 = args.cache_q4 - -if args.use_outlines: - model = outlines.models.exl2( - config.model_dir, - f"cuda:{args.outlines_device}", - max_seq_len = config.max_seq_len, - scale_pos_emb = config.scale_pos_emb, - scale_alpha_value = config.scale_alpha_value, - no_flash_attn = config.no_flash_attn, - num_experts_per_token = config.num_experts_per_token, - cache_8bit = cache_8bit, - cache_q4 = cache_q4, - tokenizer_kwargs = {}, - gpu_split = args.gpu_split, # we might be able to make this auto - low_mem = None, - verbose = None + +# N-gram or draft model speculative decoding. Largely detrimental to performance at higher batch sizes. +use_ngram = False +use_draft_model = args.use_draft_model +if use_draft_model: + model_dir = repo_id + draft_model_dir = specrepo_id + +# Max number of batches to run at once, assuming the sequences will fit within total_context. +max_batch_size = 4 if paged else 1 + +# Max chunk size. Determines the size of prefill operations. Can be reduced to reduce pauses whenever a +# new job is started, but at the expense of overall prompt ingestion speed. +max_chunk_size = args.max_chunk_size + +# Max new tokens per completion. For this example applies to all jobs. +max_new_tokens = args.max_new_tokens + +# Demonstrate token healing +healing = True + + +term = Terminal() + +if use_draft_model: + + draft_config = ExLlamaV2Config(draft_model_dir) + draft_config.scale_alpha_value = 6.0 + draft_config.max_seq_len = max_context + draft_model = ExLlamaV2(draft_config) + + draft_cache = ExLlamaV2Cache_Q4( + draft_model, + max_seq_len = total_context, + lazy = True ) + + draft_model.load_autosplit(draft_cache, progress = True) + else: - model = ExLlamaV2(config) -print("Loading model: " + repo_id) -#cache = ExLlamaV2Cache(model, lazy=True, max_seq_len = 20480) -#model.load_autosplit(cache) -if not args.use_outlines: - model.load([int(gpu_memory) for gpu_memory in args.gpu_split.split(",")]) + draft_model = None + draft_cache = None + +# Create config. We use the default max_batch_size of 1 for the model and the default max_input_len of +# 2048, which will also be the limit of the chunk size for prefill used by the dynamic generator. + +config = ExLlamaV2Config(model_dir) +config.max_input_len = max_chunk_size +config.max_attention_size = max_chunk_size ** 2 + +#ropescale = 2.5 +#config.scale_alpha_value = ropescale +config.max_seq_len = max_context +model = ExLlamaV2(config) + +# Configure the cache. The dynamic generator expects a batch size of 1 and a max_seq_len equal to +# the total number of cached tokens. The flat cache will be split dynamically + +cache = ExLlamaV2Cache_Q4( + model, + max_seq_len = total_context, + lazy = True +) + +model.load_autosplit(cache, progress = True) +# Also, tokenizer + +print("Loading tokenizer...") tokenizer = ExLlamaV2Tokenizer(config) +hf_tokenizer_kwargs = {} +hf_tokenizer_kwargs.setdefault("padding_side", "left") +hf_tokenizer = AutoTokenizer.from_pretrained(model_dir, **hf_tokenizer_kwargs) + + + +# Model Merge +#model = ExLlamaV2MergePassthrough(model) -# Cache mode +#lora_directory = "../Documents/trained_llama3_lr2e4_r64/" +#lora = ExLlamaV2Lora.from_directory(model, lora_directory) +lora = None +#cache = ExLlamaV2Cache_Q4( +# model, +# max_seq_len = total_context, + #lazy = True +#) -settings_proto = ExLlamaV2Sampler.Settings() -settings_proto.temperature = 0 -settings_proto.top_k = 50 -settings_proto.top_p = 0.8 -settings_proto.top_a = 0.0 -settings_proto.token_repetition_penalty = 1.1 -#settings.disallow_tokens(tokenizer, [tokenizer.eos_token_id]) +# Initialize the generator + +generator = ExLlamaV2DynamicGenerator( + model = model, + cache = cache, + draft_model = draft_model, + draft_cache = draft_cache, + tokenizer = tokenizer, + max_batch_size = max_batch_size, + use_ngram_draft = use_ngram, + max_chunk_size = max_chunk_size, + paged = paged, +) + +if lora is not None: + generator.set_loras(lora) # Active sequences and corresponding caches and settings prompts = queue.Queue() responses = {} - input_ids = [] prompt_length = [] -prompt_ids = [] -streamer = [] -caches = [] -input_prompts = [] -generators = [] -generations = [] -settings = [] -future_tokens = [] -future_logits = [] -sin_arr = [] -cos_arr = [] - # Global variable for storing partial responses partial_responses = {} -max_parallel_seqs = 3 -num_of_gpus = len(args.gpu_split.split(",")) +# Create jobs +STATUS_LINES = 40 # Number of lines to dedicate for status messages +LLM_LINES = max_batch_size +status_area = StatusArea(STATUS_LINES) +displays = {} +prompt_ids2jobs = {} print("*** Loaded.. now Inference...:") @@ -205,12 +359,7 @@ def __init__(self, app): self.app = app async def __call__(self, scope, receive, send): - global generators - global prompt_ids - global input_prompts - global generations - global caches - global streamer + global prompt_ids2jobs if scope["type"] != "http": await self.app(scope, receive, send) return @@ -248,15 +397,8 @@ async def message_poller(sentinel, handler_task): # TODO: FIgure out how to get prompt id that disconnected while len(cancelled_request_ids) > 0: cancelled_id = cancelled_request_ids.pop() - for i in range(len(prompt_ids)): - if cancelled_id == prompt_ids[i]: - break - generators.pop(i) - prompt_ids.pop(i) - input_prompts.pop(i) - generations.pop(i) - caches.pop(i) - streamer.pop(i) + generator.cancel(prompt_ids2jobs[cancelled_id]) + del prompt_ids2jobs[cancelled_id] app = FastAPI(title="EXL2") @@ -280,178 +422,175 @@ async def stream_response(prompt_id, timeout=180): yield f'data: {{"id":"chatcmpl-{prompt_id}","object":"chat.completion.chunk","created":{int(time.time())},"model":"{repo_str}","choices":[{{"index":0,"delta":{{}},"finish_reason":"stop"}}]}}\n\n' break -def process_eos(i): - global generators - global prompt_ids - global input_prompts - global generations - global caches - global streamer - output = generations[i].strip() - prompt = input_prompts[i] - #output = tokenizer.decode(input_ids[i])[0] - print("-----") - print(output) - generated_text = output - # Calculate token counts - completion_tokens = (tokenizer.encode(generated_text)).shape[-1] - prompt_tokens = (tokenizer.encode(prompt)).shape[-1] - full_tokens = completion_tokens + prompt_tokens - eos_prompt_id = prompt_ids.pop(i) - if(streamer[i]): - ## Generator, yield here.. - partial_response_data = { - "finish_reason": "stop" - } - - responses[eos_prompt_id] = partial_response_data - else:# Construct the response based on the format - response_data = { - "id": f"chatcmpl-{eos_prompt_id}", - "object": "chat.completion", - "created": int(time.time()), - "model": repo_str, - "choices": [{ - "index": 0, - "message": { - "role": "assistant", - "content": generated_text, - }, - "finish_reason": "stop" - }], - "usage": { - "prompt_tokens": prompt_tokens, - "completion_tokens": completion_tokens, - "total_tokens": full_tokens - } - } - responses[eos_prompt_id] = response_data - # Clean up - generators.pop(i) - input_prompts.pop(i) - generations.pop(i) - caches.pop(i) - streamer.pop(i) -# Worker thread function -def process_outline_prompts(): + +def process_prompts(): global partial_responses - global generators - global prompt_ids - global input_prompts - global generations - global caches - global streamer - assert args.use_outlines - assert not use_dynamic_rope_scaling, "Currently ROPE scaling is not supported with outlines" - base_model = model.model - while True: - try: - while not prompts.empty() or len(prompt_ids): - while len(prompt_ids) < max_parallel_seqs and not prompts.empty(): + global prompt_ids2jobs + try: + + while True: + while not prompts.empty() or len(input_ids): + while not prompts.empty(): prompt_id, prompt, max_tokens, stream, temperature, outlines_dict = prompts.get() - print(f"got prompt with outlines dict {outlines_dict}") - sampler = multinomial(top_k=50, top_p=1.0, temperature=temperature) - ids = tokenizer.encode(prompt) + stop_at = outlines_dict.get("stop_at", None) + if outlines_dict["type"] == "choices": + filters = [ChoiceFilter(outlines_dict["choices"], hf_tokenizer)] + elif outlines_dict["type"] == "json": + filters = [JSONFilter(outlines_dict["json"], hf_tokenizer)] + elif outlines_dict["type"] == "regex": + filters = [RegexFilter(outlines_dict["regex"], hf_tokenizer)] + else: + filters = [] + ids = tokenizer.encode(prompt, encode_special_tokens = True) prompt_tokens = ids.shape[-1] - max_tokens=min(max_tokens, max_context-prompt_tokens-1) - full_tokens = prompt_tokens + max_tokens - print("Processing prompt: " + str(prompt_id) + " Req tokens: " + str(full_tokens)) + new_tokens = prompt_tokens + max_tokens + #print("Processing prompt: " + str(prompt_id) + " Req tokens: " + str(new_tokens)) + status_area.update(f"Processing prompt: {prompt_id} Req tokens: {new_tokens}", line=STATUS_LINES-1) # Truncate if new_tokens exceed max_context - if full_tokens >= max_context: + if new_tokens > max_context: # Calculate how many tokens to truncate ids = tokenizer.encode("Say, 'Prompt exceeds allowed length. Please try again.'") # Update new_tokens after truncation prompt_tokens = ids.shape[-1] - full_tokens = prompt_tokens + max_tokens - print("Truncating prompt: " + str(prompt_id) + " Req tokens: " + str(full_tokens)) - if cache_8bit: - ncache = ExLlamaV2Cache_8bit(base_model, lazy=not base_model.loaded, max_seq_len = full_tokens) # (max_seq_len could be different for each cache) - elif cache_q4: - ncache = ExLlamaV2Cache_Q4(base_model, lazy=not base_model.loaded, max_seq_len = full_tokens) - else: - ncache = ExLlamaV2Cache(base_model, lazy=not base_model.loaded, max_seq_len = full_tokens) # (max_seq_len could be different for each cache) - model.cache = ncache - model.past_seq = None - stop_at = outlines_dict.get("stop_at", None) - if outlines_dict["type"] == "choices": - generator = outlines.generate.choice(model, outlines_dict["choices"], sampler=sampler) - elif outlines_dict["type"] == "json": - generator = outlines.generate.json(model, outlines_dict["json"], sampler=sampler) - elif outlines_dict["type"] == "regex": - generator = outlines.generate.regex(model, outlines_dict["regex"], sampler=sampler) - else: - generator = outlines.generate.text(model, sampler=sampler) - generators.append(generator.stream(prompt, stop_at=stop_at, max_tokens=max_tokens)) - prompt_ids.append(prompt_id) - input_prompts.append(prompt) - generations.append("") - caches.append(ncache) - streamer.append(stream) - print(len(generators), len(prompt_ids), len(input_prompts), len(generations), len(caches), len(streamer)) - if(len(prompt_ids)): - eos = [] - for i in range(len(prompt_ids)): - model.cache = caches[i] - is_finished = False - decoded_response_token = "" - try: - decoded_response_token = next(generators[i]) - generations[i] += decoded_response_token - except Exception as e: - print(e) - is_finished = True - reason = None - if(streamer[i]): - outcontent = decoded_response_token - if is_finished: - outcontent = "" - reason = "stop" + new_tokens = prompt_tokens + max_tokens + print("Truncating prompt: " + str(prompt_id) + " Req tokens: " + str(new_tokens)) + prompt_length.append(prompt_tokens) + input_ids.append(ids) + #streamer.append(stream) + #prompt_ids.append(prompt_id) + + job = ExLlamaV2DynamicJob( + input_ids = ids, + max_new_tokens = max_tokens, + stop_conditions = [tokenizer.eos_token_id] if stop_at is None else [tokenizer.eos_token_id, stop_at], + gen_settings = ExLlamaV2Sampler.Settings(), + filters = filters, + token_healing = healing + ) + + job.prompt_length = prompt_tokens + job.input_ids = ids + job.streamer = stream + job.prompt_ids = prompt_id + job.stop_at = stop_at + + generator.enqueue(job) + #displays = { job: JobStatusDisplay(job, line, STATUS_LINES) for line, job in enumerate(jobs) } + displays[job] = JobStatusDisplay(job, STATUS_LINES) + + for index, (job, display) in enumerate(list(displays.items())): + display.update_position(index%LLM_LINES) # Set position before updating + prompt_ids2jobs[prompt_id] = job + results = generator.iterate() + for r in results: + job = r["job"] + displays[job].update(r) + displays[job].display() + stage = r["stage"] + stage = r.get("eos_reason", stage) + outcontent = r.get("text", "") + reason = None + if(job.streamer): + if r["eos"] and job.stop_at is not None: + outcontent += job.stop_at + partial_response_data = { + "id": f"chatcmpl-{job.prompt_ids}", + "object": "chat.completion.chunk", + "created": int(time.time()), + "model": repo_str, + "choices": [ + { + "index": 0, + "delta": { + "content": outcontent + }, + "finish_reason": reason + } + ] + } + + # Initialize a list for new prompt_id or append to existing one + if job.prompt_ids not in partial_responses: + partial_responses[job.prompt_ids] = [] + partial_responses[job.prompt_ids].append(partial_response_data) + + if r['eos'] == True: + total_time = r['time_generate'] + total_tokens = r['new_tokens'] + tokens_per_second = total_tokens / total_time if total_time > 0 else 0 + status_area.update(f"EOS detected: {stage}, Generated Tokens: {total_tokens}, Tokens per second: {tokens_per_second}/s", line=STATUS_LINES-2) + + #generated_part = job.input_ids[:, job.prompt_length:] + #output = tokenizer.decode(generated_part[0]).strip() + #output = tokenizer.decode(input_ids[i])[0] + generated_text = r['full_completion'] + + # Calculate token counts + completion_tokens_old = (tokenizer.encode(generated_text)).shape[-1] + + completion_tokens = r['new_tokens'] + prompt_tokens = r['prompt_tokens'] + + full_tokens = completion_tokens + prompt_tokens + status_area.update(f"Completion Tokens: {completion_tokens_old}, New Completion Tokens: {completion_tokens}", line=STATUS_LINES-3) + + + eos_prompt_id = job.prompt_ids + if(job.streamer): + ## Generator, yield here.. partial_response_data = { - "id": f"chatcmpl-{prompt_ids[i]}", - "object": "chat.completion.chunk", + "finish_reason": "stop" + } + + responses[eos_prompt_id] = partial_response_data + else:# Construct the response based on the format + if job.stop_at is not None: + generated_text += job.stop_at + response_data = { + "id": f"chatcmpl-{prompt_id}", + "object": "chat.completion", "created": int(time.time()), "model": repo_str, - "choices": [ - { - "index": 0, - "delta": { - "content": outcontent - }, - "finish_reason": reason - } - ] + "choices": [{ + "index": 0, + "message": { + "role": "assistant", + "content": generated_text, + }, + "finish_reason": "stop" + }], + "usage": { + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "total_tokens": full_tokens + } } + responses[eos_prompt_id] = response_data + del prompt_ids2jobs[eos_prompt_id] - # Initialize a list for new prompt_id or append to existing one - if prompt_ids[i] not in partial_responses: - partial_responses[prompt_ids[i]] = [] - partial_responses[prompt_ids[i]].append(partial_response_data) - - if is_finished: - eos.insert(0, i) - - # Generate and store response - for i in eos: - process_eos(i) else: # Sleep for a short duration when there's no work time.sleep(0.1) # Sleep for 100 milliseconds - except Exception as e: - for i in range(len(prompt_ids)): - process_eos(i) - generators = [] - prompt_ids = [] - input_prompts = [] - generations = [] - caches = [] - streamer = [] - print("Reset server due to ", e) - - + except Exception as e: + print("Reset server due to ", e) + print(traceback.format_exc()) + for prompt_id in prompt_ids2jobs: + job = prompt_ids2jobs[prompt_id] + if(job.streamer): + ## Generator, yield here.. + partial_response_data = { + "finish_reason": "stop" + } + + responses[prompt_id] = partial_response_data + else: + print("Error handling for full generation current not implemented") + generator.cancel(job) + prompt_ids2jobs = {} # Start worker thread -worker = Thread(target=process_outline_prompts) +worker = Thread(target=process_prompts) worker.start() @@ -673,10 +812,8 @@ async def mainchat(request: ChatCompletionRequest): outlines_dict["stop_at"] = request.stop_at if request.outlines_type is not None: outlines_dict["type"] = request.outlines_type - elif args.use_outlines: - outlines_dict["type"] = "text" else: - raise Exception("Enable outlines") + outlines_dict["type"] = "text" if outlines_dict["type"] == "choices": assert request.choices is not None outlines_dict["choices"] = request.choices @@ -687,11 +824,8 @@ async def mainchat(request: ChatCompletionRequest): assert request.regex is not None outlines_dict["regex"] = request.regex else: - assert (outlines_dict["type"] == "text") or not args.outlines - if not args.use_outlines: - prompts.put((prompt_id, prompt, request.max_tokens, request.stream, request.temperature)) - else: - prompts.put((prompt_id, prompt, request.max_tokens, request.stream, request.temperature, outlines_dict)) + assert outlines_dict["type"] == "text" + prompts.put((prompt_id, prompt, request.max_tokens, request.stream, request.temperature, outlines_dict)) if request.stream: #response = StreamingResponse(streaming_request(prompt, request.max_tokens, tempmodel=repo_str, response_format='chat_completion'), media_type="text/event-stream") From 6062beb4121f7317a2ad50c05238cc6edd726064 Mon Sep 17 00:00:00 2001 From: isamu-isozaki Date: Wed, 3 Jul 2024 15:43:41 -0400 Subject: [PATCH 2/6] Change name --- llm_exl2_client_multi_outlines.py | 882 ------------------------------ llm_exl2_dynamic_gen.py | 614 ++++++++++++++------- 2 files changed, 427 insertions(+), 1069 deletions(-) delete mode 100644 llm_exl2_client_multi_outlines.py diff --git a/llm_exl2_client_multi_outlines.py b/llm_exl2_client_multi_outlines.py deleted file mode 100644 index fbfd8a5..0000000 --- a/llm_exl2_client_multi_outlines.py +++ /dev/null @@ -1,882 +0,0 @@ -import asyncio -import json -import os -import time -import configparser -import argparse -from typing import AsyncIterable, List, Generator, Union, Optional -import traceback -import subprocess - -from fastapi import FastAPI, HTTPException -from fastapi.middleware.cors import CORSMiddleware -from fastapi.responses import StreamingResponse -from pydantic import BaseModel -from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline, TextStreamer, TextIteratorStreamer -from threading import Thread -import queue -import traceback -import re - - -import sys, os -sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - -from exllamav2 import( - ExLlamaV2, - ExLlamaV2Config, - ExLlamaV2Cache, - ExLlamaV2Cache_8bit, - ExLlamaV2Cache_Q4, - ExLlamaV2Tokenizer, -) - -from exllamav2.generator import ExLlamaV2DynamicGenerator, ExLlamaV2DynamicJob, ExLlamaV2Sampler -import uuid -from blessed import Terminal -import textwrap -from outlines.integrations.exllamav2 import RegexFilter, TextFilter, JSONFilter, ChoiceFilter - -def generate_unique_id(): - return uuid.uuid4() - -class CompletionRequest(BaseModel): - model: str - prompt: Union[str, List[str]] - stop: Optional[Union[str, List[str]]] = None - max_tokens: Optional[int] = 100 # default value of 100 - temperature: Optional[float] = 0.0 # default value of 0.0 - stream: Optional[bool] = False # default value of False - best_of: Optional[int] = 1 - echo: Optional[bool] = False - frequency_penalty: Optional[float] = 0.0 # default value of 0.0 - presence_penalty: Optional[float] = 0.0 # default value of 0.0 - log_probs: Optional[int] = 0 # default value of 0.0 - n: Optional[int] = 1 # default value of 1, batch size - suffix: Optional[str] = None - top_p: Optional[float] = 0.0 # default value of 0.0 - user: Optional[str] = None - -class Message(BaseModel): - role: str - content: str - -class ChatCompletionRequest(BaseModel): - model: str - messages: List[Message] - stop: Optional[Union[str, List[str]]] = None - max_tokens: Optional[int] = 100 # default value of 100 - temperature: Optional[float] = 0.0 # default value of 0.0 - stream: Optional[bool] = False # default value of False - frequency_penalty: Optional[float] = 0.0 # default value of 0.0 - presence_penalty: Optional[float] = 0.0 # default value of 0.0 - log_probs: Optional[int] = 0 # default value of 0.0 - n: Optional[int] = 1 # default value of 1, batch size - top_p: Optional[float] = 0.0 # default value of 0.0 - user: Optional[str] = None - stop_at: Optional[str] = None - outlines_type: Optional[str] = None - choices: Optional[list[str]] = None - regex: Optional[str] = None - json: Optional[str] = None - request_id: Optional[str] = None - partial_generation: Optional[str] = None - -#repo_str = 'theprofessor-exl2-speculative' - -parser = argparse.ArgumentParser(description='Run server with specified port.') - -# Add argument for port with default type as integer -parser.add_argument('--port', type=int, help='Port to run the server on.') -parser.add_argument('--max_context', type=int, default=8192, help='Context length.') -parser.add_argument('--repo_str', type=str, default='llama3-70b-instruct', help='The model repository name') -parser.add_argument('--total_context', type=int, default=32768, help="Total context length") -parser.add_argument('--max_chunk_size', type=int, default=2048, help='Max chunk size.') -parser.add_argument('--max_new_tokens', type=int, default=2048, help='Max new tokens.') -parser.add_argument('--display_mode', type=int, default=1, help='Display mode.') -parser.add_argument('--use_draft_model', action="store_true", help='Do speculative decoding') -parser.add_argument('--not_paged', action="store_true", help='Do not do paged attention') - - - -# Parse the arguments -args = parser.parse_args() -repo_str = args.repo_str - -config = configparser.ConfigParser() -config.read('config.ini') - -repo_id = config.get(repo_str, 'repo') -host = config.get('settings', 'host') - -port = args.port if args.port is not None else config.getint('settings', 'port') -class StatusArea: - def __init__(self, num_lines): - self.num_lines = num_lines - self.messages = [""] * num_lines - - def update(self, message, line=None): - if line is not None: - # Update a specific line - if 0 <= line < self.num_lines: - self.messages[line] = message - else: - # Handle multi-line message - lines = message.split('\n') - if len(lines) > self.num_lines: - # Truncate to last num_lines if exceeds num_lines - lines = lines[-self.num_lines:] - - # Update messages, padding with empty strings if necessary - self.messages = lines + [""] * (self.num_lines - len(lines)) - - self.display() - - def display(self): - for i, message in enumerate(self.messages): - wrapped_message = textwrap.shorten(message, width=term.width, placeholder="...") - print(term.move_xy(0, i) + term.clear_eol + wrapped_message) - - # Move cursor below the status area - print(term.move_xy(0, self.num_lines), end='', flush=True) - - -class JobStatusDisplay: - - def __init__(self, job, status_lines): - #self.console_line = console_line + status_lines - self.console_line = None - self.job = job - self.prefill = 0 - self.max_prefill = 0 - self.collected_output = "" - self.tokens = 0 - self.spaces = " " * 150 - self.status_lines = status_lines - self.display_text = "" - #text = term.black(f"{self.console_line:3}:") - #text += term.blue("enqueued") - #print(term.move_xy(0, self.console_line) + text) - - def update_position(self, index): - self.console_line = self.status_lines + index - self.init_display_text() - - def init_display_text(self): - self.display_text = term.black(f"{self.console_line:3}:") + term.blue("enqueued") - - - def update(self, r): - if self.console_line is None: - return # Skip update if position hasn't been set yet - stage = r["stage"] - stage = r.get("eos_reason", stage) - - self.collected_output += r.get("text", "").replace("\n", "\\n") - - token_ids = r.get("token_ids", None) - if token_ids is not None: self.tokens += token_ids.shape[-1] - - self.prefill = r.get("curr_progress", self.prefill) - self.max_prefill = r.get("max_progress", self.max_prefill) - - text = term.black(f"{self.console_line:3}:") - text += term.blue(f"{stage:16}") - text += "prefill [ " + term.yellow(f"{self.prefill: 5} / {self.max_prefill: 5}")+" ]" - text += " " - text += term.green(f"{self.tokens: 5} t") - text += term.black(" -> ") - text += (self.spaces + self.collected_output)[-150:].replace("\t", " ") - - if "accepted_draft_tokens" in r: - acc = r["accepted_draft_tokens"] - rej = r["rejected_draft_tokens"] - eff = acc / (acc + rej) * 100.0 - text += term.bright_magenta(f" SD eff.: {eff:6.2f}%") - - #print(term.move_xy(0, self.console_line) + text) - self.display_text = text - - def display(self): - if self.console_line is not None: - print(term.move_xy(0, self.console_line) + self.display_text) - - -config = configparser.ConfigParser() -config.read('config.ini') - -repo_id = config.get(repo_str, 'repo') -specrepo_id = config.get(repo_str, 'specrepo') -host = config.get('settings', 'host') - -port = args.port if args.port is not None else config.getint('settings', 'port') -# Display modes for this demo: -# 1: One line per job, updated continuously -# 2: Print completions as jobs finish -# 3: Step over output iteration by iteration -# 4: Space heater mode (no output) -display_mode = args.display_mode - -# Whether to use paged mode or not. The generator is very handicapped in unpaged mode, does not support batching -# or CFG, but it will work without flash-attn 2.5.7+ -paged = not args.not_paged - -# Where to find our model -model_dir = repo_id - -# Total number of tokens to allocate space for. This is not the max_seq_len supported by the model but -# the total to distribute dynamically over however many jobs are active at once -total_context = total_context = args.total_context - -# Max individual context -max_context = args.max_context - -# N-gram or draft model speculative decoding. Largely detrimental to performance at higher batch sizes. -use_ngram = False -use_draft_model = args.use_draft_model -if use_draft_model: - model_dir = repo_id - draft_model_dir = specrepo_id - -# Max number of batches to run at once, assuming the sequences will fit within total_context. -max_batch_size = 4 if paged else 1 - -# Max chunk size. Determines the size of prefill operations. Can be reduced to reduce pauses whenever a -# new job is started, but at the expense of overall prompt ingestion speed. -max_chunk_size = args.max_chunk_size - -# Max new tokens per completion. For this example applies to all jobs. -max_new_tokens = args.max_new_tokens - -# Demonstrate token healing -healing = True - - -term = Terminal() - -if use_draft_model: - - draft_config = ExLlamaV2Config(draft_model_dir) - draft_config.scale_alpha_value = 6.0 - draft_config.max_seq_len = max_context - draft_model = ExLlamaV2(draft_config) - - draft_cache = ExLlamaV2Cache_Q4( - draft_model, - max_seq_len = total_context, - lazy = True - ) - - draft_model.load_autosplit(draft_cache, progress = True) - -else: - - draft_model = None - draft_cache = None - -# Create config. We use the default max_batch_size of 1 for the model and the default max_input_len of -# 2048, which will also be the limit of the chunk size for prefill used by the dynamic generator. - -config = ExLlamaV2Config(model_dir) -config.max_input_len = max_chunk_size -config.max_attention_size = max_chunk_size ** 2 - -#ropescale = 2.5 -#config.scale_alpha_value = ropescale -config.max_seq_len = max_context -model = ExLlamaV2(config) - -# Configure the cache. The dynamic generator expects a batch size of 1 and a max_seq_len equal to -# the total number of cached tokens. The flat cache will be split dynamically - -cache = ExLlamaV2Cache_Q4( - model, - max_seq_len = total_context, - lazy = True -) - -model.load_autosplit(cache, progress = True) -# Also, tokenizer - -print("Loading tokenizer...") -tokenizer = ExLlamaV2Tokenizer(config) -hf_tokenizer_kwargs = {} -hf_tokenizer_kwargs.setdefault("padding_side", "left") -hf_tokenizer = AutoTokenizer.from_pretrained(model_dir, **hf_tokenizer_kwargs) - - - -# Model Merge -#model = ExLlamaV2MergePassthrough(model) - -#lora_directory = "../Documents/trained_llama3_lr2e4_r64/" -#lora = ExLlamaV2Lora.from_directory(model, lora_directory) -lora = None - -#cache = ExLlamaV2Cache_Q4( -# model, -# max_seq_len = total_context, - #lazy = True -#) - -# Initialize the generator - -generator = ExLlamaV2DynamicGenerator( - model = model, - cache = cache, - draft_model = draft_model, - draft_cache = draft_cache, - tokenizer = tokenizer, - max_batch_size = max_batch_size, - use_ngram_draft = use_ngram, - max_chunk_size = max_chunk_size, - paged = paged, -) - -if lora is not None: - generator.set_loras(lora) - -# Active sequences and corresponding caches and settings -prompts = queue.Queue() -responses = {} -input_ids = [] -prompt_length = [] -# Global variable for storing partial responses -partial_responses = {} - -# Create jobs -STATUS_LINES = 40 # Number of lines to dedicate for status messages -LLM_LINES = max_batch_size -status_area = StatusArea(STATUS_LINES) -displays = {} -prompt_ids2jobs = {} - -print("*** Loaded.. now Inference...:") - -# take from https://github.com/tiangolo/fastapi/discussions/11360 -class RequestCancelledMiddleware: - def __init__(self, app): - self.app = app - - async def __call__(self, scope, receive, send): - global prompt_ids2jobs - if scope["type"] != "http": - await self.app(scope, receive, send) - return - - # Let's make a shared queue for the request messages - queue = asyncio.Queue() - cancelled_request_ids = [] - async def message_poller(sentinel, handler_task): - nonlocal queue - request_id = str(generate_unique_id()) - while True: - message = await receive() - print(message) - if "body" in message: - message["body"] = json.loads(message["body"].decode('utf8')) - message["body"]["request_id"] = request_id - message["body"] = str.encode(json.dumps(message["body"])) - print(message) - if message["type"] == "http.disconnect": - cancelled_request_ids.append(request_id) - handler_task.cancel() - return sentinel # Break the loop - - # Puts the message in the queue - await queue.put(message) - - sentinel = object() - handler_task = asyncio.create_task(self.app(scope, queue.get, send)) - asyncio.create_task(message_poller(sentinel, handler_task)) - - try: - return await handler_task - except asyncio.CancelledError: - print("Cancelling request due to disconnect") - # TODO: FIgure out how to get prompt id that disconnected - while len(cancelled_request_ids) > 0: - cancelled_id = cancelled_request_ids.pop() - generator.cancel(prompt_ids2jobs[cancelled_id]) - del prompt_ids2jobs[cancelled_id] - - -app = FastAPI(title="EXL2") -app.add_middleware(RequestCancelledMiddleware) - -async def stream_response(prompt_id, timeout=180): - global partial_responses - while True: - await asyncio.sleep(0.05) # Sleep to yield control to the event loop - - # Check if prompt_id exists in partial_responses - if prompt_id in partial_responses: - # Stream partial responses - while partial_responses[prompt_id]: - response_chunk = partial_responses[prompt_id].pop(0) - yield f"data: {json.dumps(response_chunk)}\n\n" - - # Check for final response or timeout - if prompt_id in responses: - final_response = responses.pop(prompt_id) - yield f'data: {{"id":"chatcmpl-{prompt_id}","object":"chat.completion.chunk","created":{int(time.time())},"model":"{repo_str}","choices":[{{"index":0,"delta":{{}},"finish_reason":"stop"}}]}}\n\n' - break - - -def process_prompts(): - global partial_responses - global prompt_ids2jobs - try: - - while True: - while not prompts.empty() or len(input_ids): - while not prompts.empty(): - prompt_id, prompt, max_tokens, stream, temperature, outlines_dict = prompts.get() - stop_at = outlines_dict.get("stop_at", None) - if outlines_dict["type"] == "choices": - filters = [ChoiceFilter(outlines_dict["choices"], hf_tokenizer)] - elif outlines_dict["type"] == "json": - filters = [JSONFilter(outlines_dict["json"], hf_tokenizer)] - elif outlines_dict["type"] == "regex": - filters = [RegexFilter(outlines_dict["regex"], hf_tokenizer)] - else: - filters = [] - ids = tokenizer.encode(prompt, encode_special_tokens = True) - prompt_tokens = ids.shape[-1] - new_tokens = prompt_tokens + max_tokens - #print("Processing prompt: " + str(prompt_id) + " Req tokens: " + str(new_tokens)) - status_area.update(f"Processing prompt: {prompt_id} Req tokens: {new_tokens}", line=STATUS_LINES-1) - # Truncate if new_tokens exceed max_context - if new_tokens > max_context: - # Calculate how many tokens to truncate - ids = tokenizer.encode("Say, 'Prompt exceeds allowed length. Please try again.'") - # Update new_tokens after truncation - prompt_tokens = ids.shape[-1] - new_tokens = prompt_tokens + max_tokens - print("Truncating prompt: " + str(prompt_id) + " Req tokens: " + str(new_tokens)) - prompt_length.append(prompt_tokens) - input_ids.append(ids) - #streamer.append(stream) - #prompt_ids.append(prompt_id) - - job = ExLlamaV2DynamicJob( - input_ids = ids, - max_new_tokens = max_tokens, - stop_conditions = [tokenizer.eos_token_id] if stop_at is None else [tokenizer.eos_token_id, stop_at], - gen_settings = ExLlamaV2Sampler.Settings(), - filters = filters, - token_healing = healing - ) - - job.prompt_length = prompt_tokens - job.input_ids = ids - job.streamer = stream - job.prompt_ids = prompt_id - job.stop_at = stop_at - - generator.enqueue(job) - #displays = { job: JobStatusDisplay(job, line, STATUS_LINES) for line, job in enumerate(jobs) } - displays[job] = JobStatusDisplay(job, STATUS_LINES) - - for index, (job, display) in enumerate(list(displays.items())): - display.update_position(index%LLM_LINES) # Set position before updating - prompt_ids2jobs[prompt_id] = job - results = generator.iterate() - for r in results: - job = r["job"] - displays[job].update(r) - displays[job].display() - stage = r["stage"] - stage = r.get("eos_reason", stage) - outcontent = r.get("text", "") - reason = None - if(job.streamer): - if r["eos"] and job.stop_at is not None: - outcontent += job.stop_at - partial_response_data = { - "id": f"chatcmpl-{job.prompt_ids}", - "object": "chat.completion.chunk", - "created": int(time.time()), - "model": repo_str, - "choices": [ - { - "index": 0, - "delta": { - "content": outcontent - }, - "finish_reason": reason - } - ] - } - - # Initialize a list for new prompt_id or append to existing one - if job.prompt_ids not in partial_responses: - partial_responses[job.prompt_ids] = [] - partial_responses[job.prompt_ids].append(partial_response_data) - - if r['eos'] == True: - total_time = r['time_generate'] - total_tokens = r['new_tokens'] - tokens_per_second = total_tokens / total_time if total_time > 0 else 0 - status_area.update(f"EOS detected: {stage}, Generated Tokens: {total_tokens}, Tokens per second: {tokens_per_second}/s", line=STATUS_LINES-2) - - #generated_part = job.input_ids[:, job.prompt_length:] - #output = tokenizer.decode(generated_part[0]).strip() - #output = tokenizer.decode(input_ids[i])[0] - generated_text = r['full_completion'] - - # Calculate token counts - completion_tokens_old = (tokenizer.encode(generated_text)).shape[-1] - - completion_tokens = r['new_tokens'] - prompt_tokens = r['prompt_tokens'] - - full_tokens = completion_tokens + prompt_tokens - status_area.update(f"Completion Tokens: {completion_tokens_old}, New Completion Tokens: {completion_tokens}", line=STATUS_LINES-3) - - - eos_prompt_id = job.prompt_ids - if(job.streamer): - ## Generator, yield here.. - partial_response_data = { - "finish_reason": "stop" - } - - responses[eos_prompt_id] = partial_response_data - else:# Construct the response based on the format - if job.stop_at is not None: - generated_text += job.stop_at - response_data = { - "id": f"chatcmpl-{prompt_id}", - "object": "chat.completion", - "created": int(time.time()), - "model": repo_str, - "choices": [{ - "index": 0, - "message": { - "role": "assistant", - "content": generated_text, - }, - "finish_reason": "stop" - }], - "usage": { - "prompt_tokens": prompt_tokens, - "completion_tokens": completion_tokens, - "total_tokens": full_tokens - } - } - responses[eos_prompt_id] = response_data - del prompt_ids2jobs[eos_prompt_id] - - - else: - # Sleep for a short duration when there's no work - time.sleep(0.1) # Sleep for 100 milliseconds - except Exception as e: - print("Reset server due to ", e) - print(traceback.format_exc()) - for prompt_id in prompt_ids2jobs: - job = prompt_ids2jobs[prompt_id] - if(job.streamer): - ## Generator, yield here.. - partial_response_data = { - "finish_reason": "stop" - } - - responses[prompt_id] = partial_response_data - else: - print("Error handling for full generation current not implemented") - generator.cancel(job) - prompt_ids2jobs = {} - -# Start worker thread -worker = Thread(target=process_prompts) -worker.start() - - -async def format_prompt(messages): - formatted_prompt = "" - for message in messages: - if message.role == "system": - formatted_prompt += f"{message.content}\n\n" - elif message.role == "user": - formatted_prompt += f"### User:\n{message.content}\n\n" - elif message.role == "assistant": - formatted_prompt += f"### Assistant:\n{message.content}\n\n" - # Add the final "### Assistant:\n" to prompt for the next response - formatted_prompt += "### Assistant:\n" - return formatted_prompt - -async def format_prompt_llama3(messages): - formatted_prompt = "" - system_message_found = False - - # Check for a system message first - for message in messages: - if message.role == "system": - system_message_found = True - break - - # If no system message was found, prepend a default one - if not system_message_found: - formatted_prompt = "<|begin_of_text|><|start_header_id|>system<|end_header_id|>\n\nYou are a helpful AI assistant.<|eot_id|>" - for message in messages: - if message.role == "system": - formatted_prompt += f"<|begin_of_text|><|start_header_id|>system<|end_header_id|>\n\n{message.content}<|eot_id|>" - elif message.role == "user": - formatted_prompt += f"<|start_header_id|>user<|end_header_id|>\n\n{message.content}<|eot_id|>" - elif message.role == "assistant": - formatted_prompt += f"<|start_header_id|>assistant<|end_header_id|>\n\n{message.content}<|eot_id|>" - # Add the final "### Assistant:\n" to prompt for the next response - formatted_prompt += "<|start_header_id|>assistant<|end_header_id|>\n\n" - return formatted_prompt - -async def format_prompt_yi(messages): - formatted_prompt = "" - system_message_found = False - - # Check for a system message first - for message in messages: - if message.role == "system": - system_message_found = True - break - - # If no system message was found, prepend a default one - if not system_message_found: - formatted_prompt = "<|im_start|>system\nYou are a helpful AI assistant.<|im_end|>\n" - for message in messages: - if message.role == "system": - formatted_prompt += f"<|im_start|>system\n{message.content}<|im_end|>\n" - elif message.role == "user": - formatted_prompt += f"<|im_start|>user\n{message.content}<|im_end|>\n" - elif message.role == "assistant": - formatted_prompt += f"<|im_start|>assistant\n{message.content}<|im_end|>\n" - # Add the final "### Assistant:\n" to prompt for the next response - formatted_prompt += "<|im_start|>assistant\n" - return formatted_prompt - -async def format_prompt_nous(messages): - formatted_prompt = "" - for message in messages: - if message.role == "system": - formatted_prompt += f"{message.content}\n" - elif message.role == "user": - formatted_prompt += f"USER: {message.content}\n" - elif message.role == "assistant": - formatted_prompt += f"ASSISTANT: {message.content}\n" - # Add the final "### Assistant:\n" to prompt for the next response - formatted_prompt += "ASSISTANT: " - return formatted_prompt - -async def format_prompt_tess(messages): - formatted_prompt = "" - for message in messages: - if message.role == "system": - formatted_prompt += f"SYSTEM: {message.content}\n" - elif message.role == "user": - formatted_prompt += f"USER: {message.content}\n" - elif message.role == "assistant": - formatted_prompt += f"ASSISTANT: {message.content}\n" - # Add the final "### Assistant:\n" to prompt for the next response - formatted_prompt += "ASSISTANT: " - return formatted_prompt - -async def format_prompt_code(messages): - formatted_prompt = "" - for message in messages: - if message.role == "system": - formatted_prompt += f"### System Prompt\nYou are an intelligent programming assistant.\n\n" - elif message.role == "user": - formatted_prompt += f"### User Message\n{message.content}\n\n" - elif message.role == "assistant": - formatted_prompt += f"### Assistant\n{message.content}\n\n" - # Add the final "### Assistant" with ellipsis to prompt for the next response - formatted_prompt += "### Assistant\n..." - return formatted_prompt - -async def format_prompt_zephyr(messages): - formatted_prompt = "" - for message in messages: - if message.role == "system": - formatted_prompt += f"<|system|>\n{message.content}\n" - elif message.role == "user": - formatted_prompt += f"<|user|>\n{message.content}\n" - elif message.role == "assistant": - formatted_prompt += f"<|assistant|>\n{message.content}\n" - # Add the final "### Assistant:\n" to prompt for the next response - formatted_prompt += "<|assistant|>\n" - return formatted_prompt - -async def format_prompt_starling(messages): - formatted_prompt = "" - system_message = "" - for message in messages: - if message.role == "system": - # Save system message to prepend to the first user message - system_message += f"{message.content}\n\n" - elif message.role == "user": - # Prepend system message if it exists - if system_message: - formatted_prompt += f"GPT4 Correct User: {system_message}{message.content}<|end_of_turn|>" - system_message = "" # Clear system message after prepending - else: - formatted_prompt += f"GPT4 Correct User: {message.content}<|end_of_turn|>" - elif message.role == "assistant": - formatted_prompt += f"GPT4 Correct Assistant: {message.content}<|end_of_turn|>" # Prep for user follow-up - formatted_prompt += "GPT4 Correct Assistant: \n\n" - return formatted_prompt - -async def format_prompt_mixtral(messages): - formatted_prompt = " " - system_message = "" - for message in messages: - if message.role == "system": - # Save system message to prepend to the first user message - system_message += f"{message.content}\n\n" - elif message.role == "user": - # Prepend system message if it exists - if system_message: - formatted_prompt += f"[INST] {system_message}{message.content} [/INST] " - system_message = "" # Clear system message after prepending - else: - formatted_prompt += f"[INST] {message.content} [/INST] " - elif message.role == "assistant": - formatted_prompt += f" {message.content} " # Prep for user follow-up - return formatted_prompt - - -async def format_prompt_commandr(messages): - formatted_prompt = "" - system_message_found = False - - # Check for a system message first - for message in messages: - if message.role == "system": - system_message_found = True - break - - # If no system message was found, prepend a default one - if not system_message_found: - formatted_prompt += f"<|START_OF_TURN_TOKEN|><|SYSTEM_TOKEN|>{message.content}<|END_OF_TURN_TOKEN|>" - - for message in messages: - if message.role == "system": - formatted_prompt += f"<|START_OF_TURN_TOKEN|><|SYSTEM_TOKEN|>{message.content}<|END_OF_TURN_TOKEN|>" - elif message.role == "user": - formatted_prompt += f"<|START_OF_TURN_TOKEN|><|USER_TOKEN|>{message.content}<|END_OF_TURN_TOKEN|>" - elif message.role == "assistant": - formatted_prompt += f"<|START_OF_TURN_TOKEN|><|CHATBOT_TOKEN|>{message.content}<|END_OF_TURN_TOKEN|>" - # Add the final "### Assistant:\n" to prompt for the next response - formatted_prompt += "<|START_OF_TURN_TOKEN|><|CHATBOT_TOKEN|>" - return formatted_prompt - - -@app.post('/v1/chat/completions') -async def mainchat(request: ChatCompletionRequest): - try: - prompt = '' - if repo_str == 'Phind-CodeLlama-34B-v2': - prompt = await format_prompt_code(request.messages) - elif repo_str == 'zephyr-7b-beta': - prompt = await format_prompt_zephyr(request.messages) - elif repo_str == 'llama3-70b-instruct': - prompt = await format_prompt_llama3(request.messages) - elif repo_str == 'Starling-LM-7B-alpha': - prompt = await format_prompt_starling(request.messages) - elif repo_str == 'Mixtral-8x7B-Instruct-v0.1-GPTQ': - prompt = await format_prompt_mixtral(request.messages) - elif repo_str == 'Yi-34B-Chat-GPTQ' or repo_str == 'Nous-Hermes-2-Yi-34B-GPTQ' or repo_str == 'theprofessor-exl2-speculative' or repo_str == 'dbrx-instruct-exl2': - prompt = await format_prompt_yi(request.messages) - elif repo_str == 'Nous-Capybara-34B-GPTQ' or repo_str == 'goliath-120b-GPTQ' or repo_str == 'goliath-120b-exl2' or repo_str == 'goliath-120b-exl2-rpcal': - prompt = await format_prompt_nous(request.messages) - elif repo_str == 'tess-xl-exl2' or repo_str == 'tess-xl-exl2-speculative': - prompt = await format_prompt_tess(request.messages) - elif repo_str == 'commandr-exl2' or repo_str == 'commandr-exl2-speculative': - prompt = await format_prompt_commandr(request.messages) - else: - prompt = await format_prompt(request.messages) - if request.partial_generation is not None: - prompt += request.partial_generation - print(prompt) - - timeout = 180 # seconds - start_time = time.time() - prompt_id = request.request_id # Replace with a function to generate unique IDs - outlines_dict = {} - - # Adjust temperature if it is 0 - if request.temperature == 0: - request.temperature = 0.001 - - if request.stop_at is not None: - outlines_dict["stop_at"] = request.stop_at - if request.outlines_type is not None: - outlines_dict["type"] = request.outlines_type - else: - outlines_dict["type"] = "text" - if outlines_dict["type"] == "choices": - assert request.choices is not None - outlines_dict["choices"] = request.choices - elif outlines_dict["type"] == "json": - assert request.json is not None - outlines_dict["json"] = request.json - elif outlines_dict["type"] == "regex": - assert request.regex is not None - outlines_dict["regex"] = request.regex - else: - assert outlines_dict["type"] == "text" - prompts.put((prompt_id, prompt, request.max_tokens, request.stream, request.temperature, outlines_dict)) - - if request.stream: - #response = StreamingResponse(streaming_request(prompt, request.max_tokens, tempmodel=repo_str, response_format='chat_completion'), media_type="text/event-stream") - return StreamingResponse(stream_response(prompt_id), media_type="text/event-stream") - else: - #response_data = non_streaming_request(prompt, request.max_tokens, tempmodel=repo_str, response_format='chat_completion') - #response = response_data # This will return a JSON response - while prompt_id not in responses: - await asyncio.sleep(0.1) # Sleep to yield control to the event loop - if time.time() - start_time > timeout: - return {"error": "Response timeout"} - - return responses.pop(prompt_id) - - except Exception as e: - print(traceback.format_exc()) - raise HTTPException(status_code=500, detail=str(e)) - - - - - -@app.get('/ping') -async def get_status(): - return {"ping": sum(prompt_length)} - -@app.get("/nvidia-smi") -async def get_nvidia_smi(): - # Execute the nvidia-smi command - result = subprocess.run( - ["nvidia-smi", "--query-gpu=utilization.gpu,memory.used,memory.total", "--format=csv,noheader"], - capture_output=True, text=True - ) - nvidia_smi_output = result.stdout.strip() # Remove any extra whitespace - # Split the output by lines and then by commas - gpu_data = [] - for line in nvidia_smi_output.split("\n"): - utilization, memory_used, memory_total = line.split(", ") - # Strip the '%' and 'MiB' and convert to appropriate types - utilization = float(utilization.strip(' %')) - memory_used = int(memory_used.strip(' MiB')) - memory_total = int(memory_total.strip(' MiB')) - gpu_data.append({ - "utilization": utilization, - "memory_used": memory_used, - "memory_total": memory_total - }) - return gpu_data - - -if __name__ == "__main__": - import uvicorn - - uvicorn.run(app, host=host, port=port, log_level="debug") diff --git a/llm_exl2_dynamic_gen.py b/llm_exl2_dynamic_gen.py index 1e170fd..fbfd8a5 100644 --- a/llm_exl2_dynamic_gen.py +++ b/llm_exl2_dynamic_gen.py @@ -1,48 +1,45 @@ -import sys, os -sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - -from exllamav2 import ExLlamaV2, ExLlamaV2Config, ExLlamaV2Cache_Q4, ExLlamaV2Tokenizer, ExLlamaV2Lora -from exllamav2.generator import ExLlamaV2DynamicGenerator, ExLlamaV2DynamicJob, ExLlamaV2Sampler -from blessed import Terminal -import pprint - import asyncio import json import os -import logging import time import configparser import argparse -import tiktoken -import torch -import random from typing import AsyncIterable, List, Generator, Union, Optional - -import requests -import sseclient +import traceback import subprocess -import textwrap - -from fastapi import FastAPI +from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from pydantic import BaseModel -import uuid -import threading +from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline, TextStreamer, TextIteratorStreamer from threading import Thread import queue -import uvicorn -from io import StringIO -from util import format_prompt_llama3, format_prompt, format_prompt_tess, format_prompt_commandr -from util_merge import ExLlamaV2MergePassthrough +import traceback +import re + + +import sys, os +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from exllamav2 import( + ExLlamaV2, + ExLlamaV2Config, + ExLlamaV2Cache, + ExLlamaV2Cache_8bit, + ExLlamaV2Cache_Q4, + ExLlamaV2Tokenizer, +) + +from exllamav2.generator import ExLlamaV2DynamicGenerator, ExLlamaV2DynamicJob, ExLlamaV2Sampler +import uuid +from blessed import Terminal +import textwrap +from outlines.integrations.exllamav2 import RegexFilter, TextFilter, JSONFilter, ChoiceFilter def generate_unique_id(): return uuid.uuid4() -# This is a demo and small stress to showcase some of the features of the dynamic batching generator. -repo_str = 'commandr-exl2' - class CompletionRequest(BaseModel): model: str prompt: Union[str, List[str]] @@ -77,7 +74,42 @@ class ChatCompletionRequest(BaseModel): n: Optional[int] = 1 # default value of 1, batch size top_p: Optional[float] = 0.0 # default value of 0.0 user: Optional[str] = None + stop_at: Optional[str] = None + outlines_type: Optional[str] = None + choices: Optional[list[str]] = None + regex: Optional[str] = None + json: Optional[str] = None + request_id: Optional[str] = None + partial_generation: Optional[str] = None + +#repo_str = 'theprofessor-exl2-speculative' +parser = argparse.ArgumentParser(description='Run server with specified port.') + +# Add argument for port with default type as integer +parser.add_argument('--port', type=int, help='Port to run the server on.') +parser.add_argument('--max_context', type=int, default=8192, help='Context length.') +parser.add_argument('--repo_str', type=str, default='llama3-70b-instruct', help='The model repository name') +parser.add_argument('--total_context', type=int, default=32768, help="Total context length") +parser.add_argument('--max_chunk_size', type=int, default=2048, help='Max chunk size.') +parser.add_argument('--max_new_tokens', type=int, default=2048, help='Max new tokens.') +parser.add_argument('--display_mode', type=int, default=1, help='Display mode.') +parser.add_argument('--use_draft_model', action="store_true", help='Do speculative decoding') +parser.add_argument('--not_paged', action="store_true", help='Do not do paged attention') + + + +# Parse the arguments +args = parser.parse_args() +repo_str = args.repo_str + +config = configparser.ConfigParser() +config.read('config.ini') + +repo_id = config.get(repo_str, 'repo') +host = config.get('settings', 'host') + +port = args.port if args.port is not None else config.getint('settings', 'port') class StatusArea: def __init__(self, num_lines): self.num_lines = num_lines @@ -170,14 +202,6 @@ def display(self): print(term.move_xy(0, self.console_line) + self.display_text) -parser = argparse.ArgumentParser(description='Run server with specified port.') - -# Add argument for port with default type as integer -parser.add_argument('--port', type=int, help='Port to run the server on.') - -# Parse the arguments -args = parser.parse_args() - config = configparser.ConfigParser() config.read('config.ini') @@ -191,25 +215,25 @@ def display(self): # 2: Print completions as jobs finish # 3: Step over output iteration by iteration # 4: Space heater mode (no output) -display_mode = 1 +display_mode = args.display_mode # Whether to use paged mode or not. The generator is very handicapped in unpaged mode, does not support batching # or CFG, but it will work without flash-attn 2.5.7+ -paged = True +paged = not args.not_paged # Where to find our model model_dir = repo_id # Total number of tokens to allocate space for. This is not the max_seq_len supported by the model but # the total to distribute dynamically over however many jobs are active at once -total_context = 32768 +total_context = total_context = args.total_context # Max individual context -max_context = 8192 +max_context = args.max_context # N-gram or draft model speculative decoding. Largely detrimental to performance at higher batch sizes. use_ngram = False -use_draft_model = False +use_draft_model = args.use_draft_model if use_draft_model: model_dir = repo_id draft_model_dir = specrepo_id @@ -219,24 +243,14 @@ def display(self): # Max chunk size. Determines the size of prefill operations. Can be reduced to reduce pauses whenever a # new job is started, but at the expense of overall prompt ingestion speed. -max_chunk_size = 2048 +max_chunk_size = args.max_chunk_size # Max new tokens per completion. For this example applies to all jobs. -max_new_tokens = 2048 - -# Use LMFE to constrain the output to JSON format. See schema and details below. -json_mode = False +max_new_tokens = args.max_new_tokens # Demonstrate token healing healing = True -# Ban some phrases maybe -ban_strings = None -# ban_strings = [ -# "person to person", -# "one person to another" -# ] - term = Terminal() @@ -282,11 +296,14 @@ def display(self): ) model.load_autosplit(cache, progress = True) -#model.load([16,18,18,20]) # Also, tokenizer print("Loading tokenizer...") tokenizer = ExLlamaV2Tokenizer(config) +hf_tokenizer_kwargs = {} +hf_tokenizer_kwargs.setdefault("padding_side", "left") +hf_tokenizer = AutoTokenizer.from_pretrained(model_dir, **hf_tokenizer_kwargs) + # Model Merge @@ -332,48 +349,65 @@ def display(self): LLM_LINES = max_batch_size status_area = StatusArea(STATUS_LINES) displays = {} +prompt_ids2jobs = {} +print("*** Loaded.. now Inference...:") -if json_mode: - print("Creating jobs... (initializing JSON filters could take a moment.)") - - -def get_stop_conditions(prompt_format, tokenizer): - if prompt_format == "llama": - return [tokenizer.eos_token_id] - elif prompt_format == "llama3": - return [tokenizer.single_id("<|eot_id|>")] - elif prompt_format == "granite": - return [tokenizer.eos_token_id, "\n\nQuestion:"] - - -# Only import lmfe if json_mode is set - -if json_mode: - import json - from lmformatenforcer.integrations.exllamav2 import ExLlamaV2TokenEnforcerFilter - from lmformatenforcer import JsonSchemaParser - from exllamav2.generator.filters import ExLlamaV2PrefixFilter - from pydantic import BaseModel - from typing import Literal - - class JSONResponse(BaseModel): - response: str - confidence: Literal["low", "medium", "high"] - is_subjective: Literal["no", "yes", "possibly"] - - schema_parser = JsonSchemaParser(JSONResponse.schema()) - - +# take from https://github.com/tiangolo/fastapi/discussions/11360 +class RequestCancelledMiddleware: + def __init__(self, app): + self.app = app + + async def __call__(self, scope, receive, send): + global prompt_ids2jobs + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + # Let's make a shared queue for the request messages + queue = asyncio.Queue() + cancelled_request_ids = [] + async def message_poller(sentinel, handler_task): + nonlocal queue + request_id = str(generate_unique_id()) + while True: + message = await receive() + print(message) + if "body" in message: + message["body"] = json.loads(message["body"].decode('utf8')) + message["body"]["request_id"] = request_id + message["body"] = str.encode(json.dumps(message["body"])) + print(message) + if message["type"] == "http.disconnect": + cancelled_request_ids.append(request_id) + handler_task.cancel() + return sentinel # Break the loop + + # Puts the message in the queue + await queue.put(message) + + sentinel = object() + handler_task = asyncio.create_task(self.app(scope, queue.get, send)) + asyncio.create_task(message_poller(sentinel, handler_task)) + + try: + return await handler_task + except asyncio.CancelledError: + print("Cancelling request due to disconnect") + # TODO: FIgure out how to get prompt id that disconnected + while len(cancelled_request_ids) > 0: + cancelled_id = cancelled_request_ids.pop() + generator.cancel(prompt_ids2jobs[cancelled_id]) + del prompt_ids2jobs[cancelled_id] -print("*** Loaded.. now Inference...:") app = FastAPI(title="EXL2") +app.add_middleware(RequestCancelledMiddleware) async def stream_response(prompt_id, timeout=180): global partial_responses while True: - await asyncio.sleep(0.001) # Sleep to yield control to the event loop + await asyncio.sleep(0.05) # Sleep to yield control to the event loop # Check if prompt_id exists in partial_responses if prompt_id in partial_responses: @@ -388,73 +422,67 @@ async def stream_response(prompt_id, timeout=180): yield f'data: {{"id":"chatcmpl-{prompt_id}","object":"chat.completion.chunk","created":{int(time.time())},"model":"{repo_str}","choices":[{{"index":0,"delta":{{}},"finish_reason":"stop"}}]}}\n\n' break -# Worker thread function + def process_prompts(): - global partial_responses + global partial_responses + global prompt_ids2jobs + try: - # To see what's going on, mode 1 - while True: - while not prompts.empty() or len(input_ids): - while len(input_ids) < max_batch_size and not prompts.empty(): - prompt_id, prompt, max_tokens, stream, temperature = prompts.get() - if json_mode: - prompt += "\n\n Answer in JSON syntax." - filters = [ - ExLlamaV2TokenEnforcerFilter(schema_parser, tokenizer), - ExLlamaV2PrefixFilter(model, tokenizer, "{") - ] - else: - filters = None - ids = tokenizer.encode(prompt, encode_special_tokens = True) - prompt_tokens = ids.shape[-1] - new_tokens = prompt_tokens + max_tokens - #print("Processing prompt: " + str(prompt_id) + " Req tokens: " + str(new_tokens)) - status_area.update(f"Processing prompt: {prompt_id} Req tokens: {new_tokens}", line=STATUS_LINES-1) - # Truncate if new_tokens exceed max_context - if new_tokens > max_context: - # Calculate how many tokens to truncate - ids = tokenizer.encode("Say, 'Prompt exceeds allowed length. Please try again.'") - # Update new_tokens after truncation + while True: + while not prompts.empty() or len(input_ids): + while not prompts.empty(): + prompt_id, prompt, max_tokens, stream, temperature, outlines_dict = prompts.get() + stop_at = outlines_dict.get("stop_at", None) + if outlines_dict["type"] == "choices": + filters = [ChoiceFilter(outlines_dict["choices"], hf_tokenizer)] + elif outlines_dict["type"] == "json": + filters = [JSONFilter(outlines_dict["json"], hf_tokenizer)] + elif outlines_dict["type"] == "regex": + filters = [RegexFilter(outlines_dict["regex"], hf_tokenizer)] + else: + filters = [] + ids = tokenizer.encode(prompt, encode_special_tokens = True) prompt_tokens = ids.shape[-1] new_tokens = prompt_tokens + max_tokens - print("Truncating prompt: " + str(prompt_id) + " Req tokens: " + str(new_tokens)) - prompt_length.append(prompt_tokens) - input_ids.append(ids) - #streamer.append(stream) - #prompt_ids.append(prompt_id) - - job = ExLlamaV2DynamicJob( - input_ids = ids, - max_new_tokens = max_tokens, - stop_conditions = get_stop_conditions('llama', tokenizer), - gen_settings = ExLlamaV2Sampler.Settings(), - banned_strings = ban_strings, - filters = filters, - filter_prefer_eos = True, - token_healing = healing - ) - - job.prompt_length = prompt_tokens - job.input_ids = ids - job.streamer = stream - job.prompt_ids = prompt_id - - generator.enqueue(job) - #displays = { job: JobStatusDisplay(job, line, STATUS_LINES) for line, job in enumerate(jobs) } - displays[job] = JobStatusDisplay(job, STATUS_LINES) - - for index, (job, display) in enumerate(list(displays.items())): - display.update_position(index%LLM_LINES) # Set position before updating - - - if(len(input_ids)): - #inputs = torch.cat([x[:, -1:] for x in input_ids], dim = 0) - #logits = model.forward(inputs, caches, input_mask = None).float().cpu() - + #print("Processing prompt: " + str(prompt_id) + " Req tokens: " + str(new_tokens)) + status_area.update(f"Processing prompt: {prompt_id} Req tokens: {new_tokens}", line=STATUS_LINES-1) + # Truncate if new_tokens exceed max_context + if new_tokens > max_context: + # Calculate how many tokens to truncate + ids = tokenizer.encode("Say, 'Prompt exceeds allowed length. Please try again.'") + # Update new_tokens after truncation + prompt_tokens = ids.shape[-1] + new_tokens = prompt_tokens + max_tokens + print("Truncating prompt: " + str(prompt_id) + " Req tokens: " + str(new_tokens)) + prompt_length.append(prompt_tokens) + input_ids.append(ids) + #streamer.append(stream) + #prompt_ids.append(prompt_id) + + job = ExLlamaV2DynamicJob( + input_ids = ids, + max_new_tokens = max_tokens, + stop_conditions = [tokenizer.eos_token_id] if stop_at is None else [tokenizer.eos_token_id, stop_at], + gen_settings = ExLlamaV2Sampler.Settings(), + filters = filters, + token_healing = healing + ) + + job.prompt_length = prompt_tokens + job.input_ids = ids + job.streamer = stream + job.prompt_ids = prompt_id + job.stop_at = stop_at + + generator.enqueue(job) + #displays = { job: JobStatusDisplay(job, line, STATUS_LINES) for line, job in enumerate(jobs) } + displays[job] = JobStatusDisplay(job, STATUS_LINES) + + for index, (job, display) in enumerate(list(displays.items())): + display.update_position(index%LLM_LINES) # Set position before updating + prompt_ids2jobs[prompt_id] = job results = generator.iterate() for r in results: - #for i in range(len(input_ids)): - #r = results[i] job = r["job"] displays[job].update(r) displays[job].display() @@ -463,21 +491,23 @@ def process_prompts(): outcontent = r.get("text", "") reason = None if(job.streamer): + if r["eos"] and job.stop_at is not None: + outcontent += job.stop_at partial_response_data = { - "id": f"chatcmpl-{job.prompt_ids}", - "object": "chat.completion.chunk", - "created": int(time.time()), - "model": repo_str, - "choices": [ - { - "index": 0, - "delta": { - "content": outcontent - }, - "finish_reason": reason - } - ] - } + "id": f"chatcmpl-{job.prompt_ids}", + "object": "chat.completion.chunk", + "created": int(time.time()), + "model": repo_str, + "choices": [ + { + "index": 0, + "delta": { + "content": outcontent + }, + "finish_reason": reason + } + ] + } # Initialize a list for new prompt_id or append to existing one if job.prompt_ids not in partial_responses: @@ -497,7 +527,6 @@ def process_prompts(): # Calculate token counts completion_tokens_old = (tokenizer.encode(generated_text)).shape[-1] - prompt_tokens_old = (tokenizer.encode(prompt)).shape[-1] completion_tokens = r['new_tokens'] prompt_tokens = r['prompt_tokens'] @@ -515,6 +544,8 @@ def process_prompts(): responses[eos_prompt_id] = partial_response_data else:# Construct the response based on the format + if job.stop_at is not None: + generated_text += job.stop_at response_data = { "id": f"chatcmpl-{prompt_id}", "object": "chat.completion", @@ -534,56 +565,267 @@ def process_prompts(): "total_tokens": full_tokens } } - responses[eos_prompt_id] = response_data + del prompt_ids2jobs[eos_prompt_id] - # Clean up - input_ids.pop() - prompt_length.pop() - #streamer.pop(i) - else: - # Sleep for a short duration when there's no work - time.sleep(0.1) # Sleep for 100 milliseconds + else: + # Sleep for a short duration when there's no work + time.sleep(0.1) # Sleep for 100 milliseconds + except Exception as e: + print("Reset server due to ", e) + print(traceback.format_exc()) + for prompt_id in prompt_ids2jobs: + job = prompt_ids2jobs[prompt_id] + if(job.streamer): + ## Generator, yield here.. + partial_response_data = { + "finish_reason": "stop" + } + + responses[prompt_id] = partial_response_data + else: + print("Error handling for full generation current not implemented") + generator.cancel(job) + prompt_ids2jobs = {} # Start worker thread worker = Thread(target=process_prompts) worker.start() +async def format_prompt(messages): + formatted_prompt = "" + for message in messages: + if message.role == "system": + formatted_prompt += f"{message.content}\n\n" + elif message.role == "user": + formatted_prompt += f"### User:\n{message.content}\n\n" + elif message.role == "assistant": + formatted_prompt += f"### Assistant:\n{message.content}\n\n" + # Add the final "### Assistant:\n" to prompt for the next response + formatted_prompt += "### Assistant:\n" + return formatted_prompt + +async def format_prompt_llama3(messages): + formatted_prompt = "" + system_message_found = False + + # Check for a system message first + for message in messages: + if message.role == "system": + system_message_found = True + break + + # If no system message was found, prepend a default one + if not system_message_found: + formatted_prompt = "<|begin_of_text|><|start_header_id|>system<|end_header_id|>\n\nYou are a helpful AI assistant.<|eot_id|>" + for message in messages: + if message.role == "system": + formatted_prompt += f"<|begin_of_text|><|start_header_id|>system<|end_header_id|>\n\n{message.content}<|eot_id|>" + elif message.role == "user": + formatted_prompt += f"<|start_header_id|>user<|end_header_id|>\n\n{message.content}<|eot_id|>" + elif message.role == "assistant": + formatted_prompt += f"<|start_header_id|>assistant<|end_header_id|>\n\n{message.content}<|eot_id|>" + # Add the final "### Assistant:\n" to prompt for the next response + formatted_prompt += "<|start_header_id|>assistant<|end_header_id|>\n\n" + return formatted_prompt + +async def format_prompt_yi(messages): + formatted_prompt = "" + system_message_found = False + + # Check for a system message first + for message in messages: + if message.role == "system": + system_message_found = True + break + + # If no system message was found, prepend a default one + if not system_message_found: + formatted_prompt = "<|im_start|>system\nYou are a helpful AI assistant.<|im_end|>\n" + for message in messages: + if message.role == "system": + formatted_prompt += f"<|im_start|>system\n{message.content}<|im_end|>\n" + elif message.role == "user": + formatted_prompt += f"<|im_start|>user\n{message.content}<|im_end|>\n" + elif message.role == "assistant": + formatted_prompt += f"<|im_start|>assistant\n{message.content}<|im_end|>\n" + # Add the final "### Assistant:\n" to prompt for the next response + formatted_prompt += "<|im_start|>assistant\n" + return formatted_prompt + +async def format_prompt_nous(messages): + formatted_prompt = "" + for message in messages: + if message.role == "system": + formatted_prompt += f"{message.content}\n" + elif message.role == "user": + formatted_prompt += f"USER: {message.content}\n" + elif message.role == "assistant": + formatted_prompt += f"ASSISTANT: {message.content}\n" + # Add the final "### Assistant:\n" to prompt for the next response + formatted_prompt += "ASSISTANT: " + return formatted_prompt + +async def format_prompt_tess(messages): + formatted_prompt = "" + for message in messages: + if message.role == "system": + formatted_prompt += f"SYSTEM: {message.content}\n" + elif message.role == "user": + formatted_prompt += f"USER: {message.content}\n" + elif message.role == "assistant": + formatted_prompt += f"ASSISTANT: {message.content}\n" + # Add the final "### Assistant:\n" to prompt for the next response + formatted_prompt += "ASSISTANT: " + return formatted_prompt + +async def format_prompt_code(messages): + formatted_prompt = "" + for message in messages: + if message.role == "system": + formatted_prompt += f"### System Prompt\nYou are an intelligent programming assistant.\n\n" + elif message.role == "user": + formatted_prompt += f"### User Message\n{message.content}\n\n" + elif message.role == "assistant": + formatted_prompt += f"### Assistant\n{message.content}\n\n" + # Add the final "### Assistant" with ellipsis to prompt for the next response + formatted_prompt += "### Assistant\n..." + return formatted_prompt + +async def format_prompt_zephyr(messages): + formatted_prompt = "" + for message in messages: + if message.role == "system": + formatted_prompt += f"<|system|>\n{message.content}\n" + elif message.role == "user": + formatted_prompt += f"<|user|>\n{message.content}\n" + elif message.role == "assistant": + formatted_prompt += f"<|assistant|>\n{message.content}\n" + # Add the final "### Assistant:\n" to prompt for the next response + formatted_prompt += "<|assistant|>\n" + return formatted_prompt + +async def format_prompt_starling(messages): + formatted_prompt = "" + system_message = "" + for message in messages: + if message.role == "system": + # Save system message to prepend to the first user message + system_message += f"{message.content}\n\n" + elif message.role == "user": + # Prepend system message if it exists + if system_message: + formatted_prompt += f"GPT4 Correct User: {system_message}{message.content}<|end_of_turn|>" + system_message = "" # Clear system message after prepending + else: + formatted_prompt += f"GPT4 Correct User: {message.content}<|end_of_turn|>" + elif message.role == "assistant": + formatted_prompt += f"GPT4 Correct Assistant: {message.content}<|end_of_turn|>" # Prep for user follow-up + formatted_prompt += "GPT4 Correct Assistant: \n\n" + return formatted_prompt + +async def format_prompt_mixtral(messages): + formatted_prompt = " " + system_message = "" + for message in messages: + if message.role == "system": + # Save system message to prepend to the first user message + system_message += f"{message.content}\n\n" + elif message.role == "user": + # Prepend system message if it exists + if system_message: + formatted_prompt += f"[INST] {system_message}{message.content} [/INST] " + system_message = "" # Clear system message after prepending + else: + formatted_prompt += f"[INST] {message.content} [/INST] " + elif message.role == "assistant": + formatted_prompt += f" {message.content} " # Prep for user follow-up + return formatted_prompt + + +async def format_prompt_commandr(messages): + formatted_prompt = "" + system_message_found = False + + # Check for a system message first + for message in messages: + if message.role == "system": + system_message_found = True + break + + # If no system message was found, prepend a default one + if not system_message_found: + formatted_prompt += f"<|START_OF_TURN_TOKEN|><|SYSTEM_TOKEN|>{message.content}<|END_OF_TURN_TOKEN|>" + + for message in messages: + if message.role == "system": + formatted_prompt += f"<|START_OF_TURN_TOKEN|><|SYSTEM_TOKEN|>{message.content}<|END_OF_TURN_TOKEN|>" + elif message.role == "user": + formatted_prompt += f"<|START_OF_TURN_TOKEN|><|USER_TOKEN|>{message.content}<|END_OF_TURN_TOKEN|>" + elif message.role == "assistant": + formatted_prompt += f"<|START_OF_TURN_TOKEN|><|CHATBOT_TOKEN|>{message.content}<|END_OF_TURN_TOKEN|>" + # Add the final "### Assistant:\n" to prompt for the next response + formatted_prompt += "<|START_OF_TURN_TOKEN|><|CHATBOT_TOKEN|>" + return formatted_prompt + + @app.post('/v1/chat/completions') async def mainchat(request: ChatCompletionRequest): - try: prompt = '' if repo_str == 'Phind-CodeLlama-34B-v2': prompt = await format_prompt_code(request.messages) elif repo_str == 'zephyr-7b-beta': prompt = await format_prompt_zephyr(request.messages) - elif repo_str == 'llama3-70b-instruct' or repo_str == 'llama3-70b-instruct-speculative': + elif repo_str == 'llama3-70b-instruct': prompt = await format_prompt_llama3(request.messages) elif repo_str == 'Starling-LM-7B-alpha': prompt = await format_prompt_starling(request.messages) - elif repo_str == 'Mixtral-8x7B-Instruct-v0.1-GPTQ' or repo_str == 'miqu-exl2-speculative': + elif repo_str == 'Mixtral-8x7B-Instruct-v0.1-GPTQ': prompt = await format_prompt_mixtral(request.messages) - elif repo_str == 'Yi-34B-Chat-GPTQ' or repo_str == 'Nous-Hermes-2-Yi-34B-GPTQ' or repo_str == 'theprofessor-exl2-speculative' or repo_str == 'Yi-34B-Chat': + elif repo_str == 'Yi-34B-Chat-GPTQ' or repo_str == 'Nous-Hermes-2-Yi-34B-GPTQ' or repo_str == 'theprofessor-exl2-speculative' or repo_str == 'dbrx-instruct-exl2': prompt = await format_prompt_yi(request.messages) elif repo_str == 'Nous-Capybara-34B-GPTQ' or repo_str == 'goliath-120b-GPTQ' or repo_str == 'goliath-120b-exl2' or repo_str == 'goliath-120b-exl2-rpcal': prompt = await format_prompt_nous(request.messages) - elif repo_str == 'tess-xl-exl2' or repo_str == 'tess-xl-exl2-speculative' or repo_str == 'venus-exl2-speculative': + elif repo_str == 'tess-xl-exl2' or repo_str == 'tess-xl-exl2-speculative': prompt = await format_prompt_tess(request.messages) - elif repo_str == 'tinyllama-exl2-speculative': - prompt = await format_prompt_zephyr(request.messages) elif repo_str == 'commandr-exl2' or repo_str == 'commandr-exl2-speculative': prompt = await format_prompt_commandr(request.messages) else: prompt = await format_prompt(request.messages) - status_area.update(f"Prompt: {prompt}") + if request.partial_generation is not None: + prompt += request.partial_generation + print(prompt) timeout = 180 # seconds start_time = time.time() - prompt_id = generate_unique_id() # Replace with a function to generate unique IDs - prompts.put((prompt_id, prompt, request.max_tokens, request.stream, request.temperature)) + prompt_id = request.request_id # Replace with a function to generate unique IDs + outlines_dict = {} + + # Adjust temperature if it is 0 + if request.temperature == 0: + request.temperature = 0.001 + + if request.stop_at is not None: + outlines_dict["stop_at"] = request.stop_at + if request.outlines_type is not None: + outlines_dict["type"] = request.outlines_type + else: + outlines_dict["type"] = "text" + if outlines_dict["type"] == "choices": + assert request.choices is not None + outlines_dict["choices"] = request.choices + elif outlines_dict["type"] == "json": + assert request.json is not None + outlines_dict["json"] = request.json + elif outlines_dict["type"] == "regex": + assert request.regex is not None + outlines_dict["regex"] = request.regex + else: + assert outlines_dict["type"] == "text" + prompts.put((prompt_id, prompt, request.max_tokens, request.stream, request.temperature, outlines_dict)) if request.stream: #response = StreamingResponse(streaming_request(prompt, request.max_tokens, tempmodel=repo_str, response_format='chat_completion'), media_type="text/event-stream") @@ -599,9 +841,11 @@ async def mainchat(request: ChatCompletionRequest): return responses.pop(prompt_id) except Exception as e: + print(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) - return response + + @app.get('/ping') @@ -633,10 +877,6 @@ async def get_nvidia_smi(): if __name__ == "__main__": + import uvicorn - uvicorn.run(app, host=host, port=port, log_level="error") - - print(term.enter_fullscreen()) - - - + uvicorn.run(app, host=host, port=port, log_level="debug") From 53700a112da6c396f2bed277c50c2755a4b596ab Mon Sep 17 00:00:00 2001 From: isamu-isozaki Date: Wed, 3 Jul 2024 15:45:10 -0400 Subject: [PATCH 3/6] Returned to original outlines file --- llm_exl2_client_multi_outlines.py | 748 ++++++++++++++++++++++++++++++ 1 file changed, 748 insertions(+) create mode 100644 llm_exl2_client_multi_outlines.py diff --git a/llm_exl2_client_multi_outlines.py b/llm_exl2_client_multi_outlines.py new file mode 100644 index 0000000..b89fcb1 --- /dev/null +++ b/llm_exl2_client_multi_outlines.py @@ -0,0 +1,748 @@ +import asyncio +import json +import os +import logging +import time +import configparser +import argparse +import tiktoken +import torch +import random +from typing import AsyncIterable, List, Generator, Union, Optional +import traceback +from typing import Mapping +import requests +import sseclient +import subprocess +import re + +from fastapi import FastAPI, HTTPException +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import StreamingResponse +from pydantic import BaseModel +from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline, TextStreamer, TextIteratorStreamer +from threading import Thread +import queue +import numpy as np + +import sys, os +import outlines +from outlines.samplers import multinomial +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from exllamav2 import( + ExLlamaV2, + ExLlamaV2Config, + ExLlamaV2Cache, + ExLlamaV2Cache_8bit, + ExLlamaV2Cache_Q4, + ExLlamaV2Tokenizer, +) + +from exllamav2.generator import ( + ExLlamaV2StreamingGenerator, + ExLlamaV2Sampler +) +import uuid + +def generate_unique_id(): + return uuid.uuid4() + +class CompletionRequest(BaseModel): + model: str + prompt: Union[str, List[str]] + stop: Optional[Union[str, List[str]]] = None + max_tokens: Optional[int] = 100 # default value of 100 + temperature: Optional[float] = 0.0 # default value of 0.0 + stream: Optional[bool] = False # default value of False + best_of: Optional[int] = 1 + echo: Optional[bool] = False + frequency_penalty: Optional[float] = 0.0 # default value of 0.0 + presence_penalty: Optional[float] = 0.0 # default value of 0.0 + log_probs: Optional[int] = 0 # default value of 0.0 + n: Optional[int] = 1 # default value of 1, batch size + suffix: Optional[str] = None + top_p: Optional[float] = 0.0 # default value of 0.0 + user: Optional[str] = None + +class Message(BaseModel): + role: str + content: str + +class ChatCompletionRequest(BaseModel): + model: str + messages: List[Message] + stop: Optional[Union[str, List[str]]] = None + max_tokens: Optional[int] = 100 # default value of 100 + temperature: Optional[float] = 0.0 # default value of 0.0 + stream: Optional[bool] = False # default value of False + frequency_penalty: Optional[float] = 0.0 # default value of 0.0 + presence_penalty: Optional[float] = 0.0 # default value of 0.0 + log_probs: Optional[int] = 0 # default value of 0.0 + n: Optional[int] = 1 # default value of 1, batch size + top_p: Optional[float] = 0.0 # default value of 0.0 + user: Optional[str] = None + stop_at: Optional[str] = None + outlines_type: Optional[str] = None + choices: Optional[list[str]] = None + regex: Optional[str] = None + json: Optional[str] = None + request_id: Optional[str] = None + partial_generation: Optional[str] = None + +#repo_str = 'theprofessor-exl2-speculative' + +parser = argparse.ArgumentParser(description='Run server with specified port.') + +# Add argument for port with default type as integer +parser.add_argument('--port', type=int, help='Port to run the server on.') +parser.add_argument('--use_outlines', action='store_true', help='Use outlines.') +parser.add_argument('--gpu_split', type=str, default="17,19,19,19", help='GPU splits.') +parser.add_argument('--max_context', type=int, default=12288, help='Context length.') +parser.add_argument('--cache_8bit', action='store_true', help='Use 8 bit cache.') +parser.add_argument('--cache_q4', action='store_true', help='Use 4 bit cache.') +parser.add_argument('--repo_str', type=str, default='llama3-70b-instruct', help='The model repository name') +parser.add_argument('--outlines_device', type=int, default=2, help='The cuda device to which the outlines device is set') + +# Parse the arguments +args = parser.parse_args() +repo_str = args.repo_str + +config = configparser.ConfigParser() +config.read('config.ini') + +repo_id = config.get(repo_str, 'repo') +host = config.get('settings', 'host') + +port = args.port if args.port is not None else config.getint('settings', 'port') + +# only allow one client at a time +busy = False +condition = asyncio.Condition() + +config = ExLlamaV2Config() +config.model_dir = repo_id +config.prepare() + +use_dynamic_rope_scaling = False +dynamic_rope_mult = 1.5 +dynamic_rope_offset = 0.0 + +ropescale = 1.0 +max_context = args.max_context +config.scale_alpha_value = ropescale +config.max_seq_len = max_context +base_model_native_max = 8192 +cache_8bit = args.cache_8bit +cache_q4 = args.cache_q4 + +if args.use_outlines: + model = outlines.models.exl2( + config.model_dir, + f"cuda:{args.outlines_device}", + max_seq_len = config.max_seq_len, + scale_pos_emb = config.scale_pos_emb, + scale_alpha_value = config.scale_alpha_value, + no_flash_attn = config.no_flash_attn, + num_experts_per_token = config.num_experts_per_token, + cache_8bit = cache_8bit, + cache_q4 = cache_q4, + tokenizer_kwargs = {}, + gpu_split = args.gpu_split, # we might be able to make this auto + low_mem = None, + verbose = None + ) +else: + model = ExLlamaV2(config) +print("Loading model: " + repo_id) +#cache = ExLlamaV2Cache(model, lazy=True, max_seq_len = 20480) +#model.load_autosplit(cache) +if not args.use_outlines: + model.load([int(gpu_memory) for gpu_memory in args.gpu_split.split(",")]) + +tokenizer = ExLlamaV2Tokenizer(config) + +# Cache mode + + +settings_proto = ExLlamaV2Sampler.Settings() +settings_proto.temperature = 0 +settings_proto.top_k = 50 +settings_proto.top_p = 0.8 +settings_proto.top_a = 0.0 +settings_proto.token_repetition_penalty = 1.1 +#settings.disallow_tokens(tokenizer, [tokenizer.eos_token_id]) + +# Active sequences and corresponding caches and settings +prompts = queue.Queue() +responses = {} + +input_ids = [] +prompt_length = [] +prompt_ids = [] +streamer = [] +caches = [] +input_prompts = [] +generators = [] +generations = [] +settings = [] +future_tokens = [] +future_logits = [] +sin_arr = [] +cos_arr = [] + +# Global variable for storing partial responses +partial_responses = {} + +max_parallel_seqs = 3 +num_of_gpus = len(args.gpu_split.split(",")) + +print("*** Loaded.. now Inference...:") + +# take from https://github.com/tiangolo/fastapi/discussions/11360 +class RequestCancelledMiddleware: + def __init__(self, app): + self.app = app + + async def __call__(self, scope, receive, send): + global generators + global prompt_ids + global input_prompts + global generations + global caches + global streamer + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + # Let's make a shared queue for the request messages + queue = asyncio.Queue() + cancelled_request_ids = [] + async def message_poller(sentinel, handler_task): + nonlocal queue + request_id = str(generate_unique_id()) + while True: + message = await receive() + print(message) + if "body" in message: + message["body"] = json.loads(message["body"].decode('utf8')) + message["body"]["request_id"] = request_id + message["body"] = str.encode(json.dumps(message["body"])) + print(message) + if message["type"] == "http.disconnect": + cancelled_request_ids.append(request_id) + handler_task.cancel() + return sentinel # Break the loop + + # Puts the message in the queue + await queue.put(message) + + sentinel = object() + handler_task = asyncio.create_task(self.app(scope, queue.get, send)) + asyncio.create_task(message_poller(sentinel, handler_task)) + + try: + return await handler_task + except asyncio.CancelledError: + print("Cancelling request due to disconnect") + # TODO: FIgure out how to get prompt id that disconnected + while len(cancelled_request_ids) > 0: + cancelled_id = cancelled_request_ids.pop() + for i in range(len(prompt_ids)): + if cancelled_id == prompt_ids[i]: + break + generators.pop(i) + prompt_ids.pop(i) + input_prompts.pop(i) + generations.pop(i) + caches.pop(i) + streamer.pop(i) + + +app = FastAPI(title="EXL2") +app.add_middleware(RequestCancelledMiddleware) + +async def stream_response(prompt_id, timeout=180): + global partial_responses + while True: + await asyncio.sleep(0.05) # Sleep to yield control to the event loop + + # Check if prompt_id exists in partial_responses + if prompt_id in partial_responses: + # Stream partial responses + while partial_responses[prompt_id]: + response_chunk = partial_responses[prompt_id].pop(0) + yield f"data: {json.dumps(response_chunk)}\n\n" + + # Check for final response or timeout + if prompt_id in responses: + final_response = responses.pop(prompt_id) + yield f'data: {{"id":"chatcmpl-{prompt_id}","object":"chat.completion.chunk","created":{int(time.time())},"model":"{repo_str}","choices":[{{"index":0,"delta":{{}},"finish_reason":"stop"}}]}}\n\n' + break + +def process_eos(i): + global generators + global prompt_ids + global input_prompts + global generations + global caches + global streamer + output = generations[i].strip() + prompt = input_prompts[i] + #output = tokenizer.decode(input_ids[i])[0] + print("-----") + print(output) + generated_text = output + # Calculate token counts + completion_tokens = (tokenizer.encode(generated_text)).shape[-1] + prompt_tokens = (tokenizer.encode(prompt)).shape[-1] + full_tokens = completion_tokens + prompt_tokens + eos_prompt_id = prompt_ids.pop(i) + if(streamer[i]): + ## Generator, yield here.. + partial_response_data = { + "finish_reason": "stop" + } + + responses[eos_prompt_id] = partial_response_data + else:# Construct the response based on the format + response_data = { + "id": f"chatcmpl-{eos_prompt_id}", + "object": "chat.completion", + "created": int(time.time()), + "model": repo_str, + "choices": [{ + "index": 0, + "message": { + "role": "assistant", + "content": generated_text, + }, + "finish_reason": "stop" + }], + "usage": { + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "total_tokens": full_tokens + } + } + responses[eos_prompt_id] = response_data + # Clean up + generators.pop(i) + input_prompts.pop(i) + generations.pop(i) + caches.pop(i) + streamer.pop(i) +# Worker thread function +def process_outline_prompts(): + global partial_responses + global generators + global prompt_ids + global input_prompts + global generations + global caches + global streamer + assert args.use_outlines + assert not use_dynamic_rope_scaling, "Currently ROPE scaling is not supported with outlines" + base_model = model.model + while True: + try: + while not prompts.empty() or len(prompt_ids): + while len(prompt_ids) < max_parallel_seqs and not prompts.empty(): + prompt_id, prompt, max_tokens, stream, temperature, outlines_dict = prompts.get() + print(f"got prompt with outlines dict {outlines_dict}") + sampler = multinomial(top_k=50, top_p=1.0, temperature=temperature) + ids = tokenizer.encode(prompt) + prompt_tokens = ids.shape[-1] + max_tokens=min(max_tokens, max_context-prompt_tokens-1) + full_tokens = prompt_tokens + max_tokens + print("Processing prompt: " + str(prompt_id) + " Req tokens: " + str(full_tokens)) + # Truncate if new_tokens exceed max_context + if full_tokens >= max_context: + # Calculate how many tokens to truncate + ids = tokenizer.encode("Say, 'Prompt exceeds allowed length. Please try again.'") + # Update new_tokens after truncation + prompt_tokens = ids.shape[-1] + full_tokens = prompt_tokens + max_tokens + print("Truncating prompt: " + str(prompt_id) + " Req tokens: " + str(full_tokens)) + if cache_8bit: + ncache = ExLlamaV2Cache_8bit(base_model, lazy=not base_model.loaded, max_seq_len = full_tokens) # (max_seq_len could be different for each cache) + elif cache_q4: + ncache = ExLlamaV2Cache_Q4(base_model, lazy=not base_model.loaded, max_seq_len = full_tokens) + else: + ncache = ExLlamaV2Cache(base_model, lazy=not base_model.loaded, max_seq_len = full_tokens) # (max_seq_len could be different for each cache) + model.cache = ncache + model.past_seq = None + stop_at = outlines_dict.get("stop_at", None) + if outlines_dict["type"] == "choices": + generator = outlines.generate.choice(model, outlines_dict["choices"], sampler=sampler) + elif outlines_dict["type"] == "json": + generator = outlines.generate.json(model, outlines_dict["json"], sampler=sampler) + elif outlines_dict["type"] == "regex": + generator = outlines.generate.regex(model, outlines_dict["regex"], sampler=sampler) + else: + generator = outlines.generate.text(model, sampler=sampler) + generators.append(generator.stream(prompt, stop_at=stop_at, max_tokens=max_tokens)) + prompt_ids.append(prompt_id) + input_prompts.append(prompt) + generations.append("") + caches.append(ncache) + streamer.append(stream) + print(len(generators), len(prompt_ids), len(input_prompts), len(generations), len(caches), len(streamer)) + if(len(prompt_ids)): + eos = [] + for i in range(len(prompt_ids)): + model.cache = caches[i] + is_finished = False + decoded_response_token = "" + try: + decoded_response_token = next(generators[i]) + generations[i] += decoded_response_token + except Exception as e: + print(e) + is_finished = True + reason = None + if(streamer[i]): + outcontent = decoded_response_token + if is_finished: + outcontent = "" + reason = "stop" + partial_response_data = { + "id": f"chatcmpl-{prompt_ids[i]}", + "object": "chat.completion.chunk", + "created": int(time.time()), + "model": repo_str, + "choices": [ + { + "index": 0, + "delta": { + "content": outcontent + }, + "finish_reason": reason + } + ] + } + + # Initialize a list for new prompt_id or append to existing one + if prompt_ids[i] not in partial_responses: + partial_responses[prompt_ids[i]] = [] + partial_responses[prompt_ids[i]].append(partial_response_data) + + if is_finished: + eos.insert(0, i) + + # Generate and store response + for i in eos: + process_eos(i) + + else: + # Sleep for a short duration when there's no work + time.sleep(0.1) # Sleep for 100 milliseconds + except Exception as e: + for i in range(len(prompt_ids)): + process_eos(i) + generators = [] + prompt_ids = [] + input_prompts = [] + generations = [] + caches = [] + streamer = [] + print("Reset server due to ", e) + + + +# Start worker thread +worker = Thread(target=process_outline_prompts) +worker.start() + + +async def format_prompt(messages): + formatted_prompt = "" + for message in messages: + if message.role == "system": + formatted_prompt += f"{message.content}\n\n" + elif message.role == "user": + formatted_prompt += f"### User:\n{message.content}\n\n" + elif message.role == "assistant": + formatted_prompt += f"### Assistant:\n{message.content}\n\n" + # Add the final "### Assistant:\n" to prompt for the next response + formatted_prompt += "### Assistant:\n" + return formatted_prompt + +async def format_prompt_llama3(messages): + formatted_prompt = "" + system_message_found = False + + # Check for a system message first + for message in messages: + if message.role == "system": + system_message_found = True + break + + # If no system message was found, prepend a default one + if not system_message_found: + formatted_prompt = "<|begin_of_text|><|start_header_id|>system<|end_header_id|>\n\nYou are a helpful AI assistant.<|eot_id|>" + for message in messages: + if message.role == "system": + formatted_prompt += f"<|begin_of_text|><|start_header_id|>system<|end_header_id|>\n\n{message.content}<|eot_id|>" + elif message.role == "user": + formatted_prompt += f"<|start_header_id|>user<|end_header_id|>\n\n{message.content}<|eot_id|>" + elif message.role == "assistant": + formatted_prompt += f"<|start_header_id|>assistant<|end_header_id|>\n\n{message.content}<|eot_id|>" + # Add the final "### Assistant:\n" to prompt for the next response + formatted_prompt += "<|start_header_id|>assistant<|end_header_id|>\n\n" + return formatted_prompt + +async def format_prompt_yi(messages): + formatted_prompt = "" + system_message_found = False + + # Check for a system message first + for message in messages: + if message.role == "system": + system_message_found = True + break + + # If no system message was found, prepend a default one + if not system_message_found: + formatted_prompt = "<|im_start|>system\nYou are a helpful AI assistant.<|im_end|>\n" + for message in messages: + if message.role == "system": + formatted_prompt += f"<|im_start|>system\n{message.content}<|im_end|>\n" + elif message.role == "user": + formatted_prompt += f"<|im_start|>user\n{message.content}<|im_end|>\n" + elif message.role == "assistant": + formatted_prompt += f"<|im_start|>assistant\n{message.content}<|im_end|>\n" + # Add the final "### Assistant:\n" to prompt for the next response + formatted_prompt += "<|im_start|>assistant\n" + return formatted_prompt + +async def format_prompt_nous(messages): + formatted_prompt = "" + for message in messages: + if message.role == "system": + formatted_prompt += f"{message.content}\n" + elif message.role == "user": + formatted_prompt += f"USER: {message.content}\n" + elif message.role == "assistant": + formatted_prompt += f"ASSISTANT: {message.content}\n" + # Add the final "### Assistant:\n" to prompt for the next response + formatted_prompt += "ASSISTANT: " + return formatted_prompt + +async def format_prompt_tess(messages): + formatted_prompt = "" + for message in messages: + if message.role == "system": + formatted_prompt += f"SYSTEM: {message.content}\n" + elif message.role == "user": + formatted_prompt += f"USER: {message.content}\n" + elif message.role == "assistant": + formatted_prompt += f"ASSISTANT: {message.content}\n" + # Add the final "### Assistant:\n" to prompt for the next response + formatted_prompt += "ASSISTANT: " + return formatted_prompt + +async def format_prompt_code(messages): + formatted_prompt = "" + for message in messages: + if message.role == "system": + formatted_prompt += f"### System Prompt\nYou are an intelligent programming assistant.\n\n" + elif message.role == "user": + formatted_prompt += f"### User Message\n{message.content}\n\n" + elif message.role == "assistant": + formatted_prompt += f"### Assistant\n{message.content}\n\n" + # Add the final "### Assistant" with ellipsis to prompt for the next response + formatted_prompt += "### Assistant\n..." + return formatted_prompt + +async def format_prompt_zephyr(messages): + formatted_prompt = "" + for message in messages: + if message.role == "system": + formatted_prompt += f"<|system|>\n{message.content}\n" + elif message.role == "user": + formatted_prompt += f"<|user|>\n{message.content}\n" + elif message.role == "assistant": + formatted_prompt += f"<|assistant|>\n{message.content}\n" + # Add the final "### Assistant:\n" to prompt for the next response + formatted_prompt += "<|assistant|>\n" + return formatted_prompt + +async def format_prompt_starling(messages): + formatted_prompt = "" + system_message = "" + for message in messages: + if message.role == "system": + # Save system message to prepend to the first user message + system_message += f"{message.content}\n\n" + elif message.role == "user": + # Prepend system message if it exists + if system_message: + formatted_prompt += f"GPT4 Correct User: {system_message}{message.content}<|end_of_turn|>" + system_message = "" # Clear system message after prepending + else: + formatted_prompt += f"GPT4 Correct User: {message.content}<|end_of_turn|>" + elif message.role == "assistant": + formatted_prompt += f"GPT4 Correct Assistant: {message.content}<|end_of_turn|>" # Prep for user follow-up + formatted_prompt += "GPT4 Correct Assistant: \n\n" + return formatted_prompt + +async def format_prompt_mixtral(messages): + formatted_prompt = " " + system_message = "" + for message in messages: + if message.role == "system": + # Save system message to prepend to the first user message + system_message += f"{message.content}\n\n" + elif message.role == "user": + # Prepend system message if it exists + if system_message: + formatted_prompt += f"[INST] {system_message}{message.content} [/INST] " + system_message = "" # Clear system message after prepending + else: + formatted_prompt += f"[INST] {message.content} [/INST] " + elif message.role == "assistant": + formatted_prompt += f" {message.content} " # Prep for user follow-up + return formatted_prompt + + +async def format_prompt_commandr(messages): + formatted_prompt = "" + system_message_found = False + + # Check for a system message first + for message in messages: + if message.role == "system": + system_message_found = True + break + + # If no system message was found, prepend a default one + if not system_message_found: + formatted_prompt += f"<|START_OF_TURN_TOKEN|><|SYSTEM_TOKEN|>{message.content}<|END_OF_TURN_TOKEN|>" + + for message in messages: + if message.role == "system": + formatted_prompt += f"<|START_OF_TURN_TOKEN|><|SYSTEM_TOKEN|>{message.content}<|END_OF_TURN_TOKEN|>" + elif message.role == "user": + formatted_prompt += f"<|START_OF_TURN_TOKEN|><|USER_TOKEN|>{message.content}<|END_OF_TURN_TOKEN|>" + elif message.role == "assistant": + formatted_prompt += f"<|START_OF_TURN_TOKEN|><|CHATBOT_TOKEN|>{message.content}<|END_OF_TURN_TOKEN|>" + # Add the final "### Assistant:\n" to prompt for the next response + formatted_prompt += "<|START_OF_TURN_TOKEN|><|CHATBOT_TOKEN|>" + return formatted_prompt + + +@app.post('/v1/chat/completions') +async def mainchat(request: ChatCompletionRequest): + try: + prompt = '' + if repo_str == 'Phind-CodeLlama-34B-v2': + prompt = await format_prompt_code(request.messages) + elif repo_str == 'zephyr-7b-beta': + prompt = await format_prompt_zephyr(request.messages) + elif repo_str == 'llama3-70b-instruct': + prompt = await format_prompt_llama3(request.messages) + elif repo_str == 'Starling-LM-7B-alpha': + prompt = await format_prompt_starling(request.messages) + elif repo_str == 'Mixtral-8x7B-Instruct-v0.1-GPTQ': + prompt = await format_prompt_mixtral(request.messages) + elif repo_str == 'Yi-34B-Chat-GPTQ' or repo_str == 'Nous-Hermes-2-Yi-34B-GPTQ' or repo_str == 'theprofessor-exl2-speculative' or repo_str == 'dbrx-instruct-exl2': + prompt = await format_prompt_yi(request.messages) + elif repo_str == 'Nous-Capybara-34B-GPTQ' or repo_str == 'goliath-120b-GPTQ' or repo_str == 'goliath-120b-exl2' or repo_str == 'goliath-120b-exl2-rpcal': + prompt = await format_prompt_nous(request.messages) + elif repo_str == 'tess-xl-exl2' or repo_str == 'tess-xl-exl2-speculative': + prompt = await format_prompt_tess(request.messages) + elif repo_str == 'commandr-exl2' or repo_str == 'commandr-exl2-speculative': + prompt = await format_prompt_commandr(request.messages) + else: + prompt = await format_prompt(request.messages) + if request.partial_generation is not None: + prompt += request.partial_generation + print(prompt) + + timeout = 180 # seconds + start_time = time.time() + prompt_id = request.request_id # Replace with a function to generate unique IDs + outlines_dict = {} + + # Adjust temperature if it is 0 + if request.temperature == 0: + request.temperature = 0.001 + + if request.stop_at is not None: + outlines_dict["stop_at"] = request.stop_at + if request.outlines_type is not None: + outlines_dict["type"] = request.outlines_type + elif args.use_outlines: + outlines_dict["type"] = "text" + else: + raise Exception("Enable outlines") + if outlines_dict["type"] == "choices": + assert request.choices is not None + outlines_dict["choices"] = request.choices + elif outlines_dict["type"] == "json": + assert request.json is not None + outlines_dict["json"] = request.json + elif outlines_dict["type"] == "regex": + assert request.regex is not None + outlines_dict["regex"] = request.regex + else: + assert (outlines_dict["type"] == "text") or not args.outlines + if not args.use_outlines: + prompts.put((prompt_id, prompt, request.max_tokens, request.stream, request.temperature)) + else: + prompts.put((prompt_id, prompt, request.max_tokens, request.stream, request.temperature, outlines_dict)) + + if request.stream: + #response = StreamingResponse(streaming_request(prompt, request.max_tokens, tempmodel=repo_str, response_format='chat_completion'), media_type="text/event-stream") + return StreamingResponse(stream_response(prompt_id), media_type="text/event-stream") + else: + #response_data = non_streaming_request(prompt, request.max_tokens, tempmodel=repo_str, response_format='chat_completion') + #response = response_data # This will return a JSON response + while prompt_id not in responses: + await asyncio.sleep(0.1) # Sleep to yield control to the event loop + if time.time() - start_time > timeout: + return {"error": "Response timeout"} + + return responses.pop(prompt_id) + + except Exception as e: + print(traceback.format_exc()) + raise HTTPException(status_code=500, detail=str(e)) + + + + + +@app.get('/ping') +async def get_status(): + return {"ping": sum(prompt_length)} + +@app.get("/nvidia-smi") +async def get_nvidia_smi(): + # Execute the nvidia-smi command + result = subprocess.run( + ["nvidia-smi", "--query-gpu=utilization.gpu,memory.used,memory.total", "--format=csv,noheader"], + capture_output=True, text=True + ) + nvidia_smi_output = result.stdout.strip() # Remove any extra whitespace + # Split the output by lines and then by commas + gpu_data = [] + for line in nvidia_smi_output.split("\n"): + utilization, memory_used, memory_total = line.split(", ") + # Strip the '%' and 'MiB' and convert to appropriate types + utilization = float(utilization.strip(' %')) + memory_used = int(memory_used.strip(' MiB')) + memory_total = int(memory_total.strip(' MiB')) + gpu_data.append({ + "utilization": utilization, + "memory_used": memory_used, + "memory_total": memory_total + }) + return gpu_data + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(app, host=host, port=port, log_level="debug") \ No newline at end of file From 340d4ede82266ecd7daf58a8a6af3be64cb5847a Mon Sep 17 00:00:00 2001 From: isamu-isozaki Date: Wed, 3 Jul 2024 19:08:47 -0400 Subject: [PATCH 4/6] Fixed token id --- llm_exl2_dynamic_gen.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/llm_exl2_dynamic_gen.py b/llm_exl2_dynamic_gen.py index fbfd8a5..27bab6f 100644 --- a/llm_exl2_dynamic_gen.py +++ b/llm_exl2_dynamic_gen.py @@ -201,6 +201,13 @@ def display(self): if self.console_line is not None: print(term.move_xy(0, self.console_line) + self.display_text) +def get_stop_conditions(prompt_format, tokenizer): + if prompt_format == "llama": + return [tokenizer.eos_token_id] + elif prompt_format == "llama3": + return [tokenizer.single_id("<|eot_id|>")] + elif prompt_format == "granite": + return [tokenizer.eos_token_id, "\n\nQuestion:"] config = configparser.ConfigParser() config.read('config.ini') @@ -462,7 +469,7 @@ def process_prompts(): job = ExLlamaV2DynamicJob( input_ids = ids, max_new_tokens = max_tokens, - stop_conditions = [tokenizer.eos_token_id] if stop_at is None else [tokenizer.eos_token_id, stop_at], + stop_conditions = get_stop_conditions() if stop_at is None else [*get_stop_conditions(), stop_at], gen_settings = ExLlamaV2Sampler.Settings(), filters = filters, token_healing = healing From 38557341deee95cbadea13e834909cfe71b79d2e Mon Sep 17 00:00:00 2001 From: isamu-isozaki Date: Wed, 3 Jul 2024 19:11:45 -0400 Subject: [PATCH 5/6] Remove unnecessary --- llm_exl2_dynamic_gen.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/llm_exl2_dynamic_gen.py b/llm_exl2_dynamic_gen.py index 27bab6f..08c6d95 100644 --- a/llm_exl2_dynamic_gen.py +++ b/llm_exl2_dynamic_gen.py @@ -469,7 +469,7 @@ def process_prompts(): job = ExLlamaV2DynamicJob( input_ids = ids, max_new_tokens = max_tokens, - stop_conditions = get_stop_conditions() if stop_at is None else [*get_stop_conditions(), stop_at], + stop_conditions = [tokenizer.eos_token_id] if stop_at is None else [tokenizer.eos_token_id, stop_at], gen_settings = ExLlamaV2Sampler.Settings(), filters = filters, token_healing = healing From c4b4c8f0723baf5205ca8a65a4525b8c3590b06d Mon Sep 17 00:00:00 2001 From: ManilShrestha Date: Wed, 3 Jul 2024 22:43:43 -0400 Subject: [PATCH 6/6] Pass temperature with ExLlamaV2Sampler.Settings() and also made the EOS compatible with Llama3 and 'stop_at' string from outlines --- llm_exl2_dynamic_gen.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/llm_exl2_dynamic_gen.py b/llm_exl2_dynamic_gen.py index 08c6d95..7bca8b4 100644 --- a/llm_exl2_dynamic_gen.py +++ b/llm_exl2_dynamic_gen.py @@ -201,13 +201,14 @@ def display(self): if self.console_line is not None: print(term.move_xy(0, self.console_line) + self.display_text) -def get_stop_conditions(prompt_format, tokenizer): - if prompt_format == "llama": +def get_stop_conditions(tokenizer): + # get_stop_condition special case if model is llama3 + if "llama3" in repo_str: + return [tokenizer.single_id("<|eot_id|>"), tokenizer.eos_token_id] + # elif prompt_format == "granite": + # return [tokenizer.eos_token_id, "\n\nQuestion:"] + else: return [tokenizer.eos_token_id] - elif prompt_format == "llama3": - return [tokenizer.single_id("<|eot_id|>")] - elif prompt_format == "granite": - return [tokenizer.eos_token_id, "\n\nQuestion:"] config = configparser.ConfigParser() config.read('config.ini') @@ -466,11 +467,19 @@ def process_prompts(): #streamer.append(stream) #prompt_ids.append(prompt_id) + preferred_eos = get_stop_conditions(tokenizer) + + if stop_at is not None: + preferred_eos.append(stop_at) + + gen_settings = ExLlamaV2Sampler.Settings() + gen_settings.temperature = 1.0 if temperature>1 else temperature # To make sure the temperature value does not exceed 1 + job = ExLlamaV2DynamicJob( input_ids = ids, max_new_tokens = max_tokens, - stop_conditions = [tokenizer.eos_token_id] if stop_at is None else [tokenizer.eos_token_id, stop_at], - gen_settings = ExLlamaV2Sampler.Settings(), + stop_conditions = preferred_eos if stop_at is None else [tokenizer.eos_token_id, stop_at], + gen_settings = gen_settings, filters = filters, token_healing = healing )