A Coding Implementation on Qwen 3.6-35B-A3B Covering Multimodal Inference, Thinking Control, Tool Calling, MoE Routing, RAG, and Session Persistence

A Coding Implementation on Qwen 3.6-35B-A3B Covering Multimodal Inference, Thinking Control, Tool Calling, MoE Routing, RAG, and Session Persistence


class QwenChat:
def __init__(self, model, processor, system=None, tools=None):
self.model, self.processor = model, processor
self.tokenizer = processor.tokenizer
self.history: list[dict] = []
if system: self.history.append({“role”: “system”, “content”: system})
self.tools = tools

def user(self, content): self.history.append({“role”:”user”,”content”:content}); return self
def assistant(self, content, reasoning=””):
m = {“role”:”assistant”,”content”:content}
if reasoning: m[“reasoning_content”] = reasoning
self.history.append(m); return self
def tool_result(self, name, result):
self.history.append({“role”:”tool”,”name”:name,
“content”: result if isinstance(result, str) else json.dumps(result)})
return self

def _inputs(self, enable_thinking, preserve_thinking):
return self.processor.apply_chat_template(
self.history, tools=self.tools, tokenize=True,
add_generation_prompt=True, return_dict=True, return_tensors=”pt”,
enable_thinking=enable_thinking, preserve_thinking=preserve_thinking,
).to(self.model.device)

def generate(self, *, enable_thinking=True, preserve_thinking=False,
max_new_tokens=2048, preset=”thinking_general”,
stopping_criteria=None, append_to_history=True):
inp = self._inputs(enable_thinking, preserve_thinking)
cfg = SAMPLING[preset]
gk = dict(**inp, max_new_tokens=max_new_tokens, do_sample=True,
temperature=cfg[“temperature”], top_p=cfg[“top_p”], top_k=cfg[“top_k”],
repetition_penalty=1.0,
pad_token_id=self.tokenizer.pad_token_id or self.tokenizer.eos_token_id)
if stopping_criteria is not None: gk[“stopping_criteria”] = stopping_criteria
with torch.inference_mode(): out = self.model.generate(**gk)
raw = self.tokenizer.decode(out[0, inp[“input_ids”].shape[-1]:], skip_special_tokens=True)
think, ans = split_thinking(raw)
if append_to_history: self.assistant(ans, reasoning=think)
return think, ans

Binance

def stream(self, *, enable_thinking=True, preserve_thinking=False,
max_new_tokens=2048, preset=”thinking_general”,
on_thinking=None, on_answer=None):
inp = self._inputs(enable_thinking, preserve_thinking)
cfg = SAMPLING[preset]
streamer = TextIteratorStreamer(self.tokenizer, skip_prompt=True, skip_special_tokens=True)
gk = dict(**inp, streamer=streamer, max_new_tokens=max_new_tokens, do_sample=True,
temperature=cfg[“temperature”], top_p=cfg[“top_p”], top_k=cfg[“top_k”],
pad_token_id=self.tokenizer.pad_token_id or self.tokenizer.eos_token_id)
t = threading.Thread(target=self.model.generate, kwargs=gk); t.start()
buf, in_think = “”, enable_thinking
think_text, answer_text = “”, “”
for piece in streamer:
buf += piece
if in_think:
if THINK_CLOSE in buf:
close_at = buf.index(THINK_CLOSE)
resid = buf[:close_at]
if on_thinking: on_thinking(resid[len(think_text):])
think_text = resid
buf = buf[close_at + len(THINK_CLOSE):]
in_think = False
if buf and on_answer: on_answer(buf)
answer_text = buf; buf = “”
else:
if on_thinking: on_thinking(piece)
think_text += piece
else:
if on_answer: on_answer(piece)
answer_text += piece
t.join()
self.assistant(answer_text.strip(), reasoning=think_text.strip())
return think_text.strip(), answer_text.strip()

def save(self, path):
with open(path, “w”) as f:
json.dump({“history”: self.history, “tools”: self.tools}, f, indent=2)
@classmethod
def load(cls, model, processor, path):
with open(path) as f: data = json.load(f)
c = cls(model, processor, tools=data.get(“tools”))
c.history = data[“history”]; return c

class ThinkingBudget(StoppingCriteria):
def __init__(self, tokenizer, budget: int):
self.budget = budget
self.open_ids = tokenizer.encode(THINK_OPEN, add_special_tokens=False)
self.close_ids = tokenizer.encode(THINK_CLOSE, add_special_tokens=False)
self.start = None
def _find(self, seq, needle):
n = len(needle)
for i in range(len(seq)-n+1):
if seq[i:i+n] == needle: return i
return None
def __call__(self, input_ids, scores, **kwargs):
seq = input_ids[0].tolist()
if self.start is None:
idx = self._find(seq, self.open_ids)
if idx is not None: self.start = idx + len(self.open_ids)
return False
if self._find(seq[self.start:], self.close_ids) is not None: return False
return (len(seq) – self.start) >= self.budget

TOOL_CALL_RE = re.compile(r”<tool_call>\s*(\{.*?\})\s*</tool_call>”, re.S)

def run_calculate(expr: str) -> str:
if any(c not in “0123456789+-*/().% ” for c in expr):
return json.dumps({“error”:”illegal chars”})
try: return json.dumps({“result”: eval(expr, {“__builtins__”: {}}, {})})
except Exception as e: return json.dumps({“error”: str(e)})

_DOCS = {
“qwen3.6”: “Qwen3.6-35B-A3B is a 35B MoE with 3B active params and 262k native context.”,
“deltanet”: “Gated DeltaNet is a linear-attention variant used in Qwen3.6’s hybrid layers.”,
“moe”: “Qwen3.6 uses 256 experts with 8 routed + 1 shared per token.”,
}
def run_search_docs(q):
hits = [v for k,v in _DOCS.items() if k in q.lower()]
return json.dumps({“results”: hits or [“no hits”]})
def run_get_time():
import datetime as dt
return json.dumps({“iso”: dt.datetime.utcnow().isoformat()+”Z”})

TOOL_FNS = {
“calculate”: lambda a: run_calculate(a[“expression”]),
“search_docs”: lambda a: run_search_docs(a[“query”]),
“get_time”: lambda a: run_get_time(),
}
TOOLS_SCHEMA = [
{“type”:”function”,”function”:{“name”:”calculate”,”description”:”Evaluate arithmetic.”,
“parameters”:{“type”:”object”,”properties”:{“expression”:{“type”:”string”}},”required”:[“expression”]}}},
{“type”:”function”,”function”:{“name”:”search_docs”,”description”:”Search internal docs.”,
“parameters”:{“type”:”object”,”properties”:{“query”:{“type”:”string”}},”required”:[“query”]}}},
{“type”:”function”,”function”:{“name”:”get_time”,”description”:”Get current UTC time.”,
“parameters”:{“type”:”object”,”properties”:{}}}},
]



Source link

Leave a Reply

Your email address will not be published. Required fields are marked *

Pin It on Pinterest