123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468 |
-
-
-
-
-
-
-
-
- import json
- import time
- import urllib.parse
- import urllib.request
- import operator
- import socket
-
- class APIMoonraker():
-
- statesWithWarning = [
- "printing", "pausing", "paused"
- ]
-
- def __init__(self, parent, host, webcam):
- self.parent = parent
- self.host = host
- self.webcamIndex = webcam
-
-
-
-
- def getAvailableCommands(self):
- commands = []
- self.devices = self.getDeviceList()
-
- for d in self.devices:
-
- for a in [ "Turn on", "Turn off" ]:
- name = a + " " + d
- cmd = ( name, self.toggleDevice )
- commands.append(cmd)
-
- return commands
-
-
-
-
-
-
- def sendRequest(self, headers, path, content = None):
- url = "http://" + self.host + "/" + path
- if content == None:
- request = urllib.request.Request(url, None, headers)
- else:
- data = content.encode('ascii')
- request = urllib.request.Request(url, data, headers)
-
- try:
- with urllib.request.urlopen(request, None, self.parent.networkTimeout) as response:
- text = response.read()
-
- return text
- except (urllib.error.URLError, urllib.error.HTTPError) as error:
- print("Error requesting URL \"" + url + "\": \"" + str(error) + "\"")
- return "error"
- except socket.timeout:
- print("Timeout waiting for response to \"" + url + "\"")
- return "timeout"
-
-
- def sendPostRequest(self, path, content):
- headers = {
- "Content-Type": "application/json"
- }
- return self.sendRequest(headers, path, content)
-
-
- def sendGetRequest(self, path):
- headers = {}
- return self.sendRequest(headers, path)
-
-
-
-
-
- def getDeviceList(self):
- devices = []
-
- r = self.sendGetRequest("machine/device_power/devices")
- if (r == "timeout") or (r == "error"):
- return devices
-
- try:
- rd = json.loads(r)
- if "result" in rd:
- if "devices" in rd["result"]:
- for d in rd["result"]["devices"]:
- if "device" in d:
- devices.append(d["device"])
-
- except json.JSONDecodeError:
- pass
-
- return devices
-
-
- def getMethod(self):
- if len(self.devices) <= 0:
- return "unknown"
- return "moonraker"
-
-
-
-
-
-
- def stateSafetyCheck(self, actionString):
- state = self.getState()
- if state.lower() in self.statesWithWarning:
- if self.parent.showDialog("OctoTray Warning", "The printer seems to be running currently!", "Do you really want to " + actionString + "?", True, True) == False:
- return True
- return False
-
-
- def tempSafetyCheck(self, actionString):
- if self.getTemperatureIsSafe() == False:
- if self.parent.showDialog("OctoTray Warning", "The printer seems to still be hot!", "Do you really want to " + actionString + "?", True, True) == False:
- return True
- return False
-
-
- def safetyCheck(self, actionString):
- if self.stateSafetyCheck(actionString):
- return True
- if self.tempSafetyCheck(actionString):
- return True
- return False
-
-
-
-
-
-
- def toggleDevice(self, name):
-
- action = ""
- if name.startswith("Toggle "):
- action = "toggle"
- name = name[len("Toggle "):]
- elif name.startswith("Turn on "):
- action = "on"
- name = name[len("Turn on "):]
- elif name.startswith("Turn off "):
- action = "off"
- name = name[len("Turn off "):]
-
- self.sendPostRequest("machine/device_power/device?device=" + name + "&action=" + action, "")
-
-
- def turnOn(self):
- if len(self.devices) > 0:
- self.toggleDevice("Turn on " + self.devices[0])
-
-
- def turnOff(self):
- if len(self.devices) > 0:
- self.toggleDevice("Turn off " + self.devices[0])
-
-
-
-
-
-
- def getState(self):
-
- r = self.sendGetRequest("api/job")
- try:
- rd = json.loads(r)
- if "state" in rd:
- return rd["state"]
- except json.JSONDecodeError:
- pass
- return "Unknown"
-
-
- def getTemperatureIsSafe(self, limit = 50.0):
- self.sendGetRequest("printer/objects/query?extruder=temperature")
- if (r == "timeout") or (r == "error"):
- return files
-
- temp = 0.0
-
- try:
- rd = json.loads(r)
- if "result" in rd:
- if "status" in rd["result"]:
- if "extruder" in rd["result"]["status"]:
- if "temperature" in rd["result"]["status"]["extruder"]:
- temp = float(rd["result"]["status"]["extruder"]["temperature"])
-
- except json.JSONDecodeError:
- pass
-
- return temp < limit
-
-
- def getTemperatureString(self):
- r = self.sendGetRequest("printer/objects/query?extruder=temperature,target")
- s = "Unknown"
-
- try:
- rd = json.loads(r)
- if "result" in rd:
- if "status" in rd["result"]:
- if "extruder" in rd["result"]["status"]:
- temp = 0.0
- target = 0.0
- if "temperature" in rd["result"]["status"]["extruder"]:
- temp = float(rd["result"]["status"]["extruder"]["temperature"])
- if "target" in rd["result"]["status"]["extruder"]:
- target = float(rd["result"]["status"]["extruder"]["target"])
- s = str(temp) + " / " + str(target)
-
- except json.JSONDecodeError:
- pass
-
- return s
-
-
- def getName(self):
- r = self.sendGetRequest("printer/info")
- s = self.host
-
- try:
- rd = json.loads(r)
- if "result" in rd:
- if "hostname" in rd["result"]:
- s = rd["result"]["hostname"]
-
- except json.JSONDecodeError:
- pass
-
- return s
-
-
- def getProgress(self):
-
- r = self.sendGetRequest("api/job")
- try:
- rd = json.loads(r)
- if "progress" in rd:
- return rd["progress"]
- except json.JSONDecodeError:
- pass
- return "Unknown"
-
-
- def getProgressString(self):
-
- s = ""
- progress = self.getProgress()
- if ("completion" in progress) and ("printTime" in progress) and ("printTimeLeft" in progress) and (progress["completion"] != None) and (progress["printTime"] != None) and (progress["printTimeLeft"] != None):
- s += "%.1f%%" % progress["completion"]
- s += " - runtime "
- s += time.strftime("%H:%M:%S", time.gmtime(progress["printTime"]))
- s += " - "
- s += time.strftime("%H:%M:%S", time.gmtime(progress["printTimeLeft"])) + " left"
- return s
-
-
-
-
-
-
- def sendGCode(self, cmd):
- self.sendPostRequest("printer/gcode/script?script=" + cmd, "")
-
-
- def isPaused(self):
- r = self.sendGetRequest("objects/query?pause_resume")
-
- p = False
-
- try:
- rd = json.loads(r)
- if "result" in rd:
- if "status" in rd["result"]:
- if "pause_resume" in rd["result"]["status"]:
- if "is_paused" in rd["result"]["status"]["pause_resume"]:
- p = rd["result"]["status"]["pause_resume"]["is_paused"]
-
- except json.JSONDecodeError:
- pass
-
- return bool(p)
-
-
- def isPositioningAbsolute(self):
- r = self.sendGetRequest("printer/objects/query?gcode_move=absolute_coordinates")
-
- p = True
-
- try:
- rd = json.loads(r)
- if "result" in rd:
- if "status" in rd["result"]:
- if "gcode_move" in rd["result"]["status"]:
- if "absolute_coordinates" in rd["result"]["status"]["gcode_move"]:
- p = rd["result"]["status"]["gcode_move"]["absolute_coordinates"]
-
- except json.JSONDecodeError:
- pass
-
- return bool(p)
-
- def callHoming(self, axes = "xyz"):
- if self.stateSafetyCheck("home it"):
- return
-
-
- if "x" in axes:
- self.sendGCode("G28 X")
- if "y" in axes:
- self.sendGCode("G28 Y")
- if "z" in axes:
- self.sendGCode("G28 Z")
-
- def callMove(self, axis, dist, speed, relative = True):
- if self.stateSafetyCheck("move it"):
- return
-
- currentlyAbsolute = self.isPositioningAbsolute()
-
- if currentlyAbsolute and relative:
-
- self.sendGCode("G91")
-
- if (not currentlyAbsolute) and (not relative):
-
- self.sendGCode("G90")
-
-
- if axis.lower() == "x":
- self.sendGCode("G0 X" + str(dist) + " F" + str(speed))
- elif axis.lower() == "y":
- self.sendGCode("G0 Y" + str(dist) + " F" + str(speed))
- elif axis.lower() == "z":
- self.sendGCode("G0 Z" + str(dist) + " F" + str(speed))
-
- if currentlyAbsolute and relative:
-
- self.sendGCode("G90")
-
- if (not currentlyAbsolute) and (not relative):
-
- self.sendGCode("G91")
-
- def callPauseResume(self):
- if self.stateSafetyCheck("pause/resume"):
- return
-
- if self.isPaused():
- self.sendPostRequest("printer/print/pause", "")
- else:
- self.sendPostRequest("printer/print/resume", "")
-
- def callJobCancel(self):
- if self.stateSafetyCheck("cancel"):
- return
-
- self.sendPostRequest("printer/print/cancel")
-
- def statusDialog(self):
- progress = self.getProgress()
- s = self.getName() + "\n"
- warning = False
- if ("completion" in progress) and ("printTime" in progress) and ("printTimeLeft" in progress) and (progress["completion"] != None) and (progress["printTime"] != None) and (progress["printTimeLeft"] != None):
- s += "%.1f%% Completion\n" % progress["completion"]
- s += "Printing since " + time.strftime("%H:%M:%S", time.gmtime(progress["printTime"])) + "\n"
- s += time.strftime("%H:%M:%S", time.gmtime(progress["printTimeLeft"])) + " left"
- elif ("completion" in progress) and ("printTime" in progress) and ("printTimeLeft" in progress):
- s += "No job is currently running"
- else:
- s += "Could not read printer status!"
- warning = True
- t = self.getTemperatureString()
- if len(t) > 0:
- s += "\n" + t
- self.parent.showDialog("OctoTray Status", s, None, False, warning)
-
-
-
-
-
- def getRecentFiles(self, count):
- files = []
-
- r = self.sendGetRequest("server/files/directory")
- if (r == "timeout") or (r == "error"):
- return files
-
- try:
- rd = json.loads(r)
- if "result" in rd:
- if "files" in rd["result"]:
- for f in rd["result"]["files"]:
- if "filename" in f and "modified" in f:
- tmp = (f["filename"], f["modified"])
- files.append(tmp)
-
- except json.JSONDecodeError:
- pass
-
- files.sort(reverse = True, key = lambda x: x[1])
- files = files[:count]
- return [ ( i[0], i[0] ) for i in files ]
-
- def printFile(self, path):
- self.sendPostRequest("printer/print/start?filename=" + path, "")
-
-
-
-
-
-
- def setTemperature(self, cmd, temp):
- cmd_str = cmd + " " + str(int(temp))
- self.sendGCode(cmd_str)
-
- def printerHeatTool(self, temp):
- self.setTemperature("M104", temp)
-
- def printerHeatBed(self, temp):
- self.setTemperature("M140", temp)
-
- def printerCooldown(self):
- if self.stateSafetyCheck("cool it down"):
- return
-
- self.printerHeatTool(0)
- self.printerHeatBed(0)
-
-
-
-
-
- def getWebcamURL(self):
- url = ""
-
- r = self.sendGetRequest("server/webcams/list")
- if (r == "timeout") or (r == "error"):
- return url
-
- try:
- rd = json.loads(r)
- if "result" in rd:
- if "webcams" in rd["result"]:
- if len(rd["result"]["webcams"]) > self.webcamIndex:
- w = rd["result"]["webcams"][self.webcamIndex]
- if "snapshot_url" in w:
- url = w["snapshot_url"]
-
- except json.JSONDecodeError:
- pass
-
-
- if url.startswith("/"):
- url = "http://" + self.host + url
-
- return url
|