S&B Volcano vaporizer remote control with Pi Pico W
您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

flow.py 2.4KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. #!/usr/bin/env python
  2. import uasyncio as asyncio
  3. import sys
  4. import time
  5. from poll import (
  6. ble_conn,
  7. get_current_temp,
  8. get_target_temp, set_target_temp,
  9. get_unit_is_fahrenheit,
  10. get_state, set_state
  11. )
  12. def print_bar(value, start, end, unit):
  13. print("{}{} -> {}{} -> {}{}".format(start, unit, value, unit, end, unit))
  14. def sleep(t):
  15. w = terminal_width
  16. if t < w:
  17. w = int(t)
  18. print_bar(0, 0, w, "s")
  19. for i in range(0, w):
  20. time.sleep(t / w)
  21. print_bar(i + 1, 0, w, "s")
  22. print()
  23. async def wait_for_temp(client, temp):
  24. print("Setting temperature {}".format(temp))
  25. await set_target_temp(client, temp)
  26. print("Waiting for temperature to rise...")
  27. start = await get_current_temp(client)
  28. curr = start
  29. print_bar(curr, start, temp, " degC")
  30. while curr < temp:
  31. time.sleep(1.0)
  32. curr = await get_current_temp(client)
  33. print_bar(curr, start, temp, " degC")
  34. print()
  35. print("Reached temperature {}".format(temp))
  36. async def flow_step(client, temp, t_wait, t_pump):
  37. await wait_for_temp(client, temp)
  38. print("Waiting {}s for heat to settle...".format(t_wait))
  39. sleep(t_wait)
  40. print("Pumping for {}s".format(t_pump))
  41. await set_state(client, (True, True)) # turn on pump
  42. sleep(t_pump)
  43. await set_state(client, (True, False)) # turn off pump
  44. async def flow(client):
  45. print("Turning on heater")
  46. await set_state(client, (True, False))
  47. await flow_step(client, 190.0, 20.0, 5.0)
  48. await flow_step(client, 205.0, 10.0, 20.0)
  49. await flow_step(client, 220.0, 10.0, 20.0)
  50. print("Notification by pumping three times...")
  51. for i in range(0, 3):
  52. time.sleep(1.0)
  53. await set_state(client, (True, True)) # turn on pump
  54. time.sleep(1.0)
  55. await set_state(client, (True, False)) # turn off pump
  56. print("Turning heater off")
  57. await set_state(client, (False, False)) # turn off heater and pump
  58. if __name__ == "__main__":
  59. async def main(address):
  60. client = await ble_conn(address)
  61. try:
  62. if await get_unit_is_fahrenheit(client):
  63. raise RuntimeError("Imperial American scum is currently not supported :P")
  64. print("Starting Workflow")
  65. await flow(client)
  66. except:
  67. print("\nTurning heater off")
  68. await set_state(client, (False, False)) # turn off heater and pump
  69. raise
  70. asyncio.run(main(None))