#!/usr/bin/env python3 from collections import defaultdict import re from pprint import pprint import tempfile from subprocess import check_call reRecord = re.compile(r'''^\s*(\d+) ([^\[]+)\[([^\]]+)\][^\d]+([\d\.]+)''', re.MULTILINE) class Glpsol: outfile = None def __init__(self, modfile): self.modfile = modfile self.sequence = [] def add(self, data): self.sequence.append(data) def solve(self): datafile = self.render_data() self.outfile = "/tmp/o" check_call(["glpsol", "--math", self.modfile, "--data", datafile, "--tmlim", "25", "-o", self.outfile]) def parse(self, variables_of_interest): result = defaultdict(dict) with open(self.outfile, "rt") as f: for match in reRecord.finditer(f.read()): # print(match.groups()) var_name = match.group(2) parameter = match.group(3) value = match.group(4) if var_name in variables_of_interest: result[var_name][parameter] = value return result def render_data(self): f = tempfile.NamedTemporaryFile(delete=False, mode="w+t") for s in self.sequence: s.render(f) f.write("\n") f.write("end;\n") f.close() return f.name class Data: def render(self, destination): raise NotImplementedError("whoops") class Param1D(Data): def __init__(self, param, mapping): self.param = param self.mapping = mapping def render(self, destination): destination.write("param {} :=\n".format(self.param)) for k, v in self.mapping.items(): destination.write("\t[{}]\t{}\n".format(k, v)) destination.write(";") class Set(Data): def __init__(self, name, values): self.name = name self.values = sorted(values) def render(self, destination): destination.write("set {} :=".format(self.name)) for v in self.values: destination.write("\t{}".format(v)) destination.write(";") class Param0D(Data): def __init__(self, name, value): self.name = name self.value = value def render(self, destination): destination.write("param {} := {};\n".format(self.name, self.value)) class Param2D(Data): def __init__(self, name, rows, columns, default=None): self.default = default self.name = name self.rows = sorted(rows) self.columns = sorted(columns) self.values = dict() # key is (r, c) def insert(self, row, column, value): assert row in self.rows assert column in self.columns self.values[row, column] = value def render(self, destination): destination.write("param {} : {} :=\n".format(self.name, "\t".join(self.columns))) for r in self.rows: destination.write("\t\t{}".format(r)) for c in self.columns: if self.default is None: destination.write("\t{}".format(self.values[r, c])) else: destination.write("\t{}".format( self.values.get((r, c), self.default))) destination.write("\n") destination.write(";") class MultiParam(Data): def __init__(self, name, columns, data=None): self.name = name assert isinstance(columns, list) self.columns = columns self.data = [] if data is not None: for d in data: assert len(d) == len(columns) self.data.append(d) def append(self, datum): assert len(datum) == len(self.columns) self.data.append(datum) def render(self, destination): destination.write("param : {} : {} :=\n".format(self.name, "\t".join(self.columns))) for i, datum in enumerate(self.data): destination.write("\t\t{}".format(i)) for d in datum: destination.write("\t{}".format(d)) destination.write("\n") destination.write(";") if __name__ == '__main__': LINACS = ["linac{}".format(i) for i in range(2)] PATIENTS = ["MRN{:03d}".format(i) for i in range(10)] patient_index = range(1, len(PATIENTS) + 1) CAPABILITIES = {"IMRT", "VMAT", "MRI"} g = Glpsol("schedule.mod") g.add(Set(name="C", values=CAPABILITIES)) g.add(Param0D(name="nPatients", value=len(PATIENTS))) g.add(Set(name="L", values=LINACS)) c = Param2D("capabilities", LINACS, CAPABILITIES, default=0) for i, l in enumerate(LINACS): c.insert(l, "IMRT", 1) if i % 2 == 0: c.insert(l, "VMAT", 1) if i % 4 == 0: c.insert(l, "MRI", 1) g.add(c) r = Param2D("requirements", patient_index, CAPABILITIES, default=0) for p in patient_index: if p % 2 == 0: r.insert(p, "VMAT", 1) else: r.insert(p, "IMRT", 1) g.add(r) g.add(Param1D("firstTime", {l: 480 + 120 * i for i, l in enumerate(LINACS)})) g.add(Param1D("lastTime", {l: 1200 for l in LINACS})) times = MultiParam("TIMES", ["patient", "startWindow", "endWindow"]) for p in patient_index: if p % 2 == 0: times.append((p, 5 * 60, 12 * 60)) # not available in the morning if p % 3 == 0: times.append((p, 12 * 60, 22 * 60)) # not available after noon if p % 5 == 0: times.append((p, 8 * 60, 18 * 60)) # not available during work g.add(times) g.add(Param1D("duration", {p: 20 for p in patient_index})) g.solve() parsed = g.parse(["x"]) pprint(parsed)